当前位置: 首页 > news >正文

(二)丶RabbitMQ的六大核心

一丶什么是MQ

        Message Queue(消息队列)简称MQ,是一种应用程序对应用程序的消息通信机制。在MQ中,消息以队列形式存储,以便于异步传输,在MQ中,发布者(生产者)将消息放入队列,而消费者从队列中读取并处理这些消息,这种设计允许生产者和消费者之间解耦,提高系统的响应速度和吞吐量,MQ常用于解耦系统之间的依赖关系,提高系统的稳定性和可扩展性,MQ还支持消峰,即以稳定的系统资源应对突发的流量冲剂,然而使用MQ也可能带来一些挑战,如:系统可用性降低、系统复杂度提高、以及消息一致性问题等;

二丶常见MQ有哪些

        目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等,也有直接使用Redis充当消息队列的案列,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ的产品特征等,综合考虑;

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是Erlang语言编写,而集群和故障转移是构建在开放电信平台框架上的。所有的主要变成语言均有与代理接口通讯的客户端库。

三丶RabbitMQ六种消息模式

        1.Simple Work Queue(简单工作队列):常见的一对一模式,一条消息由一个消费者进行消费。如果有多个消费者,默认是使用轮询的方式将消息分配消费者

        2.Work Queues(工作队列模式):也叫公屏队列,能者多劳的消息队列模型。队列必须收到来自消费者的手动ACK(消息确认机制)才可以继续往消费者发送消息。

        3.Publish/Subscribe(发布订阅模式):一条消息被多和消费者消费。

        4.Ruoting(路由模式):有选择的接收消息。

        5.Topics(主题模式):通过一定的规则来选择性的接收消息

        6.RPC模式:发布者发布消息,并且通过RPC方式等待结果。

(1)Simple Work Queue(简单工作队列)

        消息生产后将消息放入队列,消息的消费者(consumer)监听消息队列嘛,如果队列中有消息就消费掉,消息被消费后自动从消息队列中删除。(也可能出现异常)

/*** @Description:获取RabbitMQ连接* @Author: xy丶*/
public class RabbitMQConnection {public final static String RABBITMQ_SERVER_HOST = "192.168.0.122";public final static int RABBITMQ_SERVER_PORT = 5672;public final static String VIRTUAL_HOST = "/XY_HOST";public static Connection getConnection() throws IOException, TimeoutException {//1、创建连接ConnectionFactory factory = new ConnectionFactory();//2、设置主机名factory.setHost(RABBITMQ_SERVER_HOST);//3、设置通讯端口,默认是5672,不专门设置也可以factory.setPort(RABBITMQ_SERVER_PORT);//4、设置账号和密码factory.setUsername("admin");factory.setPassword("admin");//4、设置Virtual Hostfactory.setVirtualHost(VIRTUAL_HOST);//5、创建连接return factory.newConnection();}}

       消费者

/*** @Description:简单工作队列模式消费者* @Author: xy丶*/
@Slf4j
public class Consumer {private final static String SIMPLE_WORK_QUEUE= "simple_work_queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//获取通道Channel channel = connection.createChannel();channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(SIMPLE_WORK_QUEUE, true, consumer);QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("Consumer reception "+message);}
}

生产者

/*** @Description:简单工作队列模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String SIMPLE_WORK_QUEUE = "simple_work_queue";/*** 简单工作队列模式* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();// 声明队列 String var1, 是否持久化// boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者// boolean var3, 是否自动删除// boolean var4, 消费完删除// Map<String, Object> var5 其他属性channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);// 消息内容 String var1, 是否持久化// boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者// boolean var3, 是否自动删除// boolean var4, 消费完删除// Map<String, Object> var5 其他属性String message = "Hello Word!!!";channel.basicPublish("", SIMPLE_WORK_QUEUE,null,message.getBytes());log.info("Producer send "+message);//最后关闭通关和连接channel.close();connection.close();}
       (2).Work Queues(工作队列模式) 创建生产者两个消费者看看效果

        生产者

package com.puwang.MQ.workQueue;import com.puwang.MQ.config.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** @Description:工作队列模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String QUEUE_WORK = "QUEUE_WORK";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false, false, false, null);for(int i = 0; i < 20; i++){String message = "娃哈哈" + i;channel.basicPublish("", QUEUE_WORK, null, message.getBytes());System.out.println("send=============="+message);Thread.sleep(i*10);}channel.close();connection.close();}
}
@Slf4j
public class WorkQueueConsumer1 {private final static  String QUEUE_WORK = "QUEUE_WORK";/*** 结果:** 1、一条消息只会被一个消费者接收;** 2、rabbit采用轮询的方式将消息是平均发送给消费者的;** 3、消费者在处理完某条消息后,才会收到下一条消息。* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_WORK, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("[消费者1] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}@Slf4j
public class WorkQueueConsumer2 {private final static  String QUEUE_WORK = "QUEUE_WORK";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_WORK, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("[消费者2] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
       (3).Publish/Subscribe(发布订阅模式)

        生产者

/*** 订阅模式 生产者* 订阅模式:一个生产者发送的消息会被多个消费者获取。* 消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费* 相关场景:邮件群发,群聊天,广播(广告)* @Description:发布订阅模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";/*** 交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费* 相关场景:邮件群发,群聊天,广播(广告)* @param args*/public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKeychannel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//发送消息for (int i = 0; i < 10; i++) {String message = "哇哈哈哈!!!"+i;log.info("send message:" + message);//发送消息channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE, "", null, message.getBytes("utf-8"));Thread.sleep(100 * i);}channel.close();connection.close();}
}

消费者

/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer1 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer1] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer1 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer1] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer2 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer2] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}

       (4).Ruoting(路由模式)

