Kafka Producer发送消息流程之消息异步发送和同步发送
文章目录
- 1. 异步发送
- 2. 同步发送

1. 异步发送
Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}
Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。
2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。
2. 同步发送
消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。
按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
相关文章:
Kafka Producer发送消息流程之消息异步发送和同步发送
文章目录 1. 异步发送2. 同步发送 1. 异步发送 Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。 public class KafkaProducerCallbackTest {public static void mai…...
Flutter 状态管理调研总结
一, 候选状态管理组件简介 0. flutter_hooks 一个 React 钩子在 Flutter 上的实现:Making Sense of React Hooks 钩子是一种用来管理 Widget 生命周期的新对象,以减少重复代码、增加组件间复用性,允许将视图逻辑提取到通用的用例中并重用&…...
入门C语言只需一个星期(星期二)
点击上方"蓝字"关注我们 01、算术运算符 int myNum = 100 + 50;int sum1 = 100 + 50; // 150 (100 + 50)int sum2 = sum1 + 250; // 400 (150 + 250)int sum3 = sum2 + sum2; // 800 (400 + 400) + 加 将两个值相加 x + y - 减 从另一个值中减去一个值 …...
切换node版本
一、在Linux上切换Node.js版本有多种实现方法: 1.使用nvm(Node Version Manager): 安装nvm:可以通过curl或wget来安装nvm,具体请参考nvm的官方文档。 安装不同版本的Node.js:使用nvm可以轻松…...
【常见开源库的二次开发】基于openssl的加密与解密——Base的编解码(二进制转ascll)(二)
目录: 目录: 一、 Base64概述和应用场景 1.1 概述 1.2 应用场景 二、Base16 2.1 Base16编码 2.2 Base16编解码 三、Base64 四、OpenSSL BIO接☐ 4.1 Filter BIOs: 4.2 Source/Sink BIOs: 4.3 应用场景: 4.4 具体使用&…...
ssrf复习(及ctfshow351-360)
1. SSRF 概述 服务器会根据用户提交的URL发送一个HTTP请求。使用用户指定的URL,Web应用可以获取图片或者文件资源等。典型的例子是百度识图功能。 如果没有对用户提交URL和远端服务器所返回的信息做合适的验证或过滤,就有可能存在“请求伪造"的缺陷…...
请求通过Spring Cloud Gateway 503
最近想处理一个通用的网关服务。 但是我在处理好所有配置的时候发现,网络请求过网关的时候,一直503,我所有的配置都没问题。 环境: JDK: 17 Spring Cloud: 2023.0.2 在 Spring Cloud Gateway 的早期版本中ÿ…...
C++代码_让室友坑我
引子 今天古文波在外地上C集训营,结果却被一起学习的室友坑了。啊,好气,我要报复室友。 所以,我写出了死亡代码。 如果你也想报复某些人,可以看下去。 代码构造: 头文件 想要使用一些函数,如…...
AG32 的MCU与FPGA的主频可以达到568MHz吗
Customers: AG32/ AGRV2K 这个芯片主频和定时器最高速度是多少?用户期望 CPLD计时器功能0.1ns以下。 AGM RE: CPLD做不到 0.1ns的速率,这个需要10G以上的时钟。 那AGRV2K最高多少MHz呢? 一般200MHZ比较容易实现。 进一步说明࿱…...
怎样减少视频的容量 怎样减少视频内存保持清晰度
在数字媒体时代,视频内容已经成为人们日常交流和信息传递的重要方式。然而,视频往往占用大量存储空间,给我们的设备带来不小的负担。如何在不损失视频质量的前提下,减少视频文件的大小呢?本文将为你揭秘几个实用的技巧…...
谈一谈一条SQL的查询、更新语句究竟是如何执行的?
文章目录 理解执行流程衍生知识redo logbinlog 本篇文章是基于《MySQL45讲》来写的个人理解与感悟。 理解 先看下图: 上一篇文章我们讨论了一条SQL查询语句的执行流程,并介绍了执行过程中涉及的处理模块。 回顾一下: 大体来说,…...
自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo
参考:全景和标定全功能算法实现和exe测试demo-金书世界 1、测试环境 opencv310vs2022 2、使用的编程语言 c和c 3、测试的demo的获取 更新:测试的exe程序,无需解压码就可以体验算法测试效果 百度网盘: 链接:http…...
【Docker 系列】学习路线
学习基本概念: 了解容器化与虚拟化的区别了解Docker的基本概念、术语和架构 安装Docker: 根据所使用的操作系统,安装Docker Desktop(Windows、macOS)或Docker Engine(Linux) Docker镜像…...
蓝色系信息工作室建站网站源码系统 带模版手机端 带完整的源代码包以及搭建部署教程
系统概述 信息工作室建站网站源码系统是一款专为追求高效、灵活与个性化建站需求的用户设计的综合性平台。该系统不仅提供了丰富的网站构建模块和预设模版,还支持手机端自适应布局,确保网站在不同设备上都能展现出最佳效果。此外,系统附带完…...
什么是带宽限制,如何影响服务器数据传输?
什么是带宽限制? 带宽限制是指网络连接中的数据传输速率上限,通常以每秒传输的数据量(比特或字节)来衡量。例如,一个服务器的带宽限制为100 Mbps,意味着它在理想情况下每秒最多能传输100兆比特的数据。带宽限制由网络服务提供商或数据中心设…...
RISC-V在线反汇编工具
RISC-V在线反汇编工具: https://luplab.gitlab.io/rvcodecjs/#q34179073&abifalse&isaAUTO 不过,似乎,只支持RV32I、RV64I、RV128I指令集:...
从零手写实现 nginx-32-load balance 负载均衡算法 java 实现
前言 大家好,我是老马。很高兴遇到你。 我们为 java 开发者实现了 java 版本的 nginx https://github.com/houbb/nginx4j 如果你想知道 servlet 如何处理的,可以参考我的另一个项目: 手写从零实现简易版 tomcat minicat 手写 nginx 系列 …...
基于STC89C51单片机的烟雾报警器设计(煤气火灾检测报警)(含文档、源码与proteus仿真,以及系统详细介绍)
本篇文章论述的是基于STC89C51单片机的烟雾报警器设计的详情介绍,如果对您有帮助的话,还请关注一下哦,如果有资源方面的需要可以联系我。 目录 摘要 原理图 实物图 仿真图 元件清单 代码 系统论文 资源下载 摘要 随着现代家庭用火、…...
SpringBoot整合阿里云RocketMQ对接,商业版
1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…...
modbus slave 设备通过 网关thingsboard-gateway 将数据上传到thingsboard云平台
搭建thingsboard物联网云平台花了大量时间,从小白到最后搭建成功,折磨了好几天,也感谢网友的帮助,提供了思路最终成功搞定,特此记录。 一、thingsboard环境搭建(Ubuntu20.04LTS) 参考官方文档&a…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?
Pod IP 的本质与特性 Pod IP 的定位 纯端点地址:Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址(如 10.244.1.2)无特殊名称:在 Kubernetes 中,它通常被称为 “Pod IP” 或 “容器 IP”生命周期:与 Pod …...
WEB3全栈开发——面试专业技能点P8DevOps / 区块链部署
一、Hardhat / Foundry 进行合约部署 概念介绍 Hardhat 和 Foundry 都是以太坊智能合约开发的工具套件,支持合约的编译、测试和部署。 它们允许开发者在本地或测试网络快速开发智能合约,并部署到链上(测试网或主网)。 部署过程…...
Flask和Django,你怎么选?
Flask 和 Django 是 Python 两大最流行的 Web 框架,但它们的设计哲学、目标和适用场景有显著区别。以下是详细的对比: 核心区别:哲学与定位 Django: 定位: "全栈式" Web 框架。奉行"开箱即用"的理念。 哲学: "包含…...
Java + Spring Boot + Mybatis 插入数据后,获取自增 id 的方法
在 MyBatis 中使用 useGeneratedKeys"true" 获取新插入记录的自增 ID 值,可通过以下步骤实现: 1. 配置 Mapper XML 在插入语句的 <insert> 标签中设置: xml 复制 下载 运行 <insert id"insertUser" para…...
(33)课54:3 张表的 join-on 连接举例,多表查询总结。数据库编程补述及游标综合例题。静态 sqL与动态sqL(可带参数)
(112)3 张表的 join-on 连接举例 : (113) 多表查询总结 : (114)数据库编程补述 : 综合例题 : 以上没有动手练习,不知道这样的语法是否…...
