当前位置: 首页 > news >正文

【SpringBoot整合RabbitMQ(上)】

一、简单的生产者-消费者

1.1、创建连接工具类获取信道

public class RabbitMqUtils {public static Channel getChannel() throws IOException, TimeoutException {//创建一个链接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 链接RabbitMQ的队列factory.setHost("192.168.116.3");factory.setVirtualHost("/test");//用户名factory.setUsername("admin");//密码factory.setPassword("123");//创建链接Connection connection = factory.newConnection();//获取链接当中的信道Channel channel = connection.createChannel();return channel;}
}

 1.2、创建生产者

步骤:

①生产者连接RabbitMQ服务端,获取信道(即连接)

②由信道声明一个队列(调用queueDeclare方法),其中参数表示:

1.队列名称
2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数(后续使用时说明)
③控制台输入消息内容并发送,发送调用basicPublish方法,其中参数表示:
1.发送到哪个交换机(入门采用默认交换机,后续使用时详细说明其用法)
2.路由的key值是哪个 本次是队列名称
3.其他参数信息(后续使用时说明)
4.发送消息的消息体

public class Task01 {//队列的名称public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息发送完毕:" + message);}}
}

 1.3、创建消费者

步骤:

①消费者连接RabbitMQ服务端,获取信道(即连接)

②信道调用basicConsume方法接收消息,其中参数表示:
1.消费哪个队列
2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
3.当一个消息发送过来后的回调接口,即接收到消息如何处理
4.消费者取消消费的回调

public class Worker01 {//队列的名称public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println("工作线程2接收到的消息:" + new String(message.getBody()));};/**取消消费的回调*/CancelCallback cancelCallback = (consumeTag) -> {System.out.println(consumeTag + "消费者取消消息消费接口回调逻辑");};System.out.println("T2等待接收消息......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

 二、轮训分发-生产者消费者代码

       轮训分发顾名思义就是生产者将消息发送到MQ当中时,由多个消费者来处理这些消息,消费者1一条,消费者2一条,以此类推,直到MQ当中的消息消费完。

2.1、生产者

此处采用消息持久化策略(添加MessageProperties.PERSISTENT_TEXT_PLAIN),即将队列和队列当中的信息持久化到磁盘当中。

public class Task {/*** 队列名称*/private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();boolean durable = true;//表示开启消息持久化channel.queueDeclare(QUEUE_NAME,durable,false,false,null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.nextLine();//设置生产者发送的消息为持久化消息(要求保存到磁盘上)channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息:" + message);}}
}

2.2、消费者1

此处将消费者1设置为手动应答,自动应答则是消费者一旦接收到消息就告知队列,队列则将该消息从队列中删除,手动应答是为了确保消息确确实实被处理掉了才告知队列进行删除,以防处理时出现问题导致消息未被完全处理就丢弃的情况。

public class Consumer01 {/*** 队列名称*/private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println("C1接收到的消息:" + new String(message.getBody(),"UTF-8"));//手动应答/*** 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息* 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};/*** 采用手动应答*/boolean autoAck = false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

2.3、消费者2

消费者2和消费者1一样采取手动应答,确保消息的正确处理。

public class Consumer02 {/*** 队列名称*/private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println("C2接收到的消息:" + new String(message.getBody(),"UTF-8"));//手动应答/*** 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息* 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};/*** 采用手动应答*/boolean autoAck = false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

 2.4、关于自动应答和手动应答

        自动应答可以保证高吞吐量,但是保证不了数据传输安全性。一方面,当采用自动应答时,队列发送完消息,当消息还未被消费者接收到时消费者一旦断开连接,此条消息则会丢失。另一方面,例如消费者端并没有设置传递消息的数量限制,队列持续的发消息给消费端,然而消费者端处理消息的能力低下,导致消息大量积压,最终导致内存耗尽,此时消费者线程会被操作系统杀死,那么这些消息依然没有被消费就丢失了。手动应答可以完美的规避自动应答的弊端,但是,手动应答的吞吐量并不高,因此采用哪种应答方式需要根据实际应用场景进行权衡。

消息应答的方法有两种:

Channel.basicAck(用于肯定确认,告知队列消息已被处理,可以删除了)
Channel.basicNack(用于否定确认,告知队列消息未被正常处理,需要重新发送该消息)
Channel.basicReject(用于否定确认,区别在于:告知队列不处理该消息了直接拒绝,可以将其丢弃了 )
此外,手动应答(basicAck)时的第二个参数Multiple的取值有两种,true和false:
true表示批量应答,即将信道上未被处理的消息一并应答。
false表示不采用批量应答,仅应答此条消息。

三、消息重新入队

参考二当中的代码,引入线程沉睡表示消息处理的效率,让消费者1处理一条消息仅需1s,消费者2处理一条消息需要30s,生产者发送AA\BB\CC\DD四条消息,按照上述逻辑,消费者1会打印AA\CC,消费者2对打印BB\DD,当生产者发送完DD时就将消费者2线程杀死。观察消费者1的打印台情况,可以观察到DD则由消费者1进行消费了,当MQ未收到消费者2对于DD的应答,DD这条消息依然存放在MQ当中,由于消费者2断开了连接,此时MQ就会将DD发送到消费者1的信道当中,由消费者1处理DD这条消息。

