系列六、Springboot操作RocketMQ
一、同步消息
1.1、发送&接收简单消息
1.1.1、发送简单消息
/*** 测试发送简单消息*/
@Test
public void sendSimpleMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主题发送一个简单消息log.info("测试发送简单消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:39:18.296 INFO 14700 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送简单消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_SIMPLE"},"msgId":"7F000001396C18B4AAC230D9778C0000","offsetMsgId":"C0A8B58A00002A9F0000000000029FA6","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.1.2、接收简单消息
/*** @Description: 消费者消息消息,就添加一个监听*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_SIMPLE",consumerGroup = "BOOT_TOPIC_SIMPLE_GROUP",messageModel = MessageModel.CLUSTERING)
public class MySimpleMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收简单消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 11:39:18.293 INFO 6044 --- [_SIMPLE_GROUP_2] o.star.listener.MySimpleMessageListener : 接收简单消息message:我是一个简单消息
1.2、发送&接收对象消息
1.2.1、发送对象消息
/*** 测试发送对象消息*/
@Test
public void sendObjectMessage() {Order order = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");// 往BOOT_TOPIC_OBJ主题发送一个订单对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_OBJ", order);log.info("测试发送对象消息result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:42:57.879 INFO 35812 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送对象消息result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_OBJ"},"msgId":"7F0000018BE418B4AAC230DCD14D0000","offsetMsgId":"C0A8B58A00002A9F000000000002A0C2","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.2.2、接收对象消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_OBJ", consumerGroup = "BOOT_TOPIC_OBJ_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyObjectMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收对象消息message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 13:44:35.458 INFO 35816 --- [PIC_OBJ_GROUP_1] o.star.listener.MyObjectMessageListener : 接收对象消息message:{"orderSn":"f5c39c2e86f74649b9582e5e50c500ff","userId":1,"description":"小米2s,为发烧而生"}
1.3、发送&接收集合消息
1.3.1、发送集合消息
/*** 测试发送集合消息*/
@Test
public void sendCollectionMessage() {Order order1 = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");Order order2 = new Order(UUID.randomUUID().toString().replace("-", ""), 2, "小米3s,为发烧而生,你值得拥有");List<Order> orders = Arrays.asList(order1, order2);// 往[BOOT_TOPIC_COLLECTION]主题发送集合对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_COLLECTION", orders);log.info("测试发送集合消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 13:50:25.053 INFO 28696 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送集合消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_COLLECTION"},"msgId":"7F000001701818B4AAC2315181130000","offsetMsgId":"C0A8B58A00002A9F000000000002A21F","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.3.2、接收集合消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_COLLECTION", consumerGroup = "BOOT_TOPIC_COLLECTION_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyCollectionMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收集合消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 13:50:39.726 INFO 30076 --- [LECTION_GROUP_1] o.s.l.MyCollectionMessageListener : 接收集合消息message:[{"orderSn":"141bb7c6535b472d83a5099a43422d04","userId":1,"description":"小米2s,为发烧而生"},{"orderSn":"d2b41a75e087455e8910c1cba84f830f","userId":2,"description":"小米3s,为发烧而生,你值得拥有"}]
二、异步消息
发送&接收异步消息
发送异步消息
/*** 测试发送异步消息** @throws Exception*/
@Test
public void sendASyncSimpleMessage() throws Exception {rocketMQTemplate.asyncSend("BOOT_TOPIC_ASYNC", "我是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult result) {log.info("测试发送异步消息 result:{}", JSON.toJSONString(result));}@Overridepublic void onException(Throwable throwable) {log.info("测试发送异步消息 error:{}", throwable.getMessage());}});log.info("我先执行");// 挂起JVM不让方法结束System.in.read();
}
// 控制台打印结果
2023-08-10 14:02:21.125 INFO 30988 --- [ main] cketmqSpringbootProducerApplicationTests : 我先执行
2023-08-10 14:02:21.793 INFO 30988 --- [ublicExecutor_1] cketmqSpringbootProducerApplicationTests : 测试发送异步消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"BOOT_TOPIC_ASYNC"},"msgId":"7F000001790C18B4AAC2315C70D50000","offsetMsgId":"C0A8B58A00002A9F000000000002A3FC","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收异步消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ASYNC",consumerGroup = "BOOT_TOPIC_ASYNC_GROUP",messageModel = MessageModel.CLUSTERING)
public class MyASyncMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收异步消息 message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:02:36.334 INFO 10676 --- [C_ASYNC_GROUP_1] o.star.listener.MyASyncMessageListener : 接收异步消息 message:我是一个异步消息
三、单向消息
发送&接收单向消息
发送单向消息
/*** 适用场景:适用于不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送*/
@Test
public void sendOnewayMessage() {// 发送单向消息,没有返回值和结果rocketMQTemplate.sendOneWay("BOOT_TOPIC_ONE_WAY", "我是一个单向消息");
}
接收单向消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ONE_WAY", consumerGroup = "BOOT_TOPIC_ONE_WAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyOnewayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收单向消息 message:{}",new String(messageExt.getBody()));}}
// 控制台打印结果
2023-08-10 14:07:23.965 INFO 32740 --- [ONE_WAY_GROUP_1] o.star.listener.MyOnewayMessageListener : 接收单向消息 message:我是一个单向消息
四、延迟消息
发送&接收延迟消息
发送延迟消息
/*** 测试发送延迟消息*/
@Test
public void sendDelayMessage() {Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();/*** 设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)* 1s: 等级1* 5s: 等级2* 10s:等级3* 30s:等级4* 1m: 等级5* 发送一个延迟消息,延迟等级为4级,也就是30s后被监听消费* 注意事项:RocketMQ不支持任意时间的延时,只支持上述的延迟规则*/SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_DELAY", message, 2000, 4);log.info("测试发送延迟消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:13:03.859 INFO 4804 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送延迟消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_DELAY"},"msgId":"7F00000112C418B4AAC231663CE60000","offsetMsgId":"C0A8B58A00002A9F000000000002A634","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收延迟消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_DELAY", consumerGroup = "BOOT_TOPIC_DELAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyDelayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收延迟消息 message:{}",new String(messageExt.getBody()));}}// 控制台打印结果
2023-08-10 14:13:33.860 INFO 26112 --- [C_DELAY_GROUP_1] o.star.listener.MyDelayMessageListener : 接收延迟消息 message:我是一个延迟消息
五、顺序消息
发送&接收顺序消息
发送顺序消息
/*** 发送顺序消息,控制流程:下订单==》发短信==》物流* 测试发送顺序消息*/
@Test
public void sendOrderlyMessage() {// 顺序消息,发送者将一组消息都发送至同一个队列,消费者需要单线程进行消费List<Order> orders = Arrays.asList(new Order("aaa", 1, "下订单"),new Order("aaa", 1, "发短信"),new Order("aaa", 1, "物流"),new Order("bbb", 2, "下订单"),new Order("bbb", 2, "发短信"),new Order("bbb", 2, "物流"));orders.forEach(order -> {// 发送,一般都是以json的方式进行处理SendResult result = rocketMQTemplate.syncSendOrderly("BOOT_TOPIC_ORDERLY", JSON.toJSONString(order), order.getOrderSn());log.info("订单id:{},队列id:{},结果:{}",order.getUserId(),result.getMessageQueue().getQueueId(),result.getSendStatus());});
}
// 控制台打印结果
2023-08-10 14:23:02.023 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.026 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.028 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.029 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.030 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.031 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
接收顺序消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ORDERLY",consumerGroup = "BOOT_TOPIC_ORDERLY_GROUP",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式,单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class MyOrderlyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {Order order = JSON.parseObject(new String(messageExt.getBody()),Order.class);log.info("接收顺序消息order:{}",order);}
}
// 控制台打印结果
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=下订单)
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=下订单)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=物流)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=物流)
六、带Tag的消息
发送&接收带Tag的消息
发送带Tag消息
/*** 测试发送带Tag的消息*/
@Test
public void sendTagMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_TAG:TagA", "我是一个带Tag的消息");log.info("测试发送带Tag的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:34:07.806 INFO 30388 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Tag的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_TAG"},"msgId":"7F00000176B418B4AAC2317986310000","offsetMsgId":"C0A8B58A00002A9F000000000002B029","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Tag消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_TAG",consumerGroup = "BOOT_TOPIC_TAG_GROUP",messageModel = MessageModel.CLUSTERING,selectorType = SelectorType.TAG, // tag过滤模式selectorExpression = "TagA || TagB"
)
public class MyTagMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Tag标签的消息 result:{}", new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:34:27.260 INFO 10868 --- [PIC_TAG_GROUP_1] org.star.listener.MyTagMessageListener : 接收带Tag标签的消息 result:我是一个带Tag的消息
七、带Key的消息
发送&接收带Key的消息
发送带Key的消息
/*** 测试发送带Key的消息*/
@Test
public void sendKeyMessage() {// key写在消息头里边Message<String> message = MessageBuilder.withPayload("我是一个带Key的消息").setHeader(RocketMQHeaders.KEYS, "STAR").build();SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_KEY", message);log.info("测试发送带Key的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 26148 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Key的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_KEY"},"msgId":"7F000001662418B4AAC2318421A80000","offsetMsgId":"C0A8B58A00002A9F000000000002B275","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Key的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_KEY",consumerGroup = "BOOT_TOPIC_KEY_GROUP",messageModel = MessageModel.CLUSTERING
)
public class MyKeyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Key的消息:{},Key:{}",new String(messageExt.getBody()), messageExt.getKeys());}
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 23040 --- [PIC_KEY_GROUP_1] org.star.listener.MyKeyMessageListener : 接收带Key的消息:我是一个带Key的消息,Key:STAR
八、集群模式的消息
发送&接收集群模式的消息
发送集群消息
/*** 测试消息消费模式-集群模式* 此种方式消费者会采取轮询的方式进行消费*/
@Test
public void modeForClusterSendMessage() {for (int i = 1; i <= 10; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_CLUSTER", "我是第" + i + "个消息");log.info("集群模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}// 控制台打印结果
2023-08-10 14:58:14.203 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.207 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.211 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.213 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.216 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.218 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.220 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.222 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.224 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.227 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
接收集群消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 14:58:14.209 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第1个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第2个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第3个消息
2023-08-10 14:58:14.216 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第4个消息
2023-08-10 14:58:14.217 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第5个消息
2023-08-10 14:58:14.220 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第6个消息
2023-08-10 14:58:14.222 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第7个消息
2023-08-10 14:58:14.223 INFO 7300 --- [CLUSTER_GROUP_4] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第8个消息
2023-08-10 14:58:14.226 INFO 7300 --- [CLUSTER_GROUP_5] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第9个消息
2023-08-10 14:58:14.229 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第10个消息
九、广播模式的消息
发送&接收广播模式的消息
发送广播模式的消息
/*** 测试消息消费模式-广播模式* 此种方式每一个消费者都会消费一次消息*/
@Test
public void modeForBroadcastingSendMessage() {for (int i = 1; i <= 5; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_BROADCASTING", "我是第" + i + "个消息");log.info("广播模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}
// 控制台打印结果
2023-08-10 15:12:10.081 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
2023-08-10 15:12:10.084 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:1,结果:SEND_OK
2023-08-10 15:12:10.086 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:2,结果:SEND_OK
2023-08-10 15:12:10.088 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:3,结果:SEND_OK
2023-08-10 15:12:10.090 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
接收广播模式的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}// 控制台打印结果
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第5个消息,队列id:0
相关文章:
系列六、Springboot操作RocketMQ
一、同步消息 1.1、发送&接收简单消息 1.1.1、发送简单消息 /*** 测试发送简单消息*/ Test public void sendSimpleMessage() {SendResult result rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主…...
【jupyter异常错误】Kernel started:No module named ipykernel_launcher
尝试过的方案 pip install ipykernel 执行之后提示已经安装,但是执行代码依然报错 解决方案 python -m pip install ipykernel -U --force-reinstall 相当于是强制重新安装 安装成功后没有报错 注:根本原因应该是原来安装的包存在问题,虽然检测出来已经存在…...

使用langchain与你自己的数据对话(五):聊天机器人
之前我已经完成了使用langchain与你自己的数据对话的前四篇博客,还没有阅读这四篇博客的朋友可以先阅读一下: 使用langchain与你自己的数据对话(一):文档加载与切割使用langchain与你自己的数据对话(二):向量存储与嵌入使用langc…...

爬虫与搜索引擎优化:通过Python爬虫提升网站搜索排名
作为一名专业的爬虫程序员,我深知网站的搜索排名对于业务的重要性。在如今竞争激烈的网络世界中,如何让自己的网站在搜索引擎结果中脱颖而出,成为关键。今天,和大家分享一些关于如何通过Python爬虫来提升网站的搜索排名的技巧和实…...

2024软考系统架构设计师论文写作要点
一、写作注意事项 系统架构设计师的论文题目对于考生来说,是相对较难的题目。一方面,考生需要掌握论文题目中的系统架构设计的专业知识;另一方面,论文的撰写需要结合考生自身的项目经历。因此,如何将自己的项目经历和专业知识有机…...

【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承
【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承 依赖范围 依赖传递 依赖排除 依赖原则 依赖继承 依赖范围 在Maven中,依赖范围(Dependency Scope)用于控制依赖项在编译、测试和运行时的可见性和可用性。通过指定适当的依赖…...

数组slice、splice字符串substr、split
一、定义 这篇文章主要对数组操作的两种方法进行介绍和使用,包括:slice、splice。对字符串操作的两种方法进行介绍和使用,包括:substr、split (一)、数组 slice:可以操作的数据类型有:数组字符串 splice:数组 操作数组…...
程序漏洞:安全威胁的隐患
在当今数字化时代,计算机程序是现代社会的核心基石。然而,随着技术的进步,程序漏洞也成为了一个不可忽视的问题。程序漏洞可能导致数据泄露、系统崩溃、恶意攻击和经济损失等一系列问题。本文将深入探讨程序漏洞的定义、分类、影响和预防措施…...

0基础学C#笔记09:希尔排序法
文章目录 前言一、希尔排序的思想二、使用步骤总结 前言 希尔排序可以说是插入排序的一种变种。无论是插入排序还是冒泡排序,如果数组的最大值刚好是在第一位,要将它挪到正确的位置就需要 n - 1 次移动。也就是说,原数组的一个元素如果距离它…...

DOCKER的容器
1. 什么是Container(容器) 要有Container首先要有Image,也就是说Container是通过image创建的。 Container是在原先的Image之上新加的一层,称作Container layer,这一层是可读可写的(Image是只读的࿰…...

跳跃游戏——力扣55
文章目录 题目描述解法一 贪心题目描述 解法一 贪心 bool canJump(vector<int>& nums){int n=nums....

将本地项目上传至gitee的详细步骤
将本地项目上传至gitee的详细步骤 1.在gitee上创建以自己项目名称命名的空项目2.进入想上传的项目的文件夹,然后右键点击3. 初始化本地环境,把该项目变成可被git管理的仓库4.添加该项目下的所有文件5.使用如下命令将文件添加到仓库中去6.将本地代码库与远…...
iOS开发-导航栏UINavigationBar隐藏底部线及透明度
iOS 导航栏UINavigationBar隐藏底部线及透明度 苹果官方给出的解释: 如果你不调用方法设置一张背景图片的话,那就给你默认一张,然后同时还有一张阴影图片被默认设置上去,这就是导航栏上1px黑线的由来。 解决办法: 方…...
题目:2520.统计能整除数字的位数
题目来源: leetcode题目,网址:2520. 统计能整除数字的位数 - 力扣(LeetCode) 解题思路: 逐位判断即可。 解题代码: class Solution {public int countDigits(int num) {int res0;int ori…...

matplotlib 笔记 注释annotate
在图中的特定位置添加文本注释、箭头和连接线,以便更清晰地解释图形中的数据或信息 主要参数 text文本内容xy箭头指向的目标点的坐标xytext注释文本的坐标arrowprops 一个字典,指定注释箭头的属性,如颜色、箭头样式等 没有arrowprops的时候…...
Windows 无法安装到这个硬盘。选中的磁盘具有MBR分区。在EFI系统上,Windows只能安装到GPT磁盘
Windows无法安装到这个磁盘,选中的磁盘具有MBR分区表的解决方法 - 知乎 (zhihu.com) Windows无法安装到这个磁盘 选中的磁盘具有MBR分区表 - 知乎 (zhihu.com) 选中的磁盘具有MBR分区表,在EFI系统上,windows只能安装到GPT磁盘_选中的磁盘具有mbr分区表…...

学C的第三十三天【C语言文件操作】
相关代码gitee自取: C语言学习日记: 加油努力 (gitee.com) 接上期: 学C的第三十二天【动态内存管理】_高高的胖子的博客-CSDN博客 1 . 为什么要使用文件 以前面写的通讯录为例,当通讯录运行起来的时候,可以给通讯录中增加、删…...
线性表的基本操作及在顺序存储及链式存储的实现
目录 线性表的基本操作:线性表的在顺序存储上的实现 线性表的基本操作: 一个数据结构的基本操作是指其最核心、最基本的操作。其他较复杂的操作可通过其基本操作来实现。线性表的主要操作如下 - InitList(&L):初始化表。构造一个空的线性表- Length…...
合宙Air724UG LuatOS-Air script lib API--nvm
nvm Table of Contents nvm nvm.init(defaultCfgFile, burnSave) nvm.set(k, v, r, s) nvm.sett(k, kk, v, r, s) nvm.flush() nvm.get(k) nvm.gett(k, kk) nvm.restore() nvm.remove() nvm 模块功能:参数管理 nvm.init(defaultCfgFile, burnSave) 初始化参数存储管…...
springboot单元测试的详细介绍
当开发一个复杂的应用程序时,确保代码的正确性和稳定性至关重要。在这方面,单元测试是一个不可或缺的工具,它可以帮助开发人员验证代码的各个部分是否按预期工作。Spring Boot提供了丰富的测试支持,使编写和执行单元测试变得更加容…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG
TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码:HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...

rm视觉学习1-自瞄部分
首先先感谢中南大学的开源,提供了很全面的思路,减少了很多基础性的开发研究 我看的阅读的是中南大学FYT战队开源视觉代码 链接:https://github.com/CSU-FYT-Vision/FYT2024_vision.git 1.框架: 代码框架结构:readme有…...

Win系统权限提升篇UAC绕过DLL劫持未引号路径可控服务全检项目
应用场景: 1、常规某个机器被钓鱼后门攻击后,我们需要做更高权限操作或权限维持等。 2、内网域中某个机器被钓鱼后门攻击后,我们需要对后续内网域做安全测试。 #Win10&11-BypassUAC自动提权-MSF&UACME 为了远程执行目标的exe或者b…...

RushDB开源程序 是现代应用程序和 AI 的即时数据库。建立在 Neo4j 之上
一、软件介绍 文末提供程序和源码下载 RushDB 改变了您处理图形数据的方式 — 不需要 Schema,不需要复杂的查询,只需推送数据即可。 二、Key Features ✨ 主要特点 Instant Setup: Be productive in seconds, not days 即时设置 :在几秒钟…...