当前位置: 首页 > news >正文

Spring Boot整合RabbitMQ之路由模式(Direct)

RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。

1. 添加配置

在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:port: 10001spring:application:name: springboot-rabbitmq-s1rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: admin# 发送者开启 return 确认机制publisher-returns: true# 发送者开启 confirm 确认机制publisher-confirm-type: correlated

2. 创建配置类

    创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author alen* @DATE 2022/6/7 23:50*/
@Slf4j
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "direct-order-exchange";public static final String SMS_QUEUE = "sms-direct-queue";public static final String EMAIL_QUEUE = "email-direct-queue";public static final String WECHAT_QUEUE = "wechat-direct-queue";/*** 1.* 声明交换机* @return*/@Beanpublic DirectExchange directExchange() {/*** directExchange的参数说明:* 1. 交换机名称* 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除* 3. 是否自动删除 false:不自动删除 true:自动删除*/return new DirectExchange(EXCHANGE_NAME, true, false);}/*** 2.* 声明队列* @return*/@Beanpublic Queue smsQueue() {/*** Queue构造函数参数说明* 1. 队列名* 2. 是否持久化 true:持久化 false:不持久化*/return new Queue(SMS_QUEUE, true);}@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE, true);}@Beanpublic Queue wechatQueue() {return new Queue(WECHAT_QUEUE, true);}/*** 3.* 队列与交换机绑定*/@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");}@Beanpublic Binding wechatBinding() {return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");}/*** 将自定义的RabbitTemplate对象注入bean容器** @param connectionFactory* @return*/@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启消息推送结果回调rabbitTemplate.setMandatory(true);//设置ConfirmCallback回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("==============ConfirmCallback start ===============");log.info("回调数据:{}", correlationData);log.info("确认结果:{}", ack);log.info("返回原因:{}", cause);log.info("==============ConfirmCallback end =================");}});//设置ReturnCallback回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("==============ReturnCallback start ===============");log.info("发送消息:{}", JSONUtil.toJsonStr(message));log.info("结果状态码:{}", replyCode);log.info("结果状态信息:{}", replyText);log.info("交换机:{}", exchange);log.info("路由key:{}", routingKey);log.info("==============ReturnCallback end =================");}});return rabbitTemplate;}
}

3. 消费者配置

    在消费者项目的配置文件中开启手动确认,配置详情如下:
server:port: 10002spring:application:name: springboot-rabbitmq-s2rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: adminlistener:simple:# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 autoacknowledge-mode: manual

4. 创建消费者

