当前位置: 首页 > news >正文

RocketMQ-消息消费模式 顺序消费

RocketMQ-消息消费模式 顺序消费

  • RocketMQ-消息消费模式
    • 集群模式
      • 集群模式的演示(本身就默认)
      • Rocketmq存储队列
    • 广播模式
  • 顺序消费
    • 如何改实现顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
在这里插入图片描述
在这里插入图片描述

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

在这里插入图片描述在这里插入图片描述

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
在这里插入图片描述

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
在这里插入图片描述
在这里插入图片描述


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}

2.创建测试类

public class OrderUtil {public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

3.创建生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}

创建消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
在这里插入图片描述

如何改实现顺序消费

生产者类

public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列个数"+list.size());Long orderId = (Long) o;int index = (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}

消费者类

public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}

相关文章:

RocketMQ-消息消费模式 顺序消费

RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模…...

一、Java并发编程之线程、synchronized

黑马课程 文章目录1. Java线程1.1 创建和运行线程方法一&#xff1a;Thread方法二&#xff1a;Runnable&#xff08;推荐&#xff09;lambda精简Thread和runnable原理方法三&#xff1a;FutureTask配合Thread1.2 查看进程和线程的方法1.3 线程运行原理栈与栈帧线程上下文切换1.…...

12.hadoop系列之MapReduce分区实践

本文我们学习MapReduce默认分区以及自定义分区实践 当我们要求将统计结果按照条件输出到不同文件(分区)&#xff0c;比如按照统计结果将手机归属地不同省份输出到不同文件中(分区) 1.默认Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<…...

有了独自开,一个人就是一个团队

文章目录 简单介绍优点 优秀案例平台福利总结 简单介绍 独自开是一个基于商品与服务交易全流程的PaaS开发平台。对于开发者&#xff0c;独自开可以协助开发者一个人独自开发一套系统。 优点 独自开有独创的分层标准化平台架构&#xff0c;可以满足系统的任何个性化需求。 …...

web期末复习 2023.02.11

文章目录Web 的概念Web 组成用户通过浏览器请求资源的过程:HTML 超文本标记语言CSS插入样式表的方法有三种:对象&#xff0c;类&#xff0c;实例一个完整的 JavaScript 实现是由以下 3 个不同部分组成的&#xff1a;JavaScript 用法什么是 Java Server Pages?JSP 注释JSP 的 J…...

第44章 用户密码实体及其约束规则的定义实现

1 说明&#xff1a; 由当前程序需要兼容实现多种用户密码的加密操作&#xff0c;所以必须把“CustomerPassword”定义为实体类&#xff0c;该类用于用于把加密方式、密钥及其加密后的密码持久化到“CustomerPassword”表中&#xff0c;以便用为用户登录操作提供验证支撑。 如果…...

聊聊并发与锁

持续坚持原创输出&#xff0c;点击蓝字关注我吧1.并发与并行并发可以充分地利用 CPU 资源&#xff0c;一般都会使用多线程实现。多线程的作用是提高任务的平均执行速度&#xff0c;但是会导致程序可理解性变差&#xff0c;编程难度加大。关于对并发与并行的概念&#xff0c;大家…...

开源项目 —— 原生JS实现斗地主游戏 ——代码极少、功能都有、直接粘贴即用

目录 效果如下 目录结构 GameEntity.js GrawGame.js konva.min.js PlayGame.js veriable.js index.html 结语&#xff1a; 前期回顾 卡通形象人物2 写代码-睡觉 丝滑如德芙_0.活在风浪里的博客-CSDN博客本文实现了包含形象的卡通小人吃、睡、电脑工作的网页动画https://…...

Linux第四讲

目录 四、shell脚本 4.1 shell和shell脚本 4.2 脚本语言分类 4.2.1 编译型语言 4.2.2 解释型语言 4.2.3 脚本语言 4.3 shell常见种类 4.3.1 shell分类介绍 4.3.2 查看bash版本 4.3.3 sh和bash的关系 4.4 脚本书写规范 4.4.1 选择解释器 4.4.2 开发规范 4.5 shell…...

Redis 持久化

