Kafka 之顺序消息
前言:
在分布式消息系统中,消息的顺序性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现顺序消息的呢?本篇我们将对 Kafka 的顺序消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
顺序消息的使用场景
顺序消息的使用场景众多,这里我简单列举几个如下:
- 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致。
- 电商中下单后,订单创建、支付、订单发货和物流更新的顺序性。
- 手机充值过程中的扣款短信和重置成功的短信应该有顺序性。
- 。。。。等等等场景。
Kafka 如何保证消息的顺序性
讨论 Kafka 消息的顺序性,需要分单分区和多分区来讨论,具体如下:
- 单分区:单分区的消息顺序性相对简单,因为消息在单分区中是相对有序的,只需要保证消息发送顺序和消费顺序即可。
- 多分区:多分区要保证消息有序,就需要额外的设计来保证消息全局有序了。
根据上面的简单分析,我们知道 Kafka 单分区的消息有序相对简单,接下来我们分析一下 Kafka 如何保证单分区消息有序。
Kafka 如何保证单分区消息有序
Kafka 保证单分区消息有序需要从两个方面来讲,一个是消息生产者,一个是消息消费者,具体如下:
消息生产者:
- 使用相同的分区键(Partition Key):生产者发送消息时,指定相同的分区键,使得所有消息都发送到同一个分区。
- 指定消息 key,如果没有指定分区,我们指定一个相同的消息 Key,Kafka 会根据 Key 进行 Hash 计算出一个分区号,如果消息的 Key 相同,那么也会计算一个相同的分区号,消息也会发送到同一个分区了。
- 自定义分区器:如果想要实现更复杂的分区逻辑,可以实现自定义分区器,来达到消息最终到达同一个分区。
消息消费者:
生产这已经保证了消费的发送有序,因此消息消费者使用单线程消费即可。
Kafka 顺序消息实现案例
上面我们对 Kafka 顺序消息的实现做了基本分析,下面我们就使用代码来实现 Kafka 的顺序消息。
Kafka 顺序消息 Producer
在 Producer 中分别实现了两种顺序消息的方式,分别是指定分区和指定 Key,具体代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.ExecutionException;/*** @ClassName: MyKafkaOrderlyProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 顺序消息发送者*/
@Slf4j
@Component
public class MyKafkaOrderlyProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//指定分区public void sendOrderlyByPartitionMessage() {try {this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666创建").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666支付").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//指定 keypublic void sendOrderlyByKeyMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
在 Producer 代码中我们使用了 Kafka 的同步发送消息。
Kafka 顺序消息 Consumer
顺序消息的消费者代码十分简单,还是使用 @KafkaListener 完成消息消费,注意是单线程消费即可。
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @ClassName: MyKafkaConsumer* @Author: zhangyong* @Date: 2024/10/22 19:22* @Description: MyKafkaOrderlyConsumer*/
@Slf4j
@Component
public class MyKafkaOrderlyConsumer {@KafkaListener(id = "my-kafka-order-consumer",groupId = "my-kafka-consumer-groupId",topics = "my-topic",containerFactory = "myContainerFactory")public void listen(String message) {log.info("消息消费成功消息内容:{}", message);}}
Kafka 顺序消息发送消费验证
验证指定分区情况下的顺序消息:
2024-10-28 20:55:18.495 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666发货
消息是按照发送顺序来消费的,结果符合预期。
验证指定 Key 情况下的顺序消息:
2024-10-28 20:56:13.238 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666发货
消息是按照发送顺序来消费的,结果符合预期。
Kafka 自定义分区器
自定义分区器就是按自己的规则来指定消息最终要发送的分区,可以根据自己的需求灵活实现,案例代码中先获取分区数量,然后使用的是 key 的 Hash 值进行 Hash 取模的方式获取分区,具体代码如下:
package com.order.service.kafka;import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** @ClassName: CustomPartitioner* @Author: Author* @Date: 2024/10/28 20:57* @Description:*/
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取 分区数量List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);if (key == null || keyBytes == null && !(key instanceof String)) {throw new BusinessException("key 不能为空且需要是字符串类型");}String keyStr = key.toString();int partition = keyStr.hashCode() % partitionInfos.size();return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
配置自定义分区器
自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器,关键配置如下:
//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
完整的配置 KafkaProducerConfig 配置如下:
package com.order.service.config;import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author :author* @description:* @modified By:* @version: V1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Value("${spring.kafka.producer.properties.linger.ms}")private String lingerMs;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(10);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送消息的大小 默认 16KBprops.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);//生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32Mprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);//批量发送的的最大时间间隔,单位是毫秒props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);//自定义分区器配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}
自定义分区 Consumer 代码案例
自定义分区 Consumer 代码没有什么特殊之处,指定一个 key 即可,key 一致就可以保证消息发送到同一个 Partition 中,保证消息的顺序,具体代码如下:
//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}
自定义分区顺序消息验证
触发消息发送后 debugger 如下:

