当前位置: 首页 > news >正文

RabbitMQ中的Work Queues模式

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一种常见的模式,用于在多个消费者之间分配任务,从而实现负载均衡和提高系统的处理能力。下面将详细介绍 RabbitMQ 中的 Work Queues 模式。

1. 什么是 Work Queues 模式?

Work Queues 模式(也称为任务队列模式)是一种消息传递模式,用于在多个消费者之间分配任务。在这种模式下,生产者将任务(消息)发送到队列中,多个消费者从队列中获取任务并进行处理。每个任务只会被一个消费者处理,从而实现负载均衡。

在这里插入图片描述

Work Queues 模式的主要优点包括:

  1. 负载均衡:多个消费者可以并行处理任务,从而提高系统的处理能力。
  2. 解耦:生产者和消费者之间通过队列进行通信,彼此之间不需要直接交互。
  3. 异步处理:任务可以异步处理,生产者不需要等待任务完成即可继续执行其他操作。

2. Work Queues 模式的工作原理

2.1 生产者(Producer)

生产者负责将任务(消息)发送到队列中。生产者不需要知道有多少消费者会处理这些任务,只需要将任务发送到队列即可。

2.2 队列(Queue)

队列是消息的缓冲区,用于存储生产者发送的任务。队列可以有多个消费者,但每个任务只会被一个消费者处理。

2.3 消费者(Consumer)

消费者从队列中获取任务并进行处理。多个消费者可以并行处理任务,从而实现负载均衡。消费者可以是独立的进程、线程或服务。

2.4 消息确认(Message Acknowledgment)

为了确保任务能够可靠地处理,RabbitMQ 提供了消息确认机制。消费者在处理完任务后,需要向 RabbitMQ 发送确认消息,告知任务已经处理完成。如果消费者在处理任务时崩溃,RabbitMQ 会将未确认的任务重新分配给其他消费者。

2.5 公平分发(Fair Dispatch)

默认情况下,RabbitMQ 会按顺序将任务分发给消费者。然而,如果某些消费者处理任务的速度较慢,可能会导致任务堆积。为了避免这种情况,可以使用 basicQos 方法设置预取计数(prefetch count),限制每个消费者一次可以获取的任务数量,从而实现更公平的分发。

3. 环境准备

在开始之前,确保你已经安装了以下环境:

  • Java 开发环境(JDK 8 或更高版本)
  • RabbitMQ 服务器(已启动并运行)
  • Maven(用于管理依赖)

3.1 添加依赖

首先,在你的项目中添加 RabbitMQ 客户端库的依赖。如果你使用的是 Maven,可以在 pom.xml 中添加以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

4. 代码案例

4.1 生产者代码

生产者负责将任务发送到队列。以下是一个简单的生产者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.将消息发送到队列for (int i = 1; i <= 10; i++) {String message = "Task to be processed, " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}}
}

代码解释:

  • ConnectionFactory: 用于创建与 RabbitMQ 服务器的连接。
  • Connection: 表示与 RabbitMQ 服务器的物理连接。
  • Channel: 表示与 RabbitMQ 服务器的逻辑连接,用于发送和接收消息。
  • queueDeclare: 声明一个队列,true 表示队列是持久的。
  • basicPublish: 将消息发布到队列,使用默认交换机(空字符串)。

4.2 消费者代码

消费者负责从队列中获取任务并处理。以下是一个简单的消费者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // 每次只处理一个消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");//手动ack,因为不同的机器处理速度不一样,因此不同的机器会在不同时间应答,这样机器就可以根据实际能力处理了channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

代码解释:

  • basicQos(1): 设置每次只处理一个消息,确保任务的公平分配。
  • DeliverCallback: 定义消息处理逻辑,doWork 方法模拟任务处理过程。
  • basicAck: 确认消息处理完成,从队列中移除消息。

5. 运行示例

  1. 启动 RabbitMQ 服务器:确保 RabbitMQ 服务器已启动并运行。
  2. 运行多个消费者:启动多个消费者实例,确保它们连接到同一个队列。
  3. 运行生产者:启动生产者实例,发送任务到队列。

