当前位置: 首页 > news >正文

Rabbit消息的可靠性

生产者重连

 消费者重试

 

 

Confirm模式简介

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

    1. 具体代码设置

yml 

server:port: 8080spring:application:name: confirm-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式my:exchangeName: exchange.confirm.1queueName: queue.confirm.1

配置类

package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueName}")private String queueName;@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}@Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("info");}}

写法一

 配置回调类

package com.powernode.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("关联id为:{}",correlationData.getId()+"");if (ack){log.info("消息正确的达到交换机");return;}//ack =false 没有到达交换机log.error("消息没有到达交换机,原因为:{}",cause);}
}

 发送消息类

package com.powernode.service;import com.powernode.config.MyConfirmCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfirmCallBack confirmCallBack;@PostConstruct //构造方法后执行它,相当于初始化作用public void init(){rabbitTemplate.setConfirmCallback(confirmCallBack);}public void sendMsg(){Message message= MessageBuilder.withBody("hello world".getBytes()).build();CorrelationData correlationData=new CorrelationData(); //关联数据correlationData.setId("order_123456"); //发送订单信息rabbitTemplate.convertAndSend("exchange.confirm.1","info",message,correlationData);log.info("消息发送完毕,发送时间为:{}",new Date());}
}

 启动类

package com.powernode;import com.powernode.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class Application implements ApplicationRunner {@Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}

 方法二

利用lambda 可以省掉配置回调类

package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService{@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstruct //构造方法后执行它,相当于初始化作用public void init(){rabbitTemplate.setConfirmCallback(//lambda 表达式(correlationData, ack, cause)->{log.info("关联id为:{}",correlationData.getId()+"");if (ack){log.info("消息正确的达到交换机");return;}//ack =false 没有到达交换机log.error("消息没有到达交换机,原因为:{}",cause);});}public void sendMsg(){Message message= MessageBuilder.withBody("hello world".getBytes()).build();CorrelationData correlationData=new CorrelationData(); //关联数据correlationData.setId("order_123456"); //发送订单信息rabbitTemplate.convertAndSend("exchange.confirm.4dddd","info",message,correlationData);log.info("消息发送完毕,发送时间为:{}",new Date());}}

  1. RabbitMQ消息Return模式

  1. 消息可靠性投递

rabbitmq 整个消息投递的路径为:

producer —> exchange —> queue —> consumer

>> 消息从 producer 到 exchange 则会返回一个 confirmCallback;

>> 消息从 exchange –> queue 投递失败则会返回一个 returnCallback;

我们可以利用这两个callback控制消息的可靠性投递;

开启 确认模式;

使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;

注意配置文件中,开启 退回模式;

spring.rabbitmq.publisher-returns: true

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到

queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

yml

server:port: 8080spring:application:name: ttl-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-returns: true #开启return模式my:exchangeName: exchange.return.1queueName: queue.return.1

配置类

package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueName}")private String queueName;@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}@Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("info");}
}

方式一 

发送消息类

package com.powernode.service;import com.powernode.config.MyReturnCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyReturnCallBack myReturnCallBack;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack); //设置回调}public void sendMsg(){Message message= MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return.1","info1111",message);log.info("消息发送完毕,发送时间为:{}",new Date());}
}

      回调配置类

package com.powernode.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 当消息从交换机 没有正确地 到达队列,则会触发该方法* 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法** @param returned*/
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",returnedMessage.getReplyText());}
}

方式二 lambda