 生产者:

消费者1:

 消费者2:

 四、不公平分发

参考三当中的线程沉睡,我们类比成消费者1的处理能力高效,消费者2的处理能力低下,此处采用不公平分发,即能者多劳,让闲着的线程处理更多的消息。在消费者1和消费者2当中分别引入如下代码:

        /*** 设置不公平分发*/int prefetchCount = 1;channel.basicQos(prefetchCount);

生产者发送AA\BB\CC\DD四条消息,消费者1和消费者2打印台执行效果和三当中的效果一样,唯一区别在于我们此时并没有强制杀死消费者2线程。

五、预取值

预取值是对不公平分发的进一步优化,所谓预取值,举个例子,此时队列当中有1\2\3\4\5\6\7\8,共8条消息,设置消费者1的预取值为5,消费者2的预取值为2,表示将这8条消息当中的5条发送给消费者1的信道当中,将2条发送给消费者2的信道当中,消费者1每应答一条消息,队列才会将下一条消息发往消费者1的信道,类似的,消费者2也是如此。这样做的目的在于假设我们可以预估到两个消费者的处理能力,根据他们的能力让他们处理对标能力的消息。预取值的大小我们一般是不会准确把握的,只有通过反复的测试才会把握到一个接近处理能力的预取值。观察两个消费者的打印台输出情况:

设置消费者1预取值及其打印台:

        /*** 预取值*/int prefetchCount = 5;channel.basicQos(prefetchCount);

设置消费者2预取值及其打印台:

        /*** 预取值*/int prefetchCount = 2;channel.basicQos(prefetchCount);

 六、发布确认模式

发布确认模式针对生产者,生产者将信道设置为confirm模式,此模式下,生产者发送的消息将需要RabbitMQ进行确认收到,防止生产者发送的消息丢失。发布确认模式有三种策略,分别为单个确认模式、批量确认模式、异步批量确认模式。针对以上三种策略,分别实现其生产者发送消息,观察其发送效率。

将三种模式封装成三个方法,并由主函数调用:

    /*** 队列名称*/private static final String QUEUE_NAME = "confirm";/*** 批量发送消息条数*/private static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();channel.confirmSelect();//开启发布确认模式channel.queueDeclare(QUEUE_NAME,true,false,false,null);/** 发布确认模式* 1、单个确认* 2、批量确认* 3、异步批量确认*/publishMessageIndividually(channel);//25432publishMessageBatch(channel);//391asynchronousConfirmMessage(channel);//22}

6.1、单个确认模式

单个确认模式即生产者发送一条,MQ收到后回应一条,之后生产者继续发送下一条。

    //单个确认public static void publishMessageIndividually(Channel channel) throws Exception{//开始时间long startTime = System.currentTimeMillis();//批量的发消息 单个发布确认for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//单个消息就立即进行发布确认boolean flg = channel.waitForConfirms();if (!flg){System.out.println("第" + i + "条消息发送失败~");}}//结束时间long endTime = System.currentTimeMillis();long spendTime = endTime - startTime;System.out.println("单个确认模式发布1000条消息耗时:" + spendTime + " ms");}

6.2、批量确认模式

批量确认模式即发送一批消息后一起确认是否收到,与上述单个确认模式相比要大大减少耗时,但是一旦某条消息未被收到,我们将很难排查到是哪条消息。