示例输出:

  • 生产者输出

     [x] Sent 'Task to be processed, 1'[x] Sent 'Task to be processed, 2'[x] Sent 'Task to be processed, 3'[x] Sent 'Task to be processed, 4'[x] Sent 'Task to be processed, 5'[x] Sent 'Task to be processed, 6'[x] Sent 'Task to be processed, 7'[x] Sent 'Task to be processed, 8'[x] Sent 'Task to be processed, 9'[x] Sent 'Task to be processed, 10'
    
  • 消费者1输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 1'[x] Done[x] Received 'Task to be processed, 4'[x] Done[x] Received 'Task to be processed, 6'[x] Done[x] Received 'Task to be processed, 8'[x] Done[x] Received 'Task to be processed, 10'[x] Done
    
  • 消费者2输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 2'[x] Done[x] Received 'Task to be processed, 3'[x] Done[x] Received 'Task to be processed, 5'[x] Done[x] Received 'Task to be processed, 7'[x] Done[x] Received 'Task to be processed, 9'[x] Done
    

在这里插入图片描述

6. 总结

本文详细介绍了如何在 RabbitMQ 中实现 Work Queues 模式,包括生产者、默认交换机、队列和多个消费者的设计与实现。通过使用 RabbitMQ 的 Java 客户端库,我们可以轻松地实现任务的分配和处理。Work Queues 模式非常适合需要将任务分配给多个消费者处理的场景,如任务调度、日志处理等。

相关文章:

RabbitMQ中的Work Queues模式

在现代分布式系统中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件&#xff0c;支持多种消息传递模式。其中&#xff0c;Work Queues&#xff08;工作队列&#xff09;模式是一…...

GESP202412 四级【Recamán】题解(AC)

》》》点我查看「视频」详解》》》 [GESP202412 四级] Recamn 题目描述 小杨最近发现了有趣的 Recamn 数列&#xff0c;这个数列是这样生成的&#xff1a; 数列的第一项 a 1 a_1 a1​ 是 1 1 1&#xff1b;如果 a k − 1 − k a_{k-1}-k ak−1​−k 是正整数并且没有在数…...

蓝桥杯新年题解 | 第15届蓝桥杯迎新篇

蓝桥杯新年题解 | 第15届蓝桥杯迎新篇 2024年的蓝桥杯即将拉开序幕&#xff01;对于许多编程爱好者来说&#xff0c;这不仅是一次展示自我能力的舞台&#xff0c;更是一次学习和成长的机会。作为一名大一新生的小蓝&#xff0c;对蓝桥杯充满了期待&#xff0c;但面对初次参赛的…...

3D 生成重建035-DiffRF直接生成nerf

3D 生成重建035-DiffRF直接生成nerf 文章目录 0 论文工作1 论文方法2 实验结果 0 论文工作 本文提出了一种基于渲染引导的三维辐射场扩散新方法DiffRF&#xff0c;用于高质量的三维辐射场合成。现有的方法通常难以生成具有细致纹理和几何细节的三维模型&#xff0c;并且容易出…...

@SpringBootTest 报错: UnsatisfiedDependencyException

Spring Boot Test 报错: UnsatisfiedDependencyException 在使用 SpringBootTest 测试时&#xff0c;出现 UnsatisfiedDependencyException 报错&#xff0c;原因和解决方法如下。 报错原因分析 1. Spring 存在涉及 Bean 没有被添加 Spring Boot 测试中&#xff0c;默认会加…...

mysql、postgresql、oceanbase调优

一、mysql 1、my.cnf [mysqld_safe] log-error=/data/mysql/log/mysql.log pid-file=/data/mysql/run/mysqld.pid[client] socket=/data/mysql/run/mysql.sock default-character-set=utf8[mysqld] basedir=/usr/local/mysql tmpdir=/data/mysql/tmp datadir=/data/mysql/dat…...

MySQL 数据库事务实践

引言 在现代应用程序开发中&#xff0c;确保数据库操作的完整性和一致性至关重要。MySQL 提供了强大的事务管理功能&#xff0c;允许开发者以原子性、一致性、隔离性和持久性&#xff08;ACID&#xff09;的方式处理数据。本文将通过详细的解释和实际示例&#xff0c;带你深入…...

VScode、Windsurf、Cursor 中 R 语言相关快捷键设置

前言 在生物信息学数据分析中&#xff0c;R语言是一个不可或缺的工具。为了提高R语言编程效率&#xff0c;合理设置快捷键显得尤为重要。本文介绍在VSCode Windsurf Cursor 中一些实用的R语言快捷键设置&#xff0c;让非 Rstudio 的 IDE 用起来得心应手&#x1f611; 操作种…...

tcpdump编译

https://github.com/westes/flex/releases/download/v2.6.4/flex-2.6.4.tar.gz tar -zxvf flex-2.6.4.tar.gz ./configure CFLAGS-D_GNU_SOURCE make sudo make installwget http://ftp.gnu.org/gnu/bison/bison-3.2.1.tar.gz ./configure make sudo make install以上两个库是…...