消费者

/*** 1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息* 2)根据业务功能定义路由字符串* 3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;* 客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误* @Description:发布订阅模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_SEND = "queue_send";//队列名称2  接收static final String QUEUE_RECEIVE = "queue_receive";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,toppic,direct,headers*/channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE, "direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_SEND,true,false,false,null);channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);/*** 队列绑定交换机* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接*/channel.queueBind(QUEUE_SEND,ROUTING_DIRECT_EXCHANGE,"send");channel.queueBind(QUEUE_RECEIVE,ROUTING_DIRECT_EXCHANGE,"receive");//发送消息String message = "路由模式:routing key 为 send";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"send",null,message.getBytes());log.info("已发送消息:"+message);//发送消息message = "路由模式:routing key 为 receive";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"receive",null,message.getBytes());log.info("已发送消息:"+message);//关闭资源channel.close();connection.close();}
}

消费者


/***路由消费者*/
@Slf4j
public class RoutingConsumer1 {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_SEND = "queue_send";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_SEND,true,false,false,null);//队列绑定交换机channel.queueBind(QUEUE_SEND, ROUTING_DIRECT_EXCHANGE,"send");//创建消费这,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,*          消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keylog.info("路由key为:" + envelope.getRoutingKey());//交换机log.info("交换机为:" + envelope.getExchange());//消息idlog.info("消息id为:" + envelope.getDeliveryTag());//收到的消息log.info("消费者1-接收到的消息为:" + new String(body, "utf8"));}};/*** 监听消息* 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_SEND, true, consumer);}/***路由消费者*/
@Slf4j
public class RoutingConsumer2 {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_RECEIVE = "queue_receive";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);//队列绑定交换机channel.queueBind(QUEUE_RECEIVE, ROUTING_DIRECT_EXCHANGE,"receive");//创建消费这,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,*          消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keylog.info("路由key为:" + envelope.getRoutingKey());//交换机log.info("交换机为:" + envelope.getExchange());//消息idlog.info("消息id为:" + envelope.getDeliveryTag());//收到的消息log.info("消费者2-接收到的消息为:" + new String(body, "utf8"));}};/*** 监听消息* 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_RECEIVE, true, consumer);}
}
       (5).Topics(主题模式/路由模式的一种)

生产者