    //批量确认public static void publishMessageBatch(Channel channel) throws Exception{//开始时间long startTime = System.currentTimeMillis();//批量确认消息大小int confirmSize = 100;//每100条确认一次//批量的发送消息 批量发布确认for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//判断达到100条消息的时候批量的确认一次if (((i + 1) % confirmSize) == 0){boolean flg = channel.waitForConfirms();if (!flg){System.out.println("第" + (i - 99) + "条~第" + i + "条持久化失败~" );}else {System.out.println("第" + (i - 99) + "条~第" + i + "条持久化成功~" );}}}//结束时间long endTime = System.currentTimeMillis();long spendTime = endTime - startTime;System.out.println("批量确认模式发布1000条消息耗时:" + spendTime + " ms");}

6.3、异步批量确认模式

异步批量确认模式完美的规避了上两种确认模式的弊端,异步批量确认模式中生产者只负责发送消息,而无需关心消息是否发送到队列当中并进行了持久化,由MQ当中的broker负责告知消息的发送结果,生产者只需要在代码当中准备一个监听器,监听broker的消息,broker告知监听器哪些消息发送成功了,哪些消息发送失败了,我们准备一个线程安全的数据容器即可,将发送的消息放到这个容器当中。监听器当中的参数ackCallback和nackCallback会分别对容器当中的数据进行处理。ackCallback有两手准备,multiple为true时表示批量应答,此时将容器当中Key小于该deliverTag的移到临时的容器当中,反之为false时,表示只应答一条,则从容器中移除该数据。

    //异步确认public static void asynchronousConfirmMessage(Channel channel) throws Exception{/*** 线程安全有序的哈希表 适用于高并发的情况下* 1.轻松的将序号和消息进行关联* 2.可以轻松的批量删除条目 只要给到序号* 3.支持高并发(多线程)*/ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();//消息确认成功回调函数/*** 1、消息的标记* 2、是否为批量确认*/ConfirmCallback ackCallback = (deliveryTag,multiple) -> {//删除已经确认的消息 剩下的就是未确认的消息if (multiple){ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);confirmed.clear();}else {outstandingConfirms.remove(deliveryTag);}};//消息确认失败回调函数/*** 1、消息的标记* 2、是否为批量确认*/ConfirmCallback nackCallback = (deliveryTag,multiple) -> {String message = outstandingConfirms.get(deliveryTag);//3--->打印未确认的消息都有哪些System.out.println("未确认的消息:序号->" + deliveryTag + " 消息体:" + message);};//准备消息的监听器 监听哪些消息成功了 哪些消息失败了/*** 1、监听哪些消息成功了* 2、监听哪些消息失败了*/channel.addConfirmListener(ackCallback,nackCallback);//异步通知//开始时间long startTime = System.currentTimeMillis();//批量发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";//此处记录所有要发送的消息outstandingConfirms.put(channel.getNextPublishSeqNo(),message);channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));}//结束时间long endTime = System.currentTimeMillis();long spendTime = endTime - startTime;System.out.println("异步确认模式发布1000条消息耗时:" + spendTime + " ms");}

七、交换机

7.1、Exchanges概念

RabbitMQ 消息传递模型的核心思想是 : 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机 (exchange) ,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

7.2、发布/订阅模式(fanout类型)

假设有这么一个场景,在网购系统当中,某家店铺需要将新产品进行推广,该产品面向中老年人退出,因此就需要将推广消息发送给该群体用户。中年群众和老年群众作为两种消费者群体,如何做到只发布一次消息就能做到这两种消费者都能收到该消息?此时,MQ就为我们提供了发布/订阅模式。

消息发送方:

public class EmitLog {/*** 交换机名称*/private static final String EXCHANGE_NAME = "LOGS";/*** 交换机类型*/private static final String EXCHANGE_TYPE = "fanout";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明一个交换机,交换机名称、交换机类型channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送方发送消息:" + message);}}
}

消费者1:

public class ReceviceLogs01 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "LOGS";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明一个队列 临时队列/*** 生成一个临时队列 队列名是随机的* 当消费者与队列断开连接时  队列自动删除*/String queueName = channel.queueDeclare().getQueue();/*** 绑定交换机与队列*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("接收方01等待接收消息......");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}

消费者2:

public class ReceviceLogs02 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "LOGS";/*** 交换机类型*/private static final String EXCHANGE_TYPE = "fanout";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明一个队列 临时队列/*** 生成一个临时队列 队列名是随机的* 当消费者与队列断开连接时  队列自动删除*/String queueName = channel.queueDeclare().getQueue();/*** 绑定交换机与队列*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("接收方02等待接收消息......");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}

 执行结果:

 fanout交换机说明:此类型的交换机可以将接收到的消息发送给所有与它绑定的队列。参考我们上面所说的轮训分发,轮训分发是将队列当中的消息发送给消费者,一条消息仅能由一个消费者进行消费,如果使用默认的交换机,则需要发送多次消息,由队列再分发给每个消费者。

总结:只要该队列和此交换机绑定,无论routing-key是什么,都会接收到生产者发送的消息。

7.3、直接交换机(Direct类型)

假设有这么一个场景,同样是网购系统,在下单时我们需要将交易金额1W以上的订单交给某支付机构A处理,5000-1W的交给支付机构B处理,5000元以下的也交给支付机构A处理。怎么做?支付机构A和支付机构B分别对应消费者1和消费者2,订单为生产者,生产者(网购系统)将订单信息发送至交换机,交换机根据生产者提供的交易金额(routing-key)决定由哪个消费者(支付机构)去处理。这就是直接交换机的用处,可以指定哪个消费者去消费此条消息。

生产者:

public class DirectLogs {/*** 交换机名称*/private static final String EXCHANGE_NAME = "direct_logs";/*** 交换机类型*/public static final String EXCHANGE_TYPE = "direct";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));}}
}