分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author alen* @DATE 2022/6/10 22:54*/
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {//标记消费者逻辑执行方法@RabbitHandlerpublic void emailMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("Email direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author alen* @DATE 2022/6/10 22:55*/
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {@RabbitHandlerpublic void smsMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("sms direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author chaoxian.wu* @DATE 2022/6/10 22:55*/
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {@RabbitHandlerpublic void wechatlMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("wechat direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid contextLoads() {for (long i = 1; i < 2; i++) {//交换机名称String exchangeName = "direct-order-exchange";//路由keyString routingKey = "sms";Order order = buildOrder(i);orderService.createOrder(order, routingKey, exchangeName);}}private Order buildOrder(long id) {Order order = new Order();order.setRequestId(id);order.setUserId(id);order.setOrderNo(UUID.randomUUID().toString());order.setAmount(10L);order.setGoodsNum(1);order.setTotalAmount(10L);return order;}
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

再发一条routingKey=email的消息,消费的日志,效果图示如下

到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。

整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。

相关文章:

Spring Boot整合RabbitMQ之路由模式(Direct)

RabbitMQ中的路由模式&#xff08;Direct模式&#xff09;应该是在实际工作中运用的比较多的一种模式了&#xff0c;这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey&#xff0c;在配置上&#xff0c;交换机类型需要注入DirectExchange类型的交换机bean对象。…...

行式存储与列式存储

1.概述 数据处理大致可分为两大类&#xff0c;联机事务处理OLTP(on-line transaction processing) 和联机分析处理OLAP(on-line analytical processing)。 OLTP是传统关系型数据库的主要应用&#xff0c;用来执行一些基本的、日常的事务处理&#xff0c;比如数据库记录的增、删…...

windows上sqlserver的ldf日志文件和数据mdf文件分别放到不同的磁盘

之前我的windows上已安装好了sqlserver2017&#xff0c;有一个名为TestDb的数据库。ldf文件和mdf文件都一起放在D:\Database目录下。现在需要把ldf日志文件到E盘的database目录下。 重要的事情先说三遍 先停止网关&#xff08;例如nginx&#xff09;并备份数据库 先停止网关&am…...

vue3+uni——watch监听props中的数据(组件参数接收与传递defineProps、defineEmits)

案例说明 A页面引用的子组件B A页面 <template><view>//引用组件<serviceOrder change"change" :list"list" :current"type"></serviceOrder></view> </template><script setup>import serviceOrd…...

mybatis与spring集成与spring aop集成pagehelper插件

Mybatis与Spring的集成 Mybatis是一款轻量级的ORM框架&#xff0c;而Spring是一个全栈式的框架&#xff0c;二者的结合可以让我们更加高效地进行数据持久化操作。 Mybatis与Spring的集成主要有两种方式&#xff1a;使用Spring的Mybatis支持和使用Mybatis的Spring支持。 使用…...

Mybatis基础

...

TypeScript-- 配置Typescript环境(1)ts 转js,tsc --watch 实时编译

文章目录 安装Typescript判断是否有运行权限编写第一Typescript文件手动编译Ts文件转Js文件实时编译 安装Typescript npm install -g typescript 判断是否有运行权限 命令行运行 tsc -v 遇到了权限问题 用管理员打开window自带的powershell 运行如下指令即可&#xff1a; Set-…...

Dockerfile快速搭建自己专属的LAMP环境,生成镜像lamp:v1.1,并推送到私有仓库

环境&#xff1a; CentOS 7 Linux 3.10.0-1160.el7.x86_64 具体要求如下&#xff1a; &#xff08;1&#xff09;基于centos:6基础镜像&#xff1b; &#xff08;2&#xff09;指定作者信息&#xff1b; &#xff08;3&#xff09;安装httpd、mysql、mysql-server、php、ph…...

Lottery抽奖项目学习第二章第一节:环境、配置、规范

Lottery抽奖项目学习第二章第一节&#xff1a;环境、配置、规范 环境、配置、规范 下面以DDD架构和设计模式落地实战的方式&#xff0c;进行讲解和实现分布式抽奖系统的代码开发&#xff0c;那么这里会涉及到很多DDD的设计思路和设计模式应用&#xff0c;以及互联网大厂开发中…...

OpenCV之reshape函数

函数原型&#xff1a; /** brief Changes the shape and/or the number of channels of a 2D matrix without copying the data.The method makes a new matrix header for \*this elements. The new matrix may have a different sizeand/or different number of channels. A…...

【JavaEE】Spring事务-@Transactional参数介绍-事务的隔离级别以及传播机制

【JavaEE】Spring事务&#xff08;2&#xff09; 文章目录 【JavaEE】Spring事务&#xff08;2&#xff09;1. Transactional 参数介绍1.1 value 和 transactionManager1.2 timeout1.3 readOnly1.4 后面四个1.5 isolation 与 propagation 2. Spring 事务隔离级别 - isolation2.…...

微信小程序canvas type=2d生成海报保存到相册、文字换行溢出显示...、文字删除线、分享面板

一、简介 做个简单的生成二维码海报分享&#xff0c;我做的时候也找简单的方法看能不能实现页面直接截图那种生成图片&#xff0c;原生小程序不支持&#xff0c;不多介绍下面有全部代码有注释、参数自行替换运行看看&#xff0c;还有需要优化的地方&#xff0c;有问题可以咨询…...

C++卷积神经网络

C卷积神经网络 #include"TP_NNW.h" #include<iostream> #pragma warning(disable:4996) using namespace std; using namespace mnist;float* SGD(Weight* W1, Weight& W5, Weight& Wo, float** X) {Vector2 ve(28, 28);float* temp new float[10];V…...

go 读取yaml映射到struct

安装 go get gopkg.in/yaml.v3创建yaml Mysql:Host: 192.168.214.134Port: 3306UserName: wwPassword: wwDatabase: go_dbCharset: utf8mb4ParseTime: trueLoc: LocalListValue:- haha- test- vv JWTSecret: nidaye定义结构体 type Mysql struct {Host string yaml:&…...

Redis 10 大数据类型

1. which 10 1. redis字符串 2. redis 列表 3. redis哈希表 4. redis集合 5. redis有序集合 6. redis地理空间 7. redis基数统计 8. redis位图 9. redis位域 10. redis流 2. 获取redis常见操作指令 官网英文&#xff1a;https://redis.io/commands 官网中文&#xff1a;https:/…...

优化生产流程:数字化工厂中的OPC UA分布式IO模块应用

背景 近年来&#xff0c;为了提升在全球范围内的竞争力&#xff0c;制造企业希望自己工厂的机器之间协同性更强&#xff0c;自动化设备采集到的数据能够发挥更大的价值&#xff0c;越来越多的传统型工业制造企业开始加入数字化工厂建设的行列&#xff0c;实现智能制造。 数字化…...

Elasticsearch(十四)搜索---搜索匹配功能⑤--全文搜索

一、前言 不同于之前的term。terms等结构化查询&#xff0c;全文搜索首先对查询词进行分析&#xff0c;然后根据查询词的分词结果构建查询。这里所说的全文指的是文本类型数据&#xff08;text类型&#xff09;,默认的数据形式是人类的自然语言&#xff0c;如对话内容、图书名…...

已解决Gradle错误:“Unable to load class ‘org.gradle.api.plugins.MavenPlugin‘”

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…...

windows中安装sqlite

1. 下载文件 官网下载地址&#xff1a;https://www.sqlite.org/download.html 下载sqlite-dll-win64-x64-3430000.zip和sqlite-tools-win32-x86-3430000.zip文件&#xff08;32位系统下载sqlite-dll-win32-x86-3430000.zip&#xff09;。 2. 安装过程 解压文件 解压上一步…...

前端面试:【系统设计与架构】前端架构模式的演进

前端架构模式在现代Web开发中扮演着关键角色&#xff0c;它们帮助我们组织和管理前端应用的复杂性。本文将介绍一些常见的前端架构模式&#xff0c;包括MVC、MVVM、Flux和Redux&#xff0c;以及它们的演进和应用。 1. MVC&#xff08;Model-View-Controller&#xff09;&#x…...

ARM Cortex-M嵌入式通用头文件sarmfsw深度解析

1. sarmfsw项目概述sarmfsw&#xff08;ARM-based Common Headers&#xff09;是一个面向ARM Cortex-M系列微控制器的轻量级、跨平台通用头文件集合。它并非传统意义上的功能库&#xff0c;而是一套经过工程验证的类型定义&#xff08;typedefs&#xff09;、宏&#xff08;mac…...

foobar2000界面美化终极指南:3步打造你的专属音乐播放器

foobar2000界面美化终极指南&#xff1a;3步打造你的专属音乐播放器 【免费下载链接】foobox-cn DUI 配置 for foobar2000 项目地址: https://gitcode.com/GitHub_Trending/fo/foobox-cn 还在为foobar2000那套单调乏味的默认界面感到困扰吗&#xff1f;今天我要为你介绍…...

Zabbix 6.0部署避坑指南:为什么你的Ubuntu安装总卡在数据库初始化这一步?

Zabbix 6.0部署避坑指南&#xff1a;为什么你的Ubuntu安装总卡在数据库初始化这一步&#xff1f; 如果你正在Ubuntu上部署Zabbix 6.0&#xff0c;却反复在数据库初始化这一步失败&#xff0c;这篇文章就是为你准备的。不同于常规的安装教程&#xff0c;我们将聚焦于那些看似简…...

6大维度深度测评:如何挑选最可靠的开源付费墙绕过工具?

6大维度深度测评&#xff1a;如何挑选最可靠的开源付费墙绕过工具&#xff1f; 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在数字阅读时代&#xff0c;优质内容的付费壁垒逐渐形成…...

如何通过InstantClick事件回调实现精准的性能监控:开发者必备指南

如何通过InstantClick事件回调实现精准的性能监控&#xff1a;开发者必备指南 【免费下载链接】instantclick InstantClick makes following links in your website instant. 项目地址: https://gitcode.com/gh_mirrors/in/instantclick InstantClick是一款能让网站链接…...

Zemax光学设计(三)——从艾里斑到系统分辨率:衍射极限的实战解析

1. 艾里斑&#xff1a;光学的终极像素 当你用手机拍夜景时&#xff0c;为什么远处的路灯总变成模糊的光团&#xff1f;这背后隐藏着光学系统的基本限制——艾里斑。我在设计微型内窥镜镜头时&#xff0c;曾花了三周时间优化像差&#xff0c;最终却发现图像清晰度卡在一个无法突…...

终极指南:Redaxios参数序列化完全掌握,自定义查询字符串生成逻辑如此简单

终极指南&#xff1a;Redaxios参数序列化完全掌握&#xff0c;自定义查询字符串生成逻辑如此简单 【免费下载链接】redaxios The Axios API, as an 800 byte Fetch wrapper. 项目地址: https://gitcode.com/gh_mirrors/re/redaxios Redaxios是一个轻量级的Fetch封装库&a…...

整理‌ 主流国产AI龙虾的核心能力对比表(支持平台/部署方式/适用场景)腾讯WorkBuddy‌ ‌阿里JVS Claw 百度DuMate

根据当前的资料&#xff0c;腾讯WorkBuddy和百度的DuMate当前有一定一定量的免费额度&#xff0c;大家可以用起来&#xff01; 主流国产AI龙虾的核心能力对比表 五款主流国产AI龙虾的核心能力对比表已整理完成&#xff0c;涵盖支持平台、部署方式与适用场景三大维度&#xff…...

影墨·今颜模型API接口开发与调用全指南

影墨今颜模型API接口开发与调用全指南 你是不是已经成功部署了影墨今颜模型&#xff0c;看着它能在本地生成惊艳的图片&#xff0c;心里正盘算着怎么把它变成一个能对外服务的“产品”&#xff1f;比如&#xff0c;让公司的设计团队直接调用&#xff0c;或者集成到自己的应用里…...

运算放大器入门难?这篇超详细运算放大器原理与应用指南帮你轻松上手!

1. 运算放大器到底是什么&#xff1f; 第一次接触运算放大器时&#xff0c;我也被这个专业名词吓到了。但后来发现&#xff0c;它其实就是个"超级放大镜"——能把微弱的电信号放大成千上万倍。想象一下医生用的听诊器&#xff0c;它能将微弱的心跳声放大到清晰可闻&a…...