当前位置: 首页 > news >正文

Kafka 之批量消息发送消费

前言:

前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 消息批量发送

Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。

Kafka 消息的批量发送主要跟以下三个参数有关:

  • batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
  • buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
  • linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。

以上三个条件满足一个就会触发消息的批量提交。

官方文档传送门

Kafka 批量消息 参数配置

上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

Kafka 批量消息 Producer 代码演示

Kafka 批量发送消息代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: MyKafkaBatchProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。

Kafka 批量消息 Consumer 代码演示

Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:

#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = batch

Consumer 代码如下:

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @ClassName: MyKafkaBatchConsumer * @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchConsumer {@KafkaListener(id = "my-kafka-consumer-01",groupId = "my-kafka-consumer-groupId-01",topics = "my-topic",containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})public void listen(List<ConsumerRecord<String, String>> consumerRecords) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String value = consumerRecord.value();log.info("消息内容:{}",value);}}}

这里我们使用了 properties 这个属性配置,后面详细讲解。

** Kafka 批量消息验证**

触发消息发送消费结果如下:

2024-10-27 15:27:17.563  INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer    : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第9条 kafka 消息

2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。

我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

调整消息发送端代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @ClassName: KafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

触发消息发送消费结果如下:

2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10

可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。

关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。

spring.kafka.consumer.max-poll-records 参数讨论

spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。

总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。

如有不正确的地方欢迎各位指出纠正。

相关文章:

Kafka 之批量消息发送消费

前言&#xff1a; 前面我们分享了 Kafka 的一些基础知识&#xff0c;以及 Spring Boot 集成 Kafka 完成消息发送消费&#xff0c;本篇我们来分享一下 Kafka 的批量消息发送消费。 Kafka 系列文章传送门 Kafka 简介及核心概念讲解 Spring Boot 整合 Kafka 详解 Kafka Kafka…...

【大数据学习 | kafka】kafka的偏移量管理

1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置&#xff0c;防止因为消费者程序宕机而引起断点消费数据丢失问题&#xff0c;下一次可以按照相应的位置从kafka中找寻数据&#xff0c;这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信…...

实景三维赋能森林防灭火指挥调度智慧化

森林防灭火工作是保护森林资源和生态环境的重要任务。随着信息技术的发展&#xff0c;实景三维技术在森林防灭火指挥调度中的应用日益广泛&#xff0c;为提升防灭火工作的效率和效果提供了有力支持。 一、森林防灭火面临的挑战 森林火灾具有突发性强、破坏性大、蔓延速度快、…...

【C++课程学习】:string的模拟实现

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 一.string的主体框架&#xff1a; 二.string的分析&#xff1a; &#x1f354;构造函数和析构函数&a…...

Linux(VMware + CentOS )设置固定ip

需求&#xff1a;设置ip为 192.168.88.130 先关闭虚拟机 启动虚拟机 查看当前自动获取的ip 使用 FinalShell 通过 ssh 服务远程登录系统&#xff0c;更换到 root 用户 修改ip配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 重启网卡 systemctl restart network …...

安卓 android studio各版本下载地址(官方)

https://developer.android.google.cn/studio/archive 别用中文&#xff0c;右上角的语言切换成英文...

如何在一个 Docker 容器中运行多个进程 ?

在容器化的世界里&#xff0c;Docker 彻底改变了开发人员构建、发布和运行应用程序的方式。Docker 容器封装了运行应用程序所需的所有依赖项&#xff0c;使其易于跨不同环境一致地部署。然而&#xff0c;在单个 Docker 容器中管理多个进程可能具有挑战性&#xff0c;这就是 Sup…...

poetry 配置多个cuda环境心得

操作系统&#xff1a;ubuntu22.04 LTS python版本&#xff1a;3.12.7 最近学习了用poetry配置python虚拟环境&#xff0c;当为不同的项目配置cuda时&#xff0c;会遇到不同的项目使用的cuda版本不一致的情况。 像torch 这样的库&#xff0c;它们会对cuda-toolkit有依赖&…...

网络编程入门

目录 1.网络编程入门 1.1 网络编程概述【理解】 1.2 网络编程三要素【理解】 1.3 IP地址【理解】 1.4InetAddress【应用】 1.5端口和协议【理解】 2.UDP通信程序 2.1 UDP发送数据【应用】 2.2UDP接收数据【应用】 2.3UDP通信程序练习【应用】 3.TCP通信程序 3.1TCP…...