消费者1:

public class ReceiveLogsDirect01 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明一个队列channel.queueDeclare("console",false,false,false,null);/*** 路由绑定*/channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume("console",true,deliverCallback,cancelCallback);}
}

消费者2:

public class ReceiveLogsDirect02 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明一个队列channel.queueDeclare("disk",false,false,false,null);/*** 路由绑定*/channel.queueBind("disk",EXCHANGE_NAME,"error");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume("disk",true,deliverCallback,cancelCallback);}
}

 避坑:队列一旦与direct类型的交换机绑定,则生产者发送消息时则不能使用队列名称来指定发送到哪个队列,只能使用routing-key来进行队列选择。

7.4、Topic交换机

Topic交换机的应用场景是非广泛,fanout交换机和direct交换机的功能都可以使用Topic交换机实现。发送到topic交换机的消息不能具有任意的  routing_key —— 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。binding key也必须采用相同的形式。topic交换机背后的逻辑 类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:

1.*( 星号 ) 可以代替一个单词
2.#( 井号 ) 可以替代零个或多个单词

7.4.1、实用案例:

生产者:

public class TopicExchangePro {/*** 交换机名称*/private static final String EXCHANGE_NAME = "topic_logs";/*** 交换机类型*/private static final String EXCHANGE_TYPE = "topic";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机*/channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String rountingKey = scanner.next();System.out.println("输入的rountingKey为:" + rountingKey);String message = scanner.next();System.out.println("输入消息体的为:" + message);channel.basicPublish(EXCHANGE_NAME,rountingKey,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息体:" + message + "已发送给rountKey为:" + rountingKey + "的队列");}}
}

消费者1:

public class TopicExchangeCon01 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明一个队列*/channel.queueDeclare("receive1",false,false,false,null);/*** 路由绑定*/channel.queueBind("receive1",EXCHANGE_NAME,"*.*.pink");channel.queueBind("receive1",EXCHANGE_NAME,"green.#");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{String rountingKey = message.getEnvelope().getRoutingKey();System.out.println("receive1接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume("receive1",true,deliverCallback,cancelCallback);}
}

消费者2:

public class TopicExchangeCon02 {/*** 交换机名称*/private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明一个队列*/channel.queueDeclare("receive2",false,false,false,null);/*** 路由绑定*/channel.queueBind("receive2",EXCHANGE_NAME,"*.red.*");/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{String rountingKey = message.getEnvelope().getRoutingKey();System.out.println("receive2接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume("receive2",true,deliverCallback,cancelCallback);}
}

7.4.2、结果分析及测试:

消费者1routing-key:

*.*.pink
green.#

 两种匹配规则,路由key为(什么什么pink)可以发往该队列,路由key为(green什么什么什么...)也可以发往该队列。

消费者2routing-key:

*.red.*

匹配规则:路由key是(什么red什么)可以发往该队列。

测试用例:

dog.is.pink 消费者1
green.like.pink 消费者1
green.red.dog 消费者1和消费者2
green.cat 消费者1
house.red.pink 消费者1和消费者2
we.red.family 消费者2

测试结果:

 生产者:

 消费者1:

消费者2:

