RocketMQ 中如何实现消息的可靠传递?
引言
作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。
在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。
生产者端
1. 同步发送消息
生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));try {// 同步发送消息producer.send(msg);} catch (Exception e) {// 发送失败,可进行重试等处理e.printStackTrace();}producer.shutdown();}
}
2. 重试机制
生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。
producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次
3. 消息幂等性处理
为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。
Broker 端
1. 刷盘策略
- 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
flushDiskType = SYNC_FLUSH - 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。
2. 主从复制
RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。
brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点 brokerRole = SLAVE # 从 Broker 配置为从节点消费者端
1. 手动提交消费偏移量
消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class ManualCommitConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 手动提交消费偏移量consumer.setAutoCommit(false);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 处理消息System.out.println(new String(msg.getBody()));} catch (Exception e) {// 处理失败,返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 手动提交消费偏移量context.setAckIndex(msgs.size() - 1);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} }2. 消费重试机制
当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。
3. 幂等消费
消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。
总结
- 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
- 重试机制:3次重试应该是各个开源框架的默认重试次数。
- 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
- 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。
相关文章:
RocketMQ 中如何实现消息的可靠传递?
引言 作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。 在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。 生产者端 1. 同步…...
Elasticsearch+kibana安装(简单易上手)
下载ES( Download Elasticsearch | Elastic ) 将ES安装包解压缩 解压后目录如下: 修改ES服务端口(可以不修改) 启动ES 记住这些内容 验证ES是否启动成功 下载kibana( Download Kibana Free | Get Started Now | Elastic ) 解压后的kibana目…...
视频多模态模型——视频版ViT
大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细解读多模态论文《ViViT: A Video Vision Transformer》,2021由google 提出用于视频处理的视觉 Transformer 模型,在视频多模态领域有…...
单机伪分布Hadoop详细配置
目录 1. 引言2. 配置单机Hadoop2.1 下载并解压JDK1.8、Hadoop3.3.62.2 配置环境变量2.3 验证JDK、Hadoop配置 3. 伪分布Hadoop3.1 配置ssh免密码登录3.2 配置伪分布Hadoop3.2.1 修改hadoop-env.sh3.2.2 修改core-site.xml3.2.3 修改hdfs-site.xml3.2.4 修改yarn-site.xml3.2.5 …...
Ollama windows安装
Ollama 是一个开源项目,专注于帮助用户本地化运行大型语言模型(LLMs)。它提供了一个简单易用的框架,让开发者和个人用户能够在自己的设备上部署和运行 LLMs,而无需依赖云服务或外部 API。这对于需要数据隐私、离线使用…...
鸿蒙next 自定义日历组件
效果图预览 20250124-113957 使用说明 1.选择日期左右箭头,实现每月日历切换,示例中超出当前月份,禁止进入下一月,可在代码更改 2.日历中显示当前选择的日期,选中的日期颜色可自定义 3.日历中可展示历史记录作为数据…...
Nginx 开发总结
文章目录 1. Nginx 基础概念1-1、什么是 Nginx1-2、Nginx 的工作原理1-3、Nginx 的核心特点1-4、Nginx 的常见应用场景1-5、Nginx 与 Apache 的区别1-6、 Nginx 配置的基本结构1-7、Nginx 常见指令 2. Nginx 配置基础2-1、Nginx 配置文件结构2-2、全局配置 (Global Block)2-3、…...
Van-Nav:新年,将自己学习的项目地址统一整理搭建自己的私人导航站,供自己后续查阅使用,做技术的同学应该都有一个自己网站的梦想
嗨,大家好,我是小华同学,关注我们获得“最新、最全、最优质”开源项目和高效工作学习方法 Van-Nav是一个基于Vue.js开发的导航组件库,它提供了多种预设的样式和灵活的配置选项,使得开发者可以轻松地定制出符合项目需求…...
层次聚类构建层次结构的簇
层次聚类(Hierarchical Clustering)可以通过自定义函数来完成。层次聚类可以分为两种方法:凝聚型(Agglomerative)和分裂型(Divisive)。这里主要介绍一种常用的凝聚型方法,它是自底向…...
计算机网络__基础知识问答
Question: 1)在计算机网络的5层结构中,每一层的功能大概是什么? 2)交换机的功能?https://www.bilibili.com/video/BV1na4y1L7Ev 3)路由器的功能?https://www.bilibili.com/video/BV1hv411k7n…...
网易云音乐歌名可视化:词云生成与GitHub-Pages部署实践
引言 本文将基于前一篇爬取的网易云音乐数据, 利用Python的wordcloud、matplotlib等库, 对歌名数据进行深入的词云可视化分析. 我们将探索不同random_state对词云布局的影响, 并详细介绍如何将生成的词云图部署到GitHub Pages, 实现数据可视化的在线展示. 介绍了如何从原始数据…...
PHP根据IP地址获取地理位置城市和经纬度信息
/** 根据IP地址 获取地理位置*/ function getLocationByIP($ip) {$url "http://ip-api.com/json/{$ip}?langzh-CN&fieldsstatus,message,country,countryCode,region,regionName,city,lat,lon,timezone,isp,org,as";$response file_get_contents($url);$data …...
渲染流程概述
渲染流程包括 CPU应用程序端渲染逻辑 和 GPU渲染管线 一、CPU应用程序端渲染逻辑 剔除操作对物体进行渲染排序打包数据调用Shader SetPassCall 和 Drawcall 1.剔除操作 视椎体剔除 (给物体一个包围盒,利用包围盒和摄像机的视椎体进行碰撞检测…...
【单细胞-第三节 多样本数据分析】
文件在单细胞\5_GC_py\1_single_cell\1.GSE183904.Rmd GSE183904 数据原文 1.获取临床信息 筛选样本可以参考临床信息 rm(list ls()) library(tinyarray) a geo_download("GSE183904")$pd head(a) table(a$Characteristics_ch1) #统计各样本有多少2.批量读取 学…...
libOnvif通过组播不能发现相机
使用libOnvif库OnvifDiscoveryClient类, auto discovery new OnvifDiscoveryClient(QUrl(“soap.udp://239.255.255.250:3702”), cb.Build()); 会有错误: end of file or no input: message transfer interrupted or timed out(30 sec max recv delay)…...
项目集成GateWay
文章目录 1.环境搭建1.创建sunrays-common-cloud-gateway-starter模块2.目录结构3.自动配置1.GateWayAutoConfiguration.java2.spring.factories 3.pom.xml4.注意:GateWay不能跟Web一起引入! 1.环境搭建 1.创建sunrays-common-cloud-gateway-starter模块…...
2025年01月28日Github流行趋势
项目名称:maybe 项目地址url:https://github.com/maybe-finance/maybe项目语言:Ruby历史star数:37540今日star数:1004项目维护者:zachgoll, apps/dependabot, tmyracle, Shpigford, crnsh项目简介ÿ…...
使用Ollama本地部署DeepSeek R1
前言 DeepSeek是一款开源的智能搜索引擎,能够通过深度学习技术提高搜索的智能化水平。如果你正在寻找一种方式来将DeepSeek部署在本地环境中,Ollama是一个非常方便的工具,它允许你在本地快速部署并管理各种基于AI的模型。 在本篇博客中&…...
doris:异常数据处理
在导入过程中,源数据列与目标列的数据类型可能存在不一致的情况。导入过程会对这些类型不一致的数据进行转换,但在转换过程中可能会出现字段类型不匹配、字段超长、精度不匹配等问题,从而导致转换失败。 为了处理这些异常情况,Do…...
chrome源码剖析—UI架构消息机制
Chrome 浏览器的 UI 架构是高度模块化且基于现代图形技术和用户界面设计理念构建的。它的 UI 架构涵盖了窗口、标签页、控件、通知、菜单等组件的管理和交互。Chrome 的 UI 基本上是通过 views 框架和 Aura(Chrome 自己的 UI 层)构建的,后者又…...
.NET 9.0 的 Blazor Web App 项目、Bootstrap Blazor 组件库、自定义日志 TLog 使用备忘
一、设计目标:通用、容易修改、使用简单,所有代码保存在一个文件中,方便移植到其他项目使用。 注:示例使用 Bootstrap Blazor 组件库和 EF Core 、Sqlite,需要先使用 Nuget包管理器 添加对应的包。 namespace Blazor…...
单片机基础模块学习——超声波传感器
一、超声波原理 左边发射超声波信号,右边接收超声波信号 左边的芯片用来处理超声波发射信号,中间的芯片用来处理接收的超声波信号 二、超声波原理图 T——transmit 发送R——Recieve 接收 U18芯片对输入的N_A1信号进行放大,然后输入给超声…...
java 正则表达式匹配Matcher 类
Matcher 类 用法 在 Java 中,Matcher 类是用于匹配正则表达式的工具,而 group() 方法是 Matcher 类中的一个重要方法,用于提取匹配结果中的捕获组(captured groups)。以下是对 group() 方法的详细解释: 1.…...
使用 OpenResty 构建高效的动态图片水印代理服务20250127
使用 OpenResty 构建高效的动态图片水印代理服务 在当今数字化的时代,图片在各种业务场景中广泛应用。为了保护版权、统一品牌形象,动态图片水印功能显得尤为重要。然而,直接在后端服务中集成水印功能,往往会带来代码复杂度增加、…...
Elastic Agent 对 Kafka 的新输出:数据收集和流式传输的无限可能性
作者:来 Elastic Valerio Arvizzigno, Geetha Anne 及 Jeremy Hogan 介绍 Elastic Agent 的新功能:原生输出到 Kafka。借助这一最新功能,Elastic 用户现在可以轻松地将数据路由到 Kafka 集群,从而实现数据流和处理中无与伦比的可扩…...
Elasticsearch 性能测试工具 Loadgen 之 002——命令行及参数详解
上一讲,我们讲解了 Loadgen 的极简部署方式、配置文件、快速使用从 0 到 1 方式。 本讲,我们主要解读一下 Loadgen 的丰富的命令行及参数含义。 有同学可能会说,上面不是介绍很清楚了吗?但,咱们还是有必要详细中文解读…...
书生大模型实战营3
文章目录 L0——入门岛git基础Git 是什么?Git 中的一些基本概念工作区、暂存区和 Git 仓库区文件状态分支主要功能 Git 平台介绍GitHubGitLabGitee Git 下载配置验证下载 Git配置 Git验证 Git配置 Git常用操作Git简易入门四部曲Git其他指令 闯关任务任务1: 破冰活动…...
CTF-web: Python YAML反序列化利用
PyYAML存在以下几个特殊标签,如果这些标签被不安全的解析,会造成解析漏洞 从 PyYaml 版本 6.0 开始,load 的默认加载器已切换到 SafeLoader,以降低远程代码执行的风险。更新后易受攻击的是 yaml.unsafe_load 和 yaml.load(input, Loaderyaml.UnsafeLoade…...
【玩转全栈】----靓号管理系统实现
先赞后看,养成习惯。。。 目录 数据库设置 基本功能 路由器 靓号显示 靓号添加 靓号编辑 视图函数 额外功能 搜索功能 分页 一般逻辑 动态页码 上下页 首尾页 数据库设置 新建一个数据库(或者就用之前部门、用户管理的也行),用Dja…...
【Attention】KV Cache
1 什么是KV Cache? 定义:KV Cache 即 Key-Value Cache,是用于加速 Transformer 模型推理长序列过程的一种技术。 核心原理:在 Transformer 的自注意力机制中,将历史输入 token 中的 Key 和 Value 缓存下来,…...
