Kafka Producer发送消息流程之消息异步发送和同步发送
文章目录
- 1. 异步发送
- 2. 同步发送

1. 异步发送
Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}
Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。
2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。
2. 同步发送
消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。
按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
相关文章:
Kafka Producer发送消息流程之消息异步发送和同步发送
文章目录 1. 异步发送2. 同步发送 1. 异步发送 Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。 public class KafkaProducerCallbackTest {public static void mai…...
Flutter 状态管理调研总结
一, 候选状态管理组件简介 0. flutter_hooks 一个 React 钩子在 Flutter 上的实现:Making Sense of React Hooks 钩子是一种用来管理 Widget 生命周期的新对象,以减少重复代码、增加组件间复用性,允许将视图逻辑提取到通用的用例中并重用&…...
入门C语言只需一个星期(星期二)
点击上方"蓝字"关注我们 01、算术运算符 int myNum = 100 + 50;int sum1 = 100 + 50; // 150 (100 + 50)int sum2 = sum1 + 250; // 400 (150 + 250)int sum3 = sum2 + sum2; // 800 (400 + 400) + 加 将两个值相加 x + y - 减 从另一个值中减去一个值 …...
切换node版本
一、在Linux上切换Node.js版本有多种实现方法: 1.使用nvm(Node Version Manager): 安装nvm:可以通过curl或wget来安装nvm,具体请参考nvm的官方文档。 安装不同版本的Node.js:使用nvm可以轻松…...
【常见开源库的二次开发】基于openssl的加密与解密——Base的编解码(二进制转ascll)(二)
目录: 目录: 一、 Base64概述和应用场景 1.1 概述 1.2 应用场景 二、Base16 2.1 Base16编码 2.2 Base16编解码 三、Base64 四、OpenSSL BIO接☐ 4.1 Filter BIOs: 4.2 Source/Sink BIOs: 4.3 应用场景: 4.4 具体使用&…...
ssrf复习(及ctfshow351-360)
1. SSRF 概述 服务器会根据用户提交的URL发送一个HTTP请求。使用用户指定的URL,Web应用可以获取图片或者文件资源等。典型的例子是百度识图功能。 如果没有对用户提交URL和远端服务器所返回的信息做合适的验证或过滤,就有可能存在“请求伪造"的缺陷…...
请求通过Spring Cloud Gateway 503
最近想处理一个通用的网关服务。 但是我在处理好所有配置的时候发现,网络请求过网关的时候,一直503,我所有的配置都没问题。 环境: JDK: 17 Spring Cloud: 2023.0.2 在 Spring Cloud Gateway 的早期版本中ÿ…...
C++代码_让室友坑我
引子 今天古文波在外地上C集训营,结果却被一起学习的室友坑了。啊,好气,我要报复室友。 所以,我写出了死亡代码。 如果你也想报复某些人,可以看下去。 代码构造: 头文件 想要使用一些函数,如…...
AG32 的MCU与FPGA的主频可以达到568MHz吗
Customers: AG32/ AGRV2K 这个芯片主频和定时器最高速度是多少?用户期望 CPLD计时器功能0.1ns以下。 AGM RE: CPLD做不到 0.1ns的速率,这个需要10G以上的时钟。 那AGRV2K最高多少MHz呢? 一般200MHZ比较容易实现。 进一步说明࿱…...
怎样减少视频的容量 怎样减少视频内存保持清晰度
在数字媒体时代,视频内容已经成为人们日常交流和信息传递的重要方式。然而,视频往往占用大量存储空间,给我们的设备带来不小的负担。如何在不损失视频质量的前提下,减少视频文件的大小呢?本文将为你揭秘几个实用的技巧…...
谈一谈一条SQL的查询、更新语句究竟是如何执行的?
文章目录 理解执行流程衍生知识redo logbinlog 本篇文章是基于《MySQL45讲》来写的个人理解与感悟。 理解 先看下图: 上一篇文章我们讨论了一条SQL查询语句的执行流程,并介绍了执行过程中涉及的处理模块。 回顾一下: 大体来说,…...
自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo
参考:全景和标定全功能算法实现和exe测试demo-金书世界 1、测试环境 opencv310vs2022 2、使用的编程语言 c和c 3、测试的demo的获取 更新:测试的exe程序,无需解压码就可以体验算法测试效果 百度网盘: 链接:http…...
【Docker 系列】学习路线
学习基本概念: 了解容器化与虚拟化的区别了解Docker的基本概念、术语和架构 安装Docker: 根据所使用的操作系统,安装Docker Desktop(Windows、macOS)或Docker Engine(Linux) Docker镜像…...
蓝色系信息工作室建站网站源码系统 带模版手机端 带完整的源代码包以及搭建部署教程
系统概述 信息工作室建站网站源码系统是一款专为追求高效、灵活与个性化建站需求的用户设计的综合性平台。该系统不仅提供了丰富的网站构建模块和预设模版,还支持手机端自适应布局,确保网站在不同设备上都能展现出最佳效果。此外,系统附带完…...
什么是带宽限制,如何影响服务器数据传输?
什么是带宽限制? 带宽限制是指网络连接中的数据传输速率上限,通常以每秒传输的数据量(比特或字节)来衡量。例如,一个服务器的带宽限制为100 Mbps,意味着它在理想情况下每秒最多能传输100兆比特的数据。带宽限制由网络服务提供商或数据中心设…...
RISC-V在线反汇编工具
RISC-V在线反汇编工具: https://luplab.gitlab.io/rvcodecjs/#q34179073&abifalse&isaAUTO 不过,似乎,只支持RV32I、RV64I、RV128I指令集:...
从零手写实现 nginx-32-load balance 负载均衡算法 java 实现
前言 大家好,我是老马。很高兴遇到你。 我们为 java 开发者实现了 java 版本的 nginx https://github.com/houbb/nginx4j 如果你想知道 servlet 如何处理的,可以参考我的另一个项目: 手写从零实现简易版 tomcat minicat 手写 nginx 系列 …...
基于STC89C51单片机的烟雾报警器设计(煤气火灾检测报警)(含文档、源码与proteus仿真,以及系统详细介绍)
本篇文章论述的是基于STC89C51单片机的烟雾报警器设计的详情介绍,如果对您有帮助的话,还请关注一下哦,如果有资源方面的需要可以联系我。 目录 摘要 原理图 实物图 仿真图 元件清单 代码 系统论文 资源下载 摘要 随着现代家庭用火、…...
SpringBoot整合阿里云RocketMQ对接,商业版
1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…...
modbus slave 设备通过 网关thingsboard-gateway 将数据上传到thingsboard云平台
搭建thingsboard物联网云平台花了大量时间,从小白到最后搭建成功,折磨了好几天,也感谢网友的帮助,提供了思路最终成功搞定,特此记录。 一、thingsboard环境搭建(Ubuntu20.04LTS) 参考官方文档&a…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