 避坑:当路由key匹配到同一个队列时(即路由key匹配到同一队列的多个routing-key时),只会给队列发送一次消息。

值得注意的是:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。如果队列绑定键当中没有#*出现,那么该队列绑定类型就是 direct

相关文章:

【SpringBoot整合RabbitMQ(上)】

一、简单的生产者-消费者 1.1、创建连接工具类获取信道 public class RabbitMqUtils {public static Channel getChannel() throws IOException, TimeoutException {//创建一个链接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP 链接RabbitMQ的队列facto…...

Linux 设备驱动程序(二)

系列文章目录 Linux 内核设计与实现 深入理解 Linux 内核&#xff08;一&#xff09; 深入理解 Linux 内核&#xff08;二&#xff09; Linux 设备驱动程序&#xff08;一&#xff09; Linux 设备驱动程序&#xff08;二&#xff09; Linux设备驱动开发详解 文章目录 系列文章目…...

性价比提升15%,阿里云发布第八代企业级计算实例g8a和性能增强型实例g8ae

5 月 17 日&#xff0c;2023 阿里云峰会常州站上&#xff0c;阿里云正式发布第八代企业级计算实例 g8a 以及性能增强性实例 g8ae。两款实例搭载第四代 AMD EPYC 处理器&#xff0c;标配阿里云 eRDMA 大规模加速能力&#xff0c;网络延时低至 8 微秒。其中&#xff0c;g8a 综合性…...

Unity VR开发教程 OpenXR+XR Interaction Toolkit 番外(一)用 Grip 键, Trigger 键和摇杆控制手部动画

文章目录 &#x1f4d5;制作手部动画&#x1f4d5;设置 Animation Controller&#x1f4d5;添加触摸摇杆的 Input Action&#x1f4d5;代码部分 在大部分 VR 游戏中&#xff0c;手部的动画通常是由手柄的三个按键来控制的。比如 Grip 键控制中指、无名指、小拇指的弯曲&#xf…...

H.265/HEVC编码原理及其处理流程的分析

H.265/HEVC编码原理及其处理流程的分析 H.265/HEVC编码的框架图&#xff0c;查了很多资料都没搞明白&#xff0c;各个模块的处理的分析网上有很多&#xff0c;很少有把这个流程串起来的。本文的主要目的是讲清楚H.265/HEVC视频编码的处理流程&#xff0c;不涉及复杂的计算过程。…...

数据结构初阶--链表OJⅡ

目录 前言相交链表思路分析代码实现 环形链表思路分析代码实现 环形链表Ⅱ思路分析代码实现 复制带随机指针的链表思路分析代码实现 前言 本篇文章承接上篇博客&#xff0c;继续对部分经典链表OJ题进行讲解 相交链表 先来看题目描述 思路分析 这道题我们还是首先来判断一…...

离职or苟住?

面对不太好的大环境&#xff0c;我们什么时候该离职&#xff0c;什么时候不应该离职呢&#xff1f;分享几个观点&#xff0c;希望对你有所启发。 以前就有大佬讲过&#xff0c;离职无非是两个原因&#xff0c;一是因为薪资不到位&#xff0c;二是因为受委屈了&#xff0c;总之&…...

微服务之以nacos注册中心,以gateway路由转发服务调用实例(第一篇)

实现以nacos为注册中心,网关路由转发调用 项目版本汇总项目初始化新建仓库拉取仓库项目父工程pom初始化依赖版本选择pom文件如下 网关服务构建pom文件启动类配置文件YMLnacos启动新建命名空间配置网关yml(nacos)网关服务启动 用户服务构建pom文件启动类配置文件YML新增url接口配…...

主成分分析(PCA)直观理解与数学推导

近期在完成信息论的作业&#xff0c;发现网上的资料大多是直观解释&#xff0c;对其中的数学原理介绍甚少&#xff0c;并且只介绍了向量降维&#xff0c;而没有介绍向量重构的问题&#xff08;重构指的是&#xff1a;根据降维后的低维向量来恢复原始向量&#xff09;&#xff0…...

什么是合伙企业?普通合伙和有限合伙区别?

1.什么是合伙企业? 合伙企业是指由各合伙人订立合伙协议&#xff0c;共同出资&#xff0c;共同经营&#xff0c;共享收益&#xff0c;共担风险&#xff0c;并对企业债务承担无限连带责任的营利性组织。合伙企业一般无法人资格&#xff0c;不缴纳企业所得税&#xff0c;缴纳个…...

系统结构考点之不明白的点

系统结构考点系列 计算机系统结构的定义计算机组成的定义计算机实现的定义计算系统的定量设计&#xff1f;1. 哈夫曼压缩原理2. Amdahl定律3. cpu性能公式4. 程序访问局部性定理 ​ 这样的题已经不多了&#xff0c;主要是要了解下概念。打下一个好的基础。 2023年4月份成绩已经…...

Android中AIDL的简单使用(Hello world)

AIDL&#xff1a;Android Interface Definition Language&#xff08;Android接口定义语言&#xff09; 作用&#xff1a;跨进程通讯。如A应用调用B应用提供的接口 代码实现过程简述&#xff1a; A应用创建aidl接口&#xff0c;并且创建一个Service来实现这个接口&#xff08…...

ZED使用指南(五)Camera Controls

七、其他 1、相机控制 &#xff08;1&#xff09;选择视频模式 左右视频帧同步&#xff0c;以并排格式作为单个未压缩视频帧流式传输。 在ZED Explorer或者使用API可以改变视频的分辨率和帧率。 &#xff08;2&#xff09;选择输出视图 ZED能以不同的格式输出图像&#xf…...

wrk泛洪攻击监控脚本

wrk泛洪攻击介绍 WRK泛洪攻击&#xff08;WRK Flood Attack&#xff09;是一种基于WRK工具进行的DDoS攻击&#xff08;分布式拒绝服务攻击&#xff09;。WRK是一个高度并行的HTTP负载生成器&#xff0c;可以模拟大量用户访问一个网站&#xff0c;从而导致该网站服务器瘫痪或失效…...

软件I2C读写MPU6050代码

1、硬件电路 SCL引到了STM32的PB10号引脚&#xff0c;SDA引到了PB11号引脚软件I2C协议&#xff1a; 用普通GPIO口&#xff0c;手动反转电平实现协议&#xff0c;不需要STM32内部的外设资源支持&#xff0c;故端口是可以任意指定MPU605在SCL和SDA自带了两个上拉电阻&#xff0c;…...

销售/回收DSOS254A是德keysight MSOS254A混合信号示波器

Agilent DSOS254A、Keysight MSOS254A、 混合信号示波器&#xff0c;2.5 GHz&#xff0c;20 GSa/s&#xff0c;4 通道&#xff0c;16 数字通道。 ​Infiniium S 系列示波器 信号保真度方面树立新标杆 500 MHz 至 8 GHz 出色的信号完整性使您可以看到真实显示的信号&#xff1…...

RIDGID里奇金属管线检测仪故障定位仪维修SR-20KIT

里奇RIDGID管线定位仪/检测仪/探测仪维修SR-20 SR-24 SR-60 美国里奇SeekTech SR-20管线定位仪对于初次使用定位仪的用户或经验丰富的用户&#xff0c;都同样可以轻易上手使用SR-20。SR-20提供许多设置和参数&#xff0c;使得大多数复杂的定位工作变得很容易。此外&#xff0c…...

NodeJs之调试

关于调试 当我们只专注于前端的时候&#xff0c;我们习惯性F12&#xff0c;这会给我们带来安全与舒心的感觉。 但是当我们使用NodeJs来开发后台的时候&#xff0c;我想噩梦来了。 但是也别泰国担心&#xff0c;NodeJs的调试是很不方便&#xff01;这是肯定的。 但是还好&…...

Java面试知识点(全)- Java并发-多线程JUC二-原子类/锁

Java面试知识点(全) 导航&#xff1a; https://nanxiang.blog.csdn.net/article/details/130640392 注&#xff1a;随时更新 JUC原子类 什么是CAS CAS的全称为Compare-And-Swap&#xff0c;直译就是对比交换。是一条CPU的原子指令&#xff0c;其作用是让CPU先进行比较两个值…...

CSS--移动web基础

01-移动 Web 基础 谷歌模拟器 模拟移动设备&#xff0c;方便查看页面效果 屏幕分辨率 分类&#xff1a; 物理分辨率&#xff1a;硬件分辨率&#xff08;出厂设置&#xff09;逻辑分辨率&#xff1a;软件 / 驱动设置 结论&#xff1a;制作网页参考 逻辑分辨率 视口 作用&a…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

使用Spring AI和MCP协议构建图片搜索服务

目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式&#xff08;本地调用&#xff09; SSE模式&#xff08;远程调用&#xff09; 4. 注册工具提…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...