当前位置: 首页 > news >正文

RabbitMQ发布确认模式

目录

一、发布确认原理

二、发布确认的策略

(一)开启发布确认的方法

(二)单个确认模式

(三)批量确认模式

(四)异步确认模式

(五)如何处理异步未确认消息

(六)总结


一、发布确认原理

        生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的
消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队
列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传
给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
        confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

二、发布确认的策略

(一)开启发布确认的方法

        Channel channel = connection.createChannel();channel.confirmSelect();

(二)单个确认模式

        这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
        这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。
    public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("", queue_name,null,  message.getBytes());channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}

(三)批量确认模式

        上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地
提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现
问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种
方案仍然是同步的,也一样阻塞消息的发布
    public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {if(i % 100 == 0) channel.waitForConfirms();String message = "消息" + i;channel.basicPublish("", queue_name,null,  message.getBytes());}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}

(四)异步确认模式

        异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
下面就让我们来详细讲解异步确认是怎么实现的。

    public static void publishMessageAsync() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();// 异步监听broker传递过来的消息确认回调通知// 确认回调消息ConfirmCallback ackCallback = (deliveryTag, multiple) -> {if(multiple) {// 找出该序号前面所有的消息进行清空ConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(deliveryTag, true);//清除该部分未确认消息confirmed.clear();} else  {// 不是批量的话就清除单条消息outstandingConfirms.remove(deliveryTag);}System.out.println(deliveryTag + "消息发送成功");};// 失败回调消息ConfirmCallback nackCallback = (deliveryTag, multiple) -> {// 获取序列号并输出String message = outstandingConfirms.get(deliveryTag);System.out.println("发布的消息"+message+"未被确认,序列号"+deliveryTag);};// 设置确认监听器,两个,一个监听成功的,一个监听失败的channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("", queue_name,null,  message.getBytes());// 获取序列号和消息内容,并且把消息放入队列outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}

(五)如何处理异步未确认消息

        最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传
递。

(六)总结