持久化是指数据写到物理硬盘里&#xff0c;即便程序崩溃、或者电脑重启&#xff0c;依然能够恢复。Redis提供了两种持久化机制&#xff1a;RDB和AOF。 RDB(Redis Database): RDB文件相当于内存快照&#xff0c;保存了某个时间点数据库信息。使用RDB文件恢复很简单&#xff0c;将…...

Python语言零基础入门教程(十三)

Python 字典(Dictionary) 字典是另一种可变容器模型&#xff0c;且可存储任意类型对象。 字典的每个键值 key:value 对用冒号 : 分割&#xff0c;每个键值对之间用逗号 , 分割&#xff0c;整个字典包括在花括号 {} 中 ,格式如下所示&#xff1a; d {key1 : value1, key2 : …...

江苏五年制专转本应该复习几轮?

五年制专转本应该复习几轮&#xff1f; 据调查统计&#xff1a;2022年专转本17%的考生复习三轮及以上&#xff0c;23%的考生复习了两轮。这两类的考生录取率高至85%。可见复习轮数多&#xff0c;专转本上岸的概率也大。综合多方因素&#xff0c;建议同学们专转本复习四轮&#…...

微信小程序的优化方案之主包与分包的研究

什么是分包&#xff1f; 某些情况下&#xff0c;开发者需要将小程序划分成不同的子包&#xff0c;在构建时打包成不同的分包&#xff0c;用户在使用时按需进行加载。 在构建小程序分包项目时&#xff0c;构建会输出一个或多个分包。每个使用分包小程序必定含有一个主包。所谓的…...

从手工测试转型web自动化测试继而转型成专门做自动化测试的学习路线。

在开始之前先自学两个工具 商业web自动化测试工具请自学QTP&#xff1b;QTP的学习可以跳过&#xff0c;我是跳过了的。 开源web自动化测试工具请自学Selenium&#xff1b;我当年是先学watir&#xff08;耗时1周&#xff09;&#xff0c;再学selenium&#xff08;也耗时1周&…...

【计组笔记03】计算机组成原理之系统五大部件介绍、主存模型和CPU结构介绍

这篇文章,主要介绍计算机组成原理之系统五大部件、主存模型和CPU结构。 目录 一、计算机五大部件 1.1、体系结构 (1)冯诺依曼体系结构...

微信小程序解析用户加密数据

微信公众号 IT果果日记前言在上一篇文章“微信小程序如何获取用户信息”中我们完成了用户明文数据的校验工作&#xff0c;本文将学习解密用户的非明文用户信息&#xff0c;也就是获取用户的openId和unionId。解密调用wx.getUserProfile后将返回encryptedData和iv两个数据。encr…...

毕业四年换了3份软件测试工作,我为何仍焦虑?

​今天一看日历&#xff1a;2023.2.11 &#xff0c;才突然意识到自己毕业已经四年了。四年时间里一直在测试行业摸爬滚打&#xff0c;现在是时候记录一下了。 下面我来分享下我这4年软件测试经验及成长历程&#xff0c;或许能帮助你解决很多工作中的迷惑。 01、我是如何开始做…...

嵌入式C基础知识(7)

是否可以传递任何参数并从 ISR 返回值不可以。不能传递任何参数并从 ISR 返回值。 ISR 不返回任何内容&#xff0c;并且不允许传递任何参数。 当硬件或软件事件发生时调用 ISR&#xff0c;而代码不会调用它。 这就是为什么不向 ISR 传递参数的原因。 由于代码不调用 ISR&#x…...

大数据系列之:安装pulsar详细步骤

大数据系列之&#xff1a;安装pulsar详细步骤一、Pulsar版本和jdk对应关系二、安装JDK三、设置和激活jdk环境变量四、下载和解压Pulsar五、查看Pulsar目录六、启动Pulsar standalone cluster七、创建Kafka Topic八、往Topic写入数据九、消费pulsar的Topic一、Pulsar版本和jdk对…...

色彩-基础理论

颜色三大指标 色相 色相是颜色的一个属性&#xff0c;只有黑白灰没有色相这个属性(那银灰色是什么&#xff1f;) 颜色的相貌&#xff0c;指的也是给颜色一个名字 例如&#xff1a;暗红、酒红、土黄、墨绿 饱和度 颜色的鲜艳程度 纯度 饱和度主要取决于含色成分和消色成分&a…...

从入门到精通:Python开发在Web后端的实战应用

