当前位置: 首页 > news >正文

Kafka 之顺序消息

前言:

在分布式消息系统中,消息的顺序性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现顺序消息的呢?本篇我们将对 Kafka 的顺序消息展开讨论。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

顺序消息的使用场景

顺序消息的使用场景众多,这里我简单列举几个如下:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致。
  • 电商中下单后,订单创建、支付、订单发货和物流更新的顺序性。
  • 手机充值过程中的扣款短信和重置成功的短信应该有顺序性。
  • 。。。。等等等场景。

Kafka 如何保证消息的顺序性

讨论 Kafka 消息的顺序性,需要分单分区和多分区来讨论,具体如下:

  • 单分区:单分区的消息顺序性相对简单,因为消息在单分区中是相对有序的,只需要保证消息发送顺序和消费顺序即可。
  • 多分区:多分区要保证消息有序,就需要额外的设计来保证消息全局有序了。

根据上面的简单分析,我们知道 Kafka 单分区的消息有序相对简单,接下来我们分析一下 Kafka 如何保证单分区消息有序。

Kafka 如何保证单分区消息有序

Kafka 保证单分区消息有序需要从两个方面来讲,一个是消息生产者,一个是消息消费者,具体如下:

消息生产者:

  • 使用相同的分区键(Partition Key):生产者发送消息时,指定相同的分区键,使得所有消息都发送到同一个分区。
  • 指定消息 key,如果没有指定分区,我们指定一个相同的消息 Key,Kafka 会根据 Key 进行 Hash 计算出一个分区号,如果消息的 Key 相同,那么也会计算一个相同的分区号,消息也会发送到同一个分区了。
  • 自定义分区器:如果想要实现更复杂的分区逻辑,可以实现自定义分区器,来达到消息最终到达同一个分区。

消息消费者:

生产这已经保证了消费的发送有序,因此消息消费者使用单线程消费即可。

Kafka 顺序消息实现案例

上面我们对 Kafka 顺序消息的实现做了基本分析,下面我们就使用代码来实现 Kafka 的顺序消息。

Kafka 顺序消息 Producer

在 Producer 中分别实现了两种顺序消息的方式,分别是指定分区和指定 Key,具体代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.ExecutionException;/*** @ClassName: MyKafkaOrderlyProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 顺序消息发送者*/
@Slf4j
@Component
public class MyKafkaOrderlyProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//指定分区public void sendOrderlyByPartitionMessage() {try {this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666创建").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666支付").get();this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//指定 keypublic void sendOrderlyByKeyMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}

在 Producer 代码中我们使用了 Kafka 的同步发送消息。

Kafka 顺序消息 Consumer

顺序消息的消费者代码十分简单,还是使用 @KafkaListener 完成消息消费,注意是单线程消费即可。

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @ClassName: MyKafkaConsumer* @Author: zhangyong* @Date: 2024/10/22 19:22* @Description: MyKafkaOrderlyConsumer*/
@Slf4j
@Component
public class MyKafkaOrderlyConsumer {@KafkaListener(id = "my-kafka-order-consumer",groupId = "my-kafka-consumer-groupId",topics = "my-topic",containerFactory = "myContainerFactory")public void listen(String message) {log.info("消息消费成功消息内容:{}", message);}}

Kafka 顺序消息发送消费验证

验证指定分区情况下的顺序消息:

2024-10-28 20:55:18.495  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

验证指定 Key 情况下的顺序消息:

2024-10-28 20:56:13.238  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

Kafka 自定义分区器

自定义分区器就是按自己的规则来指定消息最终要发送的分区,可以根据自己的需求灵活实现,案例代码中先获取分区数量,然后使用的是 key 的 Hash 值进行 Hash 取模的方式获取分区,具体代码如下:

package com.order.service.kafka;import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** @ClassName: CustomPartitioner* @Author: Author* @Date: 2024/10/28 20:57* @Description:*/
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取 分区数量List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);if (key == null || keyBytes == null && !(key instanceof String)) {throw new BusinessException("key 不能为空且需要是字符串类型");}String keyStr = key.toString();int partition = keyStr.hashCode() % partitionInfos.size();return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

配置自定义分区器

自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器,关键配置如下:

//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

完整的配置 KafkaProducerConfig 配置如下:

package com.order.service.config;import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author :author* @description:* @modified By:* @version: V1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Value("${spring.kafka.producer.properties.linger.ms}")private String lingerMs;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(10);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送消息的大小 默认 16KBprops.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);//生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32Mprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);//批量发送的的最大时间间隔,单位是毫秒props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);//自定义分区器配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

自定义分区 Consumer 代码案例

自定义分区 Consumer 代码没有什么特殊之处,指定一个 key 即可,key 一致就可以保证消息发送到同一个 Partition 中,保证消息的顺序,具体代码如下:

//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {try {this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}

自定义分区顺序消息验证

触发消息发送后 debugger 如下:

在这里插入图片描述

控制台记录消费日志如下:

2024-10-30 17:24:52.716  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按顺序消费的,结果符合预期。

总结:Kafka 只能在单个 Partition 中保持消息的顺序存储,要想保证消息的顺序性就必须让需要保持顺序的消息发送到同一个 Partition,对于消费端,消费消息的顺序性只需要保证使用单线程进行消费即可,一般来说比较少用到 Kafka 的顺序消息,这里分享一下还是希望可以帮助到有需要的朋友。

如有不正确的地方欢迎各位指出纠正。

相关文章:

Kafka 之顺序消息

前言&#xff1a; 在分布式消息系统中&#xff0c;消息的顺序性是一个重要的问题&#xff0c;也是一个常见的业务场景&#xff0c;那 Kafka 作为一个高性能的分布式消息中间件&#xff0c;又是如何实现顺序消息的呢&#xff1f;本篇我们将对 Kafka 的顺序消息展开讨论。 Kafk…...

Kafka 之批量消息发送消费

前言&#xff1a; 前面我们分享了 Kafka 的一些基础知识&#xff0c;以及 Spring Boot 集成 Kafka 完成消息发送消费&#xff0c;本篇我们来分享一下 Kafka 的批量消息发送消费。 Kafka 系列文章传送门 Kafka 简介及核心概念讲解 Spring Boot 整合 Kafka 详解 Kafka Kafka…...

【大数据学习 | kafka】kafka的偏移量管理

1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置&#xff0c;防止因为消费者程序宕机而引起断点消费数据丢失问题&#xff0c;下一次可以按照相应的位置从kafka中找寻数据&#xff0c;这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信…...

实景三维赋能森林防灭火指挥调度智慧化

森林防灭火工作是保护森林资源和生态环境的重要任务。随着信息技术的发展&#xff0c;实景三维技术在森林防灭火指挥调度中的应用日益广泛&#xff0c;为提升防灭火工作的效率和效果提供了有力支持。 一、森林防灭火面临的挑战 森林火灾具有突发性强、破坏性大、蔓延速度快、…...

【C++课程学习】:string的模拟实现

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 一.string的主体框架&#xff1a; 二.string的分析&#xff1a; &#x1f354;构造函数和析构函数&a…...

Linux(VMware + CentOS )设置固定ip

需求&#xff1a;设置ip为 192.168.88.130 先关闭虚拟机 启动虚拟机 查看当前自动获取的ip 使用 FinalShell 通过 ssh 服务远程登录系统&#xff0c;更换到 root 用户 修改ip配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 重启网卡 systemctl restart network …...

安卓 android studio各版本下载地址(官方)

https://developer.android.google.cn/studio/archive 别用中文&#xff0c;右上角的语言切换成英文...

如何在一个 Docker 容器中运行多个进程 ?

在容器化的世界里&#xff0c;Docker 彻底改变了开发人员构建、发布和运行应用程序的方式。Docker 容器封装了运行应用程序所需的所有依赖项&#xff0c;使其易于跨不同环境一致地部署。然而&#xff0c;在单个 Docker 容器中管理多个进程可能具有挑战性&#xff0c;这就是 Sup…...

poetry 配置多个cuda环境心得

操作系统&#xff1a;ubuntu22.04 LTS python版本&#xff1a;3.12.7 最近学习了用poetry配置python虚拟环境&#xff0c;当为不同的项目配置cuda时&#xff0c;会遇到不同的项目使用的cuda版本不一致的情况。 像torch 这样的库&#xff0c;它们会对cuda-toolkit有依赖&…...

网络编程入门

目录 1.网络编程入门 1.1 网络编程概述【理解】 1.2 网络编程三要素【理解】 1.3 IP地址【理解】 1.4InetAddress【应用】 1.5端口和协议【理解】 2.UDP通信程序 2.1 UDP发送数据【应用】 2.2UDP接收数据【应用】 2.3UDP通信程序练习【应用】 3.TCP通信程序 3.1TCP…...