package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService{@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(//使用lambda表达式message->{log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",message.getReplyText());}); //设置回调}public void sendMsg(){Message message= MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return.4","info",message);log.info("消息发送完毕,发送时间为:{}",new Date());}}

RabbitMQ交换机详细属性

3.1具体参数

1、Name:交换机名称;就是一个字符串

2、Type:交换机类型,direct, topic, fanout, headers四种

3、Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;

4、Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;

5、Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。

6、Arguments:只有一个取值alternate-exchange,表示备用交换机;

3.2代码演示

结论1:没发消息之前不会创建交换机和对列

结论2:发消息后,如果交换机不存在,才开始创建交换机,如果队列不存在,则创建新的对列

结论3:创建交换机或者队列完成后再重新创建,如果修改交换机或队列参数则会报错

406错误(inequivalent arg 'durable' for exchange 'exchange.durability' in vhost 'powernode': received 'false' but current is 'true', class-id=40, method-id=10))

结论4:设置持久化为false ,重启rabbitmq-server,则交换机丢失,实验durable参数,先看下控制台,然后重启rabbitmq-server

结论5:实验自动删除为 true ,从控制台上手动解绑,会发现自动删除

3.3 备用交换机

3.3.1 备用交换机使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。

3.3.2 主要代码和注意事项

备用交换机示例如下:

注意:备用交换机一般使用fanout交换机

测试时:指定一个错误路由

重点:普通交换机设置参数绑定到备用交换机

Map<String, Object> arguments = new HashMap<>();

//指定当前正常的交换机的备用交换机是谁

arguments.put("alternate-exchange", EXCHANGE_ALTERNATE);

//DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)

return new DirectExchange(EXCHANGE, true, false, arguments);

//return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

3.3.3 参考配置代码

yml

server:port: 8080spring:application:name: ttl-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodemy:exchangeNormalName: exchange.normal.alternate  #正常交换机exchangeAlternateName: exchange.alternate.1 #备用交换机queueNormalName: queue.normal.alternate  #正常队列queueAlternateName: queue.alternate.1  # 备用队列

配置类 

package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.exchangeAlternateName}")private String exchangeAlternateName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.queueAlternateName}")private String queueAlternateName;@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder // 默认为持久化的,默认不自动删除.directExchange(exchangeNormalName) // 交换机的名字.alternate(exchangeAlternateName) //设置备用交换机 alternate-exchange.build();}@Beanpublic Queue queueNormal(){return QueueBuilder.durable(queueNormalName).build();}@Beanpublic Binding binding(DirectExchange normalExchange,Queue queueNormal){return BindingBuilder.bind(queueNormal).to(normalExchange).with("info");}@Bean //备用交换机public FanoutExchange alternateExchange(){return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();}@Beanpublic Queue alternateQueue(){return QueueBuilder.durable(queueAlternateName).build();}@Beanpublic Binding bindingAlternate(FanoutExchange alternateExchange,Queue alternateQueue){return BindingBuilder.bind(alternateQueue).to(alternateExchange);}
}

3.3.4 参考发送消息代码

@Service

public class MessageService {

    @Resource

    private RabbitTemplate rabbitTemplate;

    /**

     * 发送消息

     */

public void sendMessage() {

//我们故意写错路由key,由于我们正常交换机设置了备用交换机,所以该消息就会进入备用交换机
//从而进入备用对列,我们可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理
//如果正常交换机没有设置备用交换机,则该消息会被抛弃。

        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info1223", "hello");

        System.out.println("消息发送完毕......");

    }

}

RabbitMQ队列详细属性

4.1 具体参数

Type:队列类型

Name:队列名称,就是一个字符串,随便一个字符串就可以;

Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;

Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;

Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;

基本上不设置它,设置成false

Arguments:队列的其他属性,例如指定DLX(死信交换机等);

1、x-expires:Number

当Queue(队列)在指定的时间未被访问,则队列将被自动删除;

2、x-message-ttl:Number

发布的消息在队列中存在多长时间后被取消(单位毫秒);

3、x-overflow:String

设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,有效值为Drop HeadReject Publish

4、x-max-length:Number

队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;

5、 x-single-active-consumer:默认为false

激活单一的消费者,也就是该队列只能有一个消息者消费消息;

6、x-max-length-bytes:Number

限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法;

7、x-dead-letter-exchange:String

指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;

8.x-dead-letter-routing-key:String

指定死信交换机的路由键,一般和6一起定义;

9.x-max-priority:Number

如果将一个队列加上优先级参数,那么该队列为优先级队列;

(1)、给队列加上优先级参数使其成为优先级队列

x-max-priority=10【0-255取值范围】

(2)、给消息加上优先级属性

通过优先级特性,将一个队列实现插队消费;

MessageProperties messageProperties=new MessageProperties();
messageProperties.setPriority(8);

10、x-queue-mode:String(理解下即可)

队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用,如果未设置,则队列将保留内存缓存以尽可能快地传递消息;

11、x-queue-master-locator:String(用的较少,不讲)

在集群模式下设置队列分配到的主节点位置信息;

每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;

每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave。

基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;

关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator,有三种可供选择的策略:

(1)min-masters:选择master queue数最少的那个服务节点host;

(2)client-local:选择与client相连接的那个服务节点host;

(3)random:随机分配;

4.2 参考代码

@Configuration

public class RabbitConfig {

    public static final String EXCHANGE = "exchange";

