RabbitMQ开启消息发送确认和消费手动确认
开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlated 和publisher-returns: true开启生产者确认消息。
server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
# port: 5672
# host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000
RabbitConfig :
package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("confirmCallback data: " + correlationData);log.info("confirmCallback ack :" + ack);log.info("confirmCallback cause :" + cause);});rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));return rabbitTemplate;}
}
AckReceiver 手动确认消费者:
package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] messageBody = message.getBody();try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {Map<String, String> msg = (Map<String, String>) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);log.info("header msg :"+message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}
通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。
package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate AckReceiver ackReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/callback/sendDirectMessage")public String sendDirectMessageCallback(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);return "ok";}@GetMapping("/callback/lonelyDirectExchange")public String lonelyDirectExchange(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);return "ok";}
}
测试:
发送dirct消息 找不到交换机情况

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-10T17:04:58.492+08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
ack 为false。
发送dirct消息 找不到队列

2023-10-10T17:05:55.851+08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-10T17:05:55.852+08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]
ACK为true,replyText=NO_ROUTE。
相关文章:
RabbitMQ开启消息发送确认和消费手动确认
开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。 通过配置 publisher-confirm-type: correlated 和publisher-returns: true开启生产者确认消息。 server:port: 8014spring:rabbitmq:username: …...
嵌入式系统开发【深入浅出】 GPIO 类设备的驱动程序
目录 GPIO管脚的输出功能相当于控制、输入相当于检测 使用GPIO基本流程 对于某一个管脚来说最多有几种功能? 拓展 【定时器与系统定时器】 决定定时长短的因素: 普通定时器 系统定时器 STM32F103RBT6的时钟源有哪五种 sysclk 的时钟频率由哪个时钟源提供基…...
项目管理必备的22个公式
大家好,我是老原。 趁着国庆时间比较空闲,给你们整理了一些项目管理必备的计算公式,一共22个。 每一个公式都给你们标注了适用情况和使用方法,为了方便你们理解,也加了一些例子,保准你看了就会。 觉得不…...
ccache加速编译速度
ccache https://gitee.com/lixiaoxmm/ccache.git 依赖hiredis、zstd(zstd的cmakelists.txt在build/cmake目录下) 下载mingw,https://www.mingw-w64.org/downloads/#w64devkit hiredis、zstd使用mingw编译 cmake -G “MinGW Makefiles” cmakecache.txt手动修改去掉网络下载,…...
Apache POI使用
1.导入坐标 <!-- poi --><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>${poi}</version></dependency><dependency><groupId>org.apache.poi</groupId><a…...
UNIQUE VISION Programming Contest 2023 Autumn(AtCoder Beginner Contest 323)
A - Weak Beats 链接 : A - Weak Beats 思路 : 模拟即可,如果在偶数位上出现了非0得元素,直接输出"No"后返回即可,循环顺利结束的话,就直接输出"Yes"; 代码 : #include<bits/stdc.h> #define IOS ios::sy…...
Docker 网络管理
Docker 网络实现原理 Docker使用Linux桥接,在宿主机虚拟一个Docker容器网桥(docker0),Docker启动一个容器时会根据Docker网桥的网段分配给容器一个IP地址,称为Container-IP,同时Docker网桥是每个容器的默认网关。因为在同一宿主机…...
网络安全国家队-安防思考与实践
按照工信部“三同步”安全建设的统一要求,本项目的实施应具备符合等级保护要求的安全防护措施(主要为传输控制、防火墙隔离、入侵检测、安全审计等网络安全措施;操作系统安全、数据库安全、防病毒管理、安全审计等基础系统安全措施࿰…...
epoll 定时器
参考: Linux下使用epoll监听定时器-CSDN博客 但是这个用的是gettimeofday。 本人使用的是 #include <stdlib.h> #include<stdio.h> #include <sys/timerfd.h> #include <sys/epoll.h> #include <unistd.h> #include <sys/time.…...
BUUCTF Java逆向解密 1
Class文件是Java编译后的二进制字节码文件。 我这里使用的是jadx-gui,直接将class文件拖进去即可 package defpackage;import java.util.ArrayList; import java.util.Scanner;/* renamed from: Reverse reason: default package */ /* loaded from: Reverse.clas…...
BUUCTF [MRCTF2020]Ez_bypass1
这道题全程我都是用bp做的 拿到题目 我们查看页面源代码得到 代码审计 我们要用get传入id和gg两个参数,id和gg的值要求不能相等,但是id和gg的md5强比较必须相等 if(isset($_GET[gg])&&isset($_GET[id])) {$id$_GET[id];$gg$_GET[gg];if (md5($…...
深入理解强化学习——强化学习和有监督学习
分类目录:《深入理解强化学习》总目录 通过前文的介绍,我们现在应该已经对强化学习的基本数学概念有了一定的了解。这里我们回过头来再看看一般的有监督学习和强化学习的区别。以图片分类为例,有监督学习(Supervised Learning&…...
设计模式 - 结构型模式考点篇:装饰者模式(概念 | 案例实现 | 优缺点 | 使用场景)
目录 一、结构型模式 1.1、装饰者模式 1.1.1、概念 1.1.2、案例实现 1.1.3、优缺点 1.1.4、使用场景 一、结构型模式 1.1、装饰者模式 1.1.1、概念 装饰者模式就是指在不改变现有对象结构的情况下,动态的给该对象增加一些职责(增加额外功能&#…...
计算机竞赛 题目:基于深度学习的手势识别实现
文章目录 1 前言2 项目背景3 任务描述4 环境搭配5 项目实现5.1 准备数据5.2 构建网络5.3 开始训练5.4 模型评估 6 识别效果7 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 基于深度学习的手势识别实现 该项目较为新颖,适合作为竞赛课题…...
手撕各种排序
> 作者简介:დ旧言~,目前大一,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:掌握每种排序的方法,理解每种排序利弊…...
视频号的链接在哪,视频号视频链接地址获取办法!
不少人问视频号的链接在哪里可以获取,本质的在腾讯微信中目前视频号的链接是无法获取的,但好事多磨今天就分享一个第三方的视频号视频链接地址获取办法,希望对你有所帮助! 1:在微信客户端中,我们可以通过搜…...
深度学习笔记之优化算法(六)RMSprop算法的简单认识
深度学习笔记之优化算法——RMSProp算法的简单认识 引言回顾:AdaGrad算法AdaGrad算法与动量法的优化方式区别AdaGrad算法的缺陷 RMProp算法关于AdaGrad问题的优化方式RMSProp的算法过程描述 RMSProp示例代码 引言 上一节对 AdaGrad \text{AdaGrad} AdaGrad算法进行…...
10架构管理之公司整体技术架构
一句话导读 公司的整体技术架构一般是公司的架构组、架构管理部、技术委员会等部门负责,需要对公司整体的技术架构进行把控和管理,确保信息系统的稳定性和可靠性,避免因技术架构不合理而导致的系统崩溃和数据丢失等问题,为公司的业…...
联邦学习综述
《Advances and Open Problems in Federated Learning》 选题:Published 10 December 2019-Computer Science-Found. Trends Mach. Learn. 联邦学习定义 联邦学习是一种机器学习设置,其中多个客户端在中央服务器或服务提供商的协调下协作解决机器学习…...
几行cmd命令,轻松将java文件打包成jar文件
1. 在任意目录下建立一个.java文件 2. 在当前目录下使用cmd命令: javac filename编译 如果报错则使用此命令javac -encoding UTF-8 filename 3.此时已成功生成.class文件 4. 可以手动添加MANIFEST.MF文件 Manifest-Version: 1.0 Main-Class: fileName 5.直接一…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
