当前位置: 首页 > article >正文

Kafka 如何保证顺序消费

在消息队列的应用场景中,保证消息的顺序消费对于一些业务至关重要,例如金融交易中的订单处理、电商系统的库存变更等。Kafka 作为高性能的分布式消息队列系统,通过巧妙的设计和配置,能够实现消息的顺序消费。接下来,我们将深入探讨 Kafka 保证顺序消费的原理与方法,并结合图文进行详细说明。

一、Kafka 顺序消费的基础:分区特性

Kafka 的主题(Topic)由多个分区(Partition)组成,每个分区都是一个有序的、不可变的消息序列。在单个分区内,消息按照生产者发送的顺序依次追加到日志文件中,并且每个消息都有唯一的偏移量(Offset)来标识其在分区内的位置。这就意味着,在同一个分区内,消息天然是有序的,这是 Kafka 实现顺序消费的基础。

1.1 分区的作用

分区的存在使得 Kafka 能够实现水平扩展,提高消息处理的并行度。多个生产者可以同时向不同的分区发送消息,多个消费者也可以同时从不同的分区消费消息。然而,这种并行处理的方式可能会导致消息在主题层面的顺序被打乱,因为不同分区之间的消息是相互独立的,没有严格的顺序关系。所以,要保证消息的顺序消费,就需要对分区进行合理的管理和控制。

1.2 分区与消息顺序的关系

如下图所示,一个主题包含三个分区,每个分区内的消息都是有序的,但分区之间的消息顺序无法保证。例如,分区 1 中的消息 M1、M2、M3 按顺序写入,分区 2 中的消息 N1、N2、N3 也按顺序写入,但从主题整体来看,无法确定 M1 和 N1 谁先被处理。

二、保证顺序消费的方法

2.1 生产者端:控制消息发送到同一分区

为了保证消息的顺序消费,生产者需要将具有顺序依赖关系的消息发送到同一个分区。可以通过以下几种方式实现:

  • 自定义分区器:开发者可以实现Partitioner接口来自定义分区策略。例如,在电商系统中,可以根据订单 ID 的哈希值将同一订单相关的消息发送到同一个分区。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 假设key为订单ID,通过哈希值取模分配到固定分区
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> configs) {}
}

在生产者配置中指定自定义分区器:

Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());

  • 固定分区策略:如果业务场景允许,也可以直接将消息发送到固定的分区。例如,在一些简单的日志记录场景中,将所有日志消息都发送到分区 0。

ProducerRecord<String, String> record = new ProducerRecord<>("log_topic", 0, "log_key", "log_message");

2.2 消费者端:一个分区仅由一个消费者消费

在消费者端,为了确保分区内的消息按顺序消费,需要保证一个分区只能被消费者组内的一个消费者消费。Kafka 的消费者组机制天然支持这一点,当消费者组内的消费者数量小于或等于分区数量时,Kafka 会自动将分区分配给消费者,且每个分区最多被一个消费者消费。

如下图所示,消费者组中有两个消费者,主题有四个分区,Kafka 会将分区 0 和 1 分配给消费者 1,分区 2 和 3 分配给消费者 2,这样每个消费者都能按顺序消费自己负责的分区内的消息。

2.3 消费者端:控制消费线程

如果消费者使用多线程处理消息,需要注意控制线程的消费顺序,避免出现乱序消费的情况。一种常见的方法是为每个分区分配一个独立的消费线程,确保同一分区的消息在同一个线程中按顺序处理。例如:

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class OrderedConsumer {
    private static final int THREAD_POOL_SIZE = 10;
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executorService;
    private final Map<String, Thread> partitionThreads = new ConcurrentHashMap<>();
    public OrderedConsumer(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
        this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }
    public void startConsuming() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    String partition = record.partition() + "";
                    Thread thread = partitionThreads.get(partition);
                    if (thread == null ||!thread.isAlive()) {
                        thread = new Thread(() -> consumePartition(partition));
                        partitionThreads.put(partition, thread);
                        executorService.submit(thread);
                    }
                }
            }
        } finally {
            consumer.close();
            executorService.shutdown();
        }
    }
    private void consumePartition(String partition) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                if (record.partition() + "".equals(partition)) {
                    // 处理消息
                    processMessage(record);
                }
            }
        }
    }
    private void processMessage(ConsumerRecord<String, String> record) {
        // 具体的消息处理逻辑
        System.out.println("Consumed message: " + record.value());
    }
}

三、顺序消费的应用场景

