当前位置: 首页 > news >正文

某某会员小程序后端性能优化

背景

某某会员小程序后台提供开放平台能力,为三方油站提供会员积分、优惠劵等api。当用户在油站加油,油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢,三方调用发放积分接口性能极低,耗时30s+;

接口情况

发放积分接口业务负责,且用存储过程写的业务,改动风险极大

数据库情况

优惠卷等表,数据量800w+,甚至存在单表3000w+

优化方案

数据库数据归档

归档交易、用户优惠劵等表历史数据,比如归档三个月前的数据(根据实际情况补充归档条件,如用户优惠劵没使用或没过期的数据不能归档)
优化效果:存储过程耗时从30s降低到7s,但是作为Toc用途接口性能远远不达标,优化数据库sql或许能进一步降低响应时间,但是存储过程复杂优化费时费力风险大

方案描述风险工作量难度是否能解决性能问题是否解决并发冲突影响使用技术
方案1java重写存储过程业务一定程度能解决yes改动点多,业务影响大java + orm
方案2保证存储过程全局串行执行noyes接口性能会降低分布式锁
方案3异步下存储过程全局串行执行yesyesrabbitmq+分布式锁+自旋锁

线程池异步化分析

接口中存储串行调用改为异步调用,

使用线程池异步化存在问题

开始简单使用线程池异步化,但是出现锁表的情况(原因存储过程没有保证原子性,并且其中大量使用临时表,并发下出现竞争锁表),而SqlServer自带的死锁检查机制杀死事务导致发放积分失败

线程池+分布式锁

异步线程【不能保证分布式环境的全局顺序执行】,使用分布式锁能保证同一个时间只有一个存储过程执行
问题:但是并发情况会将存储过程执行堆积在线程池,并发过大存在OOM风险,或者处理丢失风险

rabbitmq异步改造

可行性验证报告结论

验证通过点如下:

  1. 测试rabbitmq发送/接收消息【通过】
  2. 测试并发下分布式锁+自旋锁保证业务串行执行【通过】
  3. 测试并发下分布式锁+自旋锁+mq保证业务串行执行【通过】
  4. 测试业务幂等性保证不重复消费【通过】
  5. 测试手动ack兼容原来配置保证可靠性【通过】

当前项目rabbitmq使用方式问题分析

配置发下

spring.rabbitmq.host=172.18.229.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=totaltest
spring.rabbitmq.password=totaltest
spring.rabbitmq.virtual-host=/totaltest/
spring.rabbitmq.publisher-confirms=false

该配置没有

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

若是不配默认为

spring.rabbitmq.listener.direct.acknowledge-mode=auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto

rabbitmq消费者ack机制问题分析

spring.rabbitmq.listener.direct.acknowledge-mode是用于配置Spring Boot应用中RabbitMQ消息监听器的确认模式。确认模式决定了在消费者处理消息后如何通知RabbitMQ服务器来确认消息的接收情况。
该配置有以下几种可选的值:

  1. AUTO: 在这种模式下,消费者处理消息后,RabbitMQ会自动确认消息。这意味着消息一旦被消费者接收,就会立即从队列中删除。这是默认的确认模式。
  2. MANUAL: 在这种模式下,消费者需要显式地发送确认消息来告知RabbitMQ服务器消息已经被成功处理。这意味着消费者可以在处理消息后决定是否要确认消息。通常在需要进行消息处理的事务性操作时使用这种模式。
  3. NONE: 在这种模式下,消费者不会发送任何确认消息,也不会被要求发送确认消息。这意味着消息会在被传递给消费者之后立即被视为已经被确认。

问题:项目中该配置使用的模式配置,以为着没有手动ack,即消费者接收到消息,消息就会从mq中删除,若是消费者消费异常,则消息丢失不可追溯复原

rabbitmq生产者ack机制问题分析

项目中配置如下

spring.rabbitmq.publisher-confirms=false