在当今快速发展的互联网时代&#xff0c;Web后端开发作为连接前端界面与数据库的核心&#xff0c;其重要性不言而喻。Python&#xff0c;凭借其简洁的语法、强大的库支持以及活跃的社区&#xff0c;已成为Web后端开发的热门选择。本文将带你从零开始&#xff0c;逐步掌握Python…...

3D设计工作流救星:STL转STEP一键转换,让CAD协作不再卡顿 [特殊字符]

3D设计工作流救星&#xff1a;STL转STEP一键转换&#xff0c;让CAD协作不再卡顿 &#x1f60a; 【免费下载链接】stltostp Convert stl files to STEP brep files 项目地址: https://gitcode.com/gh_mirrors/st/stltostp 您是否遇到过这样的困境&#xff1f;精心设计的3…...

AI编程助手代码质量守护:Quality Guardian MCP实战指南

1. 项目概述&#xff1a;为AI编程助手打造的“质量守门员”如果你和我一样&#xff0c;日常重度依赖 Claude Code、Cursor 这类 AI 编程助手来写代码&#xff0c;那你肯定也遇到过这个头疼的问题&#xff1a;助手写的代码&#xff0c;语法上没问题&#xff0c;但一跑静态检查&a…...

从IMU到GPS:手把手教你用ESKF实现机器人定位(附代码避坑指南)

从IMU到GPS&#xff1a;手把手教你用ESKF实现机器人定位&#xff08;附代码避坑指南&#xff09; 在机器人定位领域&#xff0c;误差状态卡尔曼滤波&#xff08;Error-State Kalman Filter, ESKF&#xff09;正逐渐成为处理IMU和GPS数据融合的主流方法。本文将带您深入理解ESK…...

OpenClaw-Readwise:开源高亮同步工具的设计与实现

1. 项目概述&#xff1a;一个连接知识碎片的“机械爪” 如果你和我一样&#xff0c;是个重度阅读爱好者&#xff0c;并且习惯把在各种地方&#xff08;比如Kindle、网页文章、PDF文档&#xff09;看到的好句子、有启发的段落&#xff0c;用高亮&#xff08;Highlight&#xff…...

从苹果FBI解锁案看现代加密技术与工程师伦理抉择

1. 事件背景与核心争议点2016年初&#xff0c;美国联邦调查局&#xff08;FBI&#xff09;向苹果公司提出了一项史无前例的要求&#xff1a;协助解锁一部属于圣贝纳迪诺枪击案枪手的iPhone 5c。这部手机设置了密码保护&#xff0c;并启用了“数据自毁”功能&#xff0c;即在连续…...

Claude集成Spring Boot全链路实践:从零搭建智能API网关的7步标准化流程

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Claude集成Spring Boot全链路实践&#xff1a;从零搭建智能API网关的7步标准化流程 环境准备与依赖声明 确保 JDK 17、Maven 3.8 和 Spring Boot 3.2.x 基础环境就绪。在 pom.xml 中引入 Claude 官方…...

结构函数:电子封装热分析的关键技术解析

1. 结构函数&#xff1a;热分析领域的核心桥梁在电子封装设计与散热方案开发中&#xff0c;热特性分析一直是个令人头疼的问题。想象一下&#xff0c;你手里拿着一块正在发烫的芯片&#xff0c;却无法直接"看到"热量是如何在内部传递的——这就像医生无法用X光检查病…...

保姆级教程:手把手配置英飞凌TC397开发板的调试环境(含板载MiniWiggler与外部DAP接口详解)

英飞凌TC397开发板调试环境全攻略&#xff1a;从接口选择到实战配置 拿到英飞凌TC397开发板的第一天&#xff0c;面对板载的miniWiggler、引出的DAP接口以及各种调试选项&#xff0c;不少开发者都会陷入选择困难。这块功能强大的开发板确实提供了多种调试路径&#xff0c;但每种…...

LangGraph多智能体系统运维:从部署到监控的自动化方案

LangGraph多智能体系统运维:从部署到监控的全链路自动化方案 一、引言 钩子:你是否也踩过LangGraph上线的这些坑? 上周接到某企业AI团队的紧急求助:他们基于LangGraph搭建的客户服务多智能体系统上线仅3小时就全线崩溃,1.2万条用户咨询全部卡住,技术团队排查了2个小时才…...