Linux下禁止root远程登录访问

开始讲故事 Long long ago&#xff0c; Linux远程访问方式有telnet、ssh两种协议&#xff1b;有人可能还会说vnc和rdp协议方式&#xff0c;后面这两种主要是可视化桌面场景下的&#xff0c;并非主流。 时过境迁&#xff0c;telnet因安全性低逐渐被禁用淘汰&#xff0c;最后就…...

算法刷题Day16: BM41 输出二叉树的右视图

题目链接 描述 思路&#xff1a; 递归构造二叉树在Day15有讲到。复习一下&#xff0c;就是使用递归构建左右子树。将中序和前序一分为二。 接下来是找出每一层的最右边的节点&#xff0c;可以利用队列层次遍历。 利用队列长度记录当前层有多少个节点&#xff0c;每次从队列里…...

登录授权的实现:json web token + redis + springboot

文章目录 引言I token实现思路传统JWT TOKEN认证方式改进的JWT TOKEN认证方式redis设计II java代码实现登录接口退出登录接口登录之后接口(token解析和校验)III 常见问题400引言 应用场景: 登录认证 I token实现思路 传统JWT TOKEN认证方式 RESTful API TOKEN认证方式:…...

yolov,coco,voc标记的睡岗检测数据集,可识别在桌子上趴着睡,埋头睡觉,座椅上靠着睡,平躺着睡等多种睡姿的检测,6549张图片

yolov&#xff0c;coco,voc标记的睡岗检测数据集&#xff0c;可识别在桌子上趴着睡&#xff0c;埋头睡觉&#xff0c;座椅上靠着睡&#xff0c;平躺着睡等多种睡姿的检测&#xff0c;6549张图片 数据集分割 6549总图像数 训练组91&#xff05; 5949图片 有效集9&#x…...

数据库表的CRUD

SQL语句&#xff08;Structured Query Language&#xff09;是用于与关系型数据库进行交互的语言。下面是几个常用的SQL语句&#xff1a; 创建表&#xff1a; CREATE TABLE table_name ( column1 datatype, column2 datatype, column3 datatype, ... ); 插入数据&#xff1a; …...

Proxy与Reflect

监听对象操作 在Object中&#xff0c;可以通过defineProperty中的get&#xff0c;set进行监听&#xff0c; Proxy基本使用 有两个参数&#xff0c;第一个是要代理的对象&#xff0c;第二个是捕获器&#xff0c;在不知道捕获器使用哪个之前可以先传个空对象。就会启用默认的捕获…...

【安卓开发】【Android Studio】启动时报错“Unable to access Android SDK add-on list”

一、问题描述 在启动Android Studio时&#xff0c;软件报错&#xff1a;Unable to access Android SDK add-on list&#xff0c;报错截图如下&#xff1a; 二、原因及解决方法 初步推测是由于网络节点延迟&#xff0c;无法接入谷歌导致的。点击Cancel取消即可。...

【C语言篇】C 语言总复习(下):点亮编程思维,穿越代码的浩瀚星河

我的个人主页 我的专栏&#xff1a;C语言&#xff0c;希望能帮助到大家&#xff01;&#xff01;&#xff01;点赞❤ 收藏❤ 在C语言的世界里&#xff0c;结构体和联合体以及文件操作都是非常重要且实用的知识板块&#xff0c;掌握它们能帮助我们更高效地组织数据以及与外部文…...

AI技术架构:从基础设施到应用

人工智能&#xff08;AI&#xff09;的发展&#xff0c;正以前所未有的速度重塑我们的世界。了解AI技术架构&#xff0c;不仅能帮助我们看懂 AI 的底层逻辑&#xff0c;还能掌握其对各行业变革的潜力与方向。 一、基础设施层&#xff1a;AI 技术的坚实地基 基础设施层是 AI 技…...

centos7的yum镜像源设置

sudo yum repolist 查看镜像源连接情况&#xff0c;not found即为连接失败 sudo cp -r /etc/yum.repos.d /etc/yum.repos.d.backup 备份镜像源文件 sudo nano /etc/yum.repos.d/CentOS-Base.repo 进入镜像源文件编辑内容 # CentOS-Base.repo # # The mirror system uses the…...

Qt6开发自签名证书的https代理服务器

目标&#xff1a;制作一个具备类似Fiddler、Burpsuit、Wireshark的https协议代理抓包功能&#xff0c;但是集成到自己的app内&#xff0c;这样无需修改系统代理设置&#xff0c;使用QWebengineview通过自建的代理服务器&#xff0c;即可实现https包的实时监测、注入等自定义功能…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...