RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费
- RocketMQ-消息消费模式
- 集群模式
- 集群模式的演示(本身就默认)
- Rocketmq存储队列
- 广播模式
- 顺序消费
- 如何改实现顺序消费
RocketMQ-消息消费模式
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模式的演示(本身就默认)
假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条
Rocketmq存储队列
在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。
为什么是四个队列?
因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
广播模式
在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
顺序消费
实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程
假设我们没有实现顺序消费的时候
创建生产者
1.创建实体类
@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}
2.创建测试类
public class OrderUtil {public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
3.创建生产者类
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}
创建消费者类
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
如何改实现顺序消费
生产者类
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列个数"+list.size());Long orderId = (Long) o;int index = (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}
消费者类
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("orderTopic","*");//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}
相关文章:

RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模…...

一、Java并发编程之线程、synchronized
黑马课程 文章目录1. Java线程1.1 创建和运行线程方法一:Thread方法二:Runnable(推荐)lambda精简Thread和runnable原理方法三:FutureTask配合Thread1.2 查看进程和线程的方法1.3 线程运行原理栈与栈帧线程上下文切换1.…...

12.hadoop系列之MapReduce分区实践
本文我们学习MapReduce默认分区以及自定义分区实践 当我们要求将统计结果按照条件输出到不同文件(分区),比如按照统计结果将手机归属地不同省份输出到不同文件中(分区) 1.默认Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<…...

有了独自开,一个人就是一个团队
文章目录 简单介绍优点 优秀案例平台福利总结 简单介绍 独自开是一个基于商品与服务交易全流程的PaaS开发平台。对于开发者,独自开可以协助开发者一个人独自开发一套系统。 优点 独自开有独创的分层标准化平台架构,可以满足系统的任何个性化需求。 …...

web期末复习 2023.02.11
文章目录Web 的概念Web 组成用户通过浏览器请求资源的过程:HTML 超文本标记语言CSS插入样式表的方法有三种:对象,类,实例一个完整的 JavaScript 实现是由以下 3 个不同部分组成的:JavaScript 用法什么是 Java Server Pages?JSP 注释JSP 的 J…...
第44章 用户密码实体及其约束规则的定义实现
1 说明: 由当前程序需要兼容实现多种用户密码的加密操作,所以必须把“CustomerPassword”定义为实体类,该类用于用于把加密方式、密钥及其加密后的密码持久化到“CustomerPassword”表中,以便用为用户登录操作提供验证支撑。 如果…...

聊聊并发与锁
持续坚持原创输出,点击蓝字关注我吧1.并发与并行并发可以充分地利用 CPU 资源,一般都会使用多线程实现。多线程的作用是提高任务的平均执行速度,但是会导致程序可理解性变差,编程难度加大。关于对并发与并行的概念,大家…...

开源项目 —— 原生JS实现斗地主游戏 ——代码极少、功能都有、直接粘贴即用
目录 效果如下 目录结构 GameEntity.js GrawGame.js konva.min.js PlayGame.js veriable.js index.html 结语: 前期回顾 卡通形象人物2 写代码-睡觉 丝滑如德芙_0.活在风浪里的博客-CSDN博客本文实现了包含形象的卡通小人吃、睡、电脑工作的网页动画https://…...
Linux第四讲
目录 四、shell脚本 4.1 shell和shell脚本 4.2 脚本语言分类 4.2.1 编译型语言 4.2.2 解释型语言 4.2.3 脚本语言 4.3 shell常见种类 4.3.1 shell分类介绍 4.3.2 查看bash版本 4.3.3 sh和bash的关系 4.4 脚本书写规范 4.4.1 选择解释器 4.4.2 开发规范 4.5 shell…...
Redis 持久化
持久化是指数据写到物理硬盘里,即便程序崩溃、或者电脑重启,依然能够恢复。Redis提供了两种持久化机制:RDB和AOF。 RDB(Redis Database): RDB文件相当于内存快照,保存了某个时间点数据库信息。使用RDB文件恢复很简单,将…...

Python语言零基础入门教程(十三)
Python 字典(Dictionary) 字典是另一种可变容器模型,且可存储任意类型对象。 字典的每个键值 key:value 对用冒号 : 分割,每个键值对之间用逗号 , 分割,整个字典包括在花括号 {} 中 ,格式如下所示: d {key1 : value1, key2 : …...

江苏五年制专转本应该复习几轮?
五年制专转本应该复习几轮? 据调查统计:2022年专转本17%的考生复习三轮及以上,23%的考生复习了两轮。这两类的考生录取率高至85%。可见复习轮数多,专转本上岸的概率也大。综合多方因素,建议同学们专转本复习四轮&#…...

微信小程序的优化方案之主包与分包的研究
什么是分包? 某些情况下,开发者需要将小程序划分成不同的子包,在构建时打包成不同的分包,用户在使用时按需进行加载。 在构建小程序分包项目时,构建会输出一个或多个分包。每个使用分包小程序必定含有一个主包。所谓的…...
从手工测试转型web自动化测试继而转型成专门做自动化测试的学习路线。
在开始之前先自学两个工具 商业web自动化测试工具请自学QTP;QTP的学习可以跳过,我是跳过了的。 开源web自动化测试工具请自学Selenium;我当年是先学watir(耗时1周),再学selenium(也耗时1周&…...
【计组笔记03】计算机组成原理之系统五大部件介绍、主存模型和CPU结构介绍
这篇文章,主要介绍计算机组成原理之系统五大部件、主存模型和CPU结构。 目录 一、计算机五大部件 1.1、体系结构 (1)冯诺依曼体系结构...
微信小程序解析用户加密数据
微信公众号 IT果果日记前言在上一篇文章“微信小程序如何获取用户信息”中我们完成了用户明文数据的校验工作,本文将学习解密用户的非明文用户信息,也就是获取用户的openId和unionId。解密调用wx.getUserProfile后将返回encryptedData和iv两个数据。encr…...

毕业四年换了3份软件测试工作,我为何仍焦虑?
今天一看日历:2023.2.11 ,才突然意识到自己毕业已经四年了。四年时间里一直在测试行业摸爬滚打,现在是时候记录一下了。 下面我来分享下我这4年软件测试经验及成长历程,或许能帮助你解决很多工作中的迷惑。 01、我是如何开始做…...
嵌入式C基础知识(7)
是否可以传递任何参数并从 ISR 返回值不可以。不能传递任何参数并从 ISR 返回值。 ISR 不返回任何内容,并且不允许传递任何参数。 当硬件或软件事件发生时调用 ISR,而代码不会调用它。 这就是为什么不向 ISR 传递参数的原因。 由于代码不调用 ISR&#x…...

大数据系列之:安装pulsar详细步骤
大数据系列之:安装pulsar详细步骤一、Pulsar版本和jdk对应关系二、安装JDK三、设置和激活jdk环境变量四、下载和解压Pulsar五、查看Pulsar目录六、启动Pulsar standalone cluster七、创建Kafka Topic八、往Topic写入数据九、消费pulsar的Topic一、Pulsar版本和jdk对…...

色彩-基础理论
颜色三大指标 色相 色相是颜色的一个属性,只有黑白灰没有色相这个属性(那银灰色是什么?) 颜色的相貌,指的也是给颜色一个名字 例如:暗红、酒红、土黄、墨绿 饱和度 颜色的鲜艳程度 纯度 饱和度主要取决于含色成分和消色成分&a…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...