3.1 金融交易场景

在金融交易系统中,订单的创建、支付、退款等操作必须按照顺序进行处理,否则可能会导致资金错误或业务逻辑混乱。通过将同一订单的相关消息发送到同一个分区,并保证分区按顺序消费,可以确保订单操作的正确性和一致性。

3.2 数据库变更日志

在数据库的变更数据捕获(Change Data Capture,CDC)场景中,Kafka 可以用于记录数据库表的增删改操作。为了保证数据库状态的一致性,这些变更日志必须按照顺序消费和应用。利用 Kafka 的顺序消费特性,能够准确地将数据库变更同步到其他系统。

3.3 电商库存管理

在电商系统中,库存的扣减和回补操作需要严格按顺序执行,否则可能会出现超卖或库存数据不准确的问题。将库存相关的消息发送到同一分区并顺序消费,可以保证库存操作的准确性。

Kafka 通过分区特性、生产者分区策略以及消费者消费方式的控制,能够有效地保证消息的顺序消费。在实际应用中,开发者需要根据具体的业务场景,合理配置和使用这些机制,以满足业务对消息顺序性的要求。同时,也要注意顺序消费可能带来的性能影响,在保证顺序的前提下,通过合理的优化措施提高系统的整体性能。

相关文章:

Kafka 如何保证顺序消费

在消息队列的应用场景中&#xff0c;保证消息的顺序消费对于一些业务至关重要&#xff0c;例如金融交易中的订单处理、电商系统的库存变更等。Kafka 作为高性能的分布式消息队列系统&#xff0c;通过巧妙的设计和配置&#xff0c;能够实现消息的顺序消费。接下来&#xff0c;我…...

【算法题】算法一本通

每周更新至完结&#xff0c;建议关注收藏点赞。 目录 待整理文章已整理的文章方法论思想总结模版工具总结排序 数组与哈希表栈双指针&#xff08;滑动窗口、二分查找、链表&#xff09;树前缀树堆 优先队列&#xff08;区间/间隔问题、贪心 &#xff09;回溯图一维DP位操作数学…...

Modbus转Ethernet IP赋能挤出吹塑机智能监控

在现代工业自动化领域&#xff0c;小疆智控Modbus转Ethernet IP网关GW-EIP-001与挤出吹塑机的应用越来越广泛。这篇文章将为您详细解读这两者的结合是如何提高生产效率&#xff0c;降低维护成本的。首先了解什么是Modbus和Ethernet IP。Modbus是一种串行通信协议&#xff0c;它…...

C++中如何遍历map?

文章目录 1. 使用范围for循环&#xff08;C11及以上&#xff09;2. 使用迭代器3. 使用反向迭代器注意事项 在C中&#xff0c; std::map 是一种关联容器&#xff0c;它存储的是键值对&#xff08;key-value pairs&#xff09;&#xff0c;并且按键的顺序进行排序。遍历 std::m…...

什么是终端安全管理系统(终端安全管理软件2024科普)

在当今数字化迅速发展的时代&#xff0c;企业面临着越来越多的信息安全威胁。为了应对这些挑战&#xff0c;保障公司数据的安全性和完整性&#xff0c;终端安全管理系统&#xff08;Endpoint Security Management System&#xff09;应运而生。 本文将为您深入浅出地科普2024年…...

书籍转圈打印矩阵(8)0604

题目 给定一个整型矩阵matrix&#xff0c;请按照转圈的方式打印它。 例如&#xff1a; 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 打印结果为&#xff1a;1&#xff0c;2&#xff0c;3&#xff…...

【JVM】Java类加载机制

【JVM】Java类加载机制 什么是类加载&#xff1f; 在 Java 的世界里&#xff0c;每一个类或接口在经过编译后&#xff0c;都会生成对应的 .class 字节码文件。 所谓类加载机制&#xff0c;就是 JVM 将这些 .class 文件中的二进制数据加载到内存中&#xff0c;并对其进行校验…...

Elasticsearch中的自定义分析器(Custom Analyzer)介绍

在 Elasticsearch 中,自定义分析器(Custom Analyzer) 是一种可配置的文本处理组件,允许用户通过组合分词器(Tokenizer)、过滤器(Token Filter)和字符过滤器(Character Filter)来定义特定的文本分析逻辑。这使得 Elasticsearch 能够针对不同语言、业务场景或特殊需求,…...

《C++初阶之入门基础》【C++的前世今生】