Linux-socket详解

Linux-socket详解_socket linux-CSDN博客...

SQL Server 2022安装要求(硬件、软件、操作系统等)

SQL Server 2022安装要求 1、硬件要求2、软件要求3、操作系统支持4、Server Core 支持5、跨语言支持6、磁盘空间要求 1、硬件要求 以下内存和处理器要求适用于所有版本的 SQL Server&#xff1a; 组件要求存储SQL Server 要求最少 6 GB 的可用硬盘驱动器空间。 磁盘空间要求随…...

“众店模式”:创新驱动下的商业新生态

在数字化浪潮的推动下&#xff0c;传统商业模式正经历着前所未有的转型。“众店模式”作为一种新兴的商业模式&#xff0c;以其独特的商业逻辑和创新的玩法&#xff0c;为商家和消费者构建了一个共赢的商业新生态。 一、“众店模式”的核心构成 “众店模式”的成功&#xff0…...

54. 螺旋矩阵

https://leetcode.cn/problems/spiral-matrix/description/?envTypestudy-plan-v2&envIdtop-100-liked观察示例中的输出轨迹我们可以想到如下设计&#xff1a; 1.在朝某一方向行进到头后的改变方向是确定的&#xff0c;左->下&#xff0c;下->右&#xff0c;右->…...

剧本杀小程序,市场发展下的新机遇

剧本杀作为休闲娱乐的一种游戏方式&#xff0c;在短时间内进入了大众视野中&#xff0c;受到了广泛关注。近几年&#xff0c;剧本杀行业面临着创新挑战&#xff0c;商家需求寻求新的发展机遇&#xff0c;在市场饱和度下降的趋势下&#xff0c;获得市场份额。 随着科技的不断进…...

【系统架构设计师】论文:论基于 ABSD 的软件开发

更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 摘要正文摘要 2022年5月,我就职的公司承接了xx的智慧党建工作,建设“党建红云” 系统,为xx公司的党组织提供觉务管理、服务功能,促进党员学习和党组织交流。我在该项目中承担架构设计师的职责,主导需求分析和…...

为什么OLED透明屏在同类产品中显示效果最好

说起OLED透明屏&#xff0c;这家伙在同类产品里那真的是“一枝独秀”啊&#xff01;为啥这么说呢&#xff1f;且听我细细道来。 首先&#xff0c;OLED透明屏的透明度那是杠杠的&#xff01;它不像传统显示屏那样有个固定的背景&#xff0c;而是可以实现像素级的透明效果。这样一…...

深度学习基础知识-Batch Normalization(BN)超详细解析

一、背景和问题定义 在深层神经网络&#xff08;Deep Neural Networks, DNNs&#xff09;中&#xff0c;层与层之间的输入分布会随着参数更新不断发生变化&#xff0c;这种现象被称为内部协变量偏移&#xff08;Internal Covariate Shift&#xff09;。具体来说&#xff0c;由…...

基于单片机的燃气报警阀门系统

本设计基于单片机的燃气报警阀门系统&#xff0c;燃气报警阀门系统采用STM32主控制器为核心芯片&#xff0c;外围电路由燃气传感器、OLED液晶显示模块、按键模块、蜂鸣器报警模块、电磁阀以及SIM800模块等模块组成。燃气传感器模块负责采集燃气浓度数据&#xff0c;采集完成由S…...

watch与computed的区别、运用的场景

computed和watch都是响应式数据变化的重要机制&#xff0c;但它们在功能、使用场景和性能表现上有显著的区别。 主要区别 功能和用途 1、computed&#xff1a;计算属性&#xff0c;用于基于其他数据属性进行计算&#xff0c;并返回一个结果。它具有缓存机制&#xff0c;只有当…...

【ESP32+MicroPython】开发环境部署

本教程将指导你如何在Visual Studio Code&#xff08;VSCode&#xff09;中设置ESP32的MicroPython开发环境。我们将涵盖从安装Python到烧录MicroPython固件的整个过程&#xff0c;以及如何配置VSCode以便与ESP32进行交互。 准备工作 安装Python 确保你的计算机上安装了Pyth…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

Python 高效图像帧提取与视频编码:实战指南

Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...