当前位置: 首页 > news >正文

RocketMQ广播消费消息

1、 基础概念

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费模式(Cluster)
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。

广播消费模式(Broadcast)
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。

怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。

2、 实现

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 广播模式*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//根据情况修改消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费/**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")*     BROADCASTING("BROADCASTING") 广播模式*     CLUSTERING("CLUSTERING") 集群模式*/consumer.setMessageModel(MessageModel.BROADCASTING);//根据情况修改消费的topicconsumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}

生产者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定mq的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {{Message msg = new Message("TopicTest", // 发送的topic"AAA",  //tags"BBB", // keys"CCC".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}

相关文章:

RocketMQ广播消费消息

1、 基础概念 RocketMQ 支持两种消息模式&#xff1a;集群消费&#xff08; Clustering &#xff09;和广播消费&#xff08; Broadcasting &#xff09;。 集群消费模式&#xff08;Cluster&#xff09;&#xff1a; 在集群消费模式下&#xff0c;同一个消费者组&#xff08…...

C#基础(2)枚举

前言 我们其实在前面已经了解过枚举到底有什么作用&#xff0c;但是那毕竟是概念性的语言&#xff0c;理解起来很抽象&#xff0c;今天我们会具体来讲一讲枚举&#xff0c;并谈一谈它的应用。 希望你能从今天的C#基础中有所收获。 基本概念 1.枚举&#xff1a;是一个比较特…...

Linux之MySQL日志

前言 数据库就像一个庞大的图书馆&#xff0c;而日志则是记录这个图书馆内每一本书的目录。正如在图书馆中找到特定书籍一样&#xff0c;数据库日志帮助我们追溯数据的变更、定位问题和还原状态。 在MySQL中&#xff0c;日志是非常重要的一个组成部分&#xff0c;它记录了数据…...

Redis集群模式—主从集群、哨兵集群、分片集群

主从集群 主从模式中&#xff0c;包括一个主节点&#xff08;Master&#xff09;和一个或多个从节点&#xff08;Slave&#xff09;。主节点负责处理所有写操作和读操作&#xff0c;而从节点则复制主节点的数据&#xff0c;并且只能处理读操作。当主节点发生故障时&#xff0c;…...

并发工具类(二):CyclicBarrier

1、CyclicBarrier 介绍 从字面上看 CyclicBarrier 就是 一个循环屏障&#xff0c;它也是一个同步助手工具&#xff0c;它允许多个线程 在执行完相应的操作后彼此等待共同到达一个屏障点。 CyclicBarrier可以被循环使用&#xff0c;当屏障点值变为0之后&#xff0c;可以在接下来…...

Spring Cloud全解析:负载均衡之Ribbon简介

Ribbon简介 Ribbon是一种客户端的软件负载均衡算法&#xff0c;将Netflix的中间层服务连接在一起&#xff0c;提供了一系列完善的配置如连接超时、重试等&#xff0c;Ribbon会自动的帮助基于某种规则(如简单轮询、随机连接等)去连接那些机器&#xff0c;也可以自定义的负载均衡…...

Kettle安装与使用指南

1. 介绍 什么是Kettle&#xff1f; Kettle&#xff0c;全称Pentaho Data Integration (PDI)&#xff0c;是Pentaho BI套件的一部分。它提供了一个可视化的ETL工具&#xff0c;允许用户通过图形界面设计复杂的数据集成流程。Kettle支持多种数据源&#xff0c;包括关系型数据库…...

教育行业解决方案:智能PPT在教育行业的创新应用

在信息化时代&#xff0c;教育行业面临着巨大的变革。随着人工智能技术的不断发展&#xff0c;传统教学方式正在被重新定义。彩漩科技作为 AI 技术的先行者&#xff0c;推出了歌者 PPT &彩漩 PPT&#xff0c;为教师、学生和家长提供了一种全新的教育体验&#xff0c;实现了…...

Matlab程序练习

Part1 1.求 [100,999] 之间能被 21整除的数的个数。 程序&#xff1a; 主文件&#xff1a;main.m clear; start_num 100; end_num 999; div_num 21; res div(start_num,end_num,div_num); fprintf("[%d,%d]之间能被%d整除的数的个数为%d个\n",start_num,end_…...

cesium可不可以改变影像底图颜色,如何给地球底图影像添加一层滤镜蒙版?

废话&#xff1a;你的球是不是很丑&#xff1f;是不是没有科技感&#xff1f;是不是没有好看的影像&#xff1f; 因果&#xff1a; 因&#xff1a;客户问&#xff0c;底图可不可以改变颜色&#xff0c;想让球更漂亮一些。 答&#xff1a;可以改变影像饱和度&#xff0c;透明度…...

MyBatis-MappedStatement什么时候生成?QueryWrapper如何做到动态生成了SQL?

通过XML配置的MappedStatement 这部分MappedStatement主要是由MybatisXMLMapperBuilder进行解析&#xff0c;核心逻辑如下&#xff1a; 通过注解配置的MappedStatement 核心逻辑就在这个里面了&#xff1a; 继承BaseMapper的MappedStatement 我们看看这个类&#xff0c;里…...

Netty系列-2 NioServerSocketChannel和NioSocketChannel介绍

背景 本文介绍Netty的通道组件NioServerSocketChannel和NioSocketChannel&#xff0c;从源码的角度介绍其实现原理。 1.NioServerSocketChannel Netty本质是对NIO的封装和增强&#xff0c;因此Netty框架中必然包含了对于ServerSocketChannel的构建、配置以及向选择器注册&am…...

智能客服的四大优势,提升企业服务效率

在这个信息化快速发展的时代&#xff0c;客户服务的重要性越来越凸显。传统的客服方式已经无法满足企业日益增长的服务需求&#xff0c;于是智能客服服务应运而生。智能客服服务不仅改变了企业与客户的互动方式&#xff0c;还提高了服务效率和客户满意度。本文将深入探讨智能客…...

AutoGPT开源项目解读

AutoGPT开源项目解读 (qq.com) AutoGPT旨在创建一个自动化的自我改进系统&#xff0c;能够自主执行和学习各种任务 项目基本信息 首先阅读项目的README.md&#xff0c;下述代理和智能体两个名词可互换 项目简介&#xff1a;一个创建和运行智能体的工具&#xff0c;这些智能体…...

Linux离线安装fontconfig

Linux离线下载yum包&#xff0c;安装字体库 一、下载安装包 以CentOS Linux release 7.9.2009下载fontconfig的rpm包的为例 http://mirror.centos.org/centos/7/按提示跳转历史库 找到对应版本的centos https://vault.centos.org/7.9.2009/os/x86_64/Packages/在Packages目…...

海山数据库(He3DB)+AI:(一)神经网络基础

文章目录 1 引言2 基本结构2.1 神经元2.2 模型结构 3 训练过程3.1 损失函数3.2 反向传播3.3 基于梯度的优化算法 4 总结 1 引言 神经网络可以被视为一个万能的拟合器&#xff0c;通过深层的隐藏层实现输入数据到输出结果的映射。神经网络的思想源于对大脑的模拟&#xff0c;在…...

CSS中选择器有哪些?(史上最全选择器)

CSS选择器是用来选择和应用样式到HTML元素上的工具。以下是所有主要的CSS选择器的详细分类和描述&#xff1a; 1. 基本选择器 通配符选择器 (*)&#xff1a;选择所有元素。例如&#xff0c;* { color: red; } 会将所有元素的文字颜色设置为红色。元素选择器&#xff1a;选择指…...

本地部署 AI 智能体,Dify 搭建保姆级教程(下):知识库 RAG + API 调用,我捏了一个红楼解读大师

话接上篇&#xff1a; 本地部署 AI 智能体&#xff0c;Dify 搭建保姆级教程&#xff08;上&#xff09;&#xff1a;工作流 Agent&#xff0c;把 AI 接入个人微信 相信大家已经在本地搭建好 Dify 了。 今日分享&#xff0c;继续介绍 Dify 的另外两项重要功能&#xff1a; 知…...

HarmonyOS应用开发者高级认证,Next版本发布后最新题库 - 答案纯享版

这篇文章是高级题库答案纯享版&#xff0c;只有需要选择的选项。如果需要查看所有选项&#xff0c;可以点击下方链接跳转。以考代学&#xff0c;还是推荐点击下方链接&#xff0c;查看完整的题库&#xff0c;边看边学习鸿蒙应用开发。此题库已更新完毕&#xff0c;笔者将不继续…...

基于PHP的文件包含介绍

引言&#xff1a;在实际开发过程中&#xff0c;经常会遇到部分模块功能需要重复使用的情况&#xff0c;比如数据库的增删改查&#xff0c;文件包含通过将需要重复使用的功能模块代码引入其他文件的内容&#xff0c;实现重用代码、分离配置等。然而&#xff0c;如果文件包含操作…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

日常一水C

多态 言简意赅&#xff1a;就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过&#xff0c;当子类和父类的函数名相同时&#xff0c;会隐藏父类的同名函数转而调用子类的同名函数&#xff0c;如果要调用父类的同名函数&#xff0c;那么就需要对父类进行引用&#…...

C++--string的模拟实现

一,引言 string的模拟实现是只对string对象中给的主要功能经行模拟实现&#xff0c;其目的是加强对string的底层了解&#xff0c;以便于在以后的学习或者工作中更加熟练的使用string。本文中的代码仅供参考并不唯一。 二,默认成员函数 string主要有三个成员变量&#xff0c;…...