Linux-socket详解

Linux-socket详解_socket linux-CSDN博客...

SQL Server 2022安装要求(硬件、软件、操作系统等)

SQL Server 2022安装要求 1、硬件要求2、软件要求3、操作系统支持4、Server Core 支持5、跨语言支持6、磁盘空间要求 1、硬件要求 以下内存和处理器要求适用于所有版本的 SQL Server&#xff1a; 组件要求存储SQL Server 要求最少 6 GB 的可用硬盘驱动器空间。 磁盘空间要求随…...

“众店模式”:创新驱动下的商业新生态

在数字化浪潮的推动下&#xff0c;传统商业模式正经历着前所未有的转型。“众店模式”作为一种新兴的商业模式&#xff0c;以其独特的商业逻辑和创新的玩法&#xff0c;为商家和消费者构建了一个共赢的商业新生态。 一、“众店模式”的核心构成 “众店模式”的成功&#xff0…...

54. 螺旋矩阵

https://leetcode.cn/problems/spiral-matrix/description/?envTypestudy-plan-v2&envIdtop-100-liked观察示例中的输出轨迹我们可以想到如下设计&#xff1a; 1.在朝某一方向行进到头后的改变方向是确定的&#xff0c;左->下&#xff0c;下->右&#xff0c;右->…...

剧本杀小程序,市场发展下的新机遇

剧本杀作为休闲娱乐的一种游戏方式&#xff0c;在短时间内进入了大众视野中&#xff0c;受到了广泛关注。近几年&#xff0c;剧本杀行业面临着创新挑战&#xff0c;商家需求寻求新的发展机遇&#xff0c;在市场饱和度下降的趋势下&#xff0c;获得市场份额。 随着科技的不断进…...

【系统架构设计师】论文:论基于 ABSD 的软件开发

更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 摘要正文摘要 2022年5月,我就职的公司承接了xx的智慧党建工作,建设“党建红云” 系统,为xx公司的党组织提供觉务管理、服务功能,促进党员学习和党组织交流。我在该项目中承担架构设计师的职责,主导需求分析和…...

为什么OLED透明屏在同类产品中显示效果最好

说起OLED透明屏&#xff0c;这家伙在同类产品里那真的是“一枝独秀”啊&#xff01;为啥这么说呢&#xff1f;且听我细细道来。 首先&#xff0c;OLED透明屏的透明度那是杠杠的&#xff01;它不像传统显示屏那样有个固定的背景&#xff0c;而是可以实现像素级的透明效果。这样一…...

深度学习基础知识-Batch Normalization(BN)超详细解析

一、背景和问题定义 在深层神经网络&#xff08;Deep Neural Networks, DNNs&#xff09;中&#xff0c;层与层之间的输入分布会随着参数更新不断发生变化&#xff0c;这种现象被称为内部协变量偏移&#xff08;Internal Covariate Shift&#xff09;。具体来说&#xff0c;由…...

基于单片机的燃气报警阀门系统

本设计基于单片机的燃气报警阀门系统&#xff0c;燃气报警阀门系统采用STM32主控制器为核心芯片&#xff0c;外围电路由燃气传感器、OLED液晶显示模块、按键模块、蜂鸣器报警模块、电磁阀以及SIM800模块等模块组成。燃气传感器模块负责采集燃气浓度数据&#xff0c;采集完成由S…...

watch与computed的区别、运用的场景

computed和watch都是响应式数据变化的重要机制&#xff0c;但它们在功能、使用场景和性能表现上有显著的区别。 主要区别 功能和用途 1、computed&#xff1a;计算属性&#xff0c;用于基于其他数据属性进行计算&#xff0c;并返回一个结果。它具有缓存机制&#xff0c;只有当…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

linux arm系统烧录

1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 &#xff08;忘了有没有这步了 估计有&#xff09; 刷机程序 和 镜像 就不提供了。要刷的时…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程

STM32F1 本教程使用零知标准板&#xff08;STM32F103RBT6&#xff09;通过I2C驱动ICM20948九轴传感器&#xff0c;实现姿态解算&#xff0c;并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化&#xff0c;适合嵌入式及物联网开发者。在基础驱动上新增…...