单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
    public static void main(String[] args) throws Exception {
//        publishMessageIndividually();  // 同步确认发送消息, 耗时为:769ms
//        publishMessageBatch();  // 批量确认发送消息,耗时为:84mspublishMessageAsync();  // 异步确认发送消息, 耗时为:43ms}

相关文章:

RabbitMQ发布确认模式

目录 一、发布确认原理 二、发布确认的策略 &#xff08;一&#xff09;开启发布确认的方法 &#xff08;二&#xff09;单个确认模式 &#xff08;三&#xff09;批量确认模式 &#xff08;四&#xff09;异步确认模式 &#xff08;五&#xff09;如何处理异步未确认消…...

零基础的人如何入门 Python ?看完这篇文章你就懂了

第一部分&#xff1a;编程环境准备 零基础入门Python的话我不建议用IDE&#xff0c;IDE叫集成开发环境&#xff0c;这东西一般是专业程序员用来实战开发用的&#xff0c;好处很多&#xff0c;比如&#xff1a;调试、语法高亮、项目管理、代码跳转、智能提示、自动完成、单元测…...

Atcoder abc257 E

E - Addition and Multiplication 2 题意: 给你一个数字n表示你现在拥有的金额 然后给你1~9每个经营额所需要的成本, 设总经营额为x, 当前使用的经营额为y, 则每一次使用经营额时都有x10*xy 问, 如何在使用不大于成本数量的金额下, 使得经营额最高 例如: 5 5 4 3 8 1 6 7 …...

模拟退火算法改进

import numpy as np import matplotlib.pyplot as plt import math import random from scipy.stats import norm from mpl_toolkits.mplot3d import Axes3D # 目标函数 def Function(x, y): return -20 * np.exp(-0.2*np.sqrt(0.5*(x*xy*y)))\ -np.exp(0.5*(n…...

SpringBoot+HttpClient+JsonPath提取A接口返回值作为参数调用B接口

前言 在做java接口自动化中&#xff0c;我们常常需要依赖多个接口&#xff0c;A接口依赖B&#xff0c;C&#xff0c;D接口的响应作为请求参数&#xff1b;或者URL中的参数是从其他接口中提取返回值作获取参数这是必不可少的。那么怎么实现呢&#xff1f;下面就来介绍多业务依赖…...

JUC 之 CompletableFuture

——CompletableFuture Future Future 接口&#xff08;FutureTask 实现类&#xff09; 定义了操作异步任务执行的一些方法&#xff0c;如获取异步的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕例如&#xff1a; 主线程让一个子线程去执行任务&…...

7-vue-1

谈谈你对MVVM的理解 为什么要有这些模式&#xff0c;目的&#xff1a;职责划分、分层&#xff08;将Model层、View层进行分类&#xff09;借鉴后端思想&#xff0c;对于前端而已&#xff0c;就是如何将数据同步到页面上 MVC模式 代表&#xff1a;Backbone underscore jquer…...

OpenAPI SDK组件介绍

背景 公司成立以来&#xff0c;积累了数以万计的可复用接口。上层的SaaS业务&#xff0c;原则上要复用这些接口开发自己的业务&#xff0c;为了屏蔽调用接口的复杂性&#xff0c;基础服务开发了apisdk组件&#xff0c;定义了一套声明OpenAPI的注解、注解解析器&#xff0c;实例…...

【Java】Synchronized锁原理和优化

一、synchronized介绍 synchronized中文意思是同步&#xff0c;也称之为”同步锁“。 synchronized的作用是保证在同一时刻&#xff0c; 被修饰的代码块或方法只会有一个线程执行&#xff0c;以达到保证并发安全的效果。 synchronized是Java中解决并发问题的一种最常用的方法…...

西北工业大学2020-2021学年大物(I)下期末试题选填解析

2 位移电流。磁效应服从安培环路&#xff0c;热效应不服从焦耳-楞次定律。注意&#xff0c;它是变化的电场而非磁场产生。3 又考恒定磁场中安培环路定理。4感生电场5 麦克斯韦速率分布函数。6 相同的高温热源和低温热源之间的一切可逆热机的工作效率相等&#xff0c;无论工质如…...

PHP - ChatGpt API 接入 ,代码,亲测!(最简单!)

由于最近ChatGpt 大火&#xff0c;但是门槛来说是对于大家最头疼的环节&#xff0c; 我自己也先开发了一个个人小程序&#xff01;大家可以访问使用下&#xff0c; 由此ChatGpt 有一个API 可以仅供大伙对接 让我来说下资质&#xff1a; 1&#xff1a;首先要搞得到一个 ChatGp…...

物联网MQTT协议简单介绍

物联网曾被认为是继计算机、互联网之后&#xff0c;信息技术行业的第三次浪潮。随着基础通讯设施的不断完善&#xff0c;尤其是 5G 的出现&#xff0c;进一步降低了万物互联的门槛和成本。物联网本身也是 AI 和区块链应用很好的落地场景之一&#xff0c;各大云服务商也在纷纷上…...

Dubbo 源码解读:负载均衡策略

概览 org.apache.dubbo包下META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance中内部spi实现类有以下几种&#xff1a; randomorg.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobinorg.apache.dubbo.rpc.cluster.loadbalance.RoundRobinL…...

吃瓜教程笔记—Task04

神经网络 知识点 M-P神经元 模型如图所示&#xff1a;  神经元的工作机理&#xff1a;神经元接收来到n个其他神经元传递过来的输入信号&#xff0c;这些输入信号通过带权重的连接进行传递&#xff0c;神经元接收到的总输入值将与神经元的阈值进行比较&#xff0c;然后通过…...

进程地址空间(虚拟地址空间)

目录 引入问题 测试代码 引入地址空间 故事1&#xff1a; 故事二&#xff1a; 解决问题 为什么有虚拟地址空间 扩展 扩展1&#xff08;没有地址空间&#xff0c;OS如何工作&#xff09; 扩展2 &#xff08;代码只读深入了解&#xff09; 扩展3&#xff08;malloc本质…...

【项目精选】基于Vue + ECharts的数据可视化系统的设计与实现(论文+源码+视频)

今天给小伙伴们推荐一款超优秀的全新Vue3.0大数据系统Vue3-bigData。 点击下载源码 vue3-bigdata 基于vue3.0echarts构建的可视化大屏图表展示系统。包括各种可视化图表及Vue3新API使用。 功能 柱状图、饼图、词云图、漏斗图 水球图、折线图 仪表盘、雷达图 矩形树图、关系…...

JavaScript Window Screen

文章目录JavaScript Window ScreenWindow ScreenWindow Screen 可用宽度Window Screen 可用高度JavaScript Window Screen window.screen 对象包含有关用户屏幕的信息。 Window Screen window.screen对象在编写时可以不使用 window 这个前缀。 一些属性&#xff1a; screen…...

【双重注意机制:肺癌:超分】

Dual attention mechanism network for lung cancer images super-resolution &#xff08;肺癌图像超分辨率的双重注意机制网络&#xff09; 目前&#xff0c;肺癌的发病率和死亡率均居世界恶性肿瘤之首。提高肺部薄层CT的分辨率对于肺癌筛查的早期诊断尤为重要。针对超分辨…...

各种中间件的使用

init background 这一部分我们学习一些常用的&#xff0c; 但是不需要深入理解的中间件 &#xff0c; 例如kafka ,分布式文件系统。 summary Content what is kafka? What time to used it ? 其实消息队列就是解决系统之间复杂交互例如聊天系统和交易系统&#xff0c; …...

Systemverilog覆盖率的合并和计算方式

在systemverilog中&#xff0c;对于一个covergroup来说&#xff0c;可能会有多个instance&#xff0c;我们可能需要对这些instance覆盖率进行操作。 只保存covergroup type的覆盖率&#xff0c;不需要保存instance-specified的覆盖率coverage type和instance-specified的覆盖率…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

Bean 作用域有哪些?如何答出技术深度?

导语&#xff1a; Spring 面试绕不开 Bean 的作用域问题&#xff0c;这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开&#xff0c;结合典型面试题及实战场景&#xff0c;帮你厘清重点&#xff0c;打破模板式回答&#xff0c…...

在 Spring Boot 中使用 JSP

jsp&#xff1f; 好多年没用了。重新整一下 还费了点时间&#xff0c;记录一下。 项目结构&#xff1a; pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...

Java详解LeetCode 热题 100(26):LeetCode 142. 环形链表 II(Linked List Cycle II)详解

文章目录 1. 题目描述1.1 链表节点定义 2. 理解题目2.1 问题可视化2.2 核心挑战 3. 解法一&#xff1a;HashSet 标记访问法3.1 算法思路3.2 Java代码实现3.3 详细执行过程演示3.4 执行结果示例3.5 复杂度分析3.6 优缺点分析 4. 解法二&#xff1a;Floyd 快慢指针法&#xff08;…...