/*** 跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系* 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;**要求* Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。* 分隔符* “*(星号)”:可以代替一个单词* “#(井号)”:可以替代零个或多个单词* @Description:主题模式* @Author: xy丶*/
@Slf4j
public class TopicProducer {public static final String TOPIC_EXCHANGE = "topic_exchange";public static final String TOPIC_QUEUE_ONE = "topic_queue_one";public static final String TOPIC_QUEUE_TWO = "topic_queue_two";public static void main(String[] args) throws Exception {//声明用作全局变量的队列变量和交换价变量//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TOPIC_QUEUE_ONE,true,false,false,null);channel.queueDeclare(TOPIC_QUEUE_TWO,true,false,false,null);//声明交换机channel.exchangeDeclare(TOPIC_EXCHANGE, "topic",true);//绑定队列channel.queueBind(TOPIC_QUEUE_ONE,TOPIC_EXCHANGE,"*.orange.*");channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"*.*.rabbit");channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"lazy.#");//发生消息for (int i = 0; i <10 ; i++) {String msg="goodnight!My love world===>"+i;channel.basicPublish(TOPIC_EXCHANGE,"ag.we.rabbit",null,msg.getBytes());}}
}

消费者

@Slf4j
public class TopicCustomer1 {public static final String TOPIC_QUEUE_ONE="topic_queue_one";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer1=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_ONE,true,consumer);}
}@Slf4j
public class TopicCustomer2 {public static final String TOPIC_QUEUE_TWO="topic_queue_two";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer22=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);}
}@Slf4j
public class TopicCustomer3 {public static final String TOPIC_QUEUE_TWO = "topic_queue_two";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer2=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);}
}

       (6).RPC模式

相关文章:

(二)丶RabbitMQ的六大核心

