当前位置: 首页 > news >正文

Rabbit SpringBoot高级用法

Rabbit高级用法

  • 一、Rabbit Springboot集成
    • 1.1 引入依赖
    • 1.2 添加配置
    • 1.3 添加Config
    • 1.4 编写Consumer
    • 1.5 发送消息
  • 二、Rabbit 高级用法
    • 2.1 消息发送前置处理器
    • 2.2 消息发送确认机制
    • 2.3 消息接收后处理器
    • 2.4 事务消息

一、Rabbit Springboot集成

1.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 添加配置

server:port: 8080
spring:application:name: rabbitmq-testrabbitmq:host: 127.0.0.1port: 5672username: testpassword: testvirtual-host: /publisher-returns: true

1.3 添加Config

@Configuration
@EnableConfigurationProperties(MqProperties.class)
public class MqConfig {@Autowiredprivate MqProperties mqProperties;@Beanpublic MessageConverter messageConverter() {// 设置消息转换器return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(mqProperties.getHost());connectionFactory.setPort(mqProperties.getPort());connectionFactory.setUsername(mqProperties.getUsername());connectionFactory.setPassword(mqProperties.getPassword());connectionFactory.setVirtualHost(mqProperties.getVirtualHost());connectionFactory.setPublisherReturns(mqProperties.getPublisherReturns());connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Beanpublic Queue queue() {return new Queue("test-queue", true, false, false);}@Beanpublic Exchange exchange() {return new DirectExchange("direct-exchange-test", true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with("direct-key").noargs();}
}

1.4 编写Consumer

@Component
public class DirectConsumer extends MessageListenerAdapter {private static final Logger logger = LoggerFactory.getLogger(DirectConsumer.class);@Autowiredprivate MessageConverter messageConverter;@Override@RabbitListener(queues = {"test-queue"}, ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {try {Map<String, String> msg = (Map<String, String>) messageConverter.fromMessage(message);// 获取 correlation idString id = (String) message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);// String msg = new String(message.getBody(), StandardCharsets.UTF_8);logger.info("directConsumer>>>>>>message={}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {logger.error("directConsumer>>>>>>exception", e);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

1.5 发送消息

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{data}")public void test(@PathVariable(value = "data", required = false) String data) {Map<String, String> msg = new HashMap<>();msg.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));if (StringUtils.isEmpty(data)) {data = String.valueOf(System.currentTimeMillis());}msg.put("data", "this is data:" + data);rabbitTemplate.convertAndSend("direct-exchange-test", "direct-key", msg, new CorrelationData(UUID.randomUUID().toString()));}
}

二、Rabbit 高级用法

2.1 消息发送前置处理器

RabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {....})

@Bean
public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();// 添加消息前置处理器。rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq before postProcess message={}", JSON.toJSONString(message));return message;}});rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;
}

通过前置处理器,可以修改消息、保存消息,设置通用header等。

2.2 消息发送确认机制

设置消息发送ConfirmCallback,消息发送成功 / 失败都会调用当前方法。


@Bean
public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {@Overridepublic CorrelationData postProcess(Message message, CorrelationData correlationData) {logger.info("-----------------rabbitmq correlationData postProcess message={}, correlationData={}",JSON.toJSONString(message), JSON.toJSONString(correlationData));return correlationData;}});rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());// 消息确认,需要配置 spring.rabbitmq.publisher-confirm-type = correlatedrabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("setConfirmCallback>>>>>>correlationData={} ack={}, cause={}", JSON.toJSONString(correlationData), ack, cause);}});
//开启mandatory模式(开启失败回调)rabbitTemplate.setMandatory(true);//添加失败回调方法rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {logger.info("setReturnCallback>>>>>消息发送队列不可达, message:{}, exchange:{}, routingKey:{}, 原因:{}", message, exchange, routingKey, replyText);});return rabbitTemplate;
}

通过 new RabbitTemplate.ConfirmCallback() 中的 confirm(CorrelationData correlationData, boolean ack, String cause) 判断消息是否发送成功。

  • ack=true:发送成功
  • ack=false:发送失败

2.3 消息接收后处理器

消息接收前执行。
方式1:

/**
* 添加SimpleRabbitListenerContainerFactory 
* 通过 @RabbitListener(queues = {"test-queue"}, containerFactory = "containerFactory", ackMode = "MANUAL") 设置
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 并发消费者数,默认为 1factory.setConcurrentConsumers(5);// 最大并发消费者数,默认为 1factory.setMaxConcurrentConsumers(10);// 拒绝未确认的消息并重新将它们放回队列,默认为 truefactory.setDefaultRequeueRejected(false);// 容器启动时是否自动启动,默认为 truefactory.setAutoStartup(true);// 消息确认模式,默认为 AUTOfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 每个消费者在一次请求中预获取的消息数,默认为 1factory.setPrefetchCount(5);// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制factory.setReceiveTimeout(0L);// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务// factory.setTransactionManager(platformTransactionManager());// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息factory.setMessageConverter(messageConverter);// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutorfactory.setTaskExecutor(new SimpleAsyncTaskExecutor());// 重试失败的消息之前等待的时间,默认为 5000 毫秒factory.setRecoveryInterval(5000L);// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 truefactory.setMissingQueuesFatal(false);// 监听器容器连接工厂factory.setConnectionFactory(connectionFactory());// 设置后置处理器factory.setAfterReceivePostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));return message;}});return factory;
}

方式2:

@Bean
public SimpleMessageListenerContainer listenerContainer(@Qualifier("directConsumer") DirectConsumer directConsumer) {String queueName = "test-queue";SimpleMessageListenerContainer simpleMessageListenerContainer =new SimpleMessageListenerContainer(connectionFactory());simpleMessageListenerContainer.setQueueNames(queueName);simpleMessageListenerContainer.setMessageListener(directConsumer);return simpleMessageListenerContainer;
}@Autowired(required = false)
private List<AbstractMessageListenerContainer> simpleMessageListenerContainers;@PostConstruct
public void init() {if (CollectionUtils.isEmpty(simpleMessageListenerContainers)) {return;}for (AbstractMessageListenerContainer simpleMessageListenerContainer : simpleMessageListenerContainers) {simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));return message;}});// 设置手动 ACKsimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);}
}

2.4 事务消息

不建议使用 rabbitmq 事务消息,对性能非常影响。建议通过消息发送确认机制实现事务。

相关文章:

Rabbit SpringBoot高级用法

Rabbit高级用法 一、Rabbit Springboot集成1.1 引入依赖1.2 添加配置1.3 添加Config1.4 编写Consumer1.5 发送消息 二、Rabbit 高级用法2.1 消息发送前置处理器2.2 消息发送确认机制2.3 消息接收后处理器2.4 事务消息 一、Rabbit Springboot集成 1.1 引入依赖 <dependency…...

找不到vcruntime140.dll,无法继续执行代码?多种解决方法解析

找不到vcruntime140.dll,无法继续执行代码&#xff1f;当你在尝试运行某个程序时&#xff0c;突然弹出一条错误提示框&#xff0c;告诉你无法继续执行代码&#xff0c;因为找不到vcruntime140.dll。这个问题很常见&#xff0c;但是它可能会让你感到困惑和疑惑。这篇文章将详细介…...

自然语言处理实战项目8- BERT模型的搭建,训练BERT实现实体抽取识别的任务

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理实战项目8- BERT模型的搭建&#xff0c;训练BERT实现实体抽取识别的任务。BERT模型是一种用于自然语言处理的深度学习模型&#xff0c;它可以通过训练来理解单词之间的上下文关系&#xff0c;从而为下游…...

pdf怎么合并在一起?软件操作更高效

PDF格式已经成为了许多文档和表格的首选格式。然而&#xff0c;当你需要合并多个PDF文件时&#xff0c;可能会遇到一些麻烦&#xff0c;在本篇文章中&#xff0c;我们将向您介绍一种简单易用的方法来合并PDF文件。 以下是可以用来合并PDF文件的软件&#xff1a; - PDF转换器&a…...

Junit常见用法

一.Junit的含义 Junit是一种Java编程语言的单元测试框架。它提供了一些用于编写和运行测试的注释和断言方法&#xff0c;并且可以方便地执行测试并生成测试报告。Junit是开源的&#xff0c;也是广泛使用的单元测试框架之一 二.Junit项目的创建 &#xff08;1&#xff09;先创…...

c++—内存管理、智能指针、内存池

1. 内存分析诊断工具&#xff1a;valgrind&#xff1b; 2. 内存管理的两种方式&#xff1a; ①用户管理&#xff1a;自己申请的&#xff0c;自己用&#xff0c;自己回收&#xff1b;效率高&#xff0c;但容易导致内存泄漏&#xff1b; ②系统管理&#xff1a;系统自动回收垃圾…...

JAVA使用HTTP代码示例

以下是使用Java发送HTTP请求的示例代码&#xff1a; java import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; public class HttpExample { public static void main(String[] args) { try { …...

【网络协议详解】——电子邮件系统协议(学习笔记)

目录 &#x1f552; 1. 电子邮件系统概述&#x1f552; 2. 简单邮件传送协议SMTP&#x1f552; 3. SMTP协议的命令和响应&#x1f558; 3.1 命令&#x1f564; 3.1.1 HELO&#x1f564; 3.1.2 MAIL FROM&#x1f564; 3.1.3 RCPT TO&#x1f564; 3.1.4 DATA&#x1f564; 3.1.…...

年度发布 | MeterSphere一站式开源持续测试平台发布v2.10 LTS版本

2023年5月25日&#xff0c;MeterSphere一站式开源持续测试平台正式发布v2.10 LTS版本。这是继2022年5月发布v1.20 LTS版本后&#xff0c;MeterSphere开源项目发布的第三个LTS&#xff08;Long Term Support&#xff09;版本。MeterSphere开源项目组将对MeterSphere v2.10 LTS版…...

从 OceanBase 迁移数据到 DolphinDB

OceanBase 是一款金融级分布式关系数据库&#xff0c;具有数据强一致、高可用、高性能、在线扩展、高度兼容 SQL标准和主流关系数据库、低成本等特点&#xff0c;但是其学习成本较高&#xff0c;且缺乏金融计算函数以及流式增量计算的功能。 DolphinDB 是一款国产的高性能分布…...

淘宝商品列表数据接口(支持价格、销量排序)

淘宝商品列表数据接口是淘宝提供的一种可以获取淘宝商品信息的接口。通过该接口&#xff0c;可以获取到具有一定规则的商品信息&#xff0c;例如按照价格排序、按照销量排序等。接口返回的数据格式为JSON格式&#xff0c;可以方便地处理数据。 我们可以通过调用淘宝提供的API&…...

Android 11 版本变更总览

Android 11 版本 Android 11 总览重大隐私权变更行为变更&#xff1a;所有应用行为变更&#xff1a;以 Android 11 为目标平台的应用功能和 API 概览Intent系统广播 intent&#xff08;API 级别 30&#xff09;通用应用 intent&#xff08;API 级别 30&#xff09; Android 11 …...

传染病学模型 | Matlab实现基于SIS传染病模型模拟城市内人口的互相感染及城市人口流动所造成的传染

文章目录 效果一览基本描述模型介绍程序设计参考资料效果一览 基本描述 传染病学模型 | Matlab实现基于SIS传染病模型模拟城市内人口的互相感染及城市人口流动所造成的传染 模型介绍 SIS模型是一种基本的传染病学模型,用于描述一个人群中某种传染病的传播情况。SIS模型假设每个…...

物联网技术如何改变我们的生活:一位资深物联网专家的见解

物联网&#xff08;IoT&#xff09;是指通过网络互联的物理设备、车辆、建筑物以及其他物品&#xff0c;这些物品都内置了传感器、执行器、软件和网络连接器&#xff0c;使它们能够收集和交换数据。物联网技术已经在各个领域产生了深远的影响&#xff0c;包括家庭、医疗、交通、…...

node.js+vue.js大学生在线选课系统的设计与实现93pul

本次设计任务是要设计一个选课系统的设计与实现&#xff0c;通过这个系统能够满足用户对选课信息的需求。系统的主要功能包括&#xff1a;个人中心、学生管理、教师管理、选课信息管理等功能。 管理员可以根据系统给定的账号进行登录&#xff0c;登录后可以进入选课系统的设计与…...

华为OD机试真题 Java 实现【寻找符合要求的最长子串】【2023Q1 200分】

一、题目描述 给定一个字符串 s ,找出这样一个子串: 该子串中的任意一个字符最多出现2次;该子串不包含指定某个字符;请你找出满足该条件的最长子串的长度。 二、输入描述 第一行为要求不包含的指定字符,为单个字符,取值范围[0-9a-zA-Z]。 第二行为字符串s,每个字符范…...

接口测试工具Postman接口测试图文教程

目录 一、前言 二、Postman安装和使用 三、请求方式 四、资金记录接口实例演示 一、前言 在前后端分离开发时&#xff0c;后端工作人员完成系统接口开发后&#xff0c;需要与前端人员对接&#xff0c;测试调试接口&#xff0c;验证接口的正确性可用性。而这要求前端开发进…...

视频编辑软件:迅捷视频工具箱

这是一款功能强大、易于使用的视频编辑工具&#xff0c;支持视频剪辑、视频转换、音频转换、视频压缩、视频水印、字幕贴图等实用功能&#xff0c;可以帮助你制作出高质量的视频作品。&#xff08;传送门&#xff1a;https://www.xunjiepdf.com/xjspgjx&#xff09; 功能简介 …...

网络知识点之-HTTP协议

超文本传输协议&#xff08;Hyper Text Transfer Protocol&#xff0c;HTTP&#xff09;是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上。它指定了客户端可能发送给服务器什么样的消息以及得到什么样的响应。请求和响应消息的头以ASCII形式给出&#xff1b;而消息内…...

K类函数和KL类函数

Class K \mathcal{K} K function- K \mathcal{K} K类函数 Definition: A continuous function α : [ 0 , a ) → [ 0 , ∞ ) \alpha:[0,a)\rightarrow[0,\infin) α:[0,a)→[0,∞) is said belong to class K \mathcal{K} K if it strictly increasing and α ( 0 ) 0 …...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意&#xff1a;运行前…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障

关键领域软件测试的"安全密码"&#xff1a;Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力&#xff0c;从金融交易到交通管控&#xff0c;这些关乎国计民生的关键领域…...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...