RabbitMQ发布确认模式
目录
一、发布确认原理
二、发布确认的策略
(一)开启发布确认的方法
(二)单个确认模式
(三)批量确认模式
(四)异步确认模式
(五)如何处理异步未确认消息
(六)总结
一、发布确认原理
二、发布确认的策略
(一)开启发布确认的方法
Channel channel = connection.createChannel();channel.confirmSelect();
(二)单个确认模式
public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("", queue_name,null, message.getBytes());channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}
(三)批量确认模式
public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {if(i % 100 == 0) channel.waitForConfirms();String message = "消息" + i;channel.basicPublish("", queue_name,null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}
(四)异步确认模式
public static void publishMessageAsync() throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();String queue_name = UUID.randomUUID().toString();channel.queueDeclare(queue_name, false, false, false, null);ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();// 异步监听broker传递过来的消息确认回调通知// 确认回调消息ConfirmCallback ackCallback = (deliveryTag, multiple) -> {if(multiple) {// 找出该序号前面所有的消息进行清空ConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(deliveryTag, true);//清除该部分未确认消息confirmed.clear();} else {// 不是批量的话就清除单条消息outstandingConfirms.remove(deliveryTag);}System.out.println(deliveryTag + "消息发送成功");};// 失败回调消息ConfirmCallback nackCallback = (deliveryTag, multiple) -> {// 获取序列号并输出String message = outstandingConfirms.get(deliveryTag);System.out.println("发布的消息"+message+"未被确认,序列号"+deliveryTag);};// 设置确认监听器,两个,一个监听成功的,一个监听失败的channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (Integer i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("", queue_name,null, message.getBytes());// 获取序列号和消息内容,并且把消息放入队列outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}long end = System.currentTimeMillis();System.out.println("发送1000条消息成功,耗时为:" + (end - begin) + "ms");}
(五)如何处理异步未确认消息
(六)总结
单独发布消息同步等待确认,简单,但吞吐量非常有限。批量发布消息批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
public static void main(String[] args) throws Exception {
// publishMessageIndividually(); // 同步确认发送消息, 耗时为:769ms
// publishMessageBatch(); // 批量确认发送消息,耗时为:84mspublishMessageAsync(); // 异步确认发送消息, 耗时为:43ms}
相关文章:
RabbitMQ发布确认模式
目录 一、发布确认原理 二、发布确认的策略 (一)开启发布确认的方法 (二)单个确认模式 (三)批量确认模式 (四)异步确认模式 (五)如何处理异步未确认消…...
零基础的人如何入门 Python ?看完这篇文章你就懂了
第一部分:编程环境准备 零基础入门Python的话我不建议用IDE,IDE叫集成开发环境,这东西一般是专业程序员用来实战开发用的,好处很多,比如:调试、语法高亮、项目管理、代码跳转、智能提示、自动完成、单元测…...
Atcoder abc257 E
E - Addition and Multiplication 2 题意: 给你一个数字n表示你现在拥有的金额 然后给你1~9每个经营额所需要的成本, 设总经营额为x, 当前使用的经营额为y, 则每一次使用经营额时都有x10*xy 问, 如何在使用不大于成本数量的金额下, 使得经营额最高 例如: 5 5 4 3 8 1 6 7 …...
模拟退火算法改进
import numpy as np import matplotlib.pyplot as plt import math import random from scipy.stats import norm from mpl_toolkits.mplot3d import Axes3D # 目标函数 def Function(x, y): return -20 * np.exp(-0.2*np.sqrt(0.5*(x*xy*y)))\ -np.exp(0.5*(n…...
SpringBoot+HttpClient+JsonPath提取A接口返回值作为参数调用B接口
前言 在做java接口自动化中,我们常常需要依赖多个接口,A接口依赖B,C,D接口的响应作为请求参数;或者URL中的参数是从其他接口中提取返回值作获取参数这是必不可少的。那么怎么实现呢?下面就来介绍多业务依赖…...
JUC 之 CompletableFuture
——CompletableFuture Future Future 接口(FutureTask 实现类) 定义了操作异步任务执行的一些方法,如获取异步的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕例如: 主线程让一个子线程去执行任务&…...
7-vue-1
谈谈你对MVVM的理解 为什么要有这些模式,目的:职责划分、分层(将Model层、View层进行分类)借鉴后端思想,对于前端而已,就是如何将数据同步到页面上 MVC模式 代表:Backbone underscore jquer…...
OpenAPI SDK组件介绍
背景 公司成立以来,积累了数以万计的可复用接口。上层的SaaS业务,原则上要复用这些接口开发自己的业务,为了屏蔽调用接口的复杂性,基础服务开发了apisdk组件,定义了一套声明OpenAPI的注解、注解解析器,实例…...
【Java】Synchronized锁原理和优化
一、synchronized介绍 synchronized中文意思是同步,也称之为”同步锁“。 synchronized的作用是保证在同一时刻, 被修饰的代码块或方法只会有一个线程执行,以达到保证并发安全的效果。 synchronized是Java中解决并发问题的一种最常用的方法…...
西北工业大学2020-2021学年大物(I)下期末试题选填解析
2 位移电流。磁效应服从安培环路,热效应不服从焦耳-楞次定律。注意,它是变化的电场而非磁场产生。3 又考恒定磁场中安培环路定理。4感生电场5 麦克斯韦速率分布函数。6 相同的高温热源和低温热源之间的一切可逆热机的工作效率相等,无论工质如…...
PHP - ChatGpt API 接入 ,代码,亲测!(最简单!)
由于最近ChatGpt 大火,但是门槛来说是对于大家最头疼的环节, 我自己也先开发了一个个人小程序!大家可以访问使用下, 由此ChatGpt 有一个API 可以仅供大伙对接 让我来说下资质: 1:首先要搞得到一个 ChatGp…...
物联网MQTT协议简单介绍
物联网曾被认为是继计算机、互联网之后,信息技术行业的第三次浪潮。随着基础通讯设施的不断完善,尤其是 5G 的出现,进一步降低了万物互联的门槛和成本。物联网本身也是 AI 和区块链应用很好的落地场景之一,各大云服务商也在纷纷上…...
Dubbo 源码解读:负载均衡策略
概览 org.apache.dubbo包下META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance中内部spi实现类有以下几种: randomorg.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobinorg.apache.dubbo.rpc.cluster.loadbalance.RoundRobinL…...
吃瓜教程笔记—Task04
神经网络 知识点 M-P神经元 模型如图所示: 神经元的工作机理:神经元接收来到n个其他神经元传递过来的输入信号,这些输入信号通过带权重的连接进行传递,神经元接收到的总输入值将与神经元的阈值进行比较,然后通过…...
进程地址空间(虚拟地址空间)
目录 引入问题 测试代码 引入地址空间 故事1: 故事二: 解决问题 为什么有虚拟地址空间 扩展 扩展1(没有地址空间,OS如何工作) 扩展2 (代码只读深入了解) 扩展3(malloc本质…...
【项目精选】基于Vue + ECharts的数据可视化系统的设计与实现(论文+源码+视频)
今天给小伙伴们推荐一款超优秀的全新Vue3.0大数据系统Vue3-bigData。 点击下载源码 vue3-bigdata 基于vue3.0echarts构建的可视化大屏图表展示系统。包括各种可视化图表及Vue3新API使用。 功能 柱状图、饼图、词云图、漏斗图 水球图、折线图 仪表盘、雷达图 矩形树图、关系…...
JavaScript Window Screen
文章目录JavaScript Window ScreenWindow ScreenWindow Screen 可用宽度Window Screen 可用高度JavaScript Window Screen window.screen 对象包含有关用户屏幕的信息。 Window Screen window.screen对象在编写时可以不使用 window 这个前缀。 一些属性: screen…...
【双重注意机制:肺癌:超分】
Dual attention mechanism network for lung cancer images super-resolution (肺癌图像超分辨率的双重注意机制网络) 目前,肺癌的发病率和死亡率均居世界恶性肿瘤之首。提高肺部薄层CT的分辨率对于肺癌筛查的早期诊断尤为重要。针对超分辨…...
各种中间件的使用
init background 这一部分我们学习一些常用的, 但是不需要深入理解的中间件 , 例如kafka ,分布式文件系统。 summary Content what is kafka? What time to used it ? 其实消息队列就是解决系统之间复杂交互例如聊天系统和交易系统, …...
Systemverilog覆盖率的合并和计算方式
在systemverilog中,对于一个covergroup来说,可能会有多个instance,我们可能需要对这些instance覆盖率进行操作。 只保存covergroup type的覆盖率,不需要保存instance-specified的覆盖率coverage type和instance-specified的覆盖率…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
