系列六、Springboot操作RocketMQ
一、同步消息
1.1、发送&接收简单消息
1.1.1、发送简单消息
/*** 测试发送简单消息*/
@Test
public void sendSimpleMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主题发送一个简单消息log.info("测试发送简单消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:39:18.296 INFO 14700 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送简单消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_SIMPLE"},"msgId":"7F000001396C18B4AAC230D9778C0000","offsetMsgId":"C0A8B58A00002A9F0000000000029FA6","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.1.2、接收简单消息
/*** @Description: 消费者消息消息,就添加一个监听*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_SIMPLE",consumerGroup = "BOOT_TOPIC_SIMPLE_GROUP",messageModel = MessageModel.CLUSTERING)
public class MySimpleMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收简单消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 11:39:18.293 INFO 6044 --- [_SIMPLE_GROUP_2] o.star.listener.MySimpleMessageListener : 接收简单消息message:我是一个简单消息
1.2、发送&接收对象消息
1.2.1、发送对象消息
/*** 测试发送对象消息*/
@Test
public void sendObjectMessage() {Order order = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");// 往BOOT_TOPIC_OBJ主题发送一个订单对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_OBJ", order);log.info("测试发送对象消息result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:42:57.879 INFO 35812 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送对象消息result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_OBJ"},"msgId":"7F0000018BE418B4AAC230DCD14D0000","offsetMsgId":"C0A8B58A00002A9F000000000002A0C2","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.2.2、接收对象消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_OBJ", consumerGroup = "BOOT_TOPIC_OBJ_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyObjectMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收对象消息message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 13:44:35.458 INFO 35816 --- [PIC_OBJ_GROUP_1] o.star.listener.MyObjectMessageListener : 接收对象消息message:{"orderSn":"f5c39c2e86f74649b9582e5e50c500ff","userId":1,"description":"小米2s,为发烧而生"}
1.3、发送&接收集合消息
1.3.1、发送集合消息
/*** 测试发送集合消息*/
@Test
public void sendCollectionMessage() {Order order1 = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");Order order2 = new Order(UUID.randomUUID().toString().replace("-", ""), 2, "小米3s,为发烧而生,你值得拥有");List<Order> orders = Arrays.asList(order1, order2);// 往[BOOT_TOPIC_COLLECTION]主题发送集合对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_COLLECTION", orders);log.info("测试发送集合消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 13:50:25.053 INFO 28696 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送集合消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_COLLECTION"},"msgId":"7F000001701818B4AAC2315181130000","offsetMsgId":"C0A8B58A00002A9F000000000002A21F","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
1.3.2、接收集合消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_COLLECTION", consumerGroup = "BOOT_TOPIC_COLLECTION_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyCollectionMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?* 没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收集合消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 13:50:39.726 INFO 30076 --- [LECTION_GROUP_1] o.s.l.MyCollectionMessageListener : 接收集合消息message:[{"orderSn":"141bb7c6535b472d83a5099a43422d04","userId":1,"description":"小米2s,为发烧而生"},{"orderSn":"d2b41a75e087455e8910c1cba84f830f","userId":2,"description":"小米3s,为发烧而生,你值得拥有"}]
二、异步消息
发送&接收异步消息
发送异步消息
/*** 测试发送异步消息** @throws Exception*/
@Test
public void sendASyncSimpleMessage() throws Exception {rocketMQTemplate.asyncSend("BOOT_TOPIC_ASYNC", "我是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult result) {log.info("测试发送异步消息 result:{}", JSON.toJSONString(result));}@Overridepublic void onException(Throwable throwable) {log.info("测试发送异步消息 error:{}", throwable.getMessage());}});log.info("我先执行");// 挂起JVM不让方法结束System.in.read();
}
// 控制台打印结果
2023-08-10 14:02:21.125 INFO 30988 --- [ main] cketmqSpringbootProducerApplicationTests : 我先执行
2023-08-10 14:02:21.793 INFO 30988 --- [ublicExecutor_1] cketmqSpringbootProducerApplicationTests : 测试发送异步消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"BOOT_TOPIC_ASYNC"},"msgId":"7F000001790C18B4AAC2315C70D50000","offsetMsgId":"C0A8B58A00002A9F000000000002A3FC","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收异步消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ASYNC",consumerGroup = "BOOT_TOPIC_ASYNC_GROUP",messageModel = MessageModel.CLUSTERING)
public class MyASyncMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收异步消息 message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:02:36.334 INFO 10676 --- [C_ASYNC_GROUP_1] o.star.listener.MyASyncMessageListener : 接收异步消息 message:我是一个异步消息
三、单向消息
发送&接收单向消息
发送单向消息
/*** 适用场景:适用于不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送*/
@Test
public void sendOnewayMessage() {// 发送单向消息,没有返回值和结果rocketMQTemplate.sendOneWay("BOOT_TOPIC_ONE_WAY", "我是一个单向消息");
}
接收单向消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ONE_WAY", consumerGroup = "BOOT_TOPIC_ONE_WAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyOnewayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收单向消息 message:{}",new String(messageExt.getBody()));}}
// 控制台打印结果
2023-08-10 14:07:23.965 INFO 32740 --- [ONE_WAY_GROUP_1] o.star.listener.MyOnewayMessageListener : 接收单向消息 message:我是一个单向消息
四、延迟消息
发送&接收延迟消息
发送延迟消息
/*** 测试发送延迟消息*/
@Test
public void sendDelayMessage() {Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();/*** 设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)* 1s: 等级1* 5s: 等级2* 10s:等级3* 30s:等级4* 1m: 等级5* 发送一个延迟消息,延迟等级为4级,也就是30s后被监听消费* 注意事项:RocketMQ不支持任意时间的延时,只支持上述的延迟规则*/SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_DELAY", message, 2000, 4);log.info("测试发送延迟消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:13:03.859 INFO 4804 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送延迟消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_DELAY"},"msgId":"7F00000112C418B4AAC231663CE60000","offsetMsgId":"C0A8B58A00002A9F000000000002A634","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收延迟消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_DELAY", consumerGroup = "BOOT_TOPIC_DELAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyDelayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收延迟消息 message:{}",new String(messageExt.getBody()));}}// 控制台打印结果
2023-08-10 14:13:33.860 INFO 26112 --- [C_DELAY_GROUP_1] o.star.listener.MyDelayMessageListener : 接收延迟消息 message:我是一个延迟消息
五、顺序消息
发送&接收顺序消息
发送顺序消息
/*** 发送顺序消息,控制流程:下订单==》发短信==》物流* 测试发送顺序消息*/
@Test
public void sendOrderlyMessage() {// 顺序消息,发送者将一组消息都发送至同一个队列,消费者需要单线程进行消费List<Order> orders = Arrays.asList(new Order("aaa", 1, "下订单"),new Order("aaa", 1, "发短信"),new Order("aaa", 1, "物流"),new Order("bbb", 2, "下订单"),new Order("bbb", 2, "发短信"),new Order("bbb", 2, "物流"));orders.forEach(order -> {// 发送,一般都是以json的方式进行处理SendResult result = rocketMQTemplate.syncSendOrderly("BOOT_TOPIC_ORDERLY", JSON.toJSONString(order), order.getOrderSn());log.info("订单id:{},队列id:{},结果:{}",order.getUserId(),result.getMessageQueue().getQueueId(),result.getSendStatus());});
}
// 控制台打印结果
2023-08-10 14:23:02.023 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.026 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.028 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.029 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.030 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.031 INFO 33668 --- [ main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
接收顺序消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ORDERLY",consumerGroup = "BOOT_TOPIC_ORDERLY_GROUP",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式,单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class MyOrderlyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {Order order = JSON.parseObject(new String(messageExt.getBody()),Order.class);log.info("接收顺序消息order:{}",order);}
}
// 控制台打印结果
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=下订单)
2023-08-10 14:23:46.714 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=下订单)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=物流)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=发短信)
2023-08-10 14:23:46.901 INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=物流)
六、带Tag的消息
发送&接收带Tag的消息
发送带Tag消息
/*** 测试发送带Tag的消息*/
@Test
public void sendTagMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_TAG:TagA", "我是一个带Tag的消息");log.info("测试发送带Tag的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:34:07.806 INFO 30388 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Tag的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_TAG"},"msgId":"7F00000176B418B4AAC2317986310000","offsetMsgId":"C0A8B58A00002A9F000000000002B029","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Tag消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_TAG",consumerGroup = "BOOT_TOPIC_TAG_GROUP",messageModel = MessageModel.CLUSTERING,selectorType = SelectorType.TAG, // tag过滤模式selectorExpression = "TagA || TagB"
)
public class MyTagMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Tag标签的消息 result:{}", new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:34:27.260 INFO 10868 --- [PIC_TAG_GROUP_1] org.star.listener.MyTagMessageListener : 接收带Tag标签的消息 result:我是一个带Tag的消息
七、带Key的消息
发送&接收带Key的消息
发送带Key的消息
/*** 测试发送带Key的消息*/
@Test
public void sendKeyMessage() {// key写在消息头里边Message<String> message = MessageBuilder.withPayload("我是一个带Key的消息").setHeader(RocketMQHeaders.KEYS, "STAR").build();SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_KEY", message);log.info("测试发送带Key的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 26148 --- [ main] cketmqSpringbootProducerApplicationTests : 测试发送带Key的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_KEY"},"msgId":"7F000001662418B4AAC2318421A80000","offsetMsgId":"C0A8B58A00002A9F000000000002B275","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
接收带Key的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_KEY",consumerGroup = "BOOT_TOPIC_KEY_GROUP",messageModel = MessageModel.CLUSTERING
)
public class MyKeyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Key的消息:{},Key:{}",new String(messageExt.getBody()), messageExt.getKeys());}
}
// 控制台打印结果
2023-08-10 14:45:42.963 INFO 23040 --- [PIC_KEY_GROUP_1] org.star.listener.MyKeyMessageListener : 接收带Key的消息:我是一个带Key的消息,Key:STAR
八、集群模式的消息
发送&接收集群模式的消息
发送集群消息
/*** 测试消息消费模式-集群模式* 此种方式消费者会采取轮询的方式进行消费*/
@Test
public void modeForClusterSendMessage() {for (int i = 1; i <= 10; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_CLUSTER", "我是第" + i + "个消息");log.info("集群模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}// 控制台打印结果
2023-08-10 14:58:14.203 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.207 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.211 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.213 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.216 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.218 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.220 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.222 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.224 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.227 INFO 13416 --- [ main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
接收集群消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 14:58:14.209 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第1个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第2个消息
2023-08-10 14:58:14.212 INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第3个消息
2023-08-10 14:58:14.216 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第4个消息
2023-08-10 14:58:14.217 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第5个消息
2023-08-10 14:58:14.220 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第6个消息
2023-08-10 14:58:14.222 INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener1 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第7个消息
2023-08-10 14:58:14.223 INFO 7300 --- [CLUSTER_GROUP_4] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第8个消息
2023-08-10 14:58:14.226 INFO 7300 --- [CLUSTER_GROUP_5] o.s.listener.MyClusterMessageListener3 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第9个消息
2023-08-10 14:58:14.229 INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener2 : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第10个消息
九、广播模式的消息
发送&接收广播模式的消息
发送广播模式的消息
/*** 测试消息消费模式-广播模式* 此种方式每一个消费者都会消费一次消息*/
@Test
public void modeForBroadcastingSendMessage() {for (int i = 1; i <= 5; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_BROADCASTING", "我是第" + i + "个消息");log.info("广播模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}
// 控制台打印结果
2023-08-10 15:12:10.081 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
2023-08-10 15:12:10.084 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:1,结果:SEND_OK
2023-08-10 15:12:10.086 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:2,结果:SEND_OK
2023-08-10 15:12:10.088 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:3,结果:SEND_OK
2023-08-10 15:12:10.090 INFO 37728 --- [ main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
接收广播模式的消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}// 控制台打印结果
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089 INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener3 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener2 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092 INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener1 : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第5个消息,队列id:0
相关文章:
系列六、Springboot操作RocketMQ
一、同步消息 1.1、发送&接收简单消息 1.1.1、发送简单消息 /*** 测试发送简单消息*/ Test public void sendSimpleMessage() {SendResult result rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主…...
【jupyter异常错误】Kernel started:No module named ipykernel_launcher
尝试过的方案 pip install ipykernel 执行之后提示已经安装,但是执行代码依然报错 解决方案 python -m pip install ipykernel -U --force-reinstall 相当于是强制重新安装 安装成功后没有报错 注:根本原因应该是原来安装的包存在问题,虽然检测出来已经存在…...
使用langchain与你自己的数据对话(五):聊天机器人
之前我已经完成了使用langchain与你自己的数据对话的前四篇博客,还没有阅读这四篇博客的朋友可以先阅读一下: 使用langchain与你自己的数据对话(一):文档加载与切割使用langchain与你自己的数据对话(二):向量存储与嵌入使用langc…...
爬虫与搜索引擎优化:通过Python爬虫提升网站搜索排名
作为一名专业的爬虫程序员,我深知网站的搜索排名对于业务的重要性。在如今竞争激烈的网络世界中,如何让自己的网站在搜索引擎结果中脱颖而出,成为关键。今天,和大家分享一些关于如何通过Python爬虫来提升网站的搜索排名的技巧和实…...
2024软考系统架构设计师论文写作要点
一、写作注意事项 系统架构设计师的论文题目对于考生来说,是相对较难的题目。一方面,考生需要掌握论文题目中的系统架构设计的专业知识;另一方面,论文的撰写需要结合考生自身的项目经历。因此,如何将自己的项目经历和专业知识有机…...
【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承
【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承 依赖范围 依赖传递 依赖排除 依赖原则 依赖继承 依赖范围 在Maven中,依赖范围(Dependency Scope)用于控制依赖项在编译、测试和运行时的可见性和可用性。通过指定适当的依赖…...
数组slice、splice字符串substr、split
一、定义 这篇文章主要对数组操作的两种方法进行介绍和使用,包括:slice、splice。对字符串操作的两种方法进行介绍和使用,包括:substr、split (一)、数组 slice:可以操作的数据类型有:数组字符串 splice:数组 操作数组…...
程序漏洞:安全威胁的隐患
在当今数字化时代,计算机程序是现代社会的核心基石。然而,随着技术的进步,程序漏洞也成为了一个不可忽视的问题。程序漏洞可能导致数据泄露、系统崩溃、恶意攻击和经济损失等一系列问题。本文将深入探讨程序漏洞的定义、分类、影响和预防措施…...
0基础学C#笔记09:希尔排序法
文章目录 前言一、希尔排序的思想二、使用步骤总结 前言 希尔排序可以说是插入排序的一种变种。无论是插入排序还是冒泡排序,如果数组的最大值刚好是在第一位,要将它挪到正确的位置就需要 n - 1 次移动。也就是说,原数组的一个元素如果距离它…...
DOCKER的容器
1. 什么是Container(容器) 要有Container首先要有Image,也就是说Container是通过image创建的。 Container是在原先的Image之上新加的一层,称作Container layer,这一层是可读可写的(Image是只读的࿰…...
跳跃游戏——力扣55
文章目录 题目描述解法一 贪心题目描述 解法一 贪心 bool canJump(vector<int>& nums){int n=nums....
将本地项目上传至gitee的详细步骤
将本地项目上传至gitee的详细步骤 1.在gitee上创建以自己项目名称命名的空项目2.进入想上传的项目的文件夹,然后右键点击3. 初始化本地环境,把该项目变成可被git管理的仓库4.添加该项目下的所有文件5.使用如下命令将文件添加到仓库中去6.将本地代码库与远…...
iOS开发-导航栏UINavigationBar隐藏底部线及透明度
iOS 导航栏UINavigationBar隐藏底部线及透明度 苹果官方给出的解释: 如果你不调用方法设置一张背景图片的话,那就给你默认一张,然后同时还有一张阴影图片被默认设置上去,这就是导航栏上1px黑线的由来。 解决办法: 方…...
题目:2520.统计能整除数字的位数
题目来源: leetcode题目,网址:2520. 统计能整除数字的位数 - 力扣(LeetCode) 解题思路: 逐位判断即可。 解题代码: class Solution {public int countDigits(int num) {int res0;int ori…...
matplotlib 笔记 注释annotate
在图中的特定位置添加文本注释、箭头和连接线,以便更清晰地解释图形中的数据或信息 主要参数 text文本内容xy箭头指向的目标点的坐标xytext注释文本的坐标arrowprops 一个字典,指定注释箭头的属性,如颜色、箭头样式等 没有arrowprops的时候…...
Windows 无法安装到这个硬盘。选中的磁盘具有MBR分区。在EFI系统上,Windows只能安装到GPT磁盘
Windows无法安装到这个磁盘,选中的磁盘具有MBR分区表的解决方法 - 知乎 (zhihu.com) Windows无法安装到这个磁盘 选中的磁盘具有MBR分区表 - 知乎 (zhihu.com) 选中的磁盘具有MBR分区表,在EFI系统上,windows只能安装到GPT磁盘_选中的磁盘具有mbr分区表…...
学C的第三十三天【C语言文件操作】
相关代码gitee自取: C语言学习日记: 加油努力 (gitee.com) 接上期: 学C的第三十二天【动态内存管理】_高高的胖子的博客-CSDN博客 1 . 为什么要使用文件 以前面写的通讯录为例,当通讯录运行起来的时候,可以给通讯录中增加、删…...
线性表的基本操作及在顺序存储及链式存储的实现
目录 线性表的基本操作:线性表的在顺序存储上的实现 线性表的基本操作: 一个数据结构的基本操作是指其最核心、最基本的操作。其他较复杂的操作可通过其基本操作来实现。线性表的主要操作如下 - InitList(&L):初始化表。构造一个空的线性表- Length…...
合宙Air724UG LuatOS-Air script lib API--nvm
nvm Table of Contents nvm nvm.init(defaultCfgFile, burnSave) nvm.set(k, v, r, s) nvm.sett(k, kk, v, r, s) nvm.flush() nvm.get(k) nvm.gett(k, kk) nvm.restore() nvm.remove() nvm 模块功能:参数管理 nvm.init(defaultCfgFile, burnSave) 初始化参数存储管…...
springboot单元测试的详细介绍
当开发一个复杂的应用程序时,确保代码的正确性和稳定性至关重要。在这方面,单元测试是一个不可或缺的工具,它可以帮助开发人员验证代码的各个部分是否按预期工作。Spring Boot提供了丰富的测试支持,使编写和执行单元测试变得更加容…...
Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
push [特殊字符] present
push 🆚 present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中,push 和 present 是两种不同的视图控制器切换方式,它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...
【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
Netty自定义协议解析
目录 自定义协议设计 实现消息解码器 实现消息编码器 自定义消息对象 配置ChannelPipeline Netty提供了强大的编解码器抽象基类,这些基类能够帮助开发者快速实现自定义协议的解析。 自定义协议设计 在实现自定义协议解析之前,需要明确协议的具体格式。例如,一个简单的…...