一丶什么是MQ Message Queue(消息队列&#xff09;简称MQ&#xff0c;是一种应用程序对应用程序的消息通信机制。在MQ中&#xff0c;消息以队列形式存储&#xff0c;以便于异步传输&#xff0c;在MQ中&#xff0c;发布者&#xff08;生产者&#xff09;将消息放入队列&#xff…...

微信小程序实现上下手势滑动切换

效果图 思路 实现一个微信小程序的复合滚动页面&#xff0c;主要通过Swiper组件实现垂直方向的轮播功能&#xff0c;每个轮播项内部使用Scroll-View组件来展示可垂直滚动的长内容&#xff0c;如图片和文本。 代码 <!-- wxml --> <view class"swiper-container…...

详解命令docker run -d --name container_name -e TZ=Asia/Shanghai your_image

docker run 是Docker的主要命令&#xff0c;用于从镜像启动一个新的容器。下面详细解释并举例说明 -d, --name, -e TZ 参数的用法&#xff1a; -d 或 --detach&#xff1a; 这个标志告诉Docker以守护进程&#xff08;后台&#xff09;模式运行容器。这意味着当你执行 docker ru…...

javaEE7

1. <% page pageEncoding"UTF-8"%><% page import"java.io.*"%> <% page import"java.util.*"%> <% page import"java.math.*"%> <html> <head><title>网站计数器</title></head&…...

int与integer的区别

int和integer都是用来表示整数的数据类型&#xff0c;但有一些细微的区别。 int是Java中的基本数据类型&#xff0c;它可以存储整数值。int类型在内存中占4个字节&#xff0c;范围为-2,147,483,648到2,147,483,647。int类型使用最频繁&#xff0c;因为它的存储空间较小&#x…...

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

Golang实现Redis分布式锁&#xff08;Lua脚本可重入自动续期&#xff09; 1 概念 应用场景 Golang自带的Lock锁单机版OK&#xff08;存储在程序的内存中&#xff09;&#xff0c;分布式不行 分布式锁&#xff1a; 简单版&#xff1a;redis setnx》加锁设置过期时间需要保证原…...

音乐播放器-C#实现

音乐播放器-C#实现 目录 一、 代码介绍 二、 音乐播放器-C#实现 三、 音乐播放器-C#实现 四、 音乐播放器-C#实现 五、 音乐播放器-C#实现 代码介绍 代码中使用了.NET框架中的System.Media命名空间来处理音频文件的播放和控制。这段代码创建了一个简单的音乐播放器界…...

如何本地搭建hMailServer邮件服务

文章目录 前言1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 前言 hMailServer 是一个邮件服务器,通过它我们可以搭建自己的邮件服务,通过cpola…...

裸机编程的几种模式、架构与缺陷。

大多数嵌入式的初学者都是从单片机裸机编程开始的&#xff0c;对于初学者来说&#xff0c;裸机编程更加直观、简单&#xff0c;代码所见及所得&#xff0c;调试也非常方便&#xff0c;区别于使用操作系统需要先了解大量的操作系统基础知识&#xff0c;调度的基本常识&#xff0…...

TSINGSEE青犀视频AI方案:数据+算力+算法,人工智能的三大基石

背景分析 随着信息技术的迅猛发展&#xff0c;人工智能&#xff08;AI&#xff09;已经逐渐渗透到我们生活的各个领域&#xff0c;从智能家居到自动驾驶&#xff0c;从医疗诊断到金融风控&#xff0c;AI的应用正在改变着我们的生活方式。而数据、算法和算力&#xff0c;正是构成…...

Linux认识与学习BASH

Linux认识与学习BASH 认识BASH这个Shellshell是什么系统的合法shell与/etc/shells功能Bash Shell的功能查询命令是否为Bash shell 的内置命令(type)命令的执行与快速编辑按钮 shell的变量功能什么是变量&#xff1f;变量的使用与设置&#xff1a;echo、变量设置规则、unset环境…...

Python JSON 序列化以及反序列化 文件读写

Python JSON 序列化以及反序列化 JSON (JavaScript Object Notation) 是一种轻量级的文本数据存储格式。JSON 数据通常存储在字符串中&#xff0c;即JSON字符串&#xff0c;其实就是一字符串&#xff0c;只是带有一定的格式&#xff0c;可以被解析。本文使用的 Python 版本为3…...

Spring MVC 返回JSON数据

1. 前置准备 1.1 导入jackson依赖 <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.0</version> </dependency>1.2 添加json数据转化器 EnableWebMvc …...

前端基础——HTML傻瓜式入门(1)

该文章Github地址&#xff1a;https://github.com/AntonyCheng/html-notes 在此介绍一下作者开源的SpringBoot项目初始化模板&#xff08;Github仓库地址&#xff1a;https://github.com/AntonyCheng/spring-boot-init-template & CSDN文章地址&#xff1a;https://blog.c…...

【AI】如何创建自己的自定义ChatGPT

如何创建自己的自定义ChatGPT 目录 如何创建自己的自定义ChatGPT大型语言模型(LLM)GPT模型ChatGPTOpenAI APILlamaIndexLangChain参考推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课本文将记录如何使用OpenAI GPT-3.5模型、LlamaIndex和LangChain创建自己的…...

电子科技大学链时代工作室招新题C语言部分---题号E

1. 题目 这道题大概的意思是说&#xff0c;一座城市中被埋了许多雷&#xff08;用一个只含0和1的字符串表示城市&#xff0c;1代表有雷&#xff0c;0代表无雷&#xff09;。 你作为一个排雷兵&#xff0c;需要花最少的钱引爆所有的雷来使城市中不再有雷&#xff08;太逆天了&a…...

K8S CNI

OCI概念 OCI&#xff0c;Open Container Initiative&#xff0c;开放容器标准&#xff0c;是一个轻量级&#xff0c;开放的治理结构&#xff08;项目&#xff09;&#xff0c;在 Linux 基金会的支持下成立&#xff0c;致力于围绕容器格式和运行时创建开放的行业标准。 OCI 项目…...

Python数据分析实验一:Python数据采集与存储

目录 一、实验目的与要求二、实验过程三、主要程序清单和运行结果1、爬取 “中国南海网” 站点上的相关信息2、爬取天气网站上的北京的历史天气信息 四、程序运行结果五、实验体会 一、实验目的与要求 1、目的&#xff1a; 理解抓取网页数据的一般处理过程&#xff1b;熟悉应用…...

丘一丘正则表达式

正则表达式(regular expression,regex,RE) 正则表达式是一种用来简洁表达一组字符串的表达式正则表达式是一种通用的字符串表达框架正则表达式是一种针对字符串表达“简洁”和“特征”思想的工具正则表达式可以用来判断某字符串的特征归属 正则表达式常用操作符 操作符说明实…...

工业物联网平台在水务环保、暖通制冷、电力能源等行业的应用

随着科技的不断发展&#xff0c;工业物联网平台作为连接物理世界与数字世界的桥梁&#xff0c;正逐渐成为推动各行业智能化转型的关键力量。在水务环保、暖通制冷、电力能源等行业&#xff0c;工业物联网平台的应用尤为广泛&#xff0c;对于提升运营效率、降低能耗、优化管理等…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

C++实现分布式网络通信框架RPC(2)——rpc发布端

有了上篇文章的项目的基本知识的了解&#xff0c;现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存&#xff0c;但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程&#xff0c;可以参考这篇文章&#xff0c;我觉得写的非常…...