当前位置: 首页 > article >正文

RocketMQ 中如何实现消息的可靠传递?

引言

作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。

在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。

 

生产者端

1. 同步发送消息

生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));try {// 同步发送消息producer.send(msg);} catch (Exception e) {// 发送失败,可进行重试等处理e.printStackTrace();}producer.shutdown();}
}

2. 重试机制

生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。

producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次

3. 消息幂等性处理

为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。

Broker 端

1. 刷盘策略

  • 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
    flushDiskType = SYNC_FLUSH
  • 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。

    2. 主从复制

    RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。

    brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点
    brokerRole = SLAVE # 从 Broker 配置为从节点

    消费者端

    1. 手动提交消费偏移量

    消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class ManualCommitConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 手动提交消费偏移量consumer.setAutoCommit(false);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 处理消息System.out.println(new String(msg.getBody()));} catch (Exception e) {// 处理失败,返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 手动提交消费偏移量context.setAckIndex(msgs.size() - 1);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
    }

    2. 消费重试机制

    当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。

    3. 幂等消费

    消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。

总结

  1. 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
  2. 重试机制:3次重试应该是各个开源框架的默认重试次数。
  3. 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
  4. 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。

相关文章:

RocketMQ 中如何实现消息的可靠传递?

引言 作为头部消息队列开源中间件&#xff0c;学习其中的技术方案并且总结可靠性和健壮性&#xff0c;提升我们的架构思维和解决问题的能力 。 在 RocketMQ 中实现消息的可靠传递可以从多个方面入手&#xff0c;涵盖生产者、Broker 以及消费者等不同环节。 生产者端 1. 同步…...

Elasticsearch+kibana安装(简单易上手)

下载ES( Download Elasticsearch | Elastic ) 将ES安装包解压缩 解压后目录如下: 修改ES服务端口&#xff08;可以不修改&#xff09; 启动ES 记住这些内容 验证ES是否启动成功 下载kibana( Download Kibana Free | Get Started Now | Elastic ) 解压后的kibana目…...

视频多模态模型——视频版ViT

大家好&#xff0c;这里是好评笔记&#xff0c;公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本文详细解读多模态论文《ViViT: A Video Vision Transformer》&#xff0c;2021由google 提出用于视频处理的视觉 Transformer 模型&#xff0c;在视频多模态领域有…...

单机伪分布Hadoop详细配置

目录 1. 引言2. 配置单机Hadoop2.1 下载并解压JDK1.8、Hadoop3.3.62.2 配置环境变量2.3 验证JDK、Hadoop配置 3. 伪分布Hadoop3.1 配置ssh免密码登录3.2 配置伪分布Hadoop3.2.1 修改hadoop-env.sh3.2.2 修改core-site.xml3.2.3 修改hdfs-site.xml3.2.4 修改yarn-site.xml3.2.5 …...

Ollama windows安装

Ollama 是一个开源项目&#xff0c;专注于帮助用户本地化运行大型语言模型&#xff08;LLMs&#xff09;。它提供了一个简单易用的框架&#xff0c;让开发者和个人用户能够在自己的设备上部署和运行 LLMs&#xff0c;而无需依赖云服务或外部 API。这对于需要数据隐私、离线使用…...

鸿蒙next 自定义日历组件

效果图预览 20250124-113957 使用说明 1.选择日期左右箭头&#xff0c;实现每月日历切换&#xff0c;示例中超出当前月份&#xff0c;禁止进入下一月&#xff0c;可在代码更改 2.日历中显示当前选择的日期&#xff0c;选中的日期颜色可自定义 3.日历中可展示历史记录作为数据…...

Nginx 开发总结

文章目录 1. Nginx 基础概念1-1、什么是 Nginx1-2、Nginx 的工作原理1-3、Nginx 的核心特点1-4、Nginx 的常见应用场景1-5、Nginx 与 Apache 的区别1-6、 Nginx 配置的基本结构1-7、Nginx 常见指令 2. Nginx 配置基础2-1、Nginx 配置文件结构2-2、全局配置 (Global Block)2-3、…...

Van-Nav:新年,将自己学习的项目地址统一整理搭建自己的私人导航站,供自己后续查阅使用,做技术的同学应该都有一个自己网站的梦想

嗨&#xff0c;大家好&#xff0c;我是小华同学&#xff0c;关注我们获得“最新、最全、最优质”开源项目和高效工作学习方法 Van-Nav是一个基于Vue.js开发的导航组件库&#xff0c;它提供了多种预设的样式和灵活的配置选项&#xff0c;使得开发者可以轻松地定制出符合项目需求…...

