当前位置: 首页 > news >正文

【RabbitMQ五】——RabbitMQ路由模式(Routing)

RabbitMQ路由模式

  • 前言
  • RabbitMQ模式的基本概念
    • 为什么要使用Rabbitmq 路由模式
    • RabbitMQ路由模式组成元素
  • 路由模式完整代码
    • Pom文件引入RabbtiMQ依赖
    • RabbitMQ工具类
    • 生产者
    • 消费者1
    • 消费者2
    • 运行结果截图

前言

通过本篇博客能够简单使用RabbitMQ的路由模式。
本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。

RabbitMQ模式的基本概念

路由模式是根据Routing Key有条件的将消息筛选后发送给消费者,消费者只接受筛选之后的消息
路由模式的核心是:
配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,再由对应的消费者进行消费

为什么要使用Rabbitmq 路由模式

由于发布订阅模式是无条件将所有消息分发给所有消费者,路由模式可以根据条件(Routing Key)将消息筛选之后发送给消费者。

应用场景:
例如:有一个股票分析机构,每天都会有一些独家的股票分析报告。对于其他一些应用平台,想要每天都到这家股票分析机构提供的百度的独家股票分析报告,对于另外一些应用平台想要收到谷歌的独家股票分析报告,就可以使用路由模式。

RabbitMQ路由模式组成元素

在这里插入图片描述
P:生产者,向交换机发送消息的是否需要指定routing key
X:交换机,接收生产者发送的消息,需要指定交换机的类型为direct,并且将消息发送给与routing key匹配的队列
C1:消费者1,它所在队列指定了需要routing key为error的信息
C2:消费者2,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式完整代码

**业务场景:**生产者为日志分发平台,分发info、warning、error级别的日志,消费者1只接受日志级别为error的日志,消费者2接收全部日志。

Pom文件引入RabbtiMQ依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : RabbitMQUtils* @description : [rabbitmq工具类]* @createTime : [2023/1/17 8:49]* @updateUser : [WangWei]* @updateTime : [2023/1/17 8:49]* @updateRemark : [描述说明本次修改内容]*/
public class RabbitMQUtils {/** @version V1.0* Title: getConnection* @author Wangwei* @description 创建rabbitmq连接* @createTime  2023/1/17 8:52* @param []* @return com.rabbitmq.client.Connection*/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setPort(5672);factory.setVirtualHost("虚拟主机");factory.setUsername("用户名");factory.setPassword("密码");//创建连接Connection connection=factory.newConnection();return  connection;}/** @version V1.0* Title: getChannel* @author Wangwei* @description 创建信道* @createTime  2023/1/17 8:55* @param []* @return com.rabbitmq.client.Channel*/public static Channel getChannel() throws IOException, TimeoutException {Connection connection=getConnection();Channel channel=connection.createChannel();return channel;}
}

生产者

import com.rabbitmq.client.Channel;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,"direct");//声明routingKeyString severityInfo="info";String severityError="error";String severityWarning="warning";//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式info:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式error:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式warning:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityWarning,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}}
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : ConsumerOne* @description : [消费者1]* @createTime : [2023/2/1 9:39]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:39]* @updateRemark : [描述说明本次修改内容]*/
public class ConsumerOne {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"direct");String queueName = channel.queueDeclare().getQueue();//声明routingKey (error)String severityError="error";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityError );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : ConsumerTwo* @description : [消费者2]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class ConsumerTwo {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel = RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,"direct");//创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称String queueName = channel.queueDeclare().getQueue();//声明routingKey (info,error,warning)String severityInfo="info";String severityError="error";String severityWarning="warning";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了3个routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityInfo );channel.queueBind(queueName, EXCHANGE_NAME,severityError );channel.queueBind(queueName, EXCHANGE_NAME,severityWarning );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}

运行结果截图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

【RabbitMQ五】——RabbitMQ路由模式(Routing)