spring.rabbitmq.publisher-confirms是Spring Boot中用于配置RabbitMQ生产者消息确认的属性。它用于控制是否启用RabbitMQ的发布确认机制,以确保消息成功发送到Exchange。
当spring.rabbitmq.publisher-confirms属性设置为true时,表示启用了RabbitMQ的发布确认机制。在这种情况下,当生产者发送消息到Exchange后,RabbitMQ会发送一个确认消息给生产者,告知消息是否成功到达Exchange。生产者可以根据收到的确认消息来判断消息是否成功发送,从而进行相应的处理。
当spring.rabbitmq.publisher-confirms属性设置为false时,表示禁用了RabbitMQ的发布确认机制。在这种情况下,生产者发送消息到Exchange后,不会收到确认消息,也无法得知消息是否成功到达Exchange。
通常情况下,建议将spring.rabbitmq.publisher-confirms属性设置为true,以确保消息的可靠发送。当然,具体是否启用发布确认机制,还取决于业务场景和对消息可靠性的要求。

rabbitmq消息可靠性问题分析

通过上诉【rabbitmq生产者ack机制问题分析】和【rabbitmq消费者ack机制问题分析】
可知当前项目中消息没有保证消息可靠性,rabbitmq宕机恢复、消费者消费异常都会导致消息丢失,导致业务完整性缺失

rabbitmq配置最小改动方案

上诉问题若想得到解决需项目中rabbitmq配置,会影响到原来所有使用mq的地方,避免影响范围较大
解决方案:新增消费者类似,通过设置不同的消费者来实现接收指定的消息需要手动 ack

测试rabbitmq配置发送接收消息【通过】

rabbitmq和springboot对应版本:3. Reference
创建虚拟host
image.png
创建测试 交换机和queues

Exchange:exchange-1
Queue:queue-1
key:springboot.*

image.png

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

image.png

发送消息

package com.bfxy.springboot.producer;import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import com.bfxy.springboot.entity.Order;@Component
public class RabbitSender {private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSender.class);//自动注入RabbitTemplate模板类@Autowiredprivate RabbitTemplate rabbitTemplate;  //回调函数: confirm确认final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println("correlationData: " + correlationData);System.err.println("ack: " + ack);if(!ack){System.err.println("异常处理....");}}};//回调函数: return返回final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,String exchange, String routingKey) {System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);}};//发送消息方法调用: 构建Message消息public void send(Object message, Map<String, Object> properties)  {LOGGER.info("消息内容:{}",message);LOGGER.info("properties:{}",properties);try {MessageHeaders mhs = new MessageHeaders(properties);Message msg = MessageBuilder.createMessage(message, mhs);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 时间戳 全局唯一CorrelationData correlationData = new CorrelationData("1234567890");rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);}catch (Exception e){LOGGER.error("发送消息异常,message:{}",message);}}//发送消息方法调用: 构建自定义对象消息public void sendOrder(Order order)  {LOGGER.info("订单消息内容:{}",order);try {rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 时间戳 全局唯一CorrelationData correlationData = new CorrelationData("0987654321");rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);}catch (Exception e){LOGGER.error("订单发送消息异常,message:{}",order);}}}

测试代码

package com.bfxy.springboot;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import com.bfxy.springboot.entity.Order;
import com.bfxy.springboot.producer.RabbitSender;@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {@Testpublic void contextLoads() {}@Autowiredprivate RabbitSender rabbitSender;private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Testpublic void testSender1() throws Exception {Map<String, Object> properties = new HashMap<>();properties.put("number", "12345");properties.put("send_time", simpleDateFormat.format(new Date()));rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);}@Testpublic void testSender2() throws Exception {Order order = new Order("001", "第一个订单");rabbitSender.sendOrder(order);}}

接收消息

