当前位置: 首页 > news >正文

RocketMQ-消息消费模式 顺序消费

RocketMQ-消息消费模式 顺序消费

  • RocketMQ-消息消费模式
    • 集群模式
      • 集群模式的演示(本身就默认)
      • Rocketmq存储队列
    • 广播模式
  • 顺序消费
    • 如何改实现顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
在这里插入图片描述
在这里插入图片描述

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

在这里插入图片描述在这里插入图片描述

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
在这里插入图片描述

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
在这里插入图片描述
在这里插入图片描述


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}

2.创建测试类

public class OrderUtil {public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

3.创建生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}

创建消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
在这里插入图片描述

如何改实现顺序消费

生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列个数"+list.size());Long orderId = (Long) o;int index = (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}

消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}

相关文章:

RocketMQ-消息消费模式 顺序消费

RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模…...

一、Java并发编程之线程、synchronized

黑马课程 文章目录1. Java线程1.1 创建和运行线程方法一&#xff1a;Thread方法二&#xff1a;Runnable&#xff08;推荐&#xff09;lambda精简Thread和runnable原理方法三&#xff1a;FutureTask配合Thread1.2 查看进程和线程的方法1.3 线程运行原理栈与栈帧线程上下文切换1.…...

12.hadoop系列之MapReduce分区实践

本文我们学习MapReduce默认分区以及自定义分区实践 当我们要求将统计结果按照条件输出到不同文件(分区)&#xff0c;比如按照统计结果将手机归属地不同省份输出到不同文件中(分区) 1.默认Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<…...

有了独自开,一个人就是一个团队

文章目录 简单介绍优点 优秀案例平台福利总结 简单介绍 独自开是一个基于商品与服务交易全流程的PaaS开发平台。对于开发者&#xff0c;独自开可以协助开发者一个人独自开发一套系统。 优点 独自开有独创的分层标准化平台架构&#xff0c;可以满足系统的任何个性化需求。 …...

web期末复习 2023.02.11

文章目录Web 的概念Web 组成用户通过浏览器请求资源的过程:HTML 超文本标记语言CSS插入样式表的方法有三种:对象&#xff0c;类&#xff0c;实例一个完整的 JavaScript 实现是由以下 3 个不同部分组成的&#xff1a;JavaScript 用法什么是 Java Server Pages?JSP 注释JSP 的 J…...

第44章 用户密码实体及其约束规则的定义实现

1 说明&#xff1a; 由当前程序需要兼容实现多种用户密码的加密操作&#xff0c;所以必须把“CustomerPassword”定义为实体类&#xff0c;该类用于用于把加密方式、密钥及其加密后的密码持久化到“CustomerPassword”表中&#xff0c;以便用为用户登录操作提供验证支撑。 如果…...

聊聊并发与锁

持续坚持原创输出&#xff0c;点击蓝字关注我吧1.并发与并行并发可以充分地利用 CPU 资源&#xff0c;一般都会使用多线程实现。多线程的作用是提高任务的平均执行速度&#xff0c;但是会导致程序可理解性变差&#xff0c;编程难度加大。关于对并发与并行的概念&#xff0c;大家…...

开源项目 —— 原生JS实现斗地主游戏 ——代码极少、功能都有、直接粘贴即用

目录 效果如下 目录结构 GameEntity.js GrawGame.js konva.min.js PlayGame.js veriable.js index.html 结语&#xff1a; 前期回顾 卡通形象人物2 写代码-睡觉 丝滑如德芙_0.活在风浪里的博客-CSDN博客本文实现了包含形象的卡通小人吃、睡、电脑工作的网页动画https://…...

Linux第四讲

目录 四、shell脚本 4.1 shell和shell脚本 4.2 脚本语言分类 4.2.1 编译型语言 4.2.2 解释型语言 4.2.3 脚本语言 4.3 shell常见种类 4.3.1 shell分类介绍 4.3.2 查看bash版本 4.3.3 sh和bash的关系 4.4 脚本书写规范 4.4.1 选择解释器 4.4.2 开发规范 4.5 shell…...

Redis 持久化

持久化是指数据写到物理硬盘里&#xff0c;即便程序崩溃、或者电脑重启&#xff0c;依然能够恢复。Redis提供了两种持久化机制&#xff1a;RDB和AOF。 RDB(Redis Database): RDB文件相当于内存快照&#xff0c;保存了某个时间点数据库信息。使用RDB文件恢复很简单&#xff0c;将…...