RabbitMQ路由模式前言RabbitMQ模式的基本概念为什么要使用Rabbitmq 路由模式RabbitMQ路由模式组成元素路由模式完整代码Pom文件引入RabbtiMQ依赖RabbitMQ工具类生产者消费者1消费者2运行结果截图前言 通过本篇博客能够简单使用RabbitMQ的路由模式。 本篇博客主要是博主通过官网…...

【C语言】宏定义 结构体 枚举变量的用法

目录 一、数据类型 二、C语言宏定义 三、C语言typedef重命名 四、 #define与typedef的区别 五、结构体 六、枚举变量 补充学习一点STM32的必备基础知识 一、数据类型 二、C语言宏定义 关键字&#xff1a;#define 用途&#xff1a;用一个字符串代替一个数字&#xff0c;…...

锁升级之Synchronized

Synchronized JVM系统锁一个对象里如果有多个synchronized方法&#xff0c;同一时刻&#xff0c;只要有一个线程去调用其中的一个synchronized方法&#xff0c;其他线程只能等待&#xff01;锁的是当前对象&#xff0c;对象被锁定后&#xff0c;其他线程都不能访问当前对象的其…...

基于nodejs+vue疫情网课管理系统

疫情网课也都将通过计算机进行整体智能化操作,对于疫情网课管理系统所牵扯的管理及数据保存都是非常多的,例如管理员&#xff1a;首页、个人中心、学生管理、教师管理、班级管理、课程分类管理、课程表管理、课程信息管理、作业信息管理、请假信息管理、上课签到管理、论坛交流…...

Zabbix 构建监控告警平台(三)

Zabbix User parametersZabbix Trigger1.Zabbix User parameters 1.1即自定义KEY 注意&#xff1a;mysql安装在被监测主机 [rootlocalhost ~]# yum -y install mariadb-server mariadb [rootlocalhost ~]# systemctl start mariadb [rootlocalhost ~]# mysqladmin -uroot statu…...

Linux系统之dool命令行工具的基本使用

Linux系统之dool命令行工具的基本使用一、dool命令行工具介绍二、本地系统环境检查1.检查系统版本2.检查系统内核版本三、下载dool软件包1.创建下载目录2.下载dool四、安装dool1.安装python32.安装dool五、dool的命令帮助六、dool的基本使用1.直接使用dool监控系统2.监控cpu和网…...

LeetCode-2335-装满杯子需要的最短总时长

1、堆 我们可以维护一个堆&#xff0c;首先我们将数组中不为0的数全部加入堆中&#xff0c;而后进行循环。当堆不为空时&#xff0c;我们将堆顶元素出堆并减一&#xff0c;而后观察是否还能继续出堆&#xff0c;若能则出堆&#xff0c;否则跳过&#xff0c;最后我们将处理后的…...

npm ERR! code ELIFECYCLE解决方案,npm犯错!myweb@1.0.0构建脚本失败。

1.问题npm ERR! code ELIFECYCLEnpm ERR! errno 1npm ERR! myweb1.0.0 build: webpack --config config/webpack.config.jsnpm ERR! Exit status 1npm ERR!npm ERR! Failed at the myweb1.0.0 build script.npm犯错!代码ELIFECYCLEnpm犯错!errno 1npm犯错!myweb1.0.0 build: we…...