控制台记录消费日志如下:
2024-10-30 17:24:52.716 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666发货
消息是按顺序消费的,结果符合预期。
总结:Kafka 只能在单个 Partition 中保持消息的顺序存储,要想保证消息的顺序性就必须让需要保持顺序的消息发送到同一个 Partition,对于消费端,消费消息的顺序性只需要保证使用单线程进行消费即可,一般来说比较少用到 Kafka 的顺序消息,这里分享一下还是希望可以帮助到有需要的朋友。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Kafka 之顺序消息
前言: 在分布式消息系统中,消息的顺序性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现顺序消息的呢?本篇我们将对 Kafka 的顺序消息展开讨论。 Kafk…...
Kafka 之批量消息发送消费
前言: 前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。 Kafka 系列文章传送门 Kafka 简介及核心概念讲解 Spring Boot 整合 Kafka 详解 Kafka Kafka…...
【大数据学习 | kafka】kafka的偏移量管理
1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置,防止因为消费者程序宕机而引起断点消费数据丢失问题,下一次可以按照相应的位置从kafka中找寻数据,这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信…...
实景三维赋能森林防灭火指挥调度智慧化
森林防灭火工作是保护森林资源和生态环境的重要任务。随着信息技术的发展,实景三维技术在森林防灭火指挥调度中的应用日益广泛,为提升防灭火工作的效率和效果提供了有力支持。 一、森林防灭火面临的挑战 森林火灾具有突发性强、破坏性大、蔓延速度快、…...
【C++课程学习】:string的模拟实现
🎁个人主页:我们的五年 🔍系列专栏:C课程学习 🎉欢迎大家点赞👍评论📝收藏⭐文章 目录 一.string的主体框架: 二.string的分析: 🍔构造函数和析构函数&a…...
Linux(VMware + CentOS )设置固定ip
需求:设置ip为 192.168.88.130 先关闭虚拟机 启动虚拟机 查看当前自动获取的ip 使用 FinalShell 通过 ssh 服务远程登录系统,更换到 root 用户 修改ip配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 重启网卡 systemctl restart network …...
安卓 android studio各版本下载地址(官方)
https://developer.android.google.cn/studio/archive 别用中文,右上角的语言切换成英文...
如何在一个 Docker 容器中运行多个进程 ?
在容器化的世界里,Docker 彻底改变了开发人员构建、发布和运行应用程序的方式。Docker 容器封装了运行应用程序所需的所有依赖项,使其易于跨不同环境一致地部署。然而,在单个 Docker 容器中管理多个进程可能具有挑战性,这就是 Sup…...
poetry 配置多个cuda环境心得
操作系统:ubuntu22.04 LTS python版本:3.12.7 最近学习了用poetry配置python虚拟环境,当为不同的项目配置cuda时,会遇到不同的项目使用的cuda版本不一致的情况。 像torch 这样的库,它们会对cuda-toolkit有依赖&…...
网络编程入门
目录 1.网络编程入门 1.1 网络编程概述【理解】 1.2 网络编程三要素【理解】 1.3 IP地址【理解】 1.4InetAddress【应用】 1.5端口和协议【理解】 2.UDP通信程序 2.1 UDP发送数据【应用】 2.2UDP接收数据【应用】 2.3UDP通信程序练习【应用】 3.TCP通信程序 3.1TCP…...
Linux-socket详解
Linux-socket详解_socket linux-CSDN博客...
SQL Server 2022安装要求(硬件、软件、操作系统等)
SQL Server 2022安装要求 1、硬件要求2、软件要求3、操作系统支持4、Server Core 支持5、跨语言支持6、磁盘空间要求 1、硬件要求 以下内存和处理器要求适用于所有版本的 SQL Server: 组件要求存储SQL Server 要求最少 6 GB 的可用硬盘驱动器空间。 磁盘空间要求随…...
“众店模式”:创新驱动下的商业新生态
在数字化浪潮的推动下,传统商业模式正经历着前所未有的转型。“众店模式”作为一种新兴的商业模式,以其独特的商业逻辑和创新的玩法,为商家和消费者构建了一个共赢的商业新生态。 一、“众店模式”的核心构成 “众店模式”的成功࿰…...
54. 螺旋矩阵
https://leetcode.cn/problems/spiral-matrix/description/?envTypestudy-plan-v2&envIdtop-100-liked观察示例中的输出轨迹我们可以想到如下设计: 1.在朝某一方向行进到头后的改变方向是确定的,左->下,下->右,右->…...
剧本杀小程序,市场发展下的新机遇
剧本杀作为休闲娱乐的一种游戏方式,在短时间内进入了大众视野中,受到了广泛关注。近几年,剧本杀行业面临着创新挑战,商家需求寻求新的发展机遇,在市场饱和度下降的趋势下,获得市场份额。 随着科技的不断进…...
【系统架构设计师】论文:论基于 ABSD 的软件开发
更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 摘要正文摘要 2022年5月,我就职的公司承接了xx的智慧党建工作,建设“党建红云” 系统,为xx公司的党组织提供觉务管理、服务功能,促进党员学习和党组织交流。我在该项目中承担架构设计师的职责,主导需求分析和…...
为什么OLED透明屏在同类产品中显示效果最好
说起OLED透明屏,这家伙在同类产品里那真的是“一枝独秀”啊!为啥这么说呢?且听我细细道来。 首先,OLED透明屏的透明度那是杠杠的!它不像传统显示屏那样有个固定的背景,而是可以实现像素级的透明效果。这样一…...
深度学习基础知识-Batch Normalization(BN)超详细解析
一、背景和问题定义 在深层神经网络(Deep Neural Networks, DNNs)中,层与层之间的输入分布会随着参数更新不断发生变化,这种现象被称为内部协变量偏移(Internal Covariate Shift)。具体来说,由…...
基于单片机的燃气报警阀门系统
本设计基于单片机的燃气报警阀门系统,燃气报警阀门系统采用STM32主控制器为核心芯片,外围电路由燃气传感器、OLED液晶显示模块、按键模块、蜂鸣器报警模块、电磁阀以及SIM800模块等模块组成。燃气传感器模块负责采集燃气浓度数据,采集完成由S…...
watch与computed的区别、运用的场景
computed和watch都是响应式数据变化的重要机制,但它们在功能、使用场景和性能表现上有显著的区别。 主要区别 功能和用途 1、computed:计算属性,用于基于其他数据属性进行计算,并返回一个结果。它具有缓存机制,只有当…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
虚拟电厂发展三大趋势:市场化、技术主导、车网互联
市场化:从政策驱动到多元盈利 政策全面赋能 2025年4月,国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》,首次明确虚拟电厂为“独立市场主体”,提出硬性目标:2027年全国调节能力≥2000万千瓦࿰…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
