当前位置: 首页 > news >正文

RabbitMQ开启消息发送确认和消费手动确认

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
#    port: 5672
#    host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000

RabbitConfig :

package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("confirmCallback  data: " + correlationData);log.info("confirmCallback ack :" + ack);log.info("confirmCallback cause :" + cause);});rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));return rabbitTemplate;}
}

AckReceiver 手动确认消费者:

package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] messageBody = message.getBody();try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {Map<String, String> msg = (Map<String, String>) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);log.info("header msg :"+message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}

通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate AckReceiver ackReceiver;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/callback/sendDirectMessage")public String sendDirectMessageCallback(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);return "ok";}@GetMapping("/callback/lonelyDirectExchange")public String lonelyDirectExchange(){String msgId = UUID.randomUUID().toString();String msg = "demo msg ,kexuexiong";String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");Map<String,Object> map = new HashMap();map.put("msgId",msgId);map.put("msg",msg);map.put("createTime",createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);return "ok";}
}

测试:

发送dirct消息 找不到交换机情况
在这里插入图片描述

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)

ack 为false。

发送dirct消息 找不到队列
在这里插入图片描述

2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]

ACK为true,replyText=NO_ROUTE。

相关文章:

RabbitMQ开启消息发送确认和消费手动确认

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认&#xff08;ACK&#xff09;和消费者通过手动确认或者丢弃消费的消息。 通过配置 publisher-confirm-type: correlated 和publisher-returns: true开启生产者确认消息。 server:port: 8014spring:rabbitmq:username: …...

嵌入式系统开发【深入浅出】 GPIO 类设备的驱动程序

目录 GPIO管脚的输出功能相当于控制、输入相当于检测 使用GPIO基本流程 对于某一个管脚来说最多有几种功能&#xff1f; 拓展 【定时器与系统定时器】 决定定时长短的因素: 普通定时器 系统定时器 STM32F103RBT6的时钟源有哪五种 sysclk 的时钟频率由哪个时钟源提供基…...

项目管理必备的22个公式

大家好&#xff0c;我是老原。 趁着国庆时间比较空闲&#xff0c;给你们整理了一些项目管理必备的计算公式&#xff0c;一共22个。 每一个公式都给你们标注了适用情况和使用方法&#xff0c;为了方便你们理解&#xff0c;也加了一些例子&#xff0c;保准你看了就会。 觉得不…...

ccache加速编译速度

ccache https://gitee.com/lixiaoxmm/ccache.git 依赖hiredis、zstd(zstd的cmakelists.txt在build/cmake目录下) 下载mingw,https://www.mingw-w64.org/downloads/#w64devkit hiredis、zstd使用mingw编译 cmake -G “MinGW Makefiles” cmakecache.txt手动修改去掉网络下载,…...

Apache POI使用

1.导入坐标 <!-- poi --><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>${poi}</version></dependency><dependency><groupId>org.apache.poi</groupId><a…...

UNIQUE VISION Programming Contest 2023 Autumn(AtCoder Beginner Contest 323)

A - Weak Beats 链接 : A - Weak Beats 思路 : 模拟即可,如果在偶数位上出现了非0得元素&#xff0c;直接输出"No"后返回即可&#xff0c;循环顺利结束的话&#xff0c;就直接输出"Yes"; 代码 : #include<bits/stdc.h> #define IOS ios::sy…...

Docker 网络管理

Docker 网络实现原理 Docker使用Linux桥接&#xff0c;在宿主机虚拟一个Docker容器网桥(docker0)&#xff0c;Docker启动一个容器时会根据Docker网桥的网段分配给容器一个IP地址&#xff0c;称为Container-IP&#xff0c;同时Docker网桥是每个容器的默认网关。因为在同一宿主机…...

网络安全国家队-安防思考与实践

按照工信部“三同步”安全建设的统一要求&#xff0c;本项目的实施应具备符合等级保护要求的安全防护措施&#xff08;主要为传输控制、防火墙隔离、入侵检测、安全审计等网络安全措施&#xff1b;操作系统安全、数据库安全、防病毒管理、安全审计等基础系统安全措施&#xff0…...

epoll 定时器

