RocketMQ广播消费消息
1、 基础概念
RocketMQ 支持两种消息模式:集群消费
( Clustering )和广播消费
( Broadcasting )。
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。
2、 实现
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 广播模式*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//根据情况修改消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费/**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")* BROADCASTING("BROADCASTING") 广播模式* CLUSTERING("CLUSTERING") 集群模式*/consumer.setMessageModel(MessageModel.BROADCASTING);//根据情况修改消费的topicconsumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}
生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定mq的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {{Message msg = new Message("TopicTest", // 发送的topic"AAA", //tags"BBB", // keys"CCC".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}
相关文章:
RocketMQ广播消费消息
1、 基础概念 RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 集群消费模式(Cluster): 在集群消费模式下,同一个消费者组(…...
C#基础(2)枚举
前言 我们其实在前面已经了解过枚举到底有什么作用,但是那毕竟是概念性的语言,理解起来很抽象,今天我们会具体来讲一讲枚举,并谈一谈它的应用。 希望你能从今天的C#基础中有所收获。 基本概念 1.枚举:是一个比较特…...

Linux之MySQL日志
前言 数据库就像一个庞大的图书馆,而日志则是记录这个图书馆内每一本书的目录。正如在图书馆中找到特定书籍一样,数据库日志帮助我们追溯数据的变更、定位问题和还原状态。 在MySQL中,日志是非常重要的一个组成部分,它记录了数据…...

Redis集群模式—主从集群、哨兵集群、分片集群
主从集群 主从模式中,包括一个主节点(Master)和一个或多个从节点(Slave)。主节点负责处理所有写操作和读操作,而从节点则复制主节点的数据,并且只能处理读操作。当主节点发生故障时,…...

并发工具类(二):CyclicBarrier
1、CyclicBarrier 介绍 从字面上看 CyclicBarrier 就是 一个循环屏障,它也是一个同步助手工具,它允许多个线程 在执行完相应的操作后彼此等待共同到达一个屏障点。 CyclicBarrier可以被循环使用,当屏障点值变为0之后,可以在接下来…...
Spring Cloud全解析:负载均衡之Ribbon简介
Ribbon简介 Ribbon是一种客户端的软件负载均衡算法,将Netflix的中间层服务连接在一起,提供了一系列完善的配置如连接超时、重试等,Ribbon会自动的帮助基于某种规则(如简单轮询、随机连接等)去连接那些机器,也可以自定义的负载均衡…...
Kettle安装与使用指南
1. 介绍 什么是Kettle? Kettle,全称Pentaho Data Integration (PDI),是Pentaho BI套件的一部分。它提供了一个可视化的ETL工具,允许用户通过图形界面设计复杂的数据集成流程。Kettle支持多种数据源,包括关系型数据库…...

教育行业解决方案:智能PPT在教育行业的创新应用
在信息化时代,教育行业面临着巨大的变革。随着人工智能技术的不断发展,传统教学方式正在被重新定义。彩漩科技作为 AI 技术的先行者,推出了歌者 PPT &彩漩 PPT,为教师、学生和家长提供了一种全新的教育体验,实现了…...

Matlab程序练习
Part1 1.求 [100,999] 之间能被 21整除的数的个数。 程序: 主文件:main.m clear; start_num 100; end_num 999; div_num 21; res div(start_num,end_num,div_num); fprintf("[%d,%d]之间能被%d整除的数的个数为%d个\n",start_num,end_…...

cesium可不可以改变影像底图颜色,如何给地球底图影像添加一层滤镜蒙版?
废话:你的球是不是很丑?是不是没有科技感?是不是没有好看的影像? 因果: 因:客户问,底图可不可以改变颜色,想让球更漂亮一些。 答:可以改变影像饱和度,透明度…...

MyBatis-MappedStatement什么时候生成?QueryWrapper如何做到动态生成了SQL?
通过XML配置的MappedStatement 这部分MappedStatement主要是由MybatisXMLMapperBuilder进行解析,核心逻辑如下: 通过注解配置的MappedStatement 核心逻辑就在这个里面了: 继承BaseMapper的MappedStatement 我们看看这个类,里…...
Netty系列-2 NioServerSocketChannel和NioSocketChannel介绍
背景 本文介绍Netty的通道组件NioServerSocketChannel和NioSocketChannel,从源码的角度介绍其实现原理。 1.NioServerSocketChannel Netty本质是对NIO的封装和增强,因此Netty框架中必然包含了对于ServerSocketChannel的构建、配置以及向选择器注册&am…...

智能客服的四大优势,提升企业服务效率
在这个信息化快速发展的时代,客户服务的重要性越来越凸显。传统的客服方式已经无法满足企业日益增长的服务需求,于是智能客服服务应运而生。智能客服服务不仅改变了企业与客户的互动方式,还提高了服务效率和客户满意度。本文将深入探讨智能客…...
AutoGPT开源项目解读
AutoGPT开源项目解读 (qq.com) AutoGPT旨在创建一个自动化的自我改进系统,能够自主执行和学习各种任务 项目基本信息 首先阅读项目的README.md,下述代理和智能体两个名词可互换 项目简介:一个创建和运行智能体的工具,这些智能体…...

Linux离线安装fontconfig
Linux离线下载yum包,安装字体库 一、下载安装包 以CentOS Linux release 7.9.2009下载fontconfig的rpm包的为例 http://mirror.centos.org/centos/7/按提示跳转历史库 找到对应版本的centos https://vault.centos.org/7.9.2009/os/x86_64/Packages/在Packages目…...

海山数据库(He3DB)+AI:(一)神经网络基础
文章目录 1 引言2 基本结构2.1 神经元2.2 模型结构 3 训练过程3.1 损失函数3.2 反向传播3.3 基于梯度的优化算法 4 总结 1 引言 神经网络可以被视为一个万能的拟合器,通过深层的隐藏层实现输入数据到输出结果的映射。神经网络的思想源于对大脑的模拟,在…...
CSS中选择器有哪些?(史上最全选择器)
CSS选择器是用来选择和应用样式到HTML元素上的工具。以下是所有主要的CSS选择器的详细分类和描述: 1. 基本选择器 通配符选择器 (*):选择所有元素。例如,* { color: red; } 会将所有元素的文字颜色设置为红色。元素选择器:选择指…...

本地部署 AI 智能体,Dify 搭建保姆级教程(下):知识库 RAG + API 调用,我捏了一个红楼解读大师
话接上篇: 本地部署 AI 智能体,Dify 搭建保姆级教程(上):工作流 Agent,把 AI 接入个人微信 相信大家已经在本地搭建好 Dify 了。 今日分享,继续介绍 Dify 的另外两项重要功能: 知…...
HarmonyOS应用开发者高级认证,Next版本发布后最新题库 - 答案纯享版
这篇文章是高级题库答案纯享版,只有需要选择的选项。如果需要查看所有选项,可以点击下方链接跳转。以考代学,还是推荐点击下方链接,查看完整的题库,边看边学习鸿蒙应用开发。此题库已更新完毕,笔者将不继续…...
基于PHP的文件包含介绍
引言:在实际开发过程中,经常会遇到部分模块功能需要重复使用的情况,比如数据库的增删改查,文件包含通过将需要重复使用的功能模块代码引入其他文件的内容,实现重用代码、分离配置等。然而,如果文件包含操作…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...

如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...

HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...

论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...