当前位置: 首页 > news >正文

Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

相关文章:

Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录 1. 异步发送2. 同步发送 1. 异步发送 Kafka默认就是异步发送&#xff0c;在Main线程中的多条消息&#xff0c;没有严格的先后顺序&#xff0c;Sender发送后就继续下一条&#xff0c;异步接受结果。 public class KafkaProducerCallbackTest {public static void mai…...

Flutter 状态管理调研总结

一, 候选状态管理组件简介 0. flutter_hooks 一个 React 钩子在 Flutter 上的实现&#xff1a;Making Sense of React Hooks 钩子是一种用来管理 Widget 生命周期的新对象&#xff0c;以减少重复代码、增加组件间复用性&#xff0c;允许将视图逻辑提取到通用的用例中并重用&…...

入门C语言只需一个星期(星期二)

点击上方"蓝字"关注我们 01、算术运算符 int myNum = 100 + 50;int sum1 = 100 + 50; // 150 (100 + 50)int sum2 = sum1 + 250; // 400 (150 + 250)int sum3 = sum2 + sum2; // 800 (400 + 400) + 加 将两个值相加 x + y - 减 从另一个值中减去一个值 …...

切换node版本

一、在Linux上切换Node.js版本有多种实现方法&#xff1a; 1.使用nvm&#xff08;Node Version Manager&#xff09;&#xff1a; 安装nvm&#xff1a;可以通过curl或wget来安装nvm&#xff0c;具体请参考nvm的官方文档。 安装不同版本的Node.js&#xff1a;使用nvm可以轻松…...

【常见开源库的二次开发】基于openssl的加密与解密——Base的编解码(二进制转ascll)(二)

目录&#xff1a; 目录&#xff1a; 一、 Base64概述和应用场景 1.1 概述 1.2 应用场景 二、Base16 2.1 Base16编码 2.2 Base16编解码 三、Base64 四、OpenSSL BIO接☐ 4.1 Filter BIOs&#xff1a; 4.2 Source/Sink BIOs&#xff1a; 4.3 应用场景&#xff1a; 4.4 具体使用&…...

ssrf复习(及ctfshow351-360)

1. SSRF 概述 服务器会根据用户提交的URL发送一个HTTP请求。使用用户指定的URL&#xff0c;Web应用可以获取图片或者文件资源等。典型的例子是百度识图功能。 如果没有对用户提交URL和远端服务器所返回的信息做合适的验证或过滤&#xff0c;就有可能存在“请求伪造"的缺陷…...

请求通过Spring Cloud Gateway 503

最近想处理一个通用的网关服务。 但是我在处理好所有配置的时候发现&#xff0c;网络请求过网关的时候&#xff0c;一直503&#xff0c;我所有的配置都没问题。 环境&#xff1a; JDK&#xff1a; 17 Spring Cloud: 2023.0.2 在 Spring Cloud Gateway 的早期版本中&#xff…...

C++代码_让室友坑我

引子 今天古文波在外地上C集训营&#xff0c;结果却被一起学习的室友坑了。啊&#xff0c;好气&#xff0c;我要报复室友。 所以&#xff0c;我写出了死亡代码。 如果你也想报复某些人&#xff0c;可以看下去。 代码构造&#xff1a; 头文件 想要使用一些函数&#xff0c;如…...

AG32 的MCU与FPGA的主频可以达到568MHz吗

Customers: AG32/ AGRV2K 这个芯片主频和定时器最高速度是多少&#xff1f;用户期望 CPLD计时器功能0.1ns以下。 AGM RE: CPLD做不到 0.1ns的速率&#xff0c;这个需要10G以上的时钟。 那AGRV2K最高多少MHz呢&#xff1f; 一般200MHZ比较容易实现。 进一步说明&#xff1…...

怎样减少视频的容量 怎样减少视频内存保持清晰度

在数字媒体时代&#xff0c;视频内容已经成为人们日常交流和信息传递的重要方式。然而&#xff0c;视频往往占用大量存储空间&#xff0c;给我们的设备带来不小的负担。如何在不损失视频质量的前提下&#xff0c;减少视频文件的大小呢&#xff1f;本文将为你揭秘几个实用的技巧…...