参考&#xff1a; Linux下使用epoll监听定时器-CSDN博客 但是这个用的是gettimeofday。 本人使用的是 #include <stdlib.h> #include<stdio.h> #include <sys/timerfd.h> #include <sys/epoll.h> #include <unistd.h> #include <sys/time.…...

BUUCTF Java逆向解密 1

Class文件是Java编译后的二进制字节码文件。 我这里使用的是jadx-gui&#xff0c;直接将class文件拖进去即可 package defpackage;import java.util.ArrayList; import java.util.Scanner;/* renamed from: Reverse reason: default package */ /* loaded from: Reverse.clas…...

BUUCTF [MRCTF2020]Ez_bypass1

这道题全程我都是用bp做的 拿到题目 我们查看页面源代码得到 代码审计 我们要用get传入id和gg两个参数&#xff0c;id和gg的值要求不能相等&#xff0c;但是id和gg的md5强比较必须相等 if(isset($_GET[gg])&&isset($_GET[id])) {$id$_GET[id];$gg$_GET[gg];if (md5($…...

深入理解强化学习——强化学习和有监督学习

分类目录&#xff1a;《深入理解强化学习》总目录 通过前文的介绍&#xff0c;我们现在应该已经对强化学习的基本数学概念有了一定的了解。这里我们回过头来再看看一般的有监督学习和强化学习的区别。以图片分类为例&#xff0c;有监督学习&#xff08;Supervised Learning&…...

设计模式 - 结构型模式考点篇:装饰者模式(概念 | 案例实现 | 优缺点 | 使用场景)

目录 一、结构型模式 1.1、装饰者模式 1.1.1、概念 1.1.2、案例实现 1.1.3、优缺点 1.1.4、使用场景 一、结构型模式 1.1、装饰者模式 1.1.1、概念 装饰者模式就是指在不改变现有对象结构的情况下&#xff0c;动态的给该对象增加一些职责&#xff08;增加额外功能&#…...

计算机竞赛 题目:基于深度学习的手势识别实现

文章目录 1 前言2 项目背景3 任务描述4 环境搭配5 项目实现5.1 准备数据5.2 构建网络5.3 开始训练5.4 模型评估 6 识别效果7 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的手势识别实现 该项目较为新颖&#xff0c;适合作为竞赛课题…...

手撕各种排序

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大一&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;掌握每种排序的方法&#xff0c;理解每种排序利弊…...

视频号的链接在哪,视频号视频链接地址获取办法!

不少人问视频号的链接在哪里可以获取&#xff0c;本质的在腾讯微信中目前视频号的链接是无法获取的&#xff0c;但好事多磨今天就分享一个第三方的视频号视频链接地址获取办法&#xff0c;希望对你有所帮助&#xff01; 1&#xff1a;在微信客户端中&#xff0c;我们可以通过搜…...

深度学习笔记之优化算法(六)RMSprop算法的简单认识

深度学习笔记之优化算法——RMSProp算法的简单认识 引言回顾&#xff1a;AdaGrad算法AdaGrad算法与动量法的优化方式区别AdaGrad算法的缺陷 RMProp算法关于AdaGrad问题的优化方式RMSProp的算法过程描述 RMSProp示例代码 引言 上一节对 AdaGrad \text{AdaGrad} AdaGrad算法进行…...

10架构管理之公司整体技术架构

一句话导读 公司的整体技术架构一般是公司的架构组、架构管理部、技术委员会等部门负责&#xff0c;需要对公司整体的技术架构进行把控和管理&#xff0c;确保信息系统的稳定性和可靠性&#xff0c;避免因技术架构不合理而导致的系统崩溃和数据丢失等问题&#xff0c;为公司的业…...

联邦学习综述

《Advances and Open Problems in Federated Learning》 选题&#xff1a;Published 10 December 2019-Computer Science-Found. Trends Mach. Learn. 联邦学习定义 联邦学习是一种机器学习设置&#xff0c;其中多个客户端在中央服务器或服务提供商的协调下协作解决机器学习…...

几行cmd命令,轻松将java文件打包成jar文件

1. 在任意目录下建立一个.java文件 2. 在当前目录下使用cmd命令&#xff1a; javac filename编译 如果报错则使用此命令javac -encoding UTF-8 filename 3.此时已成功生成.class文件 4. 可以手动添加MANIFEST.MF文件 Manifest-Version: 1.0 Main-Class: fileName 5.直接一…...

