KafkaRocketMQ
Kafka 消息生产与消费流程
1. 消息生产
-
生产者创建消息:
- 指定目标 Topic、Key(可选)、Value。
- 可附加 Header 信息(如时间戳、自定义元数据)。
-
选择分区(Partition):
- 若指定 Key,按 Key 的哈希值分配到对应 Partition。
- 若未指定 Key,按轮询或粘性分区策略分配。
-
发送消息到 Broker:
- 生产者将消息发送至对应 Partition 的 Leader Broker。
- 同步或异步发送,通过
acks
配置确认机制:acks=0
:不等待确认(可能丢失消息)。acks=1
:等待 Leader 写入成功。acks=all
:等待 Leader 和所有 ISR(In-Sync Replicas)副本写入成功。
-
Broker 持久化消息:
- Leader Broker 将消息追加到 Partition 的日志文件(顺序写入)。
- Follower Brokers 从 Leader 拉取消息进行副本同步。
2. 消息消费
-
消费者组订阅 Topic:
- 消费者组(Consumer Group)内的每个消费者分配到一个或多个 Partition。
- 每个 Partition 只能被组内的一个消费者消费。
-
拉取消息(Pull 模式):
- 消费者定期向 Broker 发送拉取请求,指定 Topic、Partition 及 Offset。
- 消费者维护当前消费的 Offset(可存储在 Kafka 内部 Topic 或外部系统)。
-
处理消息:
- 消费者处理消息后提交 Offset(自动或手动提交)。
- 若处理失败,可选择重试或跳过(需业务逻辑处理)。
-
分区再平衡(Rebalance):
- 消费者加入或离开时,触发 Rebalance,重新分配 Partition。
- 通过 Kafka 的 Coordinator(内部组件)管理消费者组状态。
RocketMQ 消息生产与消费流程
1. 消息生产
-
生产者创建消息:
- 指定目标 Topic、Tag(过滤标签)、Key(唯一标识)、Body。
- 可设置事务标识(用于事务消息)。
-
选择消息队列(MessageQueue):
- Topic 下分为多个 MessageQueue(默认 4 个)。
- 生产者按轮询、哈希或手动选择策略发送到某个 MessageQueue。
-
发送消息到 Broker:
- 消息发送至对应的 Broker Master 节点。
- 支持同步、异步、单向(Oneway)发送:
- 同步发送:等待 Broker 返回写入结果。
- 异步发送:通过回调处理结果。
- 单向发送:不等待响应(可能丢失消息)。
-
Broker 持久化消息:
- Broker 将消息写入 CommitLog(全局顺序写入的日志文件)。
- 异步构建 ConsumeQueue(消费队列索引)和 IndexFile(消息检索索引)。
2. 消息消费
-
消费者组订阅 Topic:
- 消费者组(Consumer Group)可设置为集群模式(消息负载均衡)或广播模式(全量广播)。
- 每个 MessageQueue 在同一时刻只能被组内的一个消费者消费。
-
拉取消息(Pull 模式):
- 消费者从 Broker 拉取消息,指定 Topic、MessageQueue 及消费位点(Offset)。
- RocketMQ 支持长轮询(Long Polling)减少无效请求。
-
处理消息:
- 消费者处理消息后返回消费状态(
CONSUME_SUCCESS
或RECONSUME_LATER
)。 - 若消费失败,消息进入重试队列(Retry Topic),最多重试 16 次后进入死信队列(DLQ)。
- 消费者处理消息后返回消费状态(
-
位点管理:
- 消费位点存储在 Broker(集群模式)或本地(广播模式)。
- 支持从指定时间点开始消费(如回溯历史消息)。
Kafka vs RocketMQ 核心差异
维度 | Kafka | RocketMQ |
---|---|---|
设计目标 | 高吞吐、日志流处理 | 高可靠、事务消息、顺序消息 |
存储模型 | Partition 日志文件,每个 Partition 独立存储 | CommitLog 统一存储,异步构建消费队列索引 |
消息确认机制 | 基于 acks 参数控制副本同步 | 支持同步/异步刷盘,主从同步复制 |
事务支持 | 有限支持(需配合外部事务) | 原生支持分布式事务消息(2PC) |
消息重试 | 需自行实现(如死信队列) | 内置重试队列和死信队列 |
消费模式 | 仅集群模式 | 支持集群模式和广播模式 |
运维复杂度 | 依赖 ZooKeeper,部署较复杂 | 依赖 NameServer,部署更轻量 |
1. 消息顺序性
Kafka
- 分区顺序性:Kafka通过分区(Partition)保证顺序。同一分区内的消息按写入顺序存储,消费者按顺序消费。
- 实现方式:
- 生产者需将同一业务键(如订单ID)的消息发送到同一分区(通过指定Key哈希选择分区)。
- 消费者单线程消费同一分区(或使用
max.poll.records=1
避免并发)。
- 局限性:全局顺序需单分区,牺牲扩展性。
RocketMQ
- 队列顺序性:通过队列(Queue)实现顺序性,每个队列内消息有序。
- 实现方式:
- 生产者使用
MessageQueueSelector
将同一业务标识的消息发送到同一队列。 - 消费者通过
MessageListenerOrderly
以加锁方式单线程消费队列。
- 生产者使用
- 支持模式:支持局部顺序(如订单操作)和严格全局顺序(需单队列,性能受限)。
对比
- 相似点:均依赖分区/队列的单线程处理。
- 差异:RocketMQ提供更显式的顺序消费API,Kafka需手动控制分区分配。
2. 消息不丢失
Kafka
- 生产者端:
- 设置
acks=all
:等待所有ISR副本确认写入。 - 启用重试机制(
retries
)和幂等性(enable.idempotence=true
)。
- 设置
- Broker端:
- 消息持久化到磁盘(可配置刷盘策略)。
- 多副本同步(ISR机制),
min.insync.replicas
确保最小存活副本数。
- 消费者端:
- 手动提交偏移量(
enable.auto.commit=false
),处理完消息后提交。
- 手动提交偏移量(
RocketMQ
- 生产者端:
- 同步发送(
sendSync
)等待Broker确认。 - 事务消息机制(两阶段提交)保障事务一致性。
- 同步发送(
- Broker端:
- 同步刷盘(
flushDiskType=SYNC_FLUSH
)确保消息落盘。 - 主从复制(同步双写或异步复制)。
- 同步刷盘(
- 消费者端:
- 消费者处理完成后手动ACK,失败时重试(重试队列+死信队列)。
对比
- 相似点:均依赖生产者确认、持久化、副本同步和消费者手动确认。
- 差异:
- Kafka通过ISR动态管理副本,RocketMQ支持同步刷盘和事务消息。
- RocketMQ的重试队列机制更结构化,Kafka依赖消费者自行处理。
3. 高可用性
Kafka
- 副本机制:
- 每个分区有多个副本,分布在不同Broker。
- Leader处理读写,Follower同步数据,Leader故障时从ISR选举新Leader。
- 控制器(Controller):
- 负责分区Leader选举和集群状态管理。
- 依赖ZooKeeper:
- 存储元数据和Broker协调信息(未来版本将移除ZooKeeper依赖)。
RocketMQ
- 主从架构:
- Broker分Master和Slave,Master处理写请求,Slave异步/同步复制数据。
- Master故障时,Slave可切换为Master(需手动或通过DLedger自动切换)。
- DLedger模式:
- 基于Raft协议实现多副本一致性,自动选举Leader。
- Namesrv:
- 轻量级元数据管理服务(无强一致性依赖),Broker定期注册信息。
对比
- 相似点:均通过多副本和故障转移实现高可用。
- 差异:
- Kafka依赖ZooKeeper协调,RocketMQ使用Namesrv和DLedger。
- RocketMQ的DLedger提供强一致性的自动选主,Kafka的ISR更侧重可用性。
总结
特性 | Kafka | RocketMQ |
---|---|---|
顺序性 | 分区内有序,依赖Key选择分区。 | 队列内有序,显式选择队列和顺序监听器。 |
消息不丢失 | ISR副本同步、生产者ACK、手动提交偏移。 | 同步刷盘、事务消息、主从复制。 |
高可用 | 多副本+ZooKeeper协调。 | 主从+DLedger自动选主+Namesrv。 |
适用场景
-
Kafka:
适合日志采集、流数据处理、实时分析等高吞吐场景,如 ELK 日志系统、用户行为追踪。 -
RocketMQ:
适合金融交易、订单处理、消息重试等高可靠性场景,如电商订单状态同步、支付事务消息。
总结
- Kafka 以吞吐量和水平扩展见长,适合大数据流式处理。
- RocketMQ 以事务消息和可靠性为核心,适合企业级复杂业务场景。
- 选择时需根据业务需求(吞吐量、可靠性、事务支持)及运维成本综合评估。
相关文章:
KafkaRocketMQ
Kafka 消息生产与消费流程 1. 消息生产 生产者创建消息: 指定目标 Topic、Key(可选)、Value。可附加 Header 信息(如时间戳、自定义元数据)。 选择分区(Partition): 若指定 Key&am…...

HarmonyOS Next 中的状态管理
在声明式UI编程框架中,UI是程序状态的运行结果,用户构建了一个UI模型,其中应用的运行时的状态是参数。当参数改变时,UI作为返回结果,也将进行对应的改变。这些运行时的状态变化所带来的UI的重新渲染,在ArkU…...

基于qiime2的16S数据分析全流程:从导入数据到下游分析一条龙
目录 创建metadata 把数据导入qiime2 去除引物序列 双端合并 (dada2不需要) 质控 (dada2不需要) 使用deblur获得特征序列 使用dada2生成代表序列与特征表 物种鉴定 可视化物种鉴定结果 构建进化树(ITS一般不构建进化树…...
【软件测试开发】:软件测试常用函数1.0(C++)
1. 元素的定位 web⾃动化测试的操作核⼼是能够找到⻚⾯对应的元素,然后才能对元素进⾏具体的操作。 常⻅的元素定位⽅式⾮常多,如id,classname,tagname,xpath,cssSelector 常⽤的主要由cssSelector和xpath…...

vue2项目修改浏览器显示的网页图标
1.准备一个新的图标文件,通常是. ico格式,也可以是. Png、. Svg等格式 2.将新的图标文件(例如:faviconAt.png)放入项目的public文件夹中。如下图 public文件夹中的所有文件都会在构建时原样复制到最终的输出目录(通常是dist) 3. 修改vue项目…...

开源、创新与人才发展:机器人产业的战略布局与稚晖君成功案例解析
目录 引言 一、开源:机器人产业的战略布局 促进技术进步和生态建设 吸引人才和合作伙伴 建立标准和网络效应 降低研发风险与成本 二、稚晖君:华为"天才少年计划"的成功典范 深厚的技术积累与动手能力 强烈的探索和创新意识 持续公开…...

线程相关作业
1.创建两个线程,分支线程1拷贝文件的前一部分,分支线程2拷贝文件的后一部分 #include "head.h"#define BUFFER_SIZE 1024// 线程参数结构体,包含文件名和文件偏移量 typedef struct {FILE *src_file;FILE *dest_file;long start_o…...

通义万相2.1开源版本地化部署攻略,生成视频再填利器
2025 年 2 月 25 日晚上 11:00 通义万相 2.1 开源发布,前两周太忙没空搞它,这个周末,也来本地化部署一个,体验生成效果如何,总的来说,它在国内文生视频、图生视频的行列处于领先位置,…...

【模拟CMOS集成电路设计】带隙基准(Bandgap)设计与仿真(基于运放的电流模BGR)
【模拟CMOS集成电路设计】带隙基准(Bandgap)设计与仿真 前言工程文件&部分参数计算过程,私聊~ 一、 设计指标指标分析: 二、 电路分析三、 仿真3.1仿真电路图3.2仿真结果(1)运放增益(2)基准温度系数仿真(3)瞬态启动仿真(4)静态…...

如何选择国产串口屏?
目录 1、迪文 2、淘晶驰 3、广州大彩 4、金玺智控 5、欣瑞达 6、富莱新 7、冠显 8、有彩 串口屏,顾名思义,就是通过串口通信接口(如RS232、RS485、TTL UART等)与主控设备进行通信的显示屏。其核心功能是显示信息和接收输入…...

Solana中的程序派生地址(PDAs):是什么,为什么,以及如何?
程序派生地址 (PDA) 在 Solana 中的应用:什么、为什么和如何? 在学习 Solana 时,你会经常听到关于 程序派生地址 (PDAs) 的讨论。它们就像这样 —— 强大、多功能,而且最重要的是,稍微被误解。如果你是一个开发者&…...

利用FatJar彻底解决Jar包冲突(一)
利用FatJar彻底解决Jar包冲突 序FatJar的加载与隔离⼀、 FatJar概念⼆、FatJar的加载三、FatJar的隔离四、隔离机制验证五、 FatJar的定位六、 打包注意点 序 今天整理旧电脑里的资料,偶然翻到大概10年前实习时写的笔记,之前经常遇到Java依赖冲突的问题…...
Spring MVC笔记
01 什么是Spring MVC Spring MVC 是 Spring 框架中的一个核心模块,专门用于构建 Web 应用程序。它基于经典的 MVC 设计模式(Model-View-Controller),但通过 Spring 的特性(如依赖注入、注解驱动)大幅简化了…...

BurpSuite插件jsEncrypter使用教程
一、前言 在当今Web应用安全测试中,前端加密已成为开发者保护敏感数据的常用手段。然而,这也给安全测试人员带来了挑战,传统的抓包方式难以获取明文数据,测试效率大打折扣。BurpSuite作为一款强大的Web安全测试工具,其…...

【C#实现手写Ollama服务交互,实现本地模型对话】
前言 C#手写Ollama服务交互,实现本地模型对话 最近使用C#调用OllamaSharpe库实现Ollama本地对话,然后思考着能否自己实现这个功能。经过一番查找,和查看OllamaSharpe源码发现确实可以。其实就是开启Ollama服务后,发送HTTP请求&a…...
Android Glide 框架线程管理模块原理的源码级别深入分析
一、引言 在现代的 Android 应用开发中,图片加载是一个常见且重要的功能。Glide 作为一款广泛使用的图片加载框架,以其高效、灵活和易用的特点受到了开发者的青睐。其中,线程管理模块是 Glide 框架中至关重要的一部分,它负责协调…...
每天记录一道Java面试题---day32
MySQL索引的数据结构、各自优劣 回答重点 B树:是一个平衡的多叉树,从根节点到每个叶子节点的高度差不超过1,而且同层级的节点间有指针相互连接。在B树上的常规检索,从根节点到叶子节点的搜索效率基本相当,不会出现大…...

Vue3 Pinia 符合直觉的Vue.js状态管理库
Pinia 符合直觉的Vue.js状态管理库 什么时候使用Pinia 当两个关系非常远的组件,要传递参数时使用Pinia组件的公共参数使用Pinia...
深度学习与大模型基础-向量
大家好!今天我们来聊聊向量(Vector)。别被这个词吓到,其实向量在我们的生活中无处不在,只是我们没注意罢了。 1. 向量是什么? 简单来说,向量就是有大小和方向的量。比如你从家走到学校&#x…...
【网络编程】完成端口 IOCP
10.11 完成端口 10.11.1 基本概念 完成端口的全称是I/O 完成端口,英文为IOCP(I/O Completion Port) 。IOCP是一个异 步I/O 的 API, 可以高效地将I/O 事件通知给应用程序。与使用select() 或是其他异步方法不同 的是,一个套接字与一个完成端口关联了起来…...

铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...

【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Pydantic + Function Calling的结合
1、Pydantic Pydantic 是一个 Python 库,用于数据验证和设置管理,通过 Python 类型注解强制执行数据类型。它广泛用于 API 开发(如 FastAPI)、配置管理和数据解析,核心功能包括: 数据验证:通过…...