Kafka 之批量消息发送消费
前言:
前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 消息批量发送
Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。
Kafka 消息的批量发送主要跟以下三个参数有关:
- batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
- buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
- linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。
以上三个条件满足一个就会触发消息的批量提交。
官方文档传送门
Kafka 批量消息 参数配置
上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:
#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000
Kafka 批量消息 Producer 代码演示
Kafka 批量发送消息代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: MyKafkaBatchProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}
在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。
Kafka 批量消息 Consumer 代码演示
Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:
#Kafka 的消费模式 single 每次单条消费消息 batch 每次批量消费消息
spring.kafka.listener.type = batch
Consumer 代码如下:
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @ClassName: MyKafkaBatchConsumer * @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchConsumer {@KafkaListener(id = "my-kafka-consumer-01",groupId = "my-kafka-consumer-groupId-01",topics = "my-topic",containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})public void listen(List<ConsumerRecord<String, String>> consumerRecords) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String value = consumerRecord.value();log.info("消息内容:{}",value);}}}
这里我们使用了 properties 这个属性配置,后面详细讲解。
** Kafka 批量消息验证**
触发消息发送消费结果如下:
2024-10-27 15:27:17.563 INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第9条 kafka 消息
2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。
我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。
#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000
调整消息发送端代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @ClassName: KafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}
触发消息发送消费结果如下:
2024-10-27 15:41:39.530 INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530 INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。
关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。
spring.kafka.consumer.max-poll-records 参数讨论
spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。
总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Kafka 之批量消息发送消费
前言: 前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。 Kafka 系列文章传送门 Kafka 简介及核心概念讲解 Spring Boot 整合 Kafka 详解 Kafka Kafka…...
【大数据学习 | kafka】kafka的偏移量管理
1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置,防止因为消费者程序宕机而引起断点消费数据丢失问题,下一次可以按照相应的位置从kafka中找寻数据,这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信…...
实景三维赋能森林防灭火指挥调度智慧化
森林防灭火工作是保护森林资源和生态环境的重要任务。随着信息技术的发展,实景三维技术在森林防灭火指挥调度中的应用日益广泛,为提升防灭火工作的效率和效果提供了有力支持。 一、森林防灭火面临的挑战 森林火灾具有突发性强、破坏性大、蔓延速度快、…...
【C++课程学习】:string的模拟实现
🎁个人主页:我们的五年 🔍系列专栏:C课程学习 🎉欢迎大家点赞👍评论📝收藏⭐文章 目录 一.string的主体框架: 二.string的分析: 🍔构造函数和析构函数&a…...
Linux(VMware + CentOS )设置固定ip
需求:设置ip为 192.168.88.130 先关闭虚拟机 启动虚拟机 查看当前自动获取的ip 使用 FinalShell 通过 ssh 服务远程登录系统,更换到 root 用户 修改ip配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 重启网卡 systemctl restart network …...
安卓 android studio各版本下载地址(官方)
https://developer.android.google.cn/studio/archive 别用中文,右上角的语言切换成英文...
如何在一个 Docker 容器中运行多个进程 ?
在容器化的世界里,Docker 彻底改变了开发人员构建、发布和运行应用程序的方式。Docker 容器封装了运行应用程序所需的所有依赖项,使其易于跨不同环境一致地部署。然而,在单个 Docker 容器中管理多个进程可能具有挑战性,这就是 Sup…...
poetry 配置多个cuda环境心得
操作系统:ubuntu22.04 LTS python版本:3.12.7 最近学习了用poetry配置python虚拟环境,当为不同的项目配置cuda时,会遇到不同的项目使用的cuda版本不一致的情况。 像torch 这样的库,它们会对cuda-toolkit有依赖&…...
网络编程入门
目录 1.网络编程入门 1.1 网络编程概述【理解】 1.2 网络编程三要素【理解】 1.3 IP地址【理解】 1.4InetAddress【应用】 1.5端口和协议【理解】 2.UDP通信程序 2.1 UDP发送数据【应用】 2.2UDP接收数据【应用】 2.3UDP通信程序练习【应用】 3.TCP通信程序 3.1TCP…...
Linux-socket详解
Linux-socket详解_socket linux-CSDN博客...
SQL Server 2022安装要求(硬件、软件、操作系统等)
SQL Server 2022安装要求 1、硬件要求2、软件要求3、操作系统支持4、Server Core 支持5、跨语言支持6、磁盘空间要求 1、硬件要求 以下内存和处理器要求适用于所有版本的 SQL Server: 组件要求存储SQL Server 要求最少 6 GB 的可用硬盘驱动器空间。 磁盘空间要求随…...
“众店模式”:创新驱动下的商业新生态
在数字化浪潮的推动下,传统商业模式正经历着前所未有的转型。“众店模式”作为一种新兴的商业模式,以其独特的商业逻辑和创新的玩法,为商家和消费者构建了一个共赢的商业新生态。 一、“众店模式”的核心构成 “众店模式”的成功࿰…...
54. 螺旋矩阵
https://leetcode.cn/problems/spiral-matrix/description/?envTypestudy-plan-v2&envIdtop-100-liked观察示例中的输出轨迹我们可以想到如下设计: 1.在朝某一方向行进到头后的改变方向是确定的,左->下,下->右,右->…...
剧本杀小程序,市场发展下的新机遇
剧本杀作为休闲娱乐的一种游戏方式,在短时间内进入了大众视野中,受到了广泛关注。近几年,剧本杀行业面临着创新挑战,商家需求寻求新的发展机遇,在市场饱和度下降的趋势下,获得市场份额。 随着科技的不断进…...
【系统架构设计师】论文:论基于 ABSD 的软件开发
更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 摘要正文摘要 2022年5月,我就职的公司承接了xx的智慧党建工作,建设“党建红云” 系统,为xx公司的党组织提供觉务管理、服务功能,促进党员学习和党组织交流。我在该项目中承担架构设计师的职责,主导需求分析和…...
为什么OLED透明屏在同类产品中显示效果最好
说起OLED透明屏,这家伙在同类产品里那真的是“一枝独秀”啊!为啥这么说呢?且听我细细道来。 首先,OLED透明屏的透明度那是杠杠的!它不像传统显示屏那样有个固定的背景,而是可以实现像素级的透明效果。这样一…...
深度学习基础知识-Batch Normalization(BN)超详细解析
一、背景和问题定义 在深层神经网络(Deep Neural Networks, DNNs)中,层与层之间的输入分布会随着参数更新不断发生变化,这种现象被称为内部协变量偏移(Internal Covariate Shift)。具体来说,由…...
基于单片机的燃气报警阀门系统
本设计基于单片机的燃气报警阀门系统,燃气报警阀门系统采用STM32主控制器为核心芯片,外围电路由燃气传感器、OLED液晶显示模块、按键模块、蜂鸣器报警模块、电磁阀以及SIM800模块等模块组成。燃气传感器模块负责采集燃气浓度数据,采集完成由S…...
watch与computed的区别、运用的场景
computed和watch都是响应式数据变化的重要机制,但它们在功能、使用场景和性能表现上有显著的区别。 主要区别 功能和用途 1、computed:计算属性,用于基于其他数据属性进行计算,并返回一个结果。它具有缓存机制,只有当…...
【ESP32+MicroPython】开发环境部署
本教程将指导你如何在Visual Studio Code(VSCode)中设置ESP32的MicroPython开发环境。我们将涵盖从安装Python到烧录MicroPython固件的整个过程,以及如何配置VSCode以便与ESP32进行交互。 准备工作 安装Python 确保你的计算机上安装了Pyth…...
基于SpringCloud的微服务架构技术研究
随着互联网技术与校园信息化建设的快速发展,传统单体架构系统在业务迭代、功能扩展、并发处理与后期维护方面逐渐暴露出诸多短板。单体架构将所有业务逻辑、数据接口与功能模块耦合在同一个项目中,在系统体量较小、业务需求简单的场景下能够满足开发需求…...
多模态LLM落地实战:从架构选型到推理部署的12个生死关卡
1. 这不是“多模态大模型”的科普文,而是一份一线工程师拆解真实系统时的现场笔记“Understanding Multimodal LLMs: The Next Evolution of AI”——这个标题在2024年已经刷屏了太多次。但你有没有发现,几乎所有公开资料都在讲“它能看图说话”“它能理…...
Java解析支付宝PKCS#8私钥失败的根源与解决方案
1. 这不是密钥格式错了,是Java对PKCS#8私钥的“认知偏差”在作祟 你刚把支付宝开放平台下载的 .pem 私钥文件丢进 Java 项目,调用 AlipayClient.execute() 就立刻报错:“RSA2签名遭遇异常,请检查私钥格式是否正确”。第一反应…...
WarcraftHelper终极教程:5分钟搞定魔兽争霸3现代化优化
WarcraftHelper终极教程:5分钟搞定魔兽争霸3现代化优化 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为《魔兽争霸3》这款经典游戏在…...
武汉专升本民办 vs 公办机构怎么选
每年到了专科大三的春天,武汉的专升本备考群里总会出现类似的问题:“公办机构是不是比民办靠谱?”“民办会不会拿钱不办事?”“集训营到底该冲公办还是选民办?”说实话,这个问题没有标准答案,因…...
5-8倍加速:ncnn 3×3卷积模块
5-8倍加速:ncnn 33矩阵卷积模块 我把腾讯ncnn的33卷积从手工循环替换成了自己的算法(Im2Col GEMM),实测加速5到8倍。 适用于大通道数(inch≥16, outch≥32)、大分辨率特征图、服务端推理场景。小通道建议…...
Unity连接Arduino BLE实战:5分钟实现PC端双向通信
1. 这不是“配对”,而是让Unity像手机App一样和Arduino对话很多人第一次尝试Unity连接Arduino蓝牙模块时,会下意识打开Windows的“蓝牙设置”去“添加设备”——结果折腾半小时,Unity里依然收不到任何数据。我最初也这么干过,直到…...
PdrER算法:扩展解析在模型检查中的高效应用
1. PdrER算法核心原理与技术突破1.1 传统PDR算法的局限性分析Property Directed Reachability(PDR,也称为IC3)是当前最先进的模型检查算法之一,广泛应用于硬件和软件系统的安全属性验证。该算法通过构建归纳不变量(ind…...
ARM架构中APB外设与External PPB空间部署解析
1. APB系统外设与External PPB空间的关系解析在嵌入式系统设计中,APB(Advanced Peripheral Bus)作为ARM架构中广泛使用的低速外设总线,其常规部署位置通常位于SoC内部。但近年来,随着异构计算和模块化设计的普及,将APB外设放置在E…...
回归分析((>^ω^<)喵)
回归分析找到2个数据以上的的关系做预测的。是预测数字形的而不是男还是女这种问题1.举例略说这是一张图,是学习时间与成绩的回归分析,这条红线是回归线Xx是自变量,是用于预测的,例如学习时间,是因Yy是因变量 …...
