RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费
- RocketMQ-消息消费模式
- 集群模式
- 集群模式的演示(本身就默认)
- Rocketmq存储队列
- 广播模式
- 顺序消费
- 如何改实现顺序消费
RocketMQ-消息消费模式
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模式的演示(本身就默认)
假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条
Rocketmq存储队列
在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。
为什么是四个队列?
因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
广播模式
在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
顺序消费
实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程
假设我们没有实现顺序消费的时候
创建生产者
1.创建实体类
@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}
2.创建测试类
public class OrderUtil {public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
3.创建生产者类
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}
创建消费者类
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
如何改实现顺序消费
生产者类
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列个数"+list.size());Long orderId = (Long) o;int index = (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}
消费者类
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}
相关文章:

RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模…...

一、Java并发编程之线程、synchronized
黑马课程 文章目录1. Java线程1.1 创建和运行线程方法一:Thread方法二:Runnable(推荐)lambda精简Thread和runnable原理方法三:FutureTask配合Thread1.2 查看进程和线程的方法1.3 线程运行原理栈与栈帧线程上下文切换1.…...

12.hadoop系列之MapReduce分区实践
本文我们学习MapReduce默认分区以及自定义分区实践 当我们要求将统计结果按照条件输出到不同文件(分区),比如按照统计结果将手机归属地不同省份输出到不同文件中(分区) 1.默认Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<…...

有了独自开,一个人就是一个团队
文章目录 简单介绍优点 优秀案例平台福利总结 简单介绍 独自开是一个基于商品与服务交易全流程的PaaS开发平台。对于开发者,独自开可以协助开发者一个人独自开发一套系统。 优点 独自开有独创的分层标准化平台架构,可以满足系统的任何个性化需求。 …...

web期末复习 2023.02.11
文章目录Web 的概念Web 组成用户通过浏览器请求资源的过程:HTML 超文本标记语言CSS插入样式表的方法有三种:对象,类,实例一个完整的 JavaScript 实现是由以下 3 个不同部分组成的:JavaScript 用法什么是 Java Server Pages?JSP 注释JSP 的 J…...
第44章 用户密码实体及其约束规则的定义实现
1 说明: 由当前程序需要兼容实现多种用户密码的加密操作,所以必须把“CustomerPassword”定义为实体类,该类用于用于把加密方式、密钥及其加密后的密码持久化到“CustomerPassword”表中,以便用为用户登录操作提供验证支撑。 如果…...

聊聊并发与锁
持续坚持原创输出,点击蓝字关注我吧1.并发与并行并发可以充分地利用 CPU 资源,一般都会使用多线程实现。多线程的作用是提高任务的平均执行速度,但是会导致程序可理解性变差,编程难度加大。关于对并发与并行的概念,大家…...

开源项目 —— 原生JS实现斗地主游戏 ——代码极少、功能都有、直接粘贴即用
目录 效果如下 目录结构 GameEntity.js GrawGame.js konva.min.js PlayGame.js veriable.js index.html 结语: 前期回顾 卡通形象人物2 写代码-睡觉 丝滑如德芙_0.活在风浪里的博客-CSDN博客本文实现了包含形象的卡通小人吃、睡、电脑工作的网页动画https://…...
Linux第四讲
目录 四、shell脚本 4.1 shell和shell脚本 4.2 脚本语言分类 4.2.1 编译型语言 4.2.2 解释型语言 4.2.3 脚本语言 4.3 shell常见种类 4.3.1 shell分类介绍 4.3.2 查看bash版本 4.3.3 sh和bash的关系 4.4 脚本书写规范 4.4.1 选择解释器 4.4.2 开发规范 4.5 shell…...
Redis 持久化
持久化是指数据写到物理硬盘里,即便程序崩溃、或者电脑重启,依然能够恢复。Redis提供了两种持久化机制:RDB和AOF。 RDB(Redis Database): RDB文件相当于内存快照,保存了某个时间点数据库信息。使用RDB文件恢复很简单,将…...

Python语言零基础入门教程(十三)
Python 字典(Dictionary) 字典是另一种可变容器模型,且可存储任意类型对象。 字典的每个键值 key:value 对用冒号 : 分割,每个键值对之间用逗号 , 分割,整个字典包括在花括号 {} 中 ,格式如下所示: d {key1 : value1, key2 : …...

江苏五年制专转本应该复习几轮?
五年制专转本应该复习几轮? 据调查统计:2022年专转本17%的考生复习三轮及以上,23%的考生复习了两轮。这两类的考生录取率高至85%。可见复习轮数多,专转本上岸的概率也大。综合多方因素,建议同学们专转本复习四轮&#…...

微信小程序的优化方案之主包与分包的研究
什么是分包? 某些情况下,开发者需要将小程序划分成不同的子包,在构建时打包成不同的分包,用户在使用时按需进行加载。 在构建小程序分包项目时,构建会输出一个或多个分包。每个使用分包小程序必定含有一个主包。所谓的…...
从手工测试转型web自动化测试继而转型成专门做自动化测试的学习路线。
在开始之前先自学两个工具 商业web自动化测试工具请自学QTP;QTP的学习可以跳过,我是跳过了的。 开源web自动化测试工具请自学Selenium;我当年是先学watir(耗时1周),再学selenium(也耗时1周&…...
【计组笔记03】计算机组成原理之系统五大部件介绍、主存模型和CPU结构介绍
这篇文章,主要介绍计算机组成原理之系统五大部件、主存模型和CPU结构。 目录 一、计算机五大部件 1.1、体系结构 (1)冯诺依曼体系结构...
微信小程序解析用户加密数据
微信公众号 IT果果日记前言在上一篇文章“微信小程序如何获取用户信息”中我们完成了用户明文数据的校验工作,本文将学习解密用户的非明文用户信息,也就是获取用户的openId和unionId。解密调用wx.getUserProfile后将返回encryptedData和iv两个数据。encr…...

毕业四年换了3份软件测试工作,我为何仍焦虑?
今天一看日历:2023.2.11 ,才突然意识到自己毕业已经四年了。四年时间里一直在测试行业摸爬滚打,现在是时候记录一下了。 下面我来分享下我这4年软件测试经验及成长历程,或许能帮助你解决很多工作中的迷惑。 01、我是如何开始做…...
嵌入式C基础知识(7)
是否可以传递任何参数并从 ISR 返回值不可以。不能传递任何参数并从 ISR 返回值。 ISR 不返回任何内容,并且不允许传递任何参数。 当硬件或软件事件发生时调用 ISR,而代码不会调用它。 这就是为什么不向 ISR 传递参数的原因。 由于代码不调用 ISR&#x…...

大数据系列之:安装pulsar详细步骤
大数据系列之:安装pulsar详细步骤一、Pulsar版本和jdk对应关系二、安装JDK三、设置和激活jdk环境变量四、下载和解压Pulsar五、查看Pulsar目录六、启动Pulsar standalone cluster七、创建Kafka Topic八、往Topic写入数据九、消费pulsar的Topic一、Pulsar版本和jdk对…...

色彩-基础理论
颜色三大指标 色相 色相是颜色的一个属性,只有黑白灰没有色相这个属性(那银灰色是什么?) 颜色的相貌,指的也是给颜色一个名字 例如:暗红、酒红、土黄、墨绿 饱和度 颜色的鲜艳程度 纯度 饱和度主要取决于含色成分和消色成分&a…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

uniapp手机号一键登录保姆级教程(包含前端和后端)
目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...