PlotJuggler颜色映射终极指南:如何创建惊艳的数据可视化效果

PlotJuggler颜色映射终极指南&#xff1a;如何创建惊艳的数据可视化效果 【免费下载链接】PlotJuggler The Time Series Visualization Tool that you deserve. 项目地址: https://gitcode.com/gh_mirrors/pl/PlotJuggler PlotJuggler是一款功能强大的时间序列数据可视化…...

当AI走进柴米油盐:我们的生活正在发生怎样的改变?

当清晨的AI闹钟根据你的睡眠周期轻声唤醒&#xff0c;通勤导航提前规避了突发拥堵的路段&#xff0c;办公软件里的AI一键生成了会议纪要与数据报表&#xff0c;回家路上智能家电已提前调好室温与灯光&#xff0c;睡前AI陪练帮孩子巩固了当天的知识点&#xff0c;也为独居的父母…...

Qwen3-ForcedAligner-0.6B在字幕制作中的落地应用:SRT自动导出全流程

Qwen3-ForcedAligner-0.6B在字幕制作中的落地应用&#xff1a;SRT自动导出全流程 1. 引言&#xff1a;告别手动打轴&#xff0c;让字幕制作快10倍 如果你做过视频字幕&#xff0c;一定体会过手动打轴的痛苦。一集45分钟的视频&#xff0c;台词稿早就准备好了&#xff0c;但你…...

只要一行代码,瞬间搭建 Web 服务器 python -m http.server 8000

只要一行代码,瞬间搭建 Web 服务器 python -m http.server 8000 目录 只要一行代码,瞬间搭建 Web 服务器 python -m http.server 8000 1. 核心机制:内置的 `http.server` 模块 2. 为什么它能“求生”,但不能“生产”? 🚀 并发处理能力 (Concurrency) 🛡️ 安全性 (Se…...

效率翻倍,一键生成企业级vue3+ts+pinia项目脚手架,告别重复环境配置

最近在搭建一个企业级中后台管理系统时&#xff0c;发现从零开始配置Vue3项目环境特别耗时。传统方式需要手动安装各种依赖、配置代码规范、设计目录结构&#xff0c;经常因为版本兼容问题卡住半天。后来尝试用InsCode(快马)平台生成项目脚手架&#xff0c;效率直接翻倍&#x…...

Cadence Allegro 17.4进阶技巧:PCB Editor中高效调整丝印的三大步骤

1. 丝印调整的核心价值与准备工作 在PCB设计流程中&#xff0c;丝印调整往往被新手工程师视为"收尾环节"&#xff0c;但实际它直接影响着后续生产的可制造性和产品维护的便利性。Cadence Allegro 17.4的PCB Editor模块提供了完整的丝印处理工具链&#xff0c;我经手…...

静息态fMRI分析避坑指南:DPARSFA预处理中那些容易踩的‘雷’(附解决方案)

静息态fMRI分析实战避坑手册&#xff1a;DPARSFA预处理中的7个致命陷阱与修复方案 当你熬夜跑完DPARSFA预处理流程&#xff0c;满心期待地点开结果图时——突然发现ReHo图像像被泼了墨水&#xff0c;fALFF数值全部溢出&#xff0c;或是软件弹出一串看不懂的报错代码。这种崩溃…...

基于历史数据的加密货币交易系统策略验证实践指南

基于历史数据的加密货币交易系统策略验证实践指南 【免费下载链接】node-binance-trader &#x1f4b0; Cryptocurrency Trading Strategy & Portfolio Management Development Framework for Binance. &#x1f916; 项目地址: https://gitcode.com/gh_mirrors/no/node-…...

重新定义AI助手体验:突破Cursor Pro限制的5个技术方案

重新定义AI助手体验&#xff1a;突破Cursor Pro限制的5个技术方案 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached your tri…...

付费内容访问难题如何破解?开源工具的创新解决方案

付费内容访问难题如何破解&#xff1f;开源工具的创新解决方案 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在数字内容付费阅读日益普及的今天&#xff0c;如何合法合规地获取所需…...