RabbitMQ 实现分组消费满足服务器集群部署
实现思路
- 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
- 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
- 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。
示例场景
假设我们有两个消费者组:GroupA 和 GroupB。我们希望发送的消息能够同时被 GroupA 和 GroupB 消费,但每个组内的多个消费者只会有一个成员消费该消息。
Spring Boot + RabbitMQ 广播式分组消费示例
1. 引入依赖
首先,在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖:
<dependencies><!-- Spring Boot Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列
在 application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机,并为每个消费者组创建一个独立的队列。
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认消息concurrency: 5 # 每个队列的并发消费者数量max-concurrency: 10 # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类
创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_exchange", true, false);}// 定义 Group A 队列@Beanpublic Queue groupAQueue() {return new Queue("group_a_queue", true);}// 定义 Group B 队列@Beanpublic Queue groupBQueue() {return new Queue("group_b_queue", true);}// 绑定 Group A 队列到扇出交换机@Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机@Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息
创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键,它会将消息广播到所有绑定的队列。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend("fanout_exchange", "", message);}
}
5. 创建消费者
为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener 注解来监听队列中的消息。
Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupAConsumer {@RabbitListener(queues = "group_a_queue")public void consumeMessage(String message) {System.out.println("Group A consumer received: " + message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupBConsumer {@RabbitListener(queues = "group_b_queue")public void consumeMessage(String message) {System.out.println("Group B consumer received: " + message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试
启动 Spring Boot 应用程序后,GroupAConsumer 和 GroupBConsumer 会分别监听 group_a_queue 和 group_b_queue。通过以下方式测试消息的发送和消费:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;@Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer = messageProducer;}@Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage("This is a broadcast message");}
}
7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。
相关文章:
RabbitMQ 实现分组消费满足服务器集群部署
实现思路 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消…...
Chromium网络调试篇-Fiddler 5.21.0 使用指南:捕获浏览器HTTP(S)流量(二)
概述 在上一篇文章中,我们介绍了Fiddler的基础功能和如何安装它。今天我们将深入探讨如何使用Fiddler来捕获HTTP请求,这是Fiddler的一个核心能力,对于前端开发者、测试人员以及安全研究人员来说非常有用。捕获HTTP请求可以帮助我们更好地理解…...
个人IP建设:简易指南
许多个体创业者面临的一个关键挑战是如何为其企业创造稳定的需求。 作为个体创业者,您无法使用营销团队,因此许多人通过推荐和他们的网络来产生需求。因此,扩大您的网络是发展您的业务和产生持续需求的最佳策略。 这就是个人IP和品牌发挥作…...
智能指针【C++11】
文章目录 智能指针std::auto_ptr std::unique_ptrstd::shared_ptrstd::shared_ptr的线程安全问题std::weak_ptr 智能指针 std::auto_ptr 管理权转移 auto_ptr是C98中引入的智能指针,auto_ptr通过管理权转移的方式解决智能指针的拷贝问题,保证一个资源…...
【Linux 篇】Docker 启动和停止的精准掌舵:操控指南
文章目录 【Linux篇】Docker 启动和停止的精准掌舵:操控指南前言docker基本命令1. 帮助手册 2. 指令介绍 常用命令1. 查看镜像2. 搜索镜像3. 拉取镜像4. 删除镜像5. 从Docker Hub拉取 容器的相关命令1. 查看容器2. 创建与启动容器3. 查看镜像4. 启动容器5. 查看容器…...
Cursor vs VSCode:主要区别与优势分析
Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成: # Cursor可以直接通过快捷键调用AI # 例如:按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里,你可以直接询问AI如何实现功能# AI会直接在编辑器中…...
从单体到微服务:如何借助 Spring Cloud 实现架构转型
一、Spring Cloud简介 Spring Cloud 是一套基于 Spring 框架的微服务架构解决方案,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。 Spring Cloud 提供了诸如服务发现、配置管理、负载均衡、断路器、消息总线…...
RocketMq基础学习+SpringBoot集成
学习贴:参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心,支持Topic和Broker的动态注册和发现。作用主…...
分布式cap
P(分区安全)都能保证,就是在C(强一致)和A(性能)之间做取舍。 (即立马做主从同步,还是先返回写入结果等会再做主从同步。类似的还有,缓存和db之间的同步。&am…...
mybatis-xml映射文件及mybatis动态sql
规范 XML映射文件的名称与Mapper接口名称一致,并且将XML映射文件和Mapper接口放置在相同包下(同包同名)。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致,并保持返回类型一致…...
计算机网络复习——概念强化作业
物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...
用友BIP与旺店通数据集成方案解析
用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中,跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例:如何通过轻易云数据集成平台,将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门…...
string类函数的手动实现
在上一篇文章中,我们讲解了一些string类的函数,但是对于我们要熟练掌握c是远远不够的,今天,我将手动实现一下这些函数~ 注意:本篇文章中会大量应用复用,这是一种很巧妙的方法 和以往一样,还是…...
Oceanbase离线集群部署
准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...
transformers生成式对话机器人
简介 生成式对话机器人是一种先进的人工智能系统,它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同,生成式对话机器人并不局限于预定义的回答集,而是可以根据对话上下文动态地…...
WPF中的VisualState(视觉状态)
以前在设置控件样式或自定义控件时,都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样: <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...
C#设计模式--状态模式(State Pattern)
状态模式是一种行为设计模式,它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中,而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑:通过将不同的状态封装在不同的类中,可…...
〔 MySQL 〕索引
目录 1. 没有索引,可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘: 在看看磁盘中一个盘片编辑 扇区 定位扇区编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...
计算机网络研究实训室建设方案
一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室,旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体,覆盖网络基础、网络架构、网络安全、网络管理等多个领域,以培养具备扎实理论基础…...
韩企研学团造访图为科技:共探人工智能创新前沿
今日,一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部,展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业,促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态
前言 在人工智能技术飞速发展的今天,深度学习与大模型技术已成为推动行业变革的核心驱动力,而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心,系统性地呈现了两部深度技术著作的精华:…...
C++--string的模拟实现
一,引言 string的模拟实现是只对string对象中给的主要功能经行模拟实现,其目的是加强对string的底层了解,以便于在以后的学习或者工作中更加熟练的使用string。本文中的代码仅供参考并不唯一。 二,默认成员函数 string主要有三个成员变量,…...