最小二乘支持向量机”在学习偏微分方程 (PDE) 解方面的应用(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 本代码说明了“最小二乘支持向量机”在学习偏微分方程 &#xff08;PDE&#xff09; 解方面的应用。提供了一个示例&#xff0c…...

ISYSTEM调试实践8-winIDEA Analyzer功能1

前面几篇介绍了ISYSTEM的基本调试界面和功能&#xff0c;相比我之前用过的IDE&#xff0c;除了几种断点方式和脚本功能以外&#xff0c;应该都是比较简单&#xff0c;稍微操作一下就可以直接上手&#xff0c;后续我将介绍winIDEA的Analyzer 功能。 1 Analyzer简介 iSYSTEM An…...

每日学术速递2.11

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.IR、cs.MM 1.A Comprehensive Survey on Multimodal Recommender Systems: Taxonomy, Evaluation, and Future Directions 标题&#xff1a;关于多模态推荐系统的综合调查&#xff1a;分…...

宝塔搭建实战php开源likeadmin通用管理admin端vue3源码(二)

大家好啊&#xff0c;我是测评君&#xff0c;欢迎来到web测评。 上一期给大家分享了server端的部署方式&#xff0c;今天来给大家分享admin端在本地搭建&#xff0c;与打包发布到宝塔的方法。感兴趣的朋友可以自行下载学习。 技术架构 vscode node16 vue3 elementPlus vit…...

网络基础-虚拟化工具-网桥

系列文章目录 本系列文章主要是回顾和学习工作中常用的网络基础命令&#xff0c;在此记录以便于回顾。 该篇文章主要是讲解虚拟化的工具网桥相关的概念和常用命令 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录系…...

剑指 Offer 14- II. 剪绳子 II

剑指 Offer 14- II. 剪绳子 II 给你一根长度为 n 的绳子&#xff0c;请把绳子剪成整数长度的 m 段&#xff08;m、n都是整数&#xff0c;n>1并且m>1&#xff09;&#xff0c;每段绳子的长度记为 k[0],k[1]…k[m - 1] 。请问 k[0]k[1]…*k[m - 1] 可能的最大乘积是多少&a…...

English Learning - Day55 作业打卡 2023.2.9 周四

English Learning - Day55 作业打卡 2023.2.9 周四引言1. Jim 在看电视的时候他的老婆正在做饭。2. 他刚睡着电话就响了。3. 我正在想事情&#xff0c;这时忽然有人从后面抓我胳膊。4. 我们总是边吃火锅边唱歌。5. 他一听说出了事故&#xff0c;马上就来了现场。6. He entered …...

pixhawk2.4.8-地面站配置-APM固件

文章目录一、硬件准备二、软件准备1 已实飞测试2 MP地面站 任意版本下载&#xff1a;3 APM固件 任意版本下载&#xff1a;三、飞控校准1 刷固件2 机架选择3 加速度计校准4 指南针校准5 遥控器校准6 飞行模式7 紧急断电&无头模式8 基础参数设置9 电流计校准10 电调校准11 起…...

golang 通道类型

文章目录一、什么是通道类型二、通道产生的原因三、声明channel四、创建channel五、channel相关操作1、发送值2、接收值3、关闭通道3.1 注意3.2 特点四、通道类型1、无缓冲通道2、有缓冲通道五、单向通道一、什么是通道类型 Go 语言中的通道&#xff08;channel&#xff09;是一…...

并发、并行、吞吐量、延迟、响应时间 含义理解

并发、并行、吞吐量、延迟、响应时间 知识点了解 1. 响应时间(RT) 理解&#xff1a;响应时间是指系统对请求作出响应的时间。例如一个正在运行的服务&#xff0c;服务内程序接受到参数请求开始&#xff0c;到程序计算完&#xff0c;并将结果返回出去结束&#xff0c;这段时间…...

HTTP 和 HTTPS 的区别

文章目录前言一、HTTP 与 HTTPS 的基本概念HTTPHTTPS二、HTTP 和 HTTPS协议的区别前言 浏览网站时&#xff0c;我们会发现网址有两种格式&#xff0c;一种以http://开头&#xff0c;一种https://开头。好像这两种格式差别不大&#xff0c;只多了一个s&#xff0c;实际上他们有…...

微搭低代码从入门到精通07-基础布局组件

低码开发不同于传统开发&#xff0c;传统开发我们通常需要编写前端代码和后端代码。前端代码由HTML、CSS和JavaScript组成&#xff0c;后端代码我们通常要用后端语言比如Java来编写接口。 低码开发的特点是可视化开发&#xff0c;在编辑器中通过组件的拖拽来完成页面的编制。如…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

CMake控制VS2022项目文件分组

我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...