RabbitMQ 实现分组消费满足服务器集群部署
实现思路
- 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
- 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
- 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。
示例场景
假设我们有两个消费者组:GroupA
和 GroupB
。我们希望发送的消息能够同时被 GroupA
和 GroupB
消费,但每个组内的多个消费者只会有一个成员消费该消息。
Spring Boot + RabbitMQ 广播式分组消费示例
1. 引入依赖
首先,在 pom.xml
中添加 RabbitMQ 和 Spring AMQP 的依赖:
<dependencies><!-- Spring Boot Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列
在 application.yml
中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout
交换机,并为每个消费者组创建一个独立的队列。
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认消息concurrency: 5 # 每个队列的并发消费者数量max-concurrency: 10 # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类
创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_exchange", true, false);}// 定义 Group A 队列@Beanpublic Queue groupAQueue() {return new Queue("group_a_queue", true);}// 定义 Group B 队列@Beanpublic Queue groupBQueue() {return new Queue("group_b_queue", true);}// 绑定 Group A 队列到扇出交换机@Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机@Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息
创建一个服务类来发送消息到扇出交换机。由于 fanout
交换机不使用路由键,它会将消息广播到所有绑定的队列。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend("fanout_exchange", "", message);}
}
5. 创建消费者
为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener
注解来监听队列中的消息。
Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupAConsumer {@RabbitListener(queues = "group_a_queue")public void consumeMessage(String message) {System.out.println("Group A consumer received: " + message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupBConsumer {@RabbitListener(queues = "group_b_queue")public void consumeMessage(String message) {System.out.println("Group B consumer received: " + message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试
启动 Spring Boot 应用程序后,GroupAConsumer
和 GroupBConsumer
会分别监听 group_a_queue
和 group_b_queue
。通过以下方式测试消息的发送和消费:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;@Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer = messageProducer;}@Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage("This is a broadcast message");}
}
7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。
相关文章:

RabbitMQ 实现分组消费满足服务器集群部署
实现思路 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消…...

Chromium网络调试篇-Fiddler 5.21.0 使用指南:捕获浏览器HTTP(S)流量(二)
概述 在上一篇文章中,我们介绍了Fiddler的基础功能和如何安装它。今天我们将深入探讨如何使用Fiddler来捕获HTTP请求,这是Fiddler的一个核心能力,对于前端开发者、测试人员以及安全研究人员来说非常有用。捕获HTTP请求可以帮助我们更好地理解…...

个人IP建设:简易指南
许多个体创业者面临的一个关键挑战是如何为其企业创造稳定的需求。 作为个体创业者,您无法使用营销团队,因此许多人通过推荐和他们的网络来产生需求。因此,扩大您的网络是发展您的业务和产生持续需求的最佳策略。 这就是个人IP和品牌发挥作…...

智能指针【C++11】
文章目录 智能指针std::auto_ptr std::unique_ptrstd::shared_ptrstd::shared_ptr的线程安全问题std::weak_ptr 智能指针 std::auto_ptr 管理权转移 auto_ptr是C98中引入的智能指针,auto_ptr通过管理权转移的方式解决智能指针的拷贝问题,保证一个资源…...

【Linux 篇】Docker 启动和停止的精准掌舵:操控指南
文章目录 【Linux篇】Docker 启动和停止的精准掌舵:操控指南前言docker基本命令1. 帮助手册 2. 指令介绍 常用命令1. 查看镜像2. 搜索镜像3. 拉取镜像4. 删除镜像5. 从Docker Hub拉取 容器的相关命令1. 查看容器2. 创建与启动容器3. 查看镜像4. 启动容器5. 查看容器…...

Cursor vs VSCode:主要区别与优势分析
Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成: # Cursor可以直接通过快捷键调用AI # 例如:按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里,你可以直接询问AI如何实现功能# AI会直接在编辑器中…...

从单体到微服务:如何借助 Spring Cloud 实现架构转型
一、Spring Cloud简介 Spring Cloud 是一套基于 Spring 框架的微服务架构解决方案,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。 Spring Cloud 提供了诸如服务发现、配置管理、负载均衡、断路器、消息总线…...

RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主…...

分布式cap
P(分区安全)都能保证,就是在C(强一致)和A(性能)之间做取舍。 (即立马做主从同步,还是先返回写入结果等会再做主从同步。类似的还有,缓存和db之间的同步。&am…...

mybatis-xml映射文件及mybatis动态sql
规范 XML映射文件的名称与Mapper接口名称一致,并且将XML映射文件和Mapper接口放置在相同包下(同包同名)。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致,并保持返回类型一致…...

计算机网络复习——概念强化作业
物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...

用友BIP与旺店通数据集成方案解析
用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中,跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例:如何通过轻易云数据集成平台,将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门…...

string类函数的手动实现
在上一篇文章中,我们讲解了一些string类的函数,但是对于我们要熟练掌握c是远远不够的,今天,我将手动实现一下这些函数~ 注意:本篇文章中会大量应用复用,这是一种很巧妙的方法 和以往一样,还是…...

Oceanbase离线集群部署
准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...

transformers生成式对话机器人
简介 生成式对话机器人是一种先进的人工智能系统,它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同,生成式对话机器人并不局限于预定义的回答集,而是可以根据对话上下文动态地…...

WPF中的VisualState(视觉状态)
以前在设置控件样式或自定义控件时,都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样: <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...

C#设计模式--状态模式(State Pattern)
状态模式是一种行为设计模式,它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中,而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑:通过将不同的状态封装在不同的类中,可…...

〔 MySQL 〕索引
目录 1. 没有索引,可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘: 在看看磁盘中一个盘片编辑 扇区 定位扇区编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...

计算机网络研究实训室建设方案
一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室,旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体,覆盖网络基础、网络架构、网络安全、网络管理等多个领域,以培养具备扎实理论基础…...

韩企研学团造访图为科技:共探人工智能创新前沿
今日,一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部,展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业,促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...

html button 按钮单选且 高亮
<DIV class"middle"> <div class"containerTarget"> <span class"hover-target1" οnclick"btn(1);">韵达 </span> <span class"hover-target2" οnclick"btn(2);">中通 </span…...

图片上传HTML
alioss sky:jwt:# 设置jwt签名加密时使用的秘钥admin-secret-key: itcast# 设置jwt过期时间admin-ttl: 7200000# 设置前端传递过来的令牌名称admin-token-name: tokenalioss:endpoint: ${sky.alioss.endpoint}access-key-id: ${sky.alioss.access-key-id}access-key-secret: $…...

C++学习-函数
C 函数 目录 函数默认参数引用传参函数重载 数量不同类型不同 内联函数 函数默认参数 #include<iostream>using std::cout; using std::endl;int power(int n, int x2); // x2 是默认参数int main() {cout << power(5) << endl; // 没有传 x 的值&#x…...

spring boot 测试 mybatis mapper类
spring boot 测试 mybatis mapper类 针对 mybatis plus不启动 webserver指定加载 xml 【过滤 “classpath*:/mapper/**/*.xml” 下的xml】, mapper xml文件名和mapper java文件名称要一样,是根据文件名称过滤的。默认情况加载和解析所有mapper.xml 自定义 MapperT…...

远程游戏新体验!
在这个数字化的时代,游戏已经不仅限于家里的电视或书房的电脑了。远程游戏,也就是通过远程控制软件在不同地点操作游戏设备,给玩家带来了前所未有的自由和灵活性。RayLink远程控制软件,凭借其出色的性能和专为游戏设计的功能&…...

Let up bring up a linux.part2 [十一]
之前的篇幅中我们已经将 Linux 内核 bringup 起来了,不知道大家有没有去尝试将根文件系统运行起来,今天我就带领大家完成这个事情,可以跟着下面的步骤一步步来完成: 在这里我们使用 busybox 构建 rootfs: 下载 busyb…...

调用大模型api 批量处理图像 保存到excel
最近需要调用大模型,并将结果保存到excel中,效果如下: 代码: import base64 from zhipuai import ZhipuAI import os import pandas as pd from openpyxl import Workbook from openpyxl.drawing.image import Image from io i…...

使用 Flownex 模拟热环境对原油运输的影响
石油和天然气行业经常使用管道仿真来模拟原油的流动。为了准确估计管道容量,必须考虑环境对管道的热影响以及环境温度如何影响油品特性。本博客介绍了如何通过将传热元件集成到管道流网中,以及使用新的工作液材料 Flownex 来模拟各种传热机制。 使用 Fl…...

【WRF-Urban】WPS中有关Urban的变量设置
【WRF-Urban】WPS中有关Urban的变量设置 地理数据源的配置WRF-Urban所需静态地理数据1、LANDUSE:包含城市地表分类的土地利用数据。2、URB_PARAM:城市参数数据集。3、FRC_URB2D:城市覆盖度数据集WRF默认设置(美国)数据集1-National urban dataset in China NUDC(中国)数…...

Socket编程-tcp
1. 前言 在tcp套接字编程这里,我们将完成两份代码,一份是基于tcp实现普通的对话,另一份加上业务,client输入要执行的命令,server将执行结果返回给client 2. tcp_echo_server 与udp类似,前两步࿱…...