当前位置: 首页 > news >正文

RabbitMQ相关问题

文章目录

  • 避免重复消费(保证消息幂等性)
  • 消息积压
    • 上线更多的消费者,进行正常消费
    • 惰性队列
    • 消息缓存
    • 延时队列
  • RabbitMQ如何保证消息的有序性?
  • RabbitMQ消息的可靠性、延时队列
  • 如何实现数据库与缓存数据一致?
  • 开启消费者多线程消费

避免重复消费(保证消息幂等性)

  • 方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)

  • 方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可

  • 方式3: rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

在这里插入图片描述

发送消息

	@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息*/public void sendMessage() {// 创建消费对象,并指定 全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)MessageProperties messageProperties = new MessageProperties ();messageProperties.setMessageId (UUID.randomUUID ().toString ());messageProperties.setContentType ("text/plain");messageProperties.setContentEncoding ("utf-8");Message message = new Message ("hello,message idempotent!".getBytes (), messageProperties);System.out.println ("生产消息:" + message.toString ());rabbitTemplate.convertAndSend (EXCHANGE_NAME, ROUTE_KEY, message);}

消费消息

 /*** 消费消息** @param message* @param channel* @throws IOException*/@RabbitHandler//org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。//注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器@RabbitListener(queues = "message_idempotent_queue")@Transactionalpublic void handler(Message message, Channel channel) throws IOException {/*** 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。*/// 获取消息IdString messageId = message.getMessageProperties ().getMessageId ();if (StringUtils.isBlank (messageId)) {logger.info ("获取消费ID为空!");return;}MessageIdempotent messageIdempotent = null;Optional<MessageIdempotent> list = messageIdempotentRepository.findById (messageId);if (list.isPresent ()) {messageIdempotent = list.get ();}// 如果找不到,则进行消费此消息if (null == messageIdempotent) {//获取消费内容String msg = new String (message.getBody (), StandardCharsets.UTF_8);logger.info ("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);//手动ACKchannel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);//存入到表中,标识该消息已消费MessageIdempotent idempotent = new MessageIdempotent ();idempotent.setMessageId (messageId);idempotent.setMessageContent (msg);messageIdempotentRepository.save (idempotent);} else {//如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;logger.error ("该消息已消费,无须重复消费!");}}

消息积压

在这里插入图片描述

上线更多的消费者,进行正常消费

线上突发问题,要临时扩容,增加消费端的数量

考虑到消费者的处理能力,增加配置!!!

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

simple代表简单队列模型

惰性队列

	//基于@Bean声明lazy-queue@Beanpublic Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy() //开启x-queue-mode为lazy.build();}//基于@RabbitListener声明LazyQueue@RabbitListener(queuesToDeclare = {@Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy"))})public void listenLazyQueue(String msg) {System.out.println("接收到lazy.queue的消息:【" + msg + "】");}

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out性能比较稳定

消息缓存

使用Redis的List或ZSET做接收消息缓存,写一个程序 按照消费者处理时间定时从Redis取消息发送到MQ
在这里插入图片描述

延时队列

设置消息过期时间,过期后转入死信队列,写一个程序 处理死信消息(重新如队列或者 即使处理或记录到数据库延后处理)

在这里插入图片描述

RabbitMQ如何保证消息的有序性?

RabbitMQ是队列存储天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证

因此,要保证消息的有序性,需要做的下面几点:

  • 保证消息发送的有序性
  • 保证一组有序的消息都发送到同一个队列
  • 保证一个队列只包含一个消费者
    在这里插入图片描述

这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式消费

RabbitMQ消息的可靠性、延时队列

RabbitMQ消息可靠性、延时队列

如何实现数据库与缓存数据一致?

实现方案有下面几种:

  • 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
  • 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A服务A修改Redis缓存数据
  • 通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据

开启消费者多线程消费

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {/*** @RabbitListener:加了该注解的方法表示该方法是一个消费者 concurrency:并发数量。* 其他属性和注解想了解的话,自己按Ctrl点进去看*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Queue1"),exchange = @Exchange(value = "Exchange1"),key = "key1"),concurrency = "10")public void process1(Message message) throws Exception {System.out.println("Queue1:" + new String(message.getBody()));}}
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class RabbitmqConfig {@Bean("batchQueueRabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory ();factory.setConnectionFactory (connectionFactory);factory.setMessageConverter (new Jackson2JsonMessageConverter ());//确认方式,manual为手动ack.factory.setAcknowledgeMode (AcknowledgeMode.MANUAL);//每次处理数据数量,提高并发量//factory.setPrefetchCount(250);//设置线程数//factory.setConcurrentConsumers(30);//最大线程数//factory.setMaxConcurrentConsumers(50);/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */factory.setConnectionFactory (connectionFactory);factory.setConcurrentConsumers (1);factory.setPrefetchCount (1);//factory.setDefaultRequeueRejected(true);//使用自定义线程池来启动消费者。factory.setTaskExecutor (taskExecutor ());return factory;}@Bean("correctTaskExecutor")@Primarypublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();// 设置核心线程数executor.setCorePoolSize (100);// 设置最大线程数executor.setMaxPoolSize (100);// 设置队列容量executor.setQueueCapacity (0);// 设置线程活跃时间(秒)executor.setKeepAliveSeconds (300);// 设置默认线程名称executor.setThreadNamePrefix ("thread-file-queue");// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown (true);return executor;}
}

相关文章:

RabbitMQ相关问题

文章目录避免重复消费(保证消息幂等性)消息积压上线更多的消费者&#xff0c;进行正常消费惰性队列消息缓存延时队列RabbitMQ如何保证消息的有序性&#xff1f;RabbitMQ消息的可靠性、延时队列如何实现数据库与缓存数据一致&#xff1f;开启消费者多线程消费避免重复消费(保证消…...

操作系统 三(存储管理)

一、 存储系统的“金字塔”层次结构设计原理&#xff1a;cpu自身运算速度很快。内存、外存的访问速度受到限制各层次存储器的特点&#xff1a;1&#xff09;主存储器&#xff08;主存/内存/可执行存储器&#xff09;保存进程运行时的程序和数据&#xff0c;内存的访问速度远低于…...

day34 贪心算法 | 860、柠檬水找零 406、根据身高重建队列 452、用最少数量的箭引爆气球

题目 860、柠檬水找零 在柠檬水摊上&#xff0c;每一杯柠檬水的售价为 5 美元。 顾客排队购买你的产品&#xff0c;&#xff08;按账单 bills 支付的顺序&#xff09;一次购买一杯。 每位顾客只买一杯柠檬水&#xff0c;然后向你付 5 美元、10 美元或 20 美元。你必须给每个…...

使用canvas给上传的整张图片添加平铺的水印

写在开头 哈喽&#xff0c;各位倔友们又见面了&#xff0c;本章我们继续来分享一个实用小技巧&#xff0c;给图片加水印功能&#xff0c;水印功能的目的是为了保护网站或作者版权&#xff0c;防止内容被别人利用或白嫖。 但是网络中&#xff0c;是没有绝对安全的&#xff0c;…...

[安装之4] 联想ThinkPad 加装固态硬盘教程

方案&#xff1a;保留原有的机械硬盘&#xff0c;再加装一个固态硬盘作为系统盘。由于X250没有光驱&#xff0c;这样就无法使用第二个2.5寸的硬盘。还好&#xff0c;X250留有一个M.2接口&#xff0c;这样&#xff0c;就可以使用NGFF M.2接口的固态硬盘。不过&#xff0c;这种接…...

Java数据类型、基本与引用数据类型区别、装箱与拆箱、a=a+b与a+=b区别

文章目录1.Java有哪些数据类型2.Java中引用数据类型有哪些&#xff0c;它们与基本数据类型有什么区别&#xff1f;3.Java中的自动装箱与拆箱4.为什么要有包装类型&#xff1f;5.aab与ab有什么区别吗?1.Java有哪些数据类型 8种基本数据类型&#xff1a; 6种数字类型(4个整数型…...

GoLang设置gofmt和goimports自动格式化

目录 设置gofmt gofmt介绍 配置gofmt 设置goimports goimports介绍 配置goimports 设置gofmt gofmt介绍 Go语言的开发团队制定了统一的官方代码风格&#xff0c;并且推出了 gofmt 工具&#xff08;gofmt 或 go fmt&#xff09;来帮助开发者格式化他们的代码到统一的风格…...

【k8s】如何搭建搭建k8s服务器集群(Kubernetes)

搭建k8s服务器集群 服务器搭建环境随手记 文章目录搭建k8s服务器集群前言&#xff1a;一、前期准备&#xff08;所有节点&#xff09;1.1所有节点&#xff0c;关闭防火墙规则&#xff0c;关闭selinux&#xff0c;关闭swap交换&#xff0c;打通所有服务器网络&#xff0c;进行p…...

DIDL4_前向传播与反向传播(模型参数的更新)

前向传播与反向传播前向传播与反向传播的作用前向传播及公式前向传播范例反向传播及公式反向传播范例小结前向传播计算图前向传播与反向传播的作用 在训练神经网络时&#xff0c;前向传播和反向传播相互依赖。 对于前向传播&#xff0c;我们沿着依赖的方向遍历计算图并计算其路…...

链表学习之链表划分

链表解题技巧 额外的数据结构&#xff08;哈希表&#xff09;&#xff1b;快慢指针&#xff1b;虚拟头节点&#xff1b; 链表划分 将单向链表值划分为左边小、中间相等、右边大的形式。中间值为pivot划分值。 要求&#xff1a;调整之后节点的相对次序不变&#xff0c;时间复…...

(考研湖科大教书匠计算机网络)第五章传输层-第一、二节:传输层概述及端口号、复用分用等概念

获取pdf&#xff1a;密码7281专栏目录首页&#xff1a;【专栏必读】考研湖科大教书匠计算机网络笔记导航 文章目录一&#xff1a;传输层概述&#xff08;1&#xff09;概述&#xff08;2&#xff09;从计算机网络体系结构角度看传输层&#xff08;3&#xff09;传输层意义二&am…...

C#:Krypton控件使用方法详解(第七讲) ——kryptonHeader

今天介绍的Krypton控件中的kryptonHeader&#xff0c;下面开始介绍这个控件的属性&#xff1a;控件的样子如上图所示&#xff0c;从上面控件外观来看&#xff0c;这个控件有三部分组成。第一部分是前面的图片&#xff0c;第二部分是kryptonHeader1文本&#xff0c;第三部分是控…...

5年软件测试工程师分享的自动化测试经验,一定要看

今天给大家分享一个华为的软件测试工程师分享的关于自动化测试的经验及干货。真的后悔太晚找他要了&#xff0c; 纯干货。一定要看完&#xff01; 1.什么是自动化测试&#xff1f; 用程序测试程序&#xff0c;用代码取代思考&#xff0c;用脚本运行取代手工测试。自动化测试涵…...

什么是猜疑心理?小猫测试网科普小作文

什么是猜疑心理&#xff1f;猜疑心理是说一个人心中想法偏离了客观事实&#xff0c;牵强附会&#xff0c;往往是指不好的一面&#xff0c;对别人的一言一行都充满了不良的解读&#xff0c;认为这些对自己都有针对性&#xff0c;目的性&#xff0c;对自己都是不利的。猜疑心理重…...

Redis命令行对常用数据结构String、list、set、zset、hash等增删改查操作

1.Redis命令的小套路 - NX&#xff1a;not exist - EX&#xff1a;expire - M&#xff1a;multi 2.基本操作 ①切换数据库 Redis默认有16个数据库。 115 # Set the number of databases. The default database is DB 0, you can select 116 # a different one on a per-con…...

mycobot 使用教程

(1) 树莓派4B ubuntu系统调整swap空间与使SD卡快速扩容参考&#xff1a;https://www.bilibili.com/read/cv14825069https://blog.csdn.net/weixin_45824920/article/details/114381292?spm1001.2101.3001.6650.1&utm_mediumdistribute.pc_relevant.none-task-blog-2%7Edef…...

JVM学习总结,虚拟机性能监控、故障处理工具:jps、jstat、jinfo、jmap、Visual VM、jstack等

上篇&#xff1a;JVM学习总结&#xff0c;全面介绍运行时数据区域、各类垃圾收集器的原理使用、内存分配回收策略 参考资料&#xff1a;《深入理解Java虚拟机》第三版 文章目录三&#xff0c;虚拟机性能监控、故障处理工具1&#xff09;jps&#xff1a;虚拟机进程状况工具2&…...

指针笔记(指针数组和指向数组的指针,数组中a和a的区别等)

指针数组和指向数组的指针 int *p[4]和int (*p)[4]有何区别&#xff1f; 前者是一个指针数组&#xff0c;数组大小为4&#xff0c;每一个元素都是一个指向int的指针 后者是指向int[4]类型数组的指针 以上代码若运行会报如下错误 main函数中定义的a数组本质是一个指向int[2]的…...

MySQL ---基础概念

目录 餐前小饮&#xff1a;什么是服务器&#xff1f;什么是数据库服务器&#xff1f; 一、数据库服务软件 1. 常见数据库产品 2.如何开启和停止MySQL服务 二、数据库术语及语法 1.数据库术语 2.SQL语法结构 3.SQL 语法要点 三、SQL分类 1.数据定义语言&#xff08;D…...

【基础】Flink -- ProcessFunction

Flink -- ProcessFunction处理函数概述处理函数基本处理函数 ProcessFunction按键分区处理函数 KeyedProcessFunction定时器与定时服务基于处理时间的分区处理函数基于事件时间的分区处理函数窗口处理函数 ProcessWindowFunction应用案例 -- Top N处理函数概述 为了使代码拥有…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

从零实现富文本编辑器#5-编辑器选区模型的状态结构表达

先前我们总结了浏览器选区模型的交互策略&#xff0c;并且实现了基本的选区操作&#xff0c;还调研了自绘选区的实现。那么相对的&#xff0c;我们还需要设计编辑器的选区表达&#xff0c;也可以称为模型选区。编辑器中应用变更时的操作范围&#xff0c;就是以模型选区为基准来…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...