package com.bfxy.springboot.conusmer;import java.util.Map;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component
public class RabbitReceiver {/*** 用于标识方法是一个RabbitMQ消息的监听方法,用于监听指定的队列,并在接收到消息时调用该方法进行处理。* 可以指定队列、交换机、路由键等属性,用于配置消息监听的相关信息。* 通常与@RabbitHandler一起使用,将消息监听和消息处理方法关联起来。*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1",  durable="true"),exchange = @Exchange(value = "exchange-1",  durable="true",  type= "topic",  ignoreDeclarationExceptions = "true"),key = "springboot.*" ))/*** 用于标识方法是一个RabbitMQ消息的处理方法。* 通常与@RabbitListener一起使用,用于指定具体的消息处理方法。* 通过@RabbitHandler注解标识的方法可以处理多个不同类型的消息,通过方法参数的类型来区分不同的消息类型。*/@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端Payload: " + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}/*** * 	spring.rabbitmq.listener.order.queue.name=queue-2spring.rabbitmq.listener.order.queue.durable=truespring.rabbitmq.listener.order.exchange.name=exchange-1spring.rabbitmq.listener.order.exchange.durable=truespring.rabbitmq.listener.order.exchange.type=topicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=truespring.rabbitmq.listener.order.key=springboot.** @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,  Channel channel,@Headers Map<String, Object> headers) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}}

断点测试

测试分布式锁+自旋锁测试串行执行【通过】

测试并发分布式锁顺序执行业务代码

package com.bfxy.springboot;import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class);@AutowiredRedissonClient redissonClient;@Testpublic void contextLoads() {long startTime = System.currentTimeMillis();for (int i = 1;i<5;i++){int finalI = i;CompletableFuture.runAsync(()->{bizLock(String.valueOf(finalI));});}while (true){}}private void bizLock(String taskName) {RLock lock = redissonClient.getLock("my-lock");boolean locked = false;try {while (!locked) {locked = lock.tryLock();if (locked) {try {biz(3000, taskName);System.out.println("----------------");} finally {lock.unlock();}} else {// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}private void biz(Integer time,String taskName) throws Exception{long startTime = System.currentTimeMillis();LOGGER.info("任务序号={},任务执行开始时间={}",taskName,startTime);Thread.sleep(time);long endtime = System.currentTimeMillis();LOGGER.info("任务序号={},任务执行结束时间={}",taskName,startTime);LOGGER.info("任务序号={},任务执行消耗时间={}",taskName,(endtime-startTime));}
}

执行日志如下:【测试结果并发串行执行(同一时间只有一个任务执行)】

2024-07-10 13:46:49.587  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行开始时间=1720590409587
2024-07-10 13:46:52.601  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行结束时间=1720590409587
2024-07-10 13:46:52.601  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行消耗时间=3014
----------------
2024-07-10 13:46:52.665  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行开始时间=1720590412665
2024-07-10 13:46:55.678  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行结束时间=1720590412665
2024-07-10 13:46:55.678  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行消耗时间=3013
----------------
2024-07-10 13:46:55.759  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行开始时间=1720590415759
2024-07-10 13:46:58.761  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行结束时间=1720590415759
2024-07-10 13:46:58.761  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行消耗时间=3002

压力测试对比资源使用情况

结论使用线程池比较消耗资源,特别是内存,一点并发上来可能oom

压测前

image.png

200并发

image.png

1000并发

image.png

2000并发

image.png

测试分布式锁+自旋锁+mq全局串行执行【通过】

使用线程池控制会导致请求积压到线程池消耗cpu和内存资源,使用mq能有效削峰限流(减小服务器资源消耗),线上部署了两个节点即并发为2

消费者代码

	/*** * 	spring.rabbitmq.listener.order.queue.name=queue-2spring.rabbitmq.listener.order.queue.durable=truespring.rabbitmq.listener.order.exchange.name=exchange-1spring.rabbitmq.listener.order.exchange.durable=truespring.rabbitmq.listener.order.exchange.type=topicspring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=truespring.rabbitmq.listener.order.key=springboot.** @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,  Channel channel,@Headers Map<String, Object> headers) throws Exception {//System.err.println("--------------------------------------");//System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);onLockOrderMessage(order);//手工ACKchannel.basicAck(deliveryTag, false);}@AutowiredRedissonClient redissonClient;private void onLockOrderMessage(com.bfxy.springboot.entity.Order order) {RLock lock = redissonClient.getLock("my-lock");boolean locked = false;try {while (!locked) {locked = lock.tryLock();if (locked) {try {long startTime = System.currentTimeMillis();String id = order.getId();LOGGER.info("订单序号={},订单执行开始时间={}",id,startTime);Thread.sleep(7000);long endtime = System.currentTimeMillis();LOGGER.info("订单序号={},订单执行结束时间={}",id,startTime);LOGGER.info("订单序号={},订单执行消耗时间={}",id,(endtime-startTime));System.out.println("----------------");} finally {lock.unlock();}} else {// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁Thread.sleep(100);}}} catch (Exception e) {e.printStackTrace();}}

生产者代码

	@Testpublic void testSender3() throws Exception {for (int i = 1;i<=50;i++){int finalI = i;CompletableFuture.runAsync(()->{Order order = new Order(String.valueOf(finalI), "第"+finalI+"个订单");rabbitSender.sendOrder(order);});System.err.println("发送消息订单:"+finalI);if (i%5==0){Thread.sleep(1000);}}}

启动两个消费者【验证全局串行:同一时间只有一个业务执行】

记录消费日志验证是否串行

  1. 通过日志可知:单个消费者消费顺序执行
  2. 验证消费者1和2直接业务串行

消费者2:15:18:22 到 15:18:50 之间没有接收到消息【串行执行】
image.png
验证消费1:15:18:22 到 15:18:50时间段消息情况【串行执行】
image.png

业务幂等性保障测试【通过】

mq接收到消息会将消息中的uid放入redis,当重复消费时会进行判断,保障业务幂等性

	@Testpublic void time() {// 获取字符串对象String key = "myKey";String value = "Hello, Redis!";RBucket<String> bucket = redissonClient.getBucket(key);bucket.set(value, 30, TimeUnit.SECONDS); // 设置失效时间为10秒}

image.png

幂等逻辑

// 判断key是否存在
if(bucket.isExists()){LOGGER.error("重复消费,id={}",id);// 重复消息不执行业务逻辑跳出直接ackbreak;
}else {marker(id);
}

重复消费情况:进入断点表示重复执行break会跳过业务代码
image.png

rabbitmq配置生效测试

原项目配置【自动ack测试】-【通过】

测试自动ack是否生效

package com.bfxy.springboot.config;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;@Configuration
public class RabbitMQConfig {@Value("${spring.rabbitmq.host}")private String addresses;@Value("${spring.rabbitmq.port}")private String port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Value("${spring.rabbitmq.publisher-confirms}")private boolean publisherConfirms;@Bean/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());return rabbitTemplate;}@Beanpublic RabbitTemplate manualAckRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 配置手动ACKrabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setAcknowledgeMode(AcknowledgeMode.MANUAL);return rabbitTemplate;}@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses + ":" + port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);/** 如果要进行消息回调,则这里必须要设置为true */connectionFactory.setPublisherConfirms(publisherConfirms);return connectionFactory;}@Bean("mqContainerFactory")@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}}