层次聚类构建层次结构的簇

层次聚类&#xff08;Hierarchical Clustering&#xff09;可以通过自定义函数来完成。层次聚类可以分为两种方法&#xff1a;凝聚型&#xff08;Agglomerative&#xff09;和分裂型&#xff08;Divisive&#xff09;。这里主要介绍一种常用的凝聚型方法&#xff0c;它是自底向…...

计算机网络__基础知识问答

Question: 1&#xff09;在计算机网络的5层结构中&#xff0c;每一层的功能大概是什么&#xff1f; 2&#xff09;交换机的功能&#xff1f;https://www.bilibili.com/video/BV1na4y1L7Ev 3&#xff09;路由器的功能&#xff1f;https://www.bilibili.com/video/BV1hv411k7n…...

网易云音乐歌名可视化:词云生成与GitHub-Pages部署实践

引言 本文将基于前一篇爬取的网易云音乐数据, 利用Python的wordcloud、matplotlib等库, 对歌名数据进行深入的词云可视化分析. 我们将探索不同random_state对词云布局的影响, 并详细介绍如何将生成的词云图部署到GitHub Pages, 实现数据可视化的在线展示. 介绍了如何从原始数据…...

PHP根据IP地址获取地理位置城市和经纬度信息

/** 根据IP地址 获取地理位置*/ function getLocationByIP($ip) {$url "http://ip-api.com/json/{$ip}?langzh-CN&fieldsstatus,message,country,countryCode,region,regionName,city,lat,lon,timezone,isp,org,as";$response file_get_contents($url);$data …...

渲染流程概述

渲染流程包括 CPU应用程序端渲染逻辑 和 GPU渲染管线 一、CPU应用程序端渲染逻辑 剔除操作对物体进行渲染排序打包数据调用Shader SetPassCall 和 Drawcall 1.剔除操作 视椎体剔除 &#xff08;给物体一个包围盒&#xff0c;利用包围盒和摄像机的视椎体进行碰撞检测&#xf…...

【单细胞-第三节 多样本数据分析】

文件在单细胞\5_GC_py\1_single_cell\1.GSE183904.Rmd GSE183904 数据原文 1.获取临床信息 筛选样本可以参考临床信息 rm(list ls()) library(tinyarray) a geo_download("GSE183904")$pd head(a) table(a$Characteristics_ch1) #统计各样本有多少2.批量读取 学…...

libOnvif通过组播不能发现相机

使用libOnvif库OnvifDiscoveryClient类&#xff0c; auto discovery new OnvifDiscoveryClient(QUrl(“soap.udp://239.255.255.250:3702”), cb.Build()); 会有错误&#xff1a; end of file or no input: message transfer interrupted or timed out(30 sec max recv delay)…...

项目集成GateWay

文章目录 1.环境搭建1.创建sunrays-common-cloud-gateway-starter模块2.目录结构3.自动配置1.GateWayAutoConfiguration.java2.spring.factories 3.pom.xml4.注意&#xff1a;GateWay不能跟Web一起引入&#xff01; 1.环境搭建 1.创建sunrays-common-cloud-gateway-starter模块…...

2025年01月28日Github流行趋势

项目名称&#xff1a;maybe 项目地址url&#xff1a;https://github.com/maybe-finance/maybe项目语言&#xff1a;Ruby历史star数&#xff1a;37540今日star数&#xff1a;1004项目维护者&#xff1a;zachgoll, apps/dependabot, tmyracle, Shpigford, crnsh项目简介&#xff…...

使用Ollama本地部署DeepSeek R1

前言 DeepSeek是一款开源的智能搜索引擎&#xff0c;能够通过深度学习技术提高搜索的智能化水平。如果你正在寻找一种方式来将DeepSeek部署在本地环境中&#xff0c;Ollama是一个非常方便的工具&#xff0c;它允许你在本地快速部署并管理各种基于AI的模型。 在本篇博客中&…...

doris:异常数据处理

在导入过程中&#xff0c;源数据列与目标列的数据类型可能存在不一致的情况。导入过程会对这些类型不一致的数据进行转换&#xff0c;但在转换过程中可能会出现字段类型不匹配、字段超长、精度不匹配等问题&#xff0c;从而导致转换失败。 为了处理这些异常情况&#xff0c;Do…...

chrome源码剖析—UI架构消息机制

Chrome 浏览器的 UI 架构是高度模块化且基于现代图形技术和用户界面设计理念构建的。它的 UI 架构涵盖了窗口、标签页、控件、通知、菜单等组件的管理和交互。Chrome 的 UI 基本上是通过 views 框架和 Aura&#xff08;Chrome 自己的 UI 层&#xff09;构建的&#xff0c;后者又…...

.NET 9.0 的 Blazor Web App 项目、Bootstrap Blazor 组件库、自定义日志 TLog 使用备忘

一、设计目标&#xff1a;通用、容易修改、使用简单&#xff0c;所有代码保存在一个文件中&#xff0c;方便移植到其他项目使用。 注&#xff1a;示例使用 Bootstrap Blazor 组件库和 EF Core 、Sqlite&#xff0c;需要先使用 Nuget包管理器 添加对应的包。 namespace Blazor…...

单片机基础模块学习——超声波传感器

一、超声波原理 左边发射超声波信号&#xff0c;右边接收超声波信号 左边的芯片用来处理超声波发射信号&#xff0c;中间的芯片用来处理接收的超声波信号 二、超声波原理图 T——transmit 发送R——Recieve 接收 U18芯片对输入的N_A1信号进行放大&#xff0c;然后输入给超声…...

java 正则表达式匹配Matcher 类

Matcher 类 用法 在 Java 中&#xff0c;Matcher 类是用于匹配正则表达式的工具&#xff0c;而 group() 方法是 Matcher 类中的一个重要方法&#xff0c;用于提取匹配结果中的捕获组&#xff08;captured groups&#xff09;。以下是对 group() 方法的详细解释&#xff1a; 1.…...

使用 OpenResty 构建高效的动态图片水印代理服务20250127

使用 OpenResty 构建高效的动态图片水印代理服务 在当今数字化的时代&#xff0c;图片在各种业务场景中广泛应用。为了保护版权、统一品牌形象&#xff0c;动态图片水印功能显得尤为重要。然而&#xff0c;直接在后端服务中集成水印功能&#xff0c;往往会带来代码复杂度增加、…...

Elastic Agent 对 Kafka 的新输出:数据收集和流式传输的无限可能性

作者&#xff1a;来 Elastic Valerio Arvizzigno, Geetha Anne 及 Jeremy Hogan 介绍 Elastic Agent 的新功能&#xff1a;原生输出到 Kafka。借助这一最新功能&#xff0c;Elastic 用户现在可以轻松地将数据路由到 Kafka 集群&#xff0c;从而实现数据流和处理中无与伦比的可扩…...

Elasticsearch 性能测试工具 Loadgen 之 002——命令行及参数详解

上一讲&#xff0c;我们讲解了 Loadgen 的极简部署方式、配置文件、快速使用从 0 到 1 方式。 本讲&#xff0c;我们主要解读一下 Loadgen 的丰富的命令行及参数含义。 有同学可能会说&#xff0c;上面不是介绍很清楚了吗&#xff1f;但&#xff0c;咱们还是有必要详细中文解读…...

书生大模型实战营3

文章目录 L0——入门岛git基础Git 是什么&#xff1f;Git 中的一些基本概念工作区、暂存区和 Git 仓库区文件状态分支主要功能 Git 平台介绍GitHubGitLabGitee Git 下载配置验证下载 Git配置 Git验证 Git配置 Git常用操作Git简易入门四部曲Git其他指令 闯关任务任务1: 破冰活动…...

CTF-web: Python YAML反序列化利用

PyYAML存在以下几个特殊标签,如果这些标签被不安全的解析,会造成解析漏洞 从 PyYaml 版本 6.0 开始&#xff0c;load 的默认加载器已切换到 SafeLoader&#xff0c;以降低远程代码执行的风险。更新后易受攻击的是 yaml.unsafe_load 和 yaml.load(input, Loaderyaml.UnsafeLoade…...

【玩转全栈】----靓号管理系统实现

先赞后看&#xff0c;养成习惯。。。 目录 数据库设置 基本功能 路由器 靓号显示 靓号添加 靓号编辑 视图函数 额外功能 搜索功能 分页 一般逻辑 动态页码 上下页 首尾页 数据库设置 新建一个数据库&#xff08;或者就用之前部门、用户管理的也行&#xff09;&#xff0c;用Dja…...

【Attention】KV Cache

1 什么是KV Cache&#xff1f; 定义&#xff1a;KV Cache 即 Key-Value Cache&#xff0c;是用于加速 Transformer 模型推理长序列过程的一种技术。 核心原理&#xff1a;在 Transformer 的自注意力机制中&#xff0c;将历史输入 token 中的 Key 和 Value 缓存下来&#xff0c…...