【C的前世今生】目录 前言&#xff1a;---------------起源---------------一、历史背景二、横空出世---------------发展---------------三、标准立世C98&#xff1a;首个国际标准版本C03&#xff1a;小修订版本 四、现代进化C11&#xff1a;现代C的开端C14&#xff1a;对C11的…...

Apache APISIX

目录 Apache APISIX是什么&#xff1f; Lua Lua 的主要特点&#xff1a; Lua 的常见应用&#xff1a; CVE-2020-13945(Apache APISIX默认API Token导致远程Lua代码执行) ​编辑Lua脚本解析 CVE-2021-45232(Apache APISIX Dashboard API权限绕过导致RCE) Apache …...

如何在 git dev 中创建合并请求

先将 自己的代码 推到 自己的远程的 分支上 在 创建 合并请求 根据提示 将 自己的远程的 源码 合并到 对应的分支上 然后 创建 合并请求 等待 对应的 人 来 进行合并就行...

基于nlohmann/json 实现 从C++对象转换成JSON数据格式

C对象的JSON序列化与反序列化 基于JsonCpp库实现C对象序列化与反序列化 JSON 介绍 JSON作为一种轻量级的数据交换格式&#xff0c;在Web服务和应用程序中广泛使用。 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读…...

Java枚举类映射MySQL的深度解析与实践指南

Java枚举类映射MySQL的深度解析与实践指南 一、枚举类型映射的四大核心策略 1. 序数映射法&#xff08;ordinal映射&#xff09; ​​实现原理​​&#xff1a;存储枚举值的下标顺序 public enum OrderStatus {PENDING, // 存储为0PROCESSING, // 存储为1SHIPPED, //…...

代码训练LeetCode(21)跳跃游戏2

代码训练(21)LeetCode之跳跃游戏2 Author: Once Day Date: 2025年6月4日 漫漫长路&#xff0c;才刚刚开始… 全系列文章可参考专栏: 十年代码训练_Once-Day的博客-CSDN博客 参考文章: 45. 跳跃游戏 II - 力扣&#xff08;LeetCode&#xff09;力扣 (LeetCode) 全球极客挚爱…...

【HarmonyOS 5】鸿蒙APP使用【团结引擎Unity】开发的案例教程

以下是基于团结引擎开发鸿蒙Unity应用的详细案例教程&#xff0c;整合环境配置、工程适配、跨语言通信等核心环节 一、环境配置&#xff08;关键前置步骤&#xff09; 1. ‌工具安装‌ ‌工具‌‌版本要求‌‌作用‌团结引擎Hub≥1.2.3Unity鸿蒙项目构建管理DevEco Studio≥…...

《T/CI 404-2024 医疗大数据智能采集及管理技术规范》全面解读与实施分析

规范背景与详细信息 《T/CI 404-2024 医疗大数据智能采集及管理技术规范》是由中国国际科技促进会联合河南科技大学、河南科技大学第一附属医院、深圳市人民医院等十余家医疗机构与企业共同制定的团体标准,于2024年5月正式发布实施。该规范是我国医疗大数据领域的重要技术标准…...

国产三维CAD皇冠CAD在「金属压力容器制造」建模教程:蒸汽锅炉

面对蒸汽锅炉设计中复杂的曲面封头、密集的管板开孔、多变的支撑结构以及严格的强度与安全规范&#xff08;如GB150、ASME等&#xff09;&#xff0c;传统二维设计手段往往捉襟见肘&#xff0c;易出错、效率低、协同难。国产三维CAD皇冠CAD&#xff08;CrownCAD&#xff09;凭借…...

Mysql避免索引失效

1. 在索引列上使用函数或表达式 问题描述 SELECT * FROM users WHERE YEAR(create_time) 2023; 如果create_time列上有索引&#xff0c;上述查询会导致索引失效&#xff0c;因为MySQL无法直接利用索引的B树结构。 解决方法 将函数应用于条件值&#xff0c;而不是列&#…...

python爬虫:Ruia的详细使用(一个基于asyncio和aiohttp的异步爬虫框架)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Ruia概述1.1 Ruia介绍1.2 Ruia特点1.3 安装Ruia1.4 使用案例二、基本使用2.1 Request 请求2.2 Response - 响应2.3 Item - 数据提取2.4 Field 提取数据2.5 Spider - 爬虫类2.6 Middleware - 中间件三、高级功能3.1 …...

C++中单例模式详解

