2.12日学习打卡----初学RocketMQ(三)
2.12日学习打卡
目录:
- 2.12日学习打卡
- 一. RocketMQ高级特性(续)
- 消息重试
- 延迟消息
- 消息查询
- 二.RocketMQ应用实战
- 生产端发送同步消息
- 发送异步消息
- 单向发送消息
- 顺序发送消息
- 消费顺序消息
- 全局顺序消息
- 延迟消息
- 事务消息
- 消息查询
一. RocketMQ高级特性(续)
消息重试
生产端重试
例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);
消费端重试
同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。
-
顺序消息重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
发生 -
无序消息重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费
消息失败时,可以通过设置返回状态达到消息重试的结果。- 最大重试次数
消息消费失败后,可被消息队列RocketMQ重复投递的最大
次数。
TCP协议无序消息重试时间间隔:
- 消费失败后重新配置方式
- 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
- 返回 Null
- 抛出异常
延迟消息
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。 - 最大重试次数
//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";
代码测试
生产者
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());//设置延迟时间0-17 0表示2秒 大于18都是2小时message.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}
消费者
package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
运行结果
消息查询
在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。
//返回结果
SendResult [sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],queueOffset=0]
-
按MessageId查询消息
Message Id 是消息发送后,在Broker端生成的,其包含了
Broker的地址、偏移信息,并且会把Message Id作为结果的一
部分返回。Message Id中属于精确匹配,代表唯一一条消息,
查询效率更高。 -
按照Message Key查询消息
消息的key是开发人员在发送消息之前自行指定的,通常把具有
业务含义,区分度高的字段作为消息的key,如用户id,订单id
等。 -
按照Unique Key查询消息
除了开发人员指定的消息key,生产者在发送发送消息之前,会
自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
代表一条消息
消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
二.RocketMQ应用实战
生产端发送同步消息
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {//实例化消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
运行结果
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...
Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。
SendStatus:发送的标识。成功,失败等
Queue:相当于是Topic的分区;用于并行发送和接收消息
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;public class AsyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();//消息失败重试次数producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;// 根据消息数量实例化倒计时计算器final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);//创建消息for(int i=0;i<messageCount;i++){final int index=i;Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 等待5scountDownLatch.await(5, TimeUnit.SECONDS);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
package com.jjy.produce;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
消息发送时的权衡
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 使用场景 |
---|---|---|---|---|
同步发送 | 快 | 有 | 可靠 | 邮件、短信、推送 |
异步发送 | 快 | 有 | 可靠 | 视频转码 |
单向发送 | 最快 | 无 | 可能丢失 | 日志收集 |
顺序发送消息
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class OrderProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();// 获取指定主题的MQ列表final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");Message message = null;MessageQueue messageQueue = null;for (int i = 0; i < 100; i++) {// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序// 发送到同一个MQmessageQueue = messageQueues.get(i % 8);//创建message对象//发送创建订单消息message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());producer.send(message, messageQueue);//发送付款订单消息message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());producer.send(message, messageQueue);//发送订单送货消息message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());producer.send(message, messageQueue);}producer.shutdown();}
}
消费顺序消息
package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class OrderConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);// 使用有序消息监听器consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg.getTopic() + "\t" +msg.getQueueId() + "\t" +new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}
全局顺序消息
生产者
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class GlobalOrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message = null;for(int i=0;i<100;i++){message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get((Integer)0);}},1);}producer.shutdown();}
}
消费者
package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class GlobalConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}
延迟消息
生产者
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}
消费者
package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
事务消息
生产者
package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionListener listener=new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务System.out.println("执行本地事务......");// return LocalTransactionState.COMMIT_MESSAGE;//使用下面事务回滚 消费端无法接收到消息了try {Thread.sleep(100000);} catch (InterruptedException e) {e.printStackTrace();}return LocalTransactionState.ROLLBACK_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 该方法用于获取本地事务执行的状态。System.out.println("检查本地事务的状态:" + messageExt);return LocalTransactionState.COMMIT_MESSAGE;}};//创建事务消息生产者TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");//设置事务监听器producer.setTransactionListener(listener);producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;message=new Message("tp_demo_11","hello translation message".getBytes());producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");}
}
消费者
package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionMsgConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");consumer.setNamesrvAddr("192.168.66.100:9876");consumer.subscribe("tp_demo_11", "*");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}
消息查询
package com.itbaizhan.consumer;public class QueryingMessageDemo {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");//设置nameserver地址consumer.setNamesrvAddr("192.168.66.100:9876");//设置消息监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//根据messageId查询消息MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");System.out.println(message);System.out.println(message.getMsgId());consumer.shutdown();}
}
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
相关文章:

2.12日学习打卡----初学RocketMQ(三)
2.12日学习打卡 目录: 2.12日学习打卡一. RocketMQ高级特性(续)消息重试延迟消息消息查询 二.RocketMQ应用实战生产端发送同步消息发送异步消息单向发送消息顺序发送消息消费顺序消息全局顺序消息延迟消息事务消息消息查询 一. RocketMQ高级特…...