    public static final String QUEUE = "queue";

    public static final String KEY = "info";

    QueueBuilder builder;

    @Bean

    public DirectExchange directExchange() {

        return ExchangeBuilder.directExchange(EXCHANGE).build();

    }

    @Bean

    public Queue queue() {

        Map<String, Object> arguments = new HashMap<>();

        //arguments.put("x-expires", 5000);

        //arguments.put("x-max-length", 5);

        //arguments.put("x-overflow", "reject-publish");

        arguments.put("x-single-active-consumer", false); //TODO ???

        //arguments.put("x-max-length-bytes", 20); // 单位是字节

        //arguments.put("x-max-priority", 10); // 0-255 //表示把当前声明的这个队列设置成了优先级队列,那么该队列它允许消息插队

        //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM内存的使用,如果未设置,队列将保留内存缓存以尽可能快地传递消息;

        //有时候我们把这种队列叫:惰性队列

        //arguments.put("x-queue-mode", "lazy");

        //设置队列版本。默认为版本1。

        //版本1有一个基于日志的索引,它嵌入了小消息。

        //版本2有一个不同的索引,可以在许多场景中提高内存使用率和性能,并为以前嵌入的消息提供了按队列存储。

        //arguments.put("x-queue-version", 2);

        // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

        //arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

        //-------------------------

        //arguments.put("x-expires", 10000); //自动过期,10秒

        //arguments.put("x-message-ttl", 10000); //自动过期,10秒,不会删除队列

        //QueueBuilder 类里面有定义,设置队列溢出行为,当达到队列的最大长度时消息会发生什么,有效值是drop-head、reject-publish

        //arguments.put("x-max-length", 5);

        //arguments.put("x-overflow", QueueBuilder.Overflow.dropHead.getValue());

        //表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

        //arguments.put("x-single-active-consumer", true);

        // x-max-length-bytes,队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

        //arguments.put("x-max-length-bytes", 10);

        //参数是1到255之间的正整数,表示队列应该支持的最大优先级,数字越大代表优先级越高,没有设置priority优先级字段,那么priority字段值默认为0;如果优先级队列priority属性被设置为比x-max-priority大,那么priority的值被设置为x-max-priority的值。

        //arguments.put("x-max-priority", 10);

        //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

        //arguments.put("x-queue-mode", "lazy");

        arguments.put("x-queue-version", 2);

        // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

        arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

        //---------------------------------------------

        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)

        return new Queue(QUEUE, true, false, false, arguments);

    }

    @Bean

    public Binding binding(DirectExchange directExchange, Queue queue) {

        return BindingBuilder.bind(queue).to(directExchange).with(KEY);

    }

}

实验durable 参数 重启rabbitmq-server,队列丢失

实验autodelete参数:加入接收者,发现停掉服务,那么久没有消费者了,对列就会自动删除

消息可靠性投递

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

解决方案:

  1. 、队列持久化

代码:

QueueBuilder.durable(QUEUE).build();

  1. 、交换机持久化

代码:

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

  1. 、消息持久化

代码:

默认持久化

 MessageProperties messageProperties = new MessageProperties();

//设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码

 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

  1. 、集群,镜像队列,高可用

  1. 确保消息从队列正确地投递到消费者

采用消息消费时的手动ack确认机制来保证;

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

开启手动ack消息消费确认


spring.rabbitmq.listener.simple.acknowledge-mode=manual

 yml

server:port: 8080spring:application:name: rabbit-12-reliabilityrabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-confirm-type: correlated # 开启发布者的确认模式publisher-returns: true # 开启发布者的return模式listener:simple:acknowledge-mode: manual # 开始消费者的手动确认模式my:exchangeName: exchange.reliabilityqueueName: queue.reliability

 配置类

package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueName}")private String queueName;@Beanpublic DirectExchange directExchange(){//默认就是持久化的return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue queue(){//队列持久化return QueueBuilder.durable(queueName).build();}@Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("info");}
}

消息类