Python语言零基础入门教程(十三)

Python 字典(Dictionary) 字典是另一种可变容器模型&#xff0c;且可存储任意类型对象。 字典的每个键值 key:value 对用冒号 : 分割&#xff0c;每个键值对之间用逗号 , 分割&#xff0c;整个字典包括在花括号 {} 中 ,格式如下所示&#xff1a; d {key1 : value1, key2 : …...

江苏五年制专转本应该复习几轮?

五年制专转本应该复习几轮&#xff1f; 据调查统计&#xff1a;2022年专转本17%的考生复习三轮及以上&#xff0c;23%的考生复习了两轮。这两类的考生录取率高至85%。可见复习轮数多&#xff0c;专转本上岸的概率也大。综合多方因素&#xff0c;建议同学们专转本复习四轮&#…...

微信小程序的优化方案之主包与分包的研究

什么是分包&#xff1f; 某些情况下&#xff0c;开发者需要将小程序划分成不同的子包&#xff0c;在构建时打包成不同的分包&#xff0c;用户在使用时按需进行加载。 在构建小程序分包项目时&#xff0c;构建会输出一个或多个分包。每个使用分包小程序必定含有一个主包。所谓的…...

从手工测试转型web自动化测试继而转型成专门做自动化测试的学习路线。

在开始之前先自学两个工具 商业web自动化测试工具请自学QTP&#xff1b;QTP的学习可以跳过&#xff0c;我是跳过了的。 开源web自动化测试工具请自学Selenium&#xff1b;我当年是先学watir&#xff08;耗时1周&#xff09;&#xff0c;再学selenium&#xff08;也耗时1周&…...

【计组笔记03】计算机组成原理之系统五大部件介绍、主存模型和CPU结构介绍

这篇文章,主要介绍计算机组成原理之系统五大部件、主存模型和CPU结构。 目录 一、计算机五大部件 1.1、体系结构 (1)冯诺依曼体系结构...

微信小程序解析用户加密数据

微信公众号 IT果果日记前言在上一篇文章“微信小程序如何获取用户信息”中我们完成了用户明文数据的校验工作&#xff0c;本文将学习解密用户的非明文用户信息&#xff0c;也就是获取用户的openId和unionId。解密调用wx.getUserProfile后将返回encryptedData和iv两个数据。encr…...

毕业四年换了3份软件测试工作,我为何仍焦虑?

​今天一看日历&#xff1a;2023.2.11 &#xff0c;才突然意识到自己毕业已经四年了。四年时间里一直在测试行业摸爬滚打&#xff0c;现在是时候记录一下了。 下面我来分享下我这4年软件测试经验及成长历程&#xff0c;或许能帮助你解决很多工作中的迷惑。 01、我是如何开始做…...

嵌入式C基础知识(7)

是否可以传递任何参数并从 ISR 返回值不可以。不能传递任何参数并从 ISR 返回值。 ISR 不返回任何内容&#xff0c;并且不允许传递任何参数。 当硬件或软件事件发生时调用 ISR&#xff0c;而代码不会调用它。 这就是为什么不向 ISR 传递参数的原因。 由于代码不调用 ISR&#x…...

大数据系列之:安装pulsar详细步骤

大数据系列之&#xff1a;安装pulsar详细步骤一、Pulsar版本和jdk对应关系二、安装JDK三、设置和激活jdk环境变量四、下载和解压Pulsar五、查看Pulsar目录六、启动Pulsar standalone cluster七、创建Kafka Topic八、往Topic写入数据九、消费pulsar的Topic一、Pulsar版本和jdk对…...

色彩-基础理论

颜色三大指标 色相 色相是颜色的一个属性&#xff0c;只有黑白灰没有色相这个属性(那银灰色是什么&#xff1f;) 颜色的相貌&#xff0c;指的也是给颜色一个名字 例如&#xff1a;暗红、酒红、土黄、墨绿 饱和度 颜色的鲜艳程度 纯度 饱和度主要取决于含色成分和消色成分&a…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信

文章目录 Linux C语言网络编程详细入门教程&#xff1a;如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket&#xff08;服务端和客户端都要&#xff09;2. 绑定本地地址和端口&#x…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...