<网络安全>《35 网络攻防专业课<第一课 - 网络攻防准备>》
1 主要内容 认识黑客 认识端口 常见术语与命令 网络攻击流程 VMWare虚拟环境靶机搭建 2 认识黑客 2.1 白帽、灰帽和黑帽黑客 白帽黑客是指有能力破坏电脑安全但不具恶意目的黑客。 灰帽黑客是指对于伦理和法律态度不明的黑客。 黑帽黑客经常用于区别于一般(正面…...

【实战】一、Jest 前端自动化测试框架基础入门(一) —— 前端要学的测试课 从Jest入门到TDD BDD双实战(一)
文章目录 一、前端要学的测试课1.前端要学的测试2.前端工程化的一部分3.前端自动化测试的例子4.前端为什么需要自动化测试?5.课程涵盖内容6.前置技能7.学习收获 二、Jest 前端自动化测试框架基础入门1. 自动化测试背景及原理前端自动化测试产生的背景及原理 2.前端自…...

蓝桥杯Java组备赛(二)
题目1 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);int n sc.nextInt();int max Integer.MIN_VALUE;int min Integer.MAX_VALUE;double sum 0;for(int i0;i<n;i) {int x sc.nextInt()…...

人力资源智能化管理项目(day10:首页开发以及上线部署)
学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/humanResourceIntelligentManagementProject 首页-基本结构和数字滚动 安装插件 npm i vue-count-to <template><div class"dashboard"><div class"container"><!-- 左侧内…...

Conda管理Python不同版本教程
Conda管理Python不同版本教程 目录 0.前提 1.conda常用命令 2.conda设置国内源(以添加清华源为例,阿里云源同样) 3.conda管理python库 4.其它 不太推荐 pyenv管理Python不同版本教程(本人另一篇博客,姊妹篇&…...

free pascal:fpwebview 组件通过 JSBridge 调用本机TTS
从 https://github.com/PierceNg/fpwebview 下载 fpwebview-master.zip 简单易用。 先请看 \fpwebview-master\README.md cd \lazarus\projects\fpwebview-master\demo\js_bidir 学习 js_bidir.lpr ,编写 js_bind_speak.lpr 如下,通过 JSBridge 调用本…...

数据结构——单链表专题
目录 1. 链表的概念及结构2. 实现单链表初始化尾插头插尾删头删查找在指定位置之前插入数据在指定位置之后插入数据删除指定位之前的节点删除指定位置之后pos节点销毁链表 3. 完整代码test.cSList.h 4. 链表的分类 1. 链表的概念及结构 在顺序表中存在一定的问题: …...

Linux:开源世界的王者
在科技世界中,Linux犹如一位低调的王者,统治着开源世界的半壁江山。对于许多技术爱好者、系统管理员和开发者来说,Linux不仅仅是一个操作系统,更是一种信仰、一种哲学。 一、开源的魅力 Linux的最大魅力在于其开源性质。与封闭的…...

⭐北邮复试刷题103. 二叉树的锯齿形层序遍历 (力扣每日一题)
103. 二叉树的锯齿形层序遍历 给你二叉树的根节点 root ,返回其节点值的 锯齿形层序遍历 。(即先从左往右,再从右往左进行下一层遍历,以此类推,层与层之间交替进行)。 示例 1:输入:…...

文件上传漏洞--Upload-labs--Pass07--点绕过
一、什么是点绕过 在Windows系统中,Windows特性会将文件后缀名后多余的点自动删除,在网页源码中,通常使用 deldot()函数 对点进行去除,若发现网页源代码中没有 deldot() 函数,则可能存在 点绕过漏洞。通过点绕过漏洞&…...

MySQL高级特性篇(1)-JSON数据类型的应用
MySQL是一种常用的关系型数据库管理系统,它提供了多种数据类型,其中包括JSON数据类型。JSON(JavaScript Object Notation)是一种常用的数据交换格式,它以键值对的形式组织数据,并支持嵌套和数组结构。MySQL…...

如何用Qt实现一个无标题栏、半透明、置顶(悬浮)的窗口
在Qt框架中,要实现一个无标题栏、半透明、置顶(悬浮)的窗口,需要一些特定的设置和技巧。废话不多说,下面我将以DrawClient软件为例,介绍一下实现这种效果的四个要点。 要点一:移除标题栏&#…...

ViT: transformer在图像领域的应用
文章目录 1. 概要2. 方法3. 实验3.1 Compare with SOTA3.2 PRE-TRAINING DATA REQUIREMENTS3.3 SCALING STUDY3.4 自监督学习 4. 总结参考 论文: An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale 代码:https://github.com…...

Sora 的工作原理(及其意义)
原文:How Sora Works (And What It Means) 作者: DAN SHIPPER OpenAI 的新型文本到视频模型为电影制作开启了新篇章 DALL-E 提供的插图。 让我们先明确一点,我们不会急急忙忙慌乱。我们不会预测乌托邦或预言灾难。我们要保持冷静并... 你…...

Java学习笔记2024/2/16
知识点 面向对象 题目1(完成) 定义手机类,手机有品牌(brand),价格(price)和颜色(color)三个属性,有打电话call()和sendMessage()两个功能。 请定义出手机类,类中要有空参、有参构造方法,set/get方法。 …...

XLNet做文本分类
import torch from transformers import XLNetTokenizer, XLNetForSequenceClassification from torch.utils.data import DataLoader, TensorDataset # 示例文本数据 texts ["This is a positive example.", "This is a negative example.", "Anot…...

Swift 5.9 新 @Observable 对象在 SwiftUI 使用中的陷阱与解决
概览 在 Swift 5.9 中,苹果为我们带来了全新的可观察框架 Observation,它是观察者开发模式在 Swift 中的一个全新实现。 除了自身本领过硬以外,Observation 框架和 SwiftUI 搭配起来也能相得益彰,事倍功半。不过 Observable 对象…...

分享一个学英语的网站
名字叫:公益大米网 Freerice 这个网站是以做题的形式来记忆单词,题干是一个单词,给出4个选项,需要选出其中最接近题干单词的选项。 答对可以获得10粒大米,网站的创办者负责捐赠。如图 触发某些条件&a…...

【动态规划】【C++算法】2742. 给墙壁刷油漆
作者推荐 【数位dp】【动态规划】【状态压缩】【推荐】1012. 至少有 1 位重复的数字 本文涉及知识点 动态规划汇总 LeetCode2742. 给墙壁刷油漆 给你两个长度为 n 下标从 0 开始的整数数组 cost 和 time ,分别表示给 n 堵不同的墙刷油漆需要的开销和时间。你有…...

【后端高频面试题--设计模式上篇】
🚀 作者 :“码上有前” 🚀 文章简介 :后端高频面试题 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 往期精彩内容 【后端高频面试题–设计模式上篇】 【后端高频面试题–设计模式下篇】 【后端高频…...

P3141 [USACO16FEB] Fenced In P题解
题目 如果此题数据要小一点,那么我们可以用克鲁斯卡尔算法通过,但是这个数据太大了,空间会爆炸,时间也会爆炸。 我们发现,如果用 MST 做,那么很多边的边权都一样,我们可以整行整列地删除。 我…...

Android Compose 一个音视频APP——Magic Music Player
Magic Music APP Magic Music APP Magic Music APP概述效果预览-视频资源功能预览Library歌曲播放效果预览歌曲播放依赖注入设置播放源播放进度上一首&下一首UI响应 歌词歌词解析解析成行逐行解析 视频播放AndroidView引入Exoplayer自定义Exoplayer样式横竖屏切换 歌曲多任…...

Nginx实战:安装搭建
目录 前言 一、yum安装 二、编译安装 1.下载安装包 2.解压 3.生成makefile文件 4.编译 5.安装执行 6.执行命令软连接 7.Nginx命令 前言 nginx的安装有两种方式: 1、yum安装:安装快速,但是无法在安装的时候带上想要的第三方包 2、…...

Qt之条件变量QWaitCondition详解(从使用到原理分析全)
QWaitCondition内部实现结构图: 相关系列文章 C之Pimpl惯用法 目录 1.简介 2.示例 2.1.全局配置 2.2.生产者Producer 2.3.消费者Consumer 2.4.测试例子 3.原理分析 3.1.源码介绍 3.2.辅助函数CreateEvent 3.3.辅助函数WaitForSingleObject 3.4.QWaitCo…...

OpenSource - 一站式自动化运维及自动化部署平台
文章目录 orion-ops 是什么重构特性快速开始技术栈功能预览添砖加瓦License orion-ops 是什么 orion-ops 一站式自动化运维及自动化部署平台, 使用多环境的概念, 提供了机器管理、机器监控报警、Web终端、WebSftp、机器批量执行、机器批量上传、在线查看日志、定时调度任务、应…...

【后端高频面试题--设计模式下篇】
🚀 作者 :“码上有前” 🚀 文章简介 :后端高频面试题 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 后端高频面试题--设计模式下篇 往期精彩内容设计模式总览模板方法模式怎么理解模板方法模式模板方…...

这才是大学生该做的副业,别再痴迷于游戏了!
感谢大家一直以来的支持和关注,尤其是在我的上一个公众号被关闭后,仍然选择跟随我的老粉丝们,你们的支持是我继续前行的动力。为了回馈大家长期以来的陪伴,我决定分享一些实用的干货,这些都是我亲身实践并且取得成功的…...

Ubuntu20.04 安装jekyll
首先使根据官方文档安装:Jekyll on Ubuntu | Jekyll • Simple, blog-aware, static sites 如果没有报错,就不用再继续看下去了。 我这边在执行gem install jekyll bundler时报错,所以安装了rvm,安装rvm可以参考这篇文章Ubuntu …...

AWK语言
一. awk awk:报告生成器,格式化输出。 在 Linux/UNIX 系统中,awk 是一个功能强大的编辑工具,逐行读取输入文本,默认以空格或tab键作为分隔符作为分隔,并按模式或者条件执行编辑命令。而awk比较倾向于将一行…...