系列六、Springboot操作RocketMQ
一、同步消息
1.1、发送&接收简单消息
1.1.1、发送简单消息
/*** 测试发送简单消息*/
@Test
public void sendSimpleMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主题发送一个简单消息log.info("测试发送简单消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:39:18.296 INFO 14700 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送简单消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_SIMPLE"},"msgId":"7F000001396C18B4AAC230D9778C0000","offsetMsgId":"C0A8B58A00002A9F0000000000029FA6","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.1.2、接收简单消息
/*** @Description: 消费者消息消息,就添加一个监听*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_SIMPLE",consumerGroup = "BOOT_TOPIC_SIMPLE_GROUP",messageModel = MessageModel.CLUSTERING)
public class MySimpleMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收简单消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 11:39:18.293 INFO 6044 --- [_SIMPLE_GROUP_2] o.star.listener.MySimpleMessageListener : 接收简单消息message:我是一个简单消息
1.2、发送&接收对象消息
1.2.1、发送对象消息
/*** 测试发送对象消息*/
@Test
public void sendObjectMessage() {Order order = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");// 往BOOT_TOPIC_OBJ主题发送一个订单对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_OBJ", order);log.info("测试发送对象消息result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:42:57.879 INFO 35812 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送对象消息result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_OBJ"},"msgId":"7F0000018BE418B4AAC230DCD14D0000","offsetMsgId":"C0A8B58A00002A9F000000000002A0C2","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.2.2、接收对象消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_OBJ", consumerGroup = "BOOT_TOPIC_OBJ_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyObjectMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收对象消息message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 13:44:35.458 INFO 35816 --- [PIC_OBJ_GROUP_1] o.star.listener.MyObjectMessageListener : 接收对象消息message:{"orderSn":"f5c39c2e86f74649b9582e5e50c500ff","userId":1,"description":"小米2s,为发烧而生"}
1.3、发送&接收集合消息
1.3.1、发送集合消息
/*** 测试发送集合消息*/
@Test
public void sendCollectionMessage() {Order order1 = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");Order order2 = new Order(UUID.randomUUID().toString().replace("-", ""), 2, "小米3s,为发烧而生,你值得拥有");List<Order> orders = Arrays.asList(order1, order2);// 往[BOOT_TOPIC_COLLECTION]主题发送集合对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_COLLECTION", orders);log.info("测试发送集合消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 13:50:25.053 INFO 28696 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送集合消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_COLLECTION"},"msgId":"7F000001701818B4AAC2315181130000","offsetMsgId":"C0A8B58A00002A9F000000000002A21F","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.3.2、接收集合消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_COLLECTION", consumerGroup = "BOOT_TOPIC_COLLECTION_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyCollectionMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收集合消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 13:50:39.726 INFO 30076 --- [LECTION_GROUP_1] o.s.l.MyCollectionMessageListener : 接收集合消息message:[{"orderSn":"141bb7c6535b472d83a5099a43422d04","userId":1,"description":"小米2s,为发烧而生"},{"orderSn":"d2b41a75e087455e8910c1cba84f830f","userId":2,"description":"小米3s,为发烧而生,你值得拥有"}]
二、异步消息
发送&接收异步消息
发送异步消息
/*** 测试发送异步消息** @throws Exception*/
@Test
public void sendASyncSimpleMessage() throws Exception {rocketMQTemplate.asyncSend("BOOT_TOPIC_ASYNC", "我是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult result) {log.info("测试发送异步消息 result:{}", JSON.toJSONString(result));}@Overridepublic void onException(Throwable throwable) {log.info("测试发送异步消息 error:{}", throwable.getMessage());}});log.info("我先执行");// 挂起JVM不让方法结束System.in.read();
}
// 控制台打印结果
2023-08-10 14:02:21.125 INFO 30988 --- [ main] cketmqSpringbootProducerApplicationTests : 我先执行
2023-08-10 14:02:21.793 INFO 30988 --- [ublicExecutor_1] cketmqSpringbootProducerApplicationTests : 测试发送异步消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"BOOT_TOPIC_ASYNC"},"msgId":"7F000001790C18B4AAC2315C70D50000","offsetMsgId":"C0A8B58A00002A9F000000000002A3FC","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收异步消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ASYNC",consumerGroup = "BOOT_TOPIC_ASYNC_GROUP",messageModel = MessageModel.CLUSTERING)
public class MyASyncMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收异步消息 message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:02:36.334 INFO 10676 --- [C_ASYNC_GROUP_1] o.star.listener.MyASyncMessageListener : 接收异步消息 message:我是一个异步消息
三、单向消息
发送&接收单向消息
发送单向消息
/*** 适用场景:适用于不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送*/
@Test
public void sendOnewayMessage() {// 发送单向消息,没有返回值和结果rocketMQTemplate.sendOneWay("BOOT_TOPIC_ONE_WAY", "我是一个单向消息");
}
接收单向消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ONE_WAY", consumerGroup = "BOOT_TOPIC_ONE_WAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyOnewayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收单向消息 message:{}",new String(messageExt.getBody()));}}
// 控制台打印结果
2023-08-10 14:07:23.965 INFO 32740 --- [ONE_WAY_GROUP_1] o.star.listener.MyOnewayMessageListener : 接收单向消息 message:我是一个单向消息
四、延迟消息
发送&接收延迟消息
发送延迟消息
/*** 测试发送延迟消息*/
@Test
public void sendDelayMessage() {Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();/*** 设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)* 1s: 等级1* 5s: 等级2* 10s:等级3* 30s:等级4* 1m: 等级5* 发送一个延迟消息,延迟等级为4级,也就是30s后被监听消费* 注意事项:RocketMQ不支持任意时间的延时,只支持上述的延迟规则*/SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_DELAY", message, 2000, 4);log.info("测试发送延迟消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:13:03.859 INFO 4804 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送延迟消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_DELAY"},"msgId":"7F00000112C418B4AAC231663CE60000","offsetMsgId":"C0A8B58A00002A9F000000000002A634","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收延迟消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_DELAY", consumerGroup = "BOOT_TOPIC_DELAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyDelayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收延迟消息 message:{}",new String(messageExt.getBody()));}}// 控制台打印结果
2023-08-10 14:13:33.860 INFO 26112 --- [C_DELAY_GROUP_1] o.star.listener.MyDelayMessageListener : 接收延迟消息 message:我是一个延迟消息
五、顺序消息
发送&接收顺序消息
发送顺序消息
/*** 发送顺序消息,控制流程:下订单==》发短信==》物流* 测试发送顺序消息*/
@Test
public void sendOrderlyMessage() {// 顺序消息,发送者将一组消息都发送至同一个队列,消费者需要单线程进行消费List<Order> orders = Arrays.asList(new Order("aaa", 1, "下订单"),new Order("aaa", 1, "发短信"),new Order("aaa", 1, "物流"),new Order("bbb", 2, "下订单"),new Order("bbb", 2, "发短信"),new Order("bbb", 2, "物流"));orders.forEach(order -> {// 发送,一般都是以json的方式进行处理SendResult result = rocketMQTemplate.syncSendOrderly("BOOT_TOPIC_ORDERLY", JSON.toJSONString(order), order.getOrderSn());log.info("订单id:{},队列id:{},结果:{}",order.getUserId(),result.getMessageQueue().getQueueId(),result.getSendStatus());});
}
// 控制台打印结果
2023-08-10 14:23:02.023 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.026 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.028 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.029 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.030 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.031 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
接收顺序消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ORDERLY",consumerGroup = "BOOT_TOPIC_ORDERLY_GROUP",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式,单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class MyOrderlyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {Order order = JSON.parseObject(new String(messageExt.getBody()),Order.class);log.info("接收顺序消息order:{}",order);}
}
// 控制台打印结果
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=下订单)
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=下订单)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=物流)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=物流)
六、带Tag的消息
发送&接收带Tag的消息
发送带Tag消息
/*** 测试发送带Tag的消息*/
@Test
public void sendTagMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_TAG:TagA", "我是一个带Tag的消息");log.info("测试发送带Tag的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:34:07.806 INFO 30388 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Tag的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_TAG"},"msgId":"7F00000176B418B4AAC2317986310000","offsetMsgId":"C0A8B58A00002A9F000000000002B029","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Tag消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_TAG",consumerGroup = "BOOT_TOPIC_TAG_GROUP",messageModel = MessageModel.CLUSTERING,selectorType = SelectorType.TAG, // tag过滤模式selectorExpression = "TagA || TagB"
)
public class MyTagMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Tag标签的消息 result:{}", new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:34:27.260 INFO 10868 --- [PIC_TAG_GROUP_1] org.star.listener.MyTagMessageListener : 接收带Tag标签的消息 result:我是一个带Tag的消息
七、带Key的消息
发送&接收带Key的消息
发送带Key的消息
/*** 测试发送带Key的消息*/
@Test
public void sendKeyMessage() {// key写在消息头里边Message<String> message = MessageBuilder.withPayload("我是一个带Key的消息").setHeader(RocketMQHeaders.KEYS, "STAR").build();SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_KEY", message);log.info("测试发送带Key的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 26148 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Key的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_KEY"},"msgId":"7F000001662418B4AAC2318421A80000","offsetMsgId":"C0A8B58A00002A9F000000000002B275","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Key的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_KEY",consumerGroup = "BOOT_TOPIC_KEY_GROUP",messageModel = MessageModel.CLUSTERING
)
public class MyKeyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Key的消息:{},Key:{}",new String(messageExt.getBody()), messageExt.getKeys());}
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 23040 --- [PIC_KEY_GROUP_1] org.star.listener.MyKeyMessageListener : 接收带Key的消息:我是一个带Key的消息,Key:STAR
八、集群模式的消息
发送&接收集群模式的消息
发送集群消息
/*** 测试消息消费模式-集群模式* 此种方式消费者会采取轮询的方式进行消费*/
@Test
public void modeForClusterSendMessage() {for (int i = 1; i <= 10; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_CLUSTER", "我是第" + i + "个消息");log.info("集群模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}// 控制台打印结果
2023-08-10 14:58:14.203 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.207 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.211 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.213 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.216 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.218 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.220 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.222 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.224 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.227 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
接收集群消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 14:58:14.209 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第1个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第2个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第3个消息
2023-08-10 14:58:14.216 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第4个消息
2023-08-10 14:58:14.217 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第5个消息
2023-08-10 14:58:14.220 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第6个消息
2023-08-10 14:58:14.222 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第7个消息
2023-08-10 14:58:14.223 INFO 7300 --- [CLUSTER_GROUP_4] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第8个消息
2023-08-10 14:58:14.226 INFO 7300 --- [CLUSTER_GROUP_5] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第9个消息
2023-08-10 14:58:14.229 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第10个消息
九、广播模式的消息
发送&接收广播模式的消息
发送广播模式的消息
/*** 测试消息消费模式-广播模式* 此种方式每一个消费者都会消费一次消息*/
@Test
public void modeForBroadcastingSendMessage() {for (int i = 1; i <= 5; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_BROADCASTING", "我是第" + i + "个消息");log.info("广播模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}
// 控制台打印结果
2023-08-10 15:12:10.081 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
2023-08-10 15:12:10.084 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:1,结果:SEND_OK
2023-08-10 15:12:10.086 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:2,结果:SEND_OK
2023-08-10 15:12:10.088 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:3,结果:SEND_OK
2023-08-10 15:12:10.090 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
接收广播模式的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}// 控制台打印结果
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第5个消息,队列id:0
相关文章:
系列六、Springboot操作RocketMQ
一、同步消息 1.1、发送&接收简单消息 1.1.1、发送简单消息 /*** 测试发送简单消息*/ Test public void sendSimpleMessage() {SendResult result rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主…...
【jupyter异常错误】Kernel started:No module named ipykernel_launcher
尝试过的方案 pip install ipykernel 执行之后提示已经安装,但是执行代码依然报错 解决方案 python -m pip install ipykernel -U --force-reinstall 相当于是强制重新安装 安装成功后没有报错 注:根本原因应该是原来安装的包存在问题,虽然检测出来已经存在…...
使用langchain与你自己的数据对话(五):聊天机器人
之前我已经完成了使用langchain与你自己的数据对话的前四篇博客,还没有阅读这四篇博客的朋友可以先阅读一下: 使用langchain与你自己的数据对话(一):文档加载与切割使用langchain与你自己的数据对话(二):向量存储与嵌入使用langc…...
爬虫与搜索引擎优化:通过Python爬虫提升网站搜索排名
作为一名专业的爬虫程序员,我深知网站的搜索排名对于业务的重要性。在如今竞争激烈的网络世界中,如何让自己的网站在搜索引擎结果中脱颖而出,成为关键。今天,和大家分享一些关于如何通过Python爬虫来提升网站的搜索排名的技巧和实…...
2024软考系统架构设计师论文写作要点
一、写作注意事项 系统架构设计师的论文题目对于考生来说,是相对较难的题目。一方面,考生需要掌握论文题目中的系统架构设计的专业知识;另一方面,论文的撰写需要结合考生自身的项目经历。因此,如何将自己的项目经历和专业知识有机…...
【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承
【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承 依赖范围 依赖传递 依赖排除 依赖原则 依赖继承 依赖范围 在Maven中,依赖范围(Dependency Scope)用于控制依赖项在编译、测试和运行时的可见性和可用性。通过指定适当的依赖…...
数组slice、splice字符串substr、split
一、定义 这篇文章主要对数组操作的两种方法进行介绍和使用,包括:slice、splice。对字符串操作的两种方法进行介绍和使用,包括:substr、split (一)、数组 slice:可以操作的数据类型有:数组字符串 splice:数组 操作数组…...
程序漏洞:安全威胁的隐患
在当今数字化时代,计算机程序是现代社会的核心基石。然而,随着技术的进步,程序漏洞也成为了一个不可忽视的问题。程序漏洞可能导致数据泄露、系统崩溃、恶意攻击和经济损失等一系列问题。本文将深入探讨程序漏洞的定义、分类、影响和预防措施…...
0基础学C#笔记09:希尔排序法
文章目录 前言一、希尔排序的思想二、使用步骤总结 前言 希尔排序可以说是插入排序的一种变种。无论是插入排序还是冒泡排序,如果数组的最大值刚好是在第一位,要将它挪到正确的位置就需要 n - 1 次移动。也就是说,原数组的一个元素如果距离它…...
DOCKER的容器
1. 什么是Container(容器) 要有Container首先要有Image,也就是说Container是通过image创建的。 Container是在原先的Image之上新加的一层,称作Container layer,这一层是可读可写的(Image是只读的࿰…...
跳跃游戏——力扣55
文章目录 题目描述解法一 贪心题目描述 解法一 贪心 bool canJump(vector<int>& nums){int n=nums....
将本地项目上传至gitee的详细步骤
将本地项目上传至gitee的详细步骤 1.在gitee上创建以自己项目名称命名的空项目2.进入想上传的项目的文件夹,然后右键点击3. 初始化本地环境,把该项目变成可被git管理的仓库4.添加该项目下的所有文件5.使用如下命令将文件添加到仓库中去6.将本地代码库与远…...
iOS开发-导航栏UINavigationBar隐藏底部线及透明度
iOS 导航栏UINavigationBar隐藏底部线及透明度 苹果官方给出的解释: 如果你不调用方法设置一张背景图片的话,那就给你默认一张,然后同时还有一张阴影图片被默认设置上去,这就是导航栏上1px黑线的由来。 解决办法: 方…...
题目:2520.统计能整除数字的位数
题目来源: leetcode题目,网址:2520. 统计能整除数字的位数 - 力扣(LeetCode) 解题思路: 逐位判断即可。 解题代码: class Solution {public int countDigits(int num) {int res0;int ori…...
matplotlib 笔记 注释annotate
在图中的特定位置添加文本注释、箭头和连接线,以便更清晰地解释图形中的数据或信息 主要参数 text文本内容xy箭头指向的目标点的坐标xytext注释文本的坐标arrowprops 一个字典,指定注释箭头的属性,如颜色、箭头样式等 没有arrowprops的时候…...
Windows 无法安装到这个硬盘。选中的磁盘具有MBR分区。在EFI系统上,Windows只能安装到GPT磁盘
Windows无法安装到这个磁盘,选中的磁盘具有MBR分区表的解决方法 - 知乎 (zhihu.com) Windows无法安装到这个磁盘 选中的磁盘具有MBR分区表 - 知乎 (zhihu.com) 选中的磁盘具有MBR分区表,在EFI系统上,windows只能安装到GPT磁盘_选中的磁盘具有mbr分区表…...
学C的第三十三天【C语言文件操作】
相关代码gitee自取: C语言学习日记: 加油努力 (gitee.com) 接上期: 学C的第三十二天【动态内存管理】_高高的胖子的博客-CSDN博客 1 . 为什么要使用文件 以前面写的通讯录为例,当通讯录运行起来的时候,可以给通讯录中增加、删…...
线性表的基本操作及在顺序存储及链式存储的实现
目录 线性表的基本操作:线性表的在顺序存储上的实现 线性表的基本操作: 一个数据结构的基本操作是指其最核心、最基本的操作。其他较复杂的操作可通过其基本操作来实现。线性表的主要操作如下 - InitList(&L):初始化表。构造一个空的线性表- Length…...
合宙Air724UG LuatOS-Air script lib API--nvm
nvm Table of Contents nvm nvm.init(defaultCfgFile, burnSave) nvm.set(k, v, r, s) nvm.sett(k, kk, v, r, s) nvm.flush() nvm.get(k) nvm.gett(k, kk) nvm.restore() nvm.remove() nvm 模块功能:参数管理 nvm.init(defaultCfgFile, burnSave) 初始化参数存储管…...
springboot单元测试的详细介绍
当开发一个复杂的应用程序时,确保代码的正确性和稳定性至关重要。在这方面,单元测试是一个不可或缺的工具,它可以帮助开发人员验证代码的各个部分是否按预期工作。Spring Boot提供了丰富的测试支持,使编写和执行单元测试变得更加容…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
WebRTC调研
WebRTC是什么,为什么,如何使用 WebRTC有什么优势 WebRTC Architecture Amazon KVS WebRTC 其它厂商WebRTC 海康门禁WebRTC 海康门禁其他界面整理 威视通WebRTC 局域网 Google浏览器 Microsoft Edge 公网 RTSP RTMP NVR ONVIF SIP SRT WebRTC协…...
Vue 3 + WebSocket 实战:公司通知实时推送功能详解
📢 Vue 3 WebSocket 实战:公司通知实时推送功能详解 📌 收藏 点赞 关注,项目中要用到推送功能时就不怕找不到了! 实时通知是企业系统中常见的功能,比如:管理员发布通知后,所有用户…...
数据结构:泰勒展开式:霍纳法则(Horner‘s Rule)
目录 🔍 若用递归计算每一项,会发生什么? Horners Rule(霍纳法则) 第一步:我们从最原始的泰勒公式出发 第二步:从形式上重新观察展开式 🌟 第三步:引出霍纳法则&…...
第14节 Node.js 全局对象
JavaScript 中有一个特殊的对象,称为全局对象(Global Object),它及其所有属性都可以在程序的任何地方访问,即全局变量。 在浏览器 JavaScript 中,通常 window 是全局对象, 而 Node.js 中的全局…...
Qt 按钮类控件(Push Button 与 Radio Button)(1)
文章目录 Push Button前提概要API接口给按钮添加图标给按钮添加快捷键 Radio ButtonAPI接口性别选择 Push Button(鼠标点击不放连续移动快捷键) Radio Button Push Button 前提概要 1. 之前文章中所提到的各种跟QWidget有关的各种属性/函数/方法&#…...