谈一谈一条SQL的查询、更新语句究竟是如何执行的?

文章目录 理解执行流程衍生知识redo logbinlog 本篇文章是基于《MySQL45讲》来写的个人理解与感悟。 理解 先看下图&#xff1a; 上一篇文章我们讨论了一条SQL查询语句的执行流程&#xff0c;并介绍了执行过程中涉及的处理模块。 回顾一下&#xff1a; 大体来说&#xff0c;…...

自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo

参考&#xff1a;全景和标定全功能算法实现和exe测试demo-金书世界 1、测试环境 opencv310vs2022 2、使用的编程语言 c和c 3、测试的demo的获取 更新&#xff1a;测试的exe程序&#xff0c;无需解压码就可以体验算法测试效果 百度网盘&#xff1a; 链接&#xff1a;http…...

【Docker 系列】学习路线

学习基本概念&#xff1a; 了解容器化与虚拟化的区别了解Docker的基本概念、术语和架构 安装Docker&#xff1a; 根据所使用的操作系统&#xff0c;安装Docker Desktop&#xff08;Windows、macOS&#xff09;或Docker Engine&#xff08;Linux&#xff09; Docker镜像&#xf…...

蓝色系信息工作室建站网站源码系统 带模版手机端 带完整的源代码包以及搭建部署教程

系统概述 信息工作室建站网站源码系统是一款专为追求高效、灵活与个性化建站需求的用户设计的综合性平台。该系统不仅提供了丰富的网站构建模块和预设模版&#xff0c;还支持手机端自适应布局&#xff0c;确保网站在不同设备上都能展现出最佳效果。此外&#xff0c;系统附带完…...

什么是带宽限制,如何影响服务器数据传输?

什么是带宽限制? 带宽限制是指网络连接中的数据传输速率上限&#xff0c;通常以每秒传输的数据量(比特或字节)来衡量。例如&#xff0c;一个服务器的带宽限制为100 Mbps&#xff0c;意味着它在理想情况下每秒最多能传输100兆比特的数据。带宽限制由网络服务提供商或数据中心设…...

RISC-V在线反汇编工具

RISC-V在线反汇编工具&#xff1a; https://luplab.gitlab.io/rvcodecjs/#q34179073&abifalse&isaAUTO 不过&#xff0c;似乎&#xff0c;只支持RV32I、RV64I、RV128I指令集&#xff1a;...

从零手写实现 nginx-32-load balance 负载均衡算法 java 实现

前言 大家好&#xff0c;我是老马。很高兴遇到你。 我们为 java 开发者实现了 java 版本的 nginx https://github.com/houbb/nginx4j 如果你想知道 servlet 如何处理的&#xff0c;可以参考我的另一个项目&#xff1a; 手写从零实现简易版 tomcat minicat 手写 nginx 系列 …...

基于STC89C51单片机的烟雾报警器设计(煤气火灾检测报警)(含文档、源码与proteus仿真,以及系统详细介绍)

本篇文章论述的是基于STC89C51单片机的烟雾报警器设计的详情介绍&#xff0c;如果对您有帮助的话&#xff0c;还请关注一下哦&#xff0c;如果有资源方面的需要可以联系我。 目录 摘要 原理图 实物图 仿真图 元件清单 代码 系统论文 资源下载 摘要 随着现代家庭用火、…...

SpringBoot整合阿里云RocketMQ对接,商业版

1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…...

modbus slave 设备通过 网关thingsboard-gateway 将数据上传到thingsboard云平台

搭建thingsboard物联网云平台花了大量时间&#xff0c;从小白到最后搭建成功&#xff0c;折磨了好几天&#xff0c;也感谢网友的帮助&#xff0c;提供了思路最终成功搞定&#xff0c;特此记录。 一、thingsboard环境搭建&#xff08;Ubuntu20.04LTS&#xff09; 参考官方文档&a…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...