当前位置: 首页 > news >正文

RocketMQ教程-(5)-功能特性-消费者分类

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者,本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。

背景信息​

Apache RocketMQ 面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都不一样。了解如下问题,可以帮助您选择更匹配业务场景的消费者类型。

  • 如何实现并发消费:消费者如何使用并发的多线程机制处理消息,以此提高消息处理效率?

  • 如何实现同步、异步消息处理:对于不同的集成场景,消费者获取消息后可能会将消息异步分发到业务逻辑中处理,此时,消息异步化处理如何实现?

  • 如何实现消息可靠处理:消费者处理消息时如何返回响应结果?如何在消息异常情况进行重试,保证消息的可靠处理?

以上问题的具体答案,请参考下文。

功能概述

消息消费流程

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

危险

生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

PushConsumer​

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。

使用方式

PushConsumer的使用方式比较固定,在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑。由 Apache RocketMQ 的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。

示例代码如下:

// 消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()// 设置消费者分组。.setConsumerGroup("YourConsumerGroup")// 设置接入点。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {// 消费消息并返回处理结果。return ConsumeResult.SUCCESS;}}).build();

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。

出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

PushConsumer原理

 可靠性重试

PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。消息重试具体信息,请参见PushConsumer消费重试策略。

使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 Apache RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ 服务端是无法感知的,因此不会进行消费重试。

  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ 服务端同样无法感知,因此也不会进行消费重试。

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。

适用场景

PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:

  • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。

  • 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成

使用方式

SimpleConsumer 的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:

// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()// 设置消费者分组。.setConsumerGroup("YourConsumerGroup")// 设置接入点。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置从服务端接受消息的最大等待时间.setAwaitDuration(Duration.ofSeconds(1)).build();
try {// SimpleConsumer 需要主动获取消息,并处理。List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);// 消费处理完成后,需要主动调用 ACK 提交消费结果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);}});
} catch (ClientException e) {// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。logger.error("Failed to receive message", e);
}

SimpleConsumer主要涉及以下几个接口行为:

 可靠性重试

SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessageAckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。更多信息,请参见SimpleConsumer消费重试策略。

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息。

适用场景

SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:

  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。

  • 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。

  • 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

PullConsumer

使用建议​

PushConsumer合理控制消费耗时,避免无限阻塞

对于PushConsumer消费类型,需要严格控制消息的消费耗时,尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息,建议使用SimpleConsumer,并设置好消费不可见时间。

相关文章:

RocketMQ教程-(5)-功能特性-消费者分类

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者&#xff0c;本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。 背景信息​ Apache RocketMQ 面向不同的业务场景提供了不同消费者类型&…...

Kafka原理剖析

一、简介 Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统&#xff0c;它提供了类似于JMS的特性&#xff0c;但在设计上完全不同&#xff0c;它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性&#xff0c;适用于离线和在线的消息消费&#xff0c;如常规的…...

word怎么转换成pdf?分享几种转换方法

word怎么转换成pdf&#xff1f;将Word文档转换成PDF文件有几个好处。首先&#xff0c;PDF文件通常比Word文档更容易在不同设备和操作系统上查看和共享。其次&#xff0c;PDF文件通常比Word文档更难以修改&#xff0c;这使得它们在需要保护文件内容的情况下更加安全可靠。最后&a…...

基于XDMA 中断模式的 PCIE3.0 QT上位机与FPGA数据交互架构 提供工程源码和QT上位机源码

目录 1、前言2、我已有的PCIE方案3、PCIE理论4、总体设计思路和方案图像产生、发送、缓存数据处理XDMA简介XDMA中断模式图像读取、输出、显示QT上位机及其源码 5、vivado工程详解6、上板调试验证7、福利&#xff1a;工程代码的获取 1、前言 PCIE&#xff08;PCI Express&#…...

Vue 中通用的 css 列表入场动画效果

