当前位置: 首页 > news >正文

RabbitMQ 实现分组消费满足服务器集群部署

实现思路

  1. 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
  2. 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
  3. 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。

示例场景

假设我们有两个消费者组:GroupAGroupB。我们希望发送的消息能够同时被 GroupAGroupB 消费,但每个组内的多个消费者只会有一个成员消费该消息。

Spring Boot + RabbitMQ 广播式分组消费示例

1. 引入依赖

首先,在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖:

<dependencies><!-- Spring Boot Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列

application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机,并为每个消费者组创建一个独立的队列。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual  # 手动确认消息concurrency: 5           # 每个队列的并发消费者数量max-concurrency: 10      # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类

创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_exchange", true, false);}// 定义 Group A 队列@Beanpublic Queue groupAQueue() {return new Queue("group_a_queue", true);}// 定义 Group B 队列@Beanpublic Queue groupBQueue() {return new Queue("group_b_queue", true);}// 绑定 Group A 队列到扇出交换机@Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机@Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息

创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键,它会将消息广播到所有绑定的队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend("fanout_exchange", "", message);}
}
5. 创建消费者

为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener 注解来监听队列中的消息。

Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupAConsumer {@RabbitListener(queues = "group_a_queue")public void consumeMessage(String message) {System.out.println("Group A consumer received: " + message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupBConsumer {@RabbitListener(queues = "group_b_queue")public void consumeMessage(String message) {System.out.println("Group B consumer received: " + message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试

启动 Spring Boot 应用程序后,GroupAConsumerGroupBConsumer 会分别监听 group_a_queuegroup_b_queue。通过以下方式测试消息的发送和消费:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;@Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer = messageProducer;}@Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage("This is a broadcast message");}
}

7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。

相关文章:

RabbitMQ 实现分组消费满足服务器集群部署

实现思路 使用扇出交换机&#xff08;Fanout Exchange&#xff09;&#xff1a;扇出交换机会将消息广播到所有绑定的队列&#xff0c;确保每个消费者组都能接收到相同的消息。为每个消费者组创建独立的队列&#xff1a;每个消费者组拥有自己的队列&#xff0c;所有属于该组的消…...

Chromium网络调试篇-Fiddler 5.21.0 使用指南:捕获浏览器HTTP(S)流量(二)

概述 在上一篇文章中&#xff0c;我们介绍了Fiddler的基础功能和如何安装它。今天我们将深入探讨如何使用Fiddler来捕获HTTP请求&#xff0c;这是Fiddler的一个核心能力&#xff0c;对于前端开发者、测试人员以及安全研究人员来说非常有用。捕获HTTP请求可以帮助我们更好地理解…...

个人IP建设:简易指南

许多个体创业者面临的一个关键挑战是如何为其企业创造稳定的需求。 作为个体创业者&#xff0c;您无法使用营销团队&#xff0c;因此许多人通过推荐和他们的网络来产生需求。因此&#xff0c;扩大您的网络是发展您的业务和产生持续需求的最佳策略。 这就是个人IP和品牌发挥作…...

智能指针【C++11】

文章目录 智能指针std::auto_ptr std::unique_ptrstd::shared_ptrstd::shared_ptr的线程安全问题std::weak_ptr 智能指针 std::auto_ptr 管理权转移 auto_ptr是C98中引入的智能指针&#xff0c;auto_ptr通过管理权转移的方式解决智能指针的拷贝问题&#xff0c;保证一个资源…...

【Linux 篇】Docker 启动和停止的精准掌舵:操控指南

文章目录 【Linux篇】Docker 启动和停止的精准掌舵&#xff1a;操控指南前言docker基本命令1. 帮助手册 2. 指令介绍 常用命令1. 查看镜像2. 搜索镜像3. 拉取镜像4. 删除镜像5. 从Docker Hub拉取 容器的相关命令1. 查看容器2. 创建与启动容器3. 查看镜像4. 启动容器5. 查看容器…...

Cursor vs VSCode:主要区别与优势分析

Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成&#xff1a; # Cursor可以直接通过快捷键调用AI # 例如&#xff1a;按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里&#xff0c;你可以直接询问AI如何实现功能# AI会直接在编辑器中…...

从单体到微服务:如何借助 Spring Cloud 实现架构转型

一、Spring Cloud简介 Spring Cloud 是一套基于 Spring 框架的微服务架构解决方案&#xff0c;它提供了一系列的工具和组件&#xff0c;帮助开发者快速构建分布式系统&#xff0c;尤其是微服务架构。 Spring Cloud 提供了诸如服务发现、配置管理、负载均衡、断路器、消息总线…...

RocketMq基础学习+SpringBoot集成

学习贴&#xff1a;参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心&#xff0c;支持Topic和Broker的动态注册和发现。作用主…...

分布式cap

P&#xff08;分区安全&#xff09;都能保证&#xff0c;就是在C&#xff08;强一致&#xff09;和A&#xff08;性能&#xff09;之间做取舍。 &#xff08;即立马做主从同步&#xff0c;还是先返回写入结果等会再做主从同步。类似的还有&#xff0c;缓存和db之间的同步。&am…...

mybatis-xml映射文件及mybatis动态sql

规范 XML映射文件的名称与Mapper接口名称一致&#xff0c;并且将XML映射文件和Mapper接口放置在相同包下(同包同名&#xff09;。 XML映射文件的namespace属性为Mapper接口全限定名一致。 XML映射文件中sql语句的id与Mapper接口中的方法名一致&#xff0c;并保持返回类型一致…...

计算机网络复习——概念强化作业

物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…...

用友BIP与旺店通数据集成方案解析

用友BIP与旺店通企业奇门的供应商集成同步方案 在现代企业的数据管理中&#xff0c;跨平台的数据集成是实现高效业务运作的关键环节。本文将分享一个实际案例&#xff1a;如何通过轻易云数据集成平台&#xff0c;将用友BIP系统中的供应商数据无缝对接到旺店通企业奇门&#xf…...

string类函数的手动实现

在上一篇文章中&#xff0c;我们讲解了一些string类的函数&#xff0c;但是对于我们要熟练掌握c是远远不够的&#xff0c;今天&#xff0c;我将手动实现一下这些函数~ 注意&#xff1a;本篇文章中会大量应用复用&#xff0c;这是一种很巧妙的方法 和以往一样&#xff0c;还是…...

Oceanbase离线集群部署

准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …...

transformers生成式对话机器人

简介 生成式对话机器人是一种先进的人工智能系统&#xff0c;它能够通过学习大量的自然语言数据来模拟人类进行开放、连贯且创造性的对话。与基于规则或检索式的聊天机器人不同&#xff0c;生成式对话机器人并不局限于预定义的回答集&#xff0c;而是可以根据对话上下文动态地…...

WPF中的VisualState(视觉状态)

以前在设置控件样式或自定义控件时&#xff0c;都是使用触发器来进行样式更改。触发器可以在属性值发生更改时启动操作。 像这样&#xff1a; <Style TargetType"ListBoxItem"><Setter Property"Opacity" Value"0.5" /><Setter …...

C#设计模式--状态模式(State Pattern)

状态模式是一种行为设计模式&#xff0c;它允许对象在其内部状态发生变化时改变其行为。这种模式的核心思想是将状态封装在独立的对象中&#xff0c;而不是将状态逻辑散布在整个程序中。 用途 简化复杂的条件逻辑&#xff1a;通过将不同的状态封装在不同的类中&#xff0c;可…...

〔 MySQL 〕索引

目录 1. 没有索引&#xff0c;可能会有什么问题 2. 认识磁盘 MySQL与存储 先来研究一下磁盘&#xff1a; 在看看磁盘中一个盘片​编辑 扇区 定位扇区​编辑 结论 磁盘随机访问(Random Access)与连续访问(Sequential Access) 3. MySQL 与磁盘交互基本单位 4. 建立共识…...

计算机网络研究实训室建设方案

一、概述 本方案旨在规划并实施一个先进的计算机网络研究实训室&#xff0c;旨在为学生提供一个深入学习、实践和研究网络技术的平台。实训室将集教学、实验、研究于一体&#xff0c;覆盖网络基础、网络架构、网络安全、网络管理等多个领域&#xff0c;以培养具备扎实理论基础…...

韩企研学团造访图为科技:共探人工智能创新前沿

今日&#xff0c;一支由韩国知名企业研学专家组成的代表团莅临图为科技深圳总部&#xff0c;展开了一场深度技术交流与研讨活动。 此次访问旨在通过实地探访中国领先的科技企业&#xff0c;促进中韩两国在科技创新领域的深入合作与交流。 韩国游学团合影 图为科技作为一家在人…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)

引言 在嵌入式系统中&#xff0c;用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例&#xff0c;介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单&#xff0c;执行相应操作&#xff0c;并提供平滑的滚动动画效果。 本文设计了一个…...

李沐--动手学深度学习--GRU

1.GRU从零开始实现 #9.1.2GRU从零开始实现 import torch from torch import nn from d2l import torch as d2l#首先读取 8.5节中使用的时间机器数据集 batch_size,num_steps 32,35 train_iter,vocab d2l.load_data_time_machine(batch_size,num_steps) #初始化模型参数 def …...

「Java基本语法」变量的使用

变量定义 变量是程序中存储数据的容器&#xff0c;用于保存可变的数据值。在Java中&#xff0c;变量必须先声明后使用&#xff0c;声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例&#xff1a;声明与初始化 public class VariableDemo {publi…...

多模态学习路线(2)——DL基础系列

目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization&#xff08;RMSNorm&#xff09; 二、激活函数 1. Sigmoid激活函数&#xff08;二分类&…...