在C中&#xff0c;单例模式 (Singleton Pattern) 确保一个类只有一个实例&#xff0c;并提供一个全局访问点来获取这个实例。这在需要一个全局对象来协调整个系统行为的场景中非常有用。 为什么要有单例模式&#xff1f; 在许多项目中&#xff0c;某些类从逻辑上讲只需要一个实…...

舆情监控系统爬虫技术解析

之前我已经详细解释过爬虫在系统中的角色和技术要点&#xff0c;这次需要更聚焦“如何实现”这个动作。 我注意到上次回复偏重架构设计&#xff0c;这次应该拆解为更具体的操作步骤&#xff1a;从目标定义到数据落地的完整流水线。尤其要强调动态调度这个容易被忽视的环节——…...

Windows上用FFmpeg采集摄像头推流 → MediaMTX服务器转发流 → WSL2上拉流播放

1. Windows上 FFmpeg 推流&#xff08;摄像头采集&#xff09; 设备名称可用 ffmpeg -list_devices true -f dshow -i dummy 查询&#xff0c;假设为Integrated Camera 采集推流示例&#xff08;推RTMP到MediaMTX&#xff09;&#xff1a; ffmpeg -rtbufsize 100M -f dshow …...

cpp多线程学习

1.thread std::thread是 C11 引入的跨平台线程管理类&#xff0c;封装了操作系统的线程 API&#xff08;如 pthread、Windows 线程&#xff09;&#xff0c;提供统一的线程操作接口。线程的生命周期由join()和detach()控制。 thread在创建时就开始执行 join()&#xff1a;阻…...

Vue3中Ant-design-vue的使用-附完整代码

前言 首先介绍一下什么是Ant-design-vue Ant Design Vue 是基于 Vue 3 的企业级 UI 组件库&#xff08;同时兼容 Vue 2&#xff09;&#xff0c;是蚂蚁金服开源项目 Ant Design 的 Vue 实现版本。它遵循 Ant Design 的设计规范&#xff0c;提供丰富的组件和高质量的设计体系&…...

k8s热更新-subPath 不支持热更新

文章目录 k8s热更新-subPath 不支持热更新背景subPath 不支持热更新1. 为什么 subPath 不支持热更新&#xff1f;2. 挂载整个目录为何支持热更新&#xff1f;使用demo举例&#xff1a;挂载整个目录&#xff08;不使用 subPath&#xff09; k8s热更新-subPath 不支持热更新 背景…...

Redis Sorted Set 深度解析:从原理到实战应用

Redis Sorted Set 深度解析&#xff1a;从原理到实战应用 在 Redis 丰富的数据结构家族中&#xff0c;Sorted Set&#xff08;有序集合&#xff09;凭借独特的设计和强大的功能&#xff0c;成为处理有序数据场景的得力工具。无论是构建实时排行榜&#xff0c;还是实现基于时间的…...

docker中组合这几个命令来排查 import 模块失败 的问题

pwd ls echo $PYTHONPATH这三个命令是你在 Linux 或 Docker 容器中常用来「查看环境状态」的基础命令。 ✅ 1. echo $PYTHONPATH &#x1f50d; 含义 这是在查看当前的 Python 模块搜索路径。 &#x1f9e0; 分解解释&#xff1a; echo&#xff1a;打印某个变量的值&#x…...

若依框架修改模板,添加通过excel导入数据功能

版本&#xff1a;我后端使用的是RuoYi-Vue-fast版本&#xff0c;前端是RuoYi-Vue3 需求: 我需要每个侧边栏功能都需要具有导入excel功能&#xff0c;但是若依只有用户才具备&#xff0c;我需要代码生成的每个功能都拥有导入功能。​ 每次生成一个一个改实在是太麻烦了。索性…...

web全栈开发学习-01html基础

背景 最近在付费网站学习web全栈开发&#xff0c;记录一下阶段性学习。今天刚好学完html基础&#xff0c;跟着教程画了个基础的网站。 样品展示: 开发工具 vscode Visual Studio Code - Code Editing. Redefined 常用插件 Prettier&#xff1a;格式优化 Live Sever:实时调…...

基于Socketserver+ThreadPoolExecutor+Thread构造的TCP网络实时通信程序

目录 介绍&#xff1a; 源代码&#xff1a; Socketserver-服务端代码 Socketserver客户端代码&#xff1a; 介绍&#xff1a; socketserver是一种传统的传输层网络编程接口&#xff0c;相比WebSocket这种应用层的协议来说&#xff0c;socketserver比较底层&#xff0c;soc…...