package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class MessageService {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 构造方法执行后自动执行*/@PostConstructpublic void init(){//开启生产者的确定模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(!ack){log.error("消息没有到达交换机,原因为:{}",cause);//TODO 重发消息或者记录错误日志}});rabbitTemplate.setReturnsCallback(returnedMessage->{log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}",returnedMessage.getReplyText());//TODO 记录错误日志,给程序员发短信或者或者邮件});}public void sendMsg(){MessageProperties messageProperties=new MessageProperties();//设置单条消息的持久化,默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message= MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.reliability","info",message);log.info("消息发送完毕,发送时间为:{}",new Date());}
}

发送消息

package com.powernode;import com.powernode.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class Application implements ApplicationRunner {@Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}

 消费者消费消息类(手动确认)

package com.powernode.message;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class ReceiveMessage {@RabbitListener(queues = {"queue.reliability"})public void receiveMsg(Message message, Channel channel){//获取消息的唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收到的消息为:{}",new String(message.getBody()));// TODO 插入订单等
//            int a=10/0;//手动确认channel.basicAck(deliveryTag,false);} catch (Exception e) {log.error("消息处理出现问题");try {channel.basicNack(deliveryTag,false,true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}}
}

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;

如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

  1. 消息的幂等性

消息消费时的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

以接口幂等性举例:

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;

注册接口;

发送短信验证码接口;

比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;

如何避免消息的重复消费问题?(消息消费时的幂等性)

全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

具体代码参考以下代码;

参考代码:

  //1、把消息的唯一ID写入redis

        boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中该key不存在,那么就设置,存在就不设置

        if (flag) { //key不存在返回true

            //相当于是第一次消费该消息

            //TODO 处理业务

            System.out.println("正常处理业务....." + orders.getId());

        }

相关文章:

Rabbit消息的可靠性

生产者重连 消费者重试 Confirm模式简介 消息的confirm确认机制&#xff0c;是指生产者投递消息后&#xff0c;到达了消息服务器Broker里面的exchange交换机&#xff0c;则会给生产者一个应答&#xff0c;生产者接收到应答&#xff0c;用来确定这条消息是否正常的发送到Broker…...

Java中的网络编程是什么?

Java中的网络编程是指使用Java编程语言进行网络通信的过程和技术。它允许Java程序在互联网或局域网上进行数据交换、通信和传输。 Java提供了许多类和接口&#xff0c;用于实现网络编程。主要的网络编程相关的类在java.net包中可以找到。以下是一些常用的类和接口&#xff1a;…...

Oracle 常用命令大全

数据库 ----数据库启动 & 关闭 启动数据库 SQL> startup nomount; SQL> alter database mount; SQL> alter database open;关闭数据库 SQL> shutdown immediate&#xff1b;更多内容请参考&#xff1a;Oracle数据库启动和关闭 ----连接数据库 登陆普通用…...

Mysql 开启ssl连接

本文是针对Mysql 5.7版本以上数据库 1. 检查当前SSL / TLS状态 我们将使用-h指定IPv4本地环回接口,以强制客户端与TCP连接,而不是使用本地套接字文件。 这将允许我们检查TCP连接的SSL状态: mysql -u root -p -h 127.0.0.1键入以下内容以显示SSL / TLS变量的状态: SHOW …...

Java Stream流对List集合进行分页

有一种情况&#xff0c;我们有时不便在数据库层面进行分页。我们知道Mybatis的startPage();方法也是对数据库进行limit操作&#xff0c;有没有一种方式&#xff0c;只对List集合进行分页呢&#xff1f; 当然有&#xff0c;我们可以使用Stream流的方式对List集合进行操作&#…...

Docker(二)、linux环境Docker的部署以及构建镜像

linux环境Docker的部署以及构建镜像 一、docker部署1、快速部署常用的命令&#xff1a;1.1、demo-部署tomcat1.2、tomcat容器内部结构1.2.1、每个tomcat容器&#xff0c;都包含三个组件1.2.2、在容器内部执行命令 1.3、容器生命周期 二、Dockerfile构建镜像1、demo-Dockerfile自…...

GEE错误——Image.select: Pattern ‘MDF‘ did not match any bands

问题 ImageCollection (Error) Collection query aborted after accumulating over 5000 elements. ImageCollection (268 elements) Mean DOD550: Layer error: ImageCollection.reduce: Error in map(ID=MCD19A2_A2001001_h15v17_061_2022161165308_01): Image.select: Patte…...

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS基础(四)

开始吧&#xff0c;做时间的主人&#xff01; 把时间分给睡眠&#xff0c;分给书籍&#xff0c;分给运动&#xff0c; 分给花鸟树木和山川湖海&#xff0c; 分给你对这个世界的热爱&#xff0c; 而不是将自己浪费在无聊的人和事上。 思维导图 函数 为什么需要函数 <!DO…...

mysql超级聚合with rollup

超级聚合&#xff0c;是在group by的基础上&#xff0c;再次进行聚合。 它再次聚合的列&#xff0c;是select中没有用到聚合函数的列。 文章目录 例子1解释例子2表以及数据 例子1 mysql> SELECT year, country, product, SUM(profit) AS profitFROM salesGROUP BY year, c…...

浅谈电动汽车充电桩设计与应用研究

安科瑞 华楠 摘要&#xff1a;目前&#xff0c;随着我国社会经济的快速发展&#xff0c;我国的各个领域都取得了突破性的发展&#xff0c;尤其是在电动汽车充电桩的设计方法&#xff0c;新型的电动汽车充电桩设计已经广泛的受到了人民群众的青睐与认可&#xff0c;而这种发展前…...

tensorflow Windows安装说明

TensorFlow官网教程 Tensorflow 2.10是最后一个在本地windows上支持GPU的版本。从2.11版本开始&#xff0c;需要在windows WLS2&#xff08;适用于 Linux 的 Windows 子系统&#xff09;上安装才能使用GPU。 在anaconda shell控制台中,切换至虚拟环境, 安装TensorFlow 这是用…...

【Leetcode热题】打卡 day11——20(更新至11)

1、合并两个有序链表 - 链表 暴力 / 递归 21. 合并两个有序链表 &#xff08;1&#xff09;暴力 class Solution {public ListNode mergeTwoLists(ListNode l1, ListNode l2) {ListNode dummynew ListNode();ListNode curdummy;while(l1!null&&l2!null){if(l1.val&l…...

linux使用操作[3]

文章目录 版权声明环境变量$符号自行设置环境变量 上传、下载rz、sz命令 压缩、解压tar命令压缩tar解压zip 命令压缩文件unzip 命令解压文件 版权声明 本博客的内容基于我个人学习黑马程序员课程的学习笔记整理而成。我特此声明&#xff0c;所有版权属于黑马程序员或相关权利人…...

梦想让生活得以忍受-寄语机器视觉工程师

我&#xff0c;曾梦想梦想走天涯&#xff0c;看看这世界的繁华&#xff0c;年少的心总有些轻狂&#xff0c;如今四海为家。 大家都听过这首歌&#xff0c;迎来很多打工人的共鸣&#xff0c;著名作家海明威曾说&#xff0c;“一个人可以被打败&#xff0c;但不可以被毁灭”&…...

linux 设置打开文件数

可以使用下面的文件进行设置 /etc/security/limits.d/90-nproc.conf 先来看/etc/security/limits.d/90-nproc.conf 配置文件&#xff1a; [root ~]# cat /etc/security/limits.d/90-nproc.conf # Default limit for number of users processes to prevent # accidental fork…...

MySQL基础篇-约束

目录 1.约束概述 2.分类 3.测试user表的约束情况 主键约束 非空约束及唯一约束 检查约束 默认约束 4.外键约束 外键约束的语法 外键约束的删除/更新行为 小结 1.约束概述 MySQL约束&#xff08;Constraints&#xff09;是用于确保表中数据完整性和一致性的规则。它们定…...

系统工程知识体系(SEBoK)

介绍 《系统工程知识体系》&#xff08;SEBoK&#xff09;是以一种理念设计的&#xff0c;即如果工程师有一个实时更新、实用的指南&#xff0c;他们就能做出更优秀的工作。如果你以前没有使用过这个资源&#xff0c;也没有关系&#xff1b;因为已经有一个完整的指南供你参考&…...

Spring DI (Dependency Injection)

What Is DI? 当一个类需要依赖另一个对象&#xff0c;把另一个对象实例化之后注入给这个对象的过程我们称之为DI # Create an object dependency in traditional programming public class Store {private Item item;public Store() {item new ItemImpl1(); } }# Using …...

Spring Boot : ORM 框架 JPA 与连接池 Hikari

数据库方面我们选用 Mysql &#xff0c; Spring Boot 提供了直接使用 JDBC 的方式连接数据库&#xff0c;毕竟使用 JDBC 并不是很方便&#xff0c;需要我们自己写更多的代码才能使用&#xff0c;一般而言在 Spring Boot 中我们常用的 ORM 框架有 JPA 和 Mybaties &#xff0c;本…...

Wireshark抓包分析ICMP协议

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 分析目的&#xff1a;分析ICMP协议的数据格式、报文…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...