css 代码 .gradientAnimation {animation-name: gradient;animation-duration: 0.85s;animation-fill-mode: forwards;opacity: 0; }/* 不带前缀的放到最后 */ keyframes gradient {0% {opacity: 0;transform: translate(-100px, 0px);}100% {opacity: 1;transform: translate…...

微分流形2:流形上的矢量场和张量场

来了来了&#xff0c;切向量&#xff0c;切空间。流形上的所有的线性泛函的集合&#xff0c;注意是函数的集合。然后取流形上的某点p&#xff0c;它的切向量为&#xff0c;线性泛函到实数的映射。没错&#xff0c;是函数到实数的映射&#xff0c;是不是想到了求导。我们要逐渐熟…...

C++数组、向量和列表的练习

运行代码&#xff1a; //C数组、向量和列表的练习 #include"std_lib_facilities.h"int main() try {int ii[10] { 0,1,2,3,4,5,6,7,8,9 };for (int i 0; i < 10; i)//把数组中的每个元素值加2ii[i] 2;vector<int>vv(10);for (int i 0; i < 10; i)vv…...

视频剪辑矩阵分发系统Unable to load FFProbe报错技术处理?

问题一 报错处理 对于视频剪辑矩阵分发系统中出现的“Unable to load FFProbe”报错问题&#xff0c;可以采取以下技术处理措施进行解决。 1.检查系统中是否正确安装了FFProbe工具&#xff0c;并确保其路径正确配置。 2.检查系统环境变量是否正确设置&#xff0c;包括FFPr…...

Docker轻量级可视化工具Portainer

Portainer是一个轻量级的管理UI界面&#xff0c;用于管理Docker容器、镜像、卷和网络。它支持端口映射、容器启动、停止、删除、日志查看等功能&#xff0c;同时也提供了可视化的监控和统计功能&#xff0c;可以快速轻松的管理多个Docker主机。Portainer不需要额外安装依赖&…...

功率放大器在电光调制中的应用有哪些

电光调制是一种利用光电效应将电信号转化为光信号的技术。在实现电光调制的过程中&#xff0c;功率放大器作为一个重要的组件&#xff0c;具有对输入电信号进行放大和控制的功能。本文将介绍功率放大器的基本原理、特点以及在电光调制中的应用。 基本原理 功率放大器是一种能够…...

MyBatis入门程序

1.MyBatis 入门程序开发步骤 SqlSession&#xff1a;代表Java程序和数据库之间的会话。&#xff08;HttpSession是Java程序和浏览器之间的会话&#xff09; SqlSessionFactory&#xff1a;是“生产”SqlSession的“工厂”。 工厂模式&#xff1a;如果创建某一个对象&#xff…...

C++快速切换 头文件和源文件

有没有一种快速的方法 &#xff0c; 将头文件中的声明 直接在源文件中自动写出来&#xff0c; 毕竟头文件中已经有声明了&#xff0c; 我只需要写具体实现就行了&#xff0c;没有必要把声明的部分再敲一遍在 Visual Studio 中&#xff0c;你可以使用快速生成函数定义的功能来实…...

对原型、原型链的理解

在 JavaScript 中是使用构造两数来新建一个对象的&#xff0c;每一个构造函数的内部都有一个 prototype 属性&#xff0c;它的属性值是一个对象&#xff0c;这个对象包含了可以由该构造西数的所有实例共享的属性和方法。当使用构造函数新建一个对象后&#xff0c;在这个对象的内…...

7月26日,每日信息差

1、日本经产省将讨论让消费者负担核电站重启费。若被采用&#xff0c;那么即便是与把源自可再生能源作为卖点的新电力公司签约的消费者&#xff0c;也将负担重启核电站的费用 2、国家发改委&#xff1a;电厂存煤和出力均达历史同期最高水平 3、国家深改委&#xff1a;全国统调…...

git修改已经push后的commit注释

回到倒数第8次提交 git rebase -i HEAD~8修改注释&#xff0c;然后把最前面的pick改成edit 修改注释 git commit --amendrebase确认 git rebase --continue强制提交 git push -f origin master参考&#xff1a;https://blog.csdn.net/qq_16942727/article/details/1260355…...

网络云存储服务器,数据库服务器|PetaExpress

云存储服务器是什么&#xff1f; 云存储服务器是一种在线存储(英语:Cloud storage&#xff09;该模式是将数据存储在通常由第三方托管的多个虚拟服务器上&#xff0c;而不是独家服务器上。 云存储服务器有几种结构 架构方法分为两类&#xff1a;一类是通过服务进行架构&…...

java语法基础--基本数据类型

一、数据类型概括 1、整数类型 2、浮点型 3、布尔类型 4、字符类型 二、数据类型的使用 1、整数类型的使用 超出类型范围 //1.1 定义一个byte类型的变量&#xff0c;并且设置它超过byte类型范围// 如果定义的数值在byte类型范围内&#xff0c;那么就能正常使用&#xff0c;//…...

uniapp 微信小程序 预览pdf方法

效果图&#xff1a; 1、在小程序中 // #ifdef MP */ 是区分运行的环境&#xff0c;在小程序中可使用如下方法uni.downloadFile({url: item.link,//文件地址success: function (res) {var filePath res.tempFilePath;uni.openDocument({filePath: filePath,showMenu: false…...

基于vue+uniapp微信小程序公司企业后勤服务(设备)系统

本系统分为用户和管理员两个角色&#xff0c;其中用户可以注册登陆系统&#xff0c;查看公司公告&#xff0c;查看设备&#xff0c;设备入库&#xff0c;查看通讯录&#xff0c;会议室预约&#xff0c;申请出入&#xff0c;申请请假等功能。管理员可以对员工信息&#xff0c;会…...

Linux命令(54)之blkid

Linux命令之blkid 1.blkid介绍 linux命令blkid被用来查询系统块设备文件系统的类型、卷标、UUID等信息 2.blkid用法 blkid [参数] [设备] blkid参数 参数说明-L <卷标>将卷标转换为设备名-U <UUID>将UUID转换为设备名-p转换设备块-i显示I/O信息 3.实例 3.1.查…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

JDK 17 新特性

#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持&#xff0c;不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的&#xff…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...