注释手动ack
image.png

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=total-api
spring.rabbitmq.connection-timeout=15000

发送消息,没ack前控制台信息
image.png
等待一会,自动ack的消息从rabbitmq中删除了
image.png

新增配置【手动ack测试】-【通过】

rabbitmq如何实现接受指定的消息要手动ack,其他消息自动ack

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
}

消费者代码指定手动ack,并注释手动ackimage.png
查看rabbitmq中消息是否被删除,预期消息不会删除
image.png
放开手动ack注释,再次测试
image.png

兜底保证方案

消息处理可能失败,处理失败的消息记录到broker_message_log表中

-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (`message_id` varchar(128) NOT NULL, -- 消息唯一ID`message` varchar(4000) DEFAULT NULL, -- 消息内容`try_count` int(4) DEFAULT '0', -- 重试次数`status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  --下一次重试时间 或 超时时间`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --创建时间`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --更新时间PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

通过定时任务重新执行失败的消息
执行点设计

  1. 定时任务重目标业务方法(该方式要将业务封装某个class的某个方法中,失败时会录入表中)
  2. 发送mq在消费中执行

相关文章:

某某会员小程序后端性能优化

背景 某某会员小程序后台提供开放平台能力&#xff0c;为三方油站提供会员积分、优惠劵等api。当用户在油站加油&#xff0c;油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢&#xff0c;三方调用发放积分接口性能极低&#xff0c;耗时30s&#xff1b; 接口情况…...

Docker:基础概念、架构与网络模式详解

1.Docker的基本概念 1.1.什么是docker Docker是一个用于开发,交付和运行应用程序的开放平台.docker使您能够将应用程序域基础框架分开,以便你可以快速开发交付软件.使用docker,你可以管理你的基础架构以管理应用程序相同的方式.通过利用docker用于交付,测试和部署代码的方法,你…...

全国大学生数据建模比赛c题——基于蔬菜类商品的自动定价与补货决策的研究分析

基于蔬菜类商品的自动定价与补货决策的研究分析 摘要 商超蔬菜不易保存&#xff0c;其质量会随着销售时间的增加而变差&#xff0c;影响商超收益&#xff0c;因此&#xff0c;基于各蔬菜品类的历史销售数据&#xff0c;制定合理的销售策略和补货决策对商超的营收十分关键。本文…...

【漏洞复现】飞企互联-FE企业运营管理平台 uploadAttachmentServlet—文件上传漏洞

声明&#xff1a;本文档或演示材料仅用于教育和教学目的。如果任何个人或组织利用本文档中的信息进行非法活动&#xff0c;将与本文档的作者或发布者无关。 一、漏洞描述 企互联-FE企业运营管理平台是一个利用云计算、人工智能、大数据、物联网和移动互联网等现代技术构建的云…...

基于深度学习的语言生成

基于深度学习的语言生成&#xff08;NLG, Natural Language Generation&#xff09;是一种利用深度学习模型生成自然语言文本的技术。它在智能写作、自动摘要、对话系统、机器翻译等领域有广泛应用。以下是对这一领域的系统介绍&#xff1a; 1. 任务和目标 语言生成的主要任务…...

Kafka Rebalance详解

作者&#xff1a;耀灵 1.rebalance概览 rebalance中文含义为再平衡。它本质上是一组协议&#xff0c;规定了一个 consumer group 是如何达成一致来分配订阅 topic 的所有分区的。比方说Consumer Group A 有3个consumer 实例&#xff0c;它要消费一个拥有6个分区的topic&#…...

在 Markdown 编辑器中插入 空格 Space 和 空行 Enter

1. 空格 Space &#xA0;2.空行 Enter <br/>...

js逆向-webpack-python

网站&#xff08;base64&#xff09;&#xff1a;aHR0cHM6Ly93d3cuY29pbmdsYXNzLmNvbS96aA 案例响应解密爬取&#xff08;webpack&#xff09; 1、找到目标url 2、进行入口定位&#xff08;此案例使用 ‘decrypt(’ 关键字搜索 &#xff09; 3、找到位置进行分析 --t 为 dat…...

Python精神病算法和自我认知异类数学模型

&#x1f3af;要点 &#x1f3af;空间不确定性和动态相互作用自我认知异类模型 | &#x1f3af;精神病神经元算法推理 | &#x1f3af;集体信念催化个人行动力数学模型 | &#x1f3af;物种基因进化关系网络算法 | &#x1f3af;电路噪声低功耗容错解码算法 &#x1f4dc;和-…...

npm install 报错:PhantomJS not found on PATH

npm install 报错&#xff1a;PhantomJS not found on PATH 整体报错内容 npm ERR! code 1 npm ERR! path G:\work-learn\open-coding\bruno\node_modules\phantomjs-prebuilt npm ERR! command failed npm ERR! command C:\Windows\system32\cmd.exe /d /s /c node install.…...

【C++进阶学习】第六弹——set和map——体会用C++来构建二叉搜索树

set和map基础&#xff1a;【C进阶学习】第五弹——二叉搜索树——二叉树进阶及set和map的铺垫-CSDN博客 前言&#xff1a; 在上篇的学习中&#xff0c;我们已经学习了如何使用C语言来实现二叉搜索树&#xff0c;在C中&#xff0c;我们是有现成的封装好的类模板来实现二叉搜索树…...

sqlmap确定目标/实操

安装kali&#xff0c;kali自带sqlmap&#xff0c;在window系统中跟linux系统操作有区别 sqlmap是一款自动化SQL工具&#xff0c;打开kali终端&#xff0c;输入sqlmap&#xff0c;出现以下界面&#xff0c;就说明sqlmap可用。 sqlmap确定目标 一、sqlmap直连数据库 1、直连数据库…...

Java笔试|面试 —— 对多态性的理解

谈谈对多态性的理解&#xff1a; 一个事物的多种形态&#xff08;编译和运行时状态不一致性&#xff09; 实现机制&#xff1a;通过继承、重写和向上转型&#xff08;Object obj new 子类()&#xff09;来实现。 1.广义上的理解 子类对象的多态性&#xff0c;方法的重写&am…...

从RL的专业角度解惑 instruct GPT的目标函数

作为早期chatGPT背后的核心技术&#xff0c;instruct GPT一直被业界奉为里程碑式的著作。但是这篇论文关于RL的部分确写的非常模糊&#xff0c;几乎一笔带过。当我们去仔细审查它的目标函数的时候&#xff0c;心中不免有诸多困惑。特别是作者提到用PPO来做强化学习&#xff0c;…...

location匹配的优先级和重定向

nginx的重定向&#xff08;rewrite&#xff09; location 匹配 location匹配的就是后面的uri /wordpress 192.168.233.10/wordpress location匹配的分类和优先级 1.精确匹配 location / 对字符串进行完全匹配&#xff0c;必须完全符合 2.正则匹配 ^-前缀级别&#xff…...

观察矩阵(View Matrix)、投影矩阵(Projection Matrix)、视口矩阵(Window Matrix)及VPM矩阵及它们之间的关系

V表示摄像机的观察矩阵&#xff08;View Matrix&#xff09;&#xff0c;它的作用是把对象从世界坐标系变换到摄像机坐标系。因此&#xff0c;对于世界坐标系下的坐标值worldCoord(x0, y0, z0)&#xff0c;如果希望使用观察矩阵VM将其变换为摄像机坐标系下的坐标值localCoord(x…...

谷粒商城学习笔记-19-快速开发-逆向生成所有微服务基本CRUD代码

文章目录 一&#xff0c;使用逆向工程步骤梳理1&#xff0c;修改逆向工程的application.yml配置2&#xff0c;修改逆向工程的generator.properties配置3&#xff0c;以Debug模式启动逆向工程4&#xff0c;使用逆向工程生成代码5&#xff0c;整合生成的代码到对应的模块中 二&am…...

时序预测 | Matlab实现TCN-Transformer的时间序列预测

时序预测 | Matlab实现TCN-Transformer的时间序列预测 目录 时序预测 | Matlab实现TCN-Transformer的时间序列预测效果一览基本介绍程序设计 效果一览 基本介绍 基于TCN-Transformer模型的时间序列预测&#xff0c;可以用于做光伏发电功率预测&#xff0c;风速预测&#xff0c;…...

没想到MySQL 9.0这么拉胯

MySQL 7月1号发布了9.0版本&#xff0c;然而没想到并没有引起大家的狂欢&#xff0c;反而是来自DBA圈子的一篇吐槽&#xff0c;尤其是PG界吐槽更厉害。 难道MySQL现在真的这么拉胯了&#xff1f;本着好奇的态度&#xff0c;我也去下载了MySQL9.0的手册看了一下。确实有点让我大…...

开源 Wiki 系统 InfoSphere 2024.01.1 发布

推荐一套基于 SpringBoot 开发的简单、易用的开源权限管理平台&#xff0c;建议下载使用: https://github.com/devlive-community/authx 推荐一套为 Java 开发人员提供方便易用的 SDK 来与目前提供服务的的 Open AI 进行交互组件&#xff1a;https://github.com/devlive-commun…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

群晖NAS如何在虚拟机创建飞牛NAS

套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...

深度解析云存储:概念、架构与应用实践

在数据爆炸式增长的时代&#xff0c;传统本地存储因容量限制、管理复杂等问题&#xff0c;已难以满足企业和个人的需求。云存储凭借灵活扩展、便捷访问等特性&#xff0c;成为数据存储领域的主流解决方案。从个人照片备份到企业核心数据管理&#xff0c;云存储正重塑数据存储与…...

ffmpeg(三):处理原始数据命令

FFmpeg 可以直接处理原始音频和视频数据&#xff08;Raw PCM、YUV 等&#xff09;&#xff0c;常见场景包括&#xff1a; 将原始 YUV 图像编码为 H.264 视频将 PCM 音频编码为 AAC 或 MP3对原始音视频数据进行封装&#xff08;如封装为 MP4、TS&#xff09; 处理原始 YUV 视频…...

LINUX编译vlc

下载 VideoLAN / VLC GitLab 选择最新的发布版本 准备 sudo apt install -y xcb bison sudo apt install -y autopoint sudo apt install -y autoconf automake libtool编译ffmpeg LINUX FFMPEG编译汇总&#xff08;最简化&#xff09;_底部的附件列表中】: ffmpeg - lzip…...

(12)-Fiddler抓包-Fiddler设置IOS手机抓包

1.简介 Fiddler不但能截获各种浏览器发出的 HTTP 请求&#xff0c;也可以截获各种智能手机发出的HTTP/ HTTPS 请求。 Fiddler 能捕获Android 和 Windows Phone 等设备发出的 HTTP/HTTPS 请求。同理也可以截获iOS设备发出的请求&#xff0c;比如 iPhone、iPad 和 MacBook 等苹…...