Kafka - 消息乱序问题的常见解决方案和实现
文章目录
- 概述
- 一、MQ消息乱序问题分析
- 1.1 相同topic内的消息乱序
- 1.2 不同topic的消息乱序
- 二、解决方案
- 方案一: 顺序消息
- Kafka
- 1. Kafka 顺序消息的实现
- 1.1 生产者:确保同一业务主键的消息发送到同一个分区
- 1.2 消费者:顺序消费消息
- 2. Kafka 顺序消息实现的局限性
- 3. 小结
- RocketMQ
- 1. 使用 RocketMQ 实现顺序消费
- 1.1 生产者:发送顺序消息
- 1.2 消费者:顺序消费消息
- 2. RocketMQ 顺序消息的局限性
- 3. 小结
- 方案二: 前置检测(Pre-check)
- 前置检测的方案
- 方案1: 使用辅助表进行前置检测
- 1.1 方案设计
- 1.2 数据库表设计
- 1.3 消费者前置检测代码实现
- 方案2: 使用序列号/时间戳进行顺序检查
- 2.1 方案设计
- 2.2 消费者前置检测代码实现
- 3. 小结
- 方案三: 状态机
- 1. 状态机的设计思路
- 2. 状态机的实现步骤
- 3. 设计与实现
- 3.1 状态机设计
- 3.1.1 定义状态
- 3.1.2 定义事件
- 3.1.3 状态机逻辑
- 3.1.4 使用状态机处理消息
- 4. 运行流程
- 5. 小结
- 监控与报警
- 伪实现
- 总结

概述
在分布式系统中,消息队列(MQ)作为实现系统解耦和异步通信的重要工具,广泛应用于各种业务场景。然而,消息消费时出现的乱序问题,常常会对业务逻辑的正确执行和系统稳定性产生不良影响。
接下来我们将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方案,包括顺序消息、前置检测、状态机等方式
一、MQ消息乱序问题分析
1.1 相同topic内的消息乱序
- 并发消费:为了提高消息处理吞吐量,通常会配置多个消费者实例来并发消费同一个队列中的消息。然而,由于消费者实例的性能差异,可能导致消息的消费顺序与发送顺序不一致。
- 消息分区:MQ系统通常采用分区化设计,当同一业务逻辑的消息分发到不同的分区时,可能出现乱序。
- 网络延迟与抖动:消息在传输过程中可能会受到网络延迟和抖动的影响,导致消息到达消费者端的顺序与发送顺序不一致。
- 消息重试与故障恢复:当消费者处理消息失败或出现故障时,重试机制或故障恢复操作不当,也可能导致消息乱序。
1.2 不同topic的消息乱序
例如,系统A在01:00时向TopicA发送了消息msgA-01:00
,而系统B在01:01时向TopicB发送了消息msgB-01:01
。消费者无法预设msgA-01:00
必然先于msgB-01:01
被接收。消息系统中的分区策略、消费者的处理能力、网络等因素共同导致无法确保消息遵循严格的先进先出(FIFO)原则。
二、解决方案
为了应对消息乱序问题,有几种常见的解决方案,包括顺序消息、前置检测、状态机等。
方案一: 顺序消息
顺序消息是通过确保同一业务主键的消息发送到同一分区,从而保证消息的顺序性。
Kafka
以 Kafka 为例,虽然它不保证全局消息顺序,但可以通过合理的分区策略和消息键来确保消息的局部顺序性。
下面是使用 Kafka 作为消息队列(MQ)时,如何实现顺序消息的解决方案。通过使用 Kafka 的分区策略和消息键(key),可以确保同一业务主键的消息发送到同一个分区,从而保证消息的顺序性。
1. Kafka 顺序消息的实现
1.1 生产者:确保同一业务主键的消息发送到同一个分区
通过指定消息的 key,Kafka 会确保具有相同 key 的消息发送到同一个分区。这样,即使多个消费者并行消费,也能保证消息在同一个分区内的顺序。
生产者代码实现
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class OrderProducer {private final KafkaProducer<String, String> producer;private final String topic;public OrderProducer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());this.producer = new KafkaProducer<>(properties);}public void sendOrderMessage(String orderId, String orderMessage) {// 使用订单ID作为消息的 key,以确保同一订单的消息发送到同一个分区ProducerRecord<String, String> record = new ProducerRecord<>(topic, orderId, orderMessage);producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Message sent: " + metadata);}});}public void close() {producer.close();}public static void main(String[] args) {OrderProducer orderProducer = new OrderProducer("order-topic");// 发送顺序消息,确保同一订单的消息被发送到同一分区orderProducer.sendOrderMessage("order123", "Order Created");orderProducer.sendOrderMessage("order123", "Order Paid");orderProducer.sendOrderMessage("order123", "Order Shipped");// 发送另一个订单的消息orderProducer.sendOrderMessage("order456", "Order Created");orderProducer.sendOrderMessage("order456", "Order Paid");orderProducer.close();}
}
- 在生产者端,通过
ProducerRecord
发送消息时,设置了消息的 key 为订单 ID(orderId
)。Kafka 会使用该 key 来确定消息发送到哪个分区,从而确保同一订单的所有消息都会被发送到同一个分区,保证顺序。 producer.send()
方法的回调函数用来处理消息发送的异步结果。
1.2 消费者:顺序消费消息
消费者使用 MessageListener
或 Consumer
来消费消息。Kafka 默认会根据分区消费顺序保证同一分区内消息的顺序。我们只需要保证同一个业务的消息被路由到同一个分区,消费者就能顺序消费这些消息。
消费者代码实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class OrderConsumer {private final KafkaConsumer<String, String> consumer;private final String topic;public OrderConsumer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "order-consumer-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("auto.offset.reset", "earliest");this.consumer = new KafkaConsumer<>(properties);}public void consumeMessages() {consumer.subscribe(Collections.singletonList(topic));while (true) {consumer.poll(1000).forEach(record -> {// 处理顺序消息System.out.println("Consumed message: " + record.key() + " - " + record.value());});}}public void close() {consumer.close();}public static void main(String[] args) {OrderConsumer orderConsumer = new OrderConsumer("order-topic");// 消费消息,确保同一个订单的消息顺序消费orderConsumer.consumeMessages();}
}
-
消费者通过
KafkaConsumer
从指定的 topic 中拉取消息。在这种实现中,消息会按照 Kafka 内部的消费机制被顺序消费。 -
consumer.poll()
方法定期从 Kafka 中拉取消息,并根据key
分配到相应的分区进行消费。 -
Kafka 的分区是顺序消费的,即每个分区内的消息按照生产者发送的顺序消费。因此,通过确保同一订单的消息使用相同的
key
,就能保证同一分区内消息的消费顺序。
2. Kafka 顺序消息实现的局限性
- 局部顺序保证:Kafka 只能保证同一分区内的消息顺序,对于跨分区的消息并不保证顺序。因此,确保同一业务的消息发送到同一分区非常关键。
- 性能与吞吐量:为了提高系统的吞吐量和并发能力,Kafka 会对 topic 进行分区。分区数过多可能影响顺序性,但可以通过合理设计业务键来平衡性能和顺序性要求。
3. 小结
通过使用 Kafka 的分区和消息键机制,我们可以确保同一业务主键的消息在同一分区内顺序消费。这种方法适用于需要保证顺序性的场景,如订单处理等。生产者确保消息按照业务主键路由到同一分区,消费者则按分区顺序消费消息,从而避免消息乱序的问题。
RocketMQ
在使用 RocketMQ 作为消息队列时,确保消息的顺序消费可以通过 顺序消息(Ordered Message)的特性来实现。RocketMQ 支持两种类型的顺序消费:局部顺序(确保同一消息队列内的消息顺序)和 全局顺序(通过单一队列保证全局顺序,但在高并发情况下可能会影响性能)。
1. 使用 RocketMQ 实现顺序消费
1.1 生产者:发送顺序消息
生产者通过指定消息的 key 来确保具有相同 key 的消息被发送到同一个消息队列,从而保证顺序性。RocketMQ 支持发送顺序消息的 API,通过 MessageQueueSelector
来指定消息发送到哪个队列。
生产者代码实现
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class OrderProducer {private DefaultMQProducer producer;public OrderProducer(String groupName) throws Exception {// 创建生产者实例producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址producer.start();}public void sendOrderMessage(String orderId, String orderMessage) throws Exception {// 创建消息实例Message message = new Message("OrderTopic", "OrderTag", orderMessage.getBytes(RemotingHelper.DEFAULT_CHARSET));// 使用订单ID作为消息的key,确保同一订单的消息发送到同一队列SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String orderId = (String) arg;int queueIndex = orderId.hashCode() % mqs.size(); // 根据订单ID选择队列return mqs.get(queueIndex);}}, orderId);System.out.println("Message sent: " + sendResult);}public void close() {producer.shutdown();}public static void main(String[] args) throws Exception {OrderProducer producer = new OrderProducer("order-group");// 发送顺序消息,确保同一订单的消息被发送到同一队列producer.sendOrderMessage("order123", "Order Created");producer.sendOrderMessage("order123", "Order Paid");producer.sendOrderMessage("order123", "Order Shipped");producer.sendOrderMessage("order456", "Order Created");producer.sendOrderMessage("order456", "Order Paid");producer.close();}
}
- 生产者通过
MessageQueueSelector
来确保相同 key 的消息被发送到相同的队列。这里我们使用orderId
作为消息的 key,通过计算orderId.hashCode()
来决定消息发送到哪个队列。确保同一个订单的消息发送到同一个队列,从而在消费时保持顺序性。 SendResult
会返回发送结果,包括消息发送的状态。
1.2 消费者:顺序消费消息
在消费者端,RocketMQ 提供了 MessageListenerOrderly 接口来实现顺序消费。该接口保证在同一队列内,消息会按照发送的顺序被消费。
消费者代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumer {private DefaultMQPushConsumer consumer;public OrderConsumer(String groupName) throws Exception {// 创建消费者实例consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}public void consumeMessages() throws Exception {// 设置顺序消费监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {// 消费顺序消息System.out.println("Consumed message: " + new String(msg.getBody()));}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumer consumer = new OrderConsumer("order-consumer-group");// 开始消费顺序消息consumer.consumeMessages();}
}
-
消费者使用
MessageListenerOrderly
来实现顺序消费。该接口保证了消费者在同一消息队列内按顺序消费消息。 -
消费者在接收到消息后,会依次消费并输出消息内容。
-
RocketMQ 是基于消息队列的,每个队列内的消息是顺序消费的,即使有多个消费者,也只会有一个消费者消费某个队列的消息。通过将同一 key 的消息发送到同一个队列,可以确保这些消息按照顺序被消费。
-
需要注意的是,RocketMQ 保证的是 局部顺序,即同一队列内的消息按照发送顺序消费。对于多个队列和多个消费者,只有同一个队列内的消息顺序是保证的。
2. RocketMQ 顺序消息的局限性
- 局部顺序保证:RocketMQ 只能保证同一队列内的消息顺序,对于多个队列之间的消息没有顺序保证。
- 性能影响:如果需要保证全局顺序,可能需要将所有消息都发送到同一个队列,这会影响性能,导致吞吐量下降。通常需要在性能和顺序性之间进行权衡。
3. 小结
通过使用 RocketMQ 的 MessageQueueSelector
和 MessageListenerOrderly,我们可以保证同一业务的消息在同一队列内顺序消费。这种方式适用于需要保证顺序的场景,如订单处理、支付等高可靠性的业务系统。生产者通过业务主键选择队列,消费者则顺序消费消息,确保数据一致性和业务流程的正确执行。
方案二: 前置检测(Pre-check)
前置检测(Pre-check)在消息队列消费中,常用于确保消息消费的顺序性,防止因为消息乱序导致的数据不一致或业务错误。其核心思想是在消息消费之前进行验证,确保前置条件满足才继续消费当前消息。
在消费者处理消息之前,进行前置条件检查,确保上一条消息已成功消费。这可以通过消息辅助表来实现,或者在消息中附带序列号、时间戳等信息进行验证。
前置检测的方案
前置检测主要包括以下几种常见方法:
-
使用辅助表进行状态检查:通过创建一个辅助表(如状态表或消息表),记录消息的状态,消费者可以通过查询该表来验证上一个消息是否已经成功处理,确保消息按顺序消费。
-
使用序列号/时间戳进行顺序检查:在消息中包含序列号或时间戳,消费者根据这些信息判断当前消息是否按预期顺序到达。如果不符合顺序,则将当前消息暂时缓存,等待前一个消息处理完成。
-
利用死信队列处理无序消息:当消息的顺序不符合预期时,可以将这些消息暂时放入死信队列(DLQ)中,待前置消息消费成功后再重新消费。
方案1: 使用辅助表进行前置检测
假设在处理订单相关的消息时,我们希望确保订单的状态始终按照正确的顺序处理,比如,Order Created
应该在 Order Paid
前消费。
1.1 方案设计
- 设计一个
order_status
表,记录订单的处理状态。 - 消费者在处理消息前,查询这个表,确保订单的前置状态已经处理完毕。
- 消费失败时,可以将消息放入死信队列或重试。
1.2 数据库表设计
CREATE TABLE order_status (order_id VARCHAR(255) PRIMARY KEY,status VARCHAR(255) NOT NULL,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 状态示例
-- 订单创建:CREATED
-- 订单支付:PAID
-- 订单完成:COMPLETED
1.3 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class OrderConsumerWithPreCheck {private static final String DB_URL = "jdbc:mysql://localhost:3306/order_db";private static final String DB_USER = "root";private static final String DB_PASSWORD = "password";private DefaultMQPushConsumer consumer;public OrderConsumerWithPreCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}// 检查订单状态public boolean checkOrderStatus(String orderId, String expectedStatus) throws Exception {try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {String query = "SELECT status FROM order_status WHERE order_id = ?";try (PreparedStatement statement = connection.prepareStatement(query)) {statement.setString(1, orderId);ResultSet rs = statement.executeQuery();if (rs.next()) {String currentStatus = rs.getString("status");return expectedStatus.equals(currentStatus); // 比对期望状态}}}return false; // 订单未找到,默认返回 false}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String orderId = msg.getKeys(); // 假设订单ID存储在消息的keys字段String currentStatus = new String(msg.getBody());// 检查前置状态,确保当前状态是顺序的try {if ("OrderCreated".equals(currentStatus)) {if (!checkOrderStatus(orderId, "CREATED")) {System.out.println("Order not created yet, skipping message: " + orderId);continue; // 如果前置状态不符合,跳过该消息}} else if ("OrderPaid".equals(currentStatus)) {if (!checkOrderStatus(orderId, "PAID")) {System.out.println("Order not paid yet, skipping message: " + orderId);continue;}}// 消费消息逻辑System.out.println("Processing order message: " + orderId + " - " + currentStatus);// 更新状态或其他业务逻辑} catch (Exception e) {e.printStackTrace();}}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithPreCheck consumer = new OrderConsumerWithPreCheck("order-consumer-group");consumer.consumeMessages();}
}
方案2: 使用序列号/时间戳进行顺序检查
在这种方法中,我们为每个消息分配一个 序列号 或 时间戳,并通过对比当前消息的序列号和前一条消息的序列号来确保消息按顺序消费。如果序列号不符合预期,消费者会将该消息缓存,等待前置消息的消费完成。
2.1 方案设计
- 消息中包含一个 sequenceId 或 timestamp 字段。
- 消费者检查当前消息的 sequenceId,如果当前消息的 sequenceId 小于等于上一个已消费消息的 sequenceId,则跳过当前消息。
2.2 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class OrderConsumerWithSequenceCheck {private DefaultMQPushConsumer consumer;private AtomicInteger lastSequenceId = new AtomicInteger(0); // 记录最后处理的序列号public OrderConsumerWithSequenceCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}// 检查消息的序列号,确保顺序性public boolean checkSequenceId(int currentSequenceId) {return currentSequenceId == lastSequenceId.incrementAndGet();}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {int sequenceId = Integer.parseInt(new String(msg.getBody())); // 消息中的序列号if (!checkSequenceId(sequenceId)) {System.out.println("Out of order message, skipping message with sequence: " + sequenceId);continue; // 如果消息序列号不符合顺序,则跳过}// 消费消息逻辑System.out.println("Processing message with sequence ID: " + sequenceId);// 进行相应的业务处理}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithSequenceCheck consumer = new OrderConsumerWithSequenceCheck("order-consumer-group");consumer.consumeMessages();}
}
3. 小结
前置检测方案的核心是通过验证当前消息的处理条件(如订单的状态或消息的序列号),确保前置条件满足后再继续处理当前消息。此方案能有效防止由于消息乱序导致的数据不一致或业务错误,适用于需要严格保证处理顺序的场景。
- 数据库检查:通过查询数据库记录来验证消息的处理顺序。
- 序列号检查:通过消息中的序列号或时间戳验证消息是否按顺序到达。
方案三: 状态机
可以利用状态机来管理消息的消费顺序和状态。状态机的核心思想是定义系统的不同状态,以及触发状态变更的事件,从而确保消息在正确的状态下被处理。
通过引入状态机,我们能够:
- 通过状态转移机制保证消息按顺序消费。
- 在状态转移过程中,避免非法的状态变更和消息丢失。
1. 状态机的设计思路
在处理消息时,可以将消息消费的过程视为一系列状态的变换。每个消息会根据其当前状态决定是否可以进行处理。
-
定义状态:
- 定义消息消费的不同状态,例如
PENDING
(待处理)、PROCESSING
(处理中)、PROCESSED
(已处理)。 - 每个消息在处理过程中会从一个状态转移到另一个状态。
- 定义消息消费的不同状态,例如
-
定义事件:
- 每个消息可能触发一个事件,事件可以是消息的到达或者某些外部条件的变化。
- 通过事件来决定状态的转移。
-
处理顺序:
- 确保某些消息必须在特定的顺序下处理。比如,某个状态的消息必须先处理完成,才能处理下一个状态的消息。
2. 状态机的实现步骤
- 状态定义:使用枚举类(
enum
)定义消息的状态。 - 事件定义:根据消息到达的顺序或其他外部条件触发不同的事件。
- 状态机实现:根据当前状态和事件的触发来决定状态转移。
3. 设计与实现
假设我们有一个订单处理系统,订单的状态可能为以下几种:
ORDER_CREATED
:订单已创建ORDER_PAID
:订单已支付ORDER_SHIPPED
:订单已发货ORDER_COMPLETED
:订单已完成
我们希望确保消息的消费顺序是按顺序进行的,即订单创建 -> 支付 -> 发货 -> 完成。
3.1 状态机设计
3.1.1 定义状态
首先定义订单状态的枚举类型 OrderState
:
public enum OrderState {ORDER_CREATED, // 订单已创建ORDER_PAID, // 订单已支付ORDER_SHIPPED, // 订单已发货ORDER_COMPLETED // 订单已完成
}
3.1.2 定义事件
根据业务需求,定义事件触发的条件。比如:
ORDER_CREATED_EVENT
:订单创建事件ORDER_PAID_EVENT
:订单支付事件ORDER_SHIPPED_EVENT
:订单发货事件ORDER_COMPLETED_EVENT
:订单完成事件
3.1.3 状态机逻辑
使用一个状态机类来管理状态的转换。状态机会根据当前状态和触发的事件来进行状态转换。
import java.util.HashMap;
import java.util.Map;public class OrderStateMachine {// 订单状态private OrderState currentState;// 状态转移规则,基于当前状态和事件决定下一个状态private final Map<OrderState, Map<String, OrderState>> transitionTable;public OrderStateMachine() {// 初始化状态为 ORDER_CREATEDthis.currentState = OrderState.ORDER_CREATED;// 初始化状态转移规则this.transitionTable = new HashMap<>();// 设置转移规则// 从 ORDER_CREATED 可以转到 ORDER_PAIDaddTransition(OrderState.ORDER_CREATED, "ORDER_CREATED_EVENT", OrderState.ORDER_PAID);// 从 ORDER_PAID 可以转到 ORDER_SHIPPEDaddTransition(OrderState.ORDER_PAID, "ORDER_PAID_EVENT", OrderState.ORDER_SHIPPED);// 从 ORDER_SHIPPED 可以转到 ORDER_COMPLETEDaddTransition(OrderState.ORDER_SHIPPED, "ORDER_SHIPPED_EVENT", OrderState.ORDER_COMPLETED);}// 添加状态转换规则private void addTransition(OrderState fromState, String event, OrderState toState) {transitionTable.putIfAbsent(fromState, new HashMap<>());transitionTable.get(fromState).put(event, toState);}// 处理事件并转换状态public boolean handleEvent(String event) {Map<String, OrderState> transitions = transitionTable.get(currentState);if (transitions != null && transitions.containsKey(event)) {OrderState nextState = transitions.get(event);System.out.println("State transition: " + currentState + " -> " + nextState);this.currentState = nextState; // 执行状态转移return true;} else {System.out.println("Invalid event for the current state: " + currentState);return false;}}// 获取当前状态public OrderState getCurrentState() {return currentState;}
}
3.1.4 使用状态机处理消息
假设我们在消息队列中有不同的订单消息,需要按顺序消费。我们将消费者与状态机结合使用,确保消息按照正确的顺序消费。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumerWithStateMachine {private static final String TOPIC = "OrderTopic";private static final String GROUP = "OrderConsumerGroup";private DefaultMQPushConsumer consumer;private OrderStateMachine stateMachine;public OrderConsumerWithStateMachine() {consumer = new DefaultMQPushConsumer(GROUP);stateMachine = new OrderStateMachine();try {consumer.setNamesrvAddr("localhost:9876");consumer.subscribe(TOPIC, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String event = new String(msg.getBody());System.out.println("Received message: " + event);// 根据消息的内容触发状态机事件if ("ORDER_CREATED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_CREATED_EVENT");} else if ("ORDER_PAID_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_PAID_EVENT");} else if ("ORDER_SHIPPED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_SHIPPED_EVENT");} else if ("ORDER_COMPLETED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_COMPLETED_EVENT");}System.out.println("Current state: " + stateMachine.getCurrentState());}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();System.out.println("Order consumer started");} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {new OrderConsumerWithStateMachine();}
}
4. 运行流程
- 消费者会根据事件(如
ORDER_CREATED_EVENT
,ORDER_PAID_EVENT
等)处理消息。 - 消费者会触发状态机,状态机会根据当前状态和事件来进行状态转换。
- 如果消息的顺序不正确(例如
ORDER_PAID_EVENT
在ORDER_CREATED_EVENT
之前到达),状态机会拒绝处理,并打印Invalid event for the current state
。
5. 小结
- 状态机可以帮助管理消息的消费顺序,确保在处理消息时遵循正确的流程和业务逻辑。
- 通过定义状态和事件,状态机提供了一个清晰的框架来管理复杂的消息处理过程。
- 结合消息队列,状态机可以有效地控制消息的顺序消费,避免乱序带来的问题。
监控与报警
建立系统的监控和报警机制,及时发现并处理消息错乱等异常情况。通过设定阈值或检测规则,监控系统的消息流转,确保及时响应并纠正问题。
- 定期监控消息队列的消费进度,若发现消费滞后或消息顺序异常,自动报警。
- 通过日志和统计信息,捕获异常并自动触发处理流程。
伪实现
public class MessageMonitor {private static final Logger logger = LoggerFactory.getLogger(MessageMonitor.class);public void monitorMessageQueue() {// 假设有一个队列监控机制boolean isOutOfOrder = checkMessageOrder();if (isOutOfOrder) {logger.error("Message order error detected, triggering alert!");// 触发报警或采取恢复措施}}private boolean checkMessageOrder() {// 检查消息顺序是否正常return false; // 假设没有乱序}
}
总结
MQ消息乱序是分布式系统中常见的挑战,直接影响到系统的稳定性和业务一致性。我们可以通过顺序消息、前置检测、状态机等解决方案, 保证消息的顺序性,提高系统的可靠性和用户体验。
相关文章:

Kafka - 消息乱序问题的常见解决方案和实现
文章目录 概述一、MQ消息乱序问题分析1.1 相同topic内的消息乱序1.2 不同topic的消息乱序 二、解决方案方案一: 顺序消息Kafka1. Kafka 顺序消息的实现1.1 生产者:确保同一业务主键的消息发送到同一个分区1.2 消费者:顺序消费消息 2. Kafka 顺…...
【golang】匿名内部协程,值传递与参数传递
代码例子 下面代码的区别是直接调用循环变量,这里使用的就是这个变量的引用,而不是将参数的副本传递给协程执行 for task : range taskChan {wg.Add(1)go func() {defer wg.Done()task.Do() // 使用外部循环变量}() }func DistributeTasks(taskChan &…...

Jenkins与SonarQube持续集成搭建及坑位详解
Jenkins和SonarQube都是软件开发过程中常用的工具,它们在代码管理、构建、测试和质量管理方面发挥着重要作用。以下是关于Jenkins与SonarQube的作用及整合步骤环境搭建的详细解释: 一、Jenkins与SonarQube的作用 Jenkins: Jenkins是一个开源的持续集成和交付工具,它可以帮…...

.NET6 WebAPI从基础到进阶--朝夕教育
1、环境准备 1. Visual Studio 2022 2. .NET6 平台支持 3. Internet Information Services 服务器( IIS ) 4. Linux 服务器 【 CentOS 系统】 ( 跨平台部署使用 ) 5. Linux 服务器下的 Docker 容器( Docker 部署使用) …...

购物车案例--分模块存储数据,发送请求数据渲染,底部总计数量和价格
shift鼠标右键,打开powershell,新建项目 自定义 只有一个页面,不涉及路由,勾选vuex,css,babel 无需保存预设 回车项目开始创建 项目用vscode打开 将src里的内容全部清空 将第七天的课程准备代码复制粘贴到src中 刷新页面&…...

PCIe学习笔记
PCIE高速串行数据总线 当拿到一块板子 比如你要用到PCIE 首先要看这块板子的原理图 一般原理图写的是 PCI express 表示PCIE 以下是Netfpga为例下的PCIE插口元件原理图 
The Rise and Potential of Large Language ModelBased Agents:A Survey---讨论
讨论 论法学硕士研究与Agent研究的互利性 近年来,随着激光诱导金属化技术的发展,激光诱导金属化与化学剂交叉领域的研究取得了长足的进步,促进了这两个领域的发展。在此,我们期待着LLM研究和Agent研究相互提供的一些益处和发展机…...
C语言:const的用法
有时候我们希望定义这样一种变量,它的值不能被改变,在整个作用域中都保持固定。例如,用一个变量来表示班级的最大人数,或者表示缓冲区的大小。为了满足这一要求,可以使用 const 关键字对变量加以限定: con…...

Redis - 集合 Set 及代码实战
Set 类型 定义:类似 Java 中的 HashSet 类,key 是 set 的名字,value 是集合中的值特点 无序元素唯一查找速度快支持交集、并集、补集功能 常见命令 命令功能SADD key member …添加元素SREM key member …删除元素SCARD key获取元素个数SI…...

LabVIEW面向对象编程有什么特点?
LabVIEW面向对象编程(OOP)的特点主要体现在它如何结合传统面向对象编程(OOP)的理念与LabVIEW的图形化编程模式,提供灵活的抽象和模块化的功能。以下是LabVIEW面向对象编程的几个主要特点: 1. 类&#x…...
配置Nginx自签名SSL证书,支持HTTPS
配置Nginx自签名SSL证书的流程 生成一个SSL自签名证书客户端机器信任这个自签名证书修改RHEL服务器的Nginx配置在客户机用curl测试HTTPS 生成一个SSL自签名证书 在RHEL服务器上, 用openssl命令生成一个自签名证书 openssl genrsa -out server.key 2048 #生成一个2048位的RS…...
使用Spring Boot、VUE实现SSE长连接:跟踪文件上传和任务进度
使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度 文章目录 使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度什么是SSE?使用场景前端库选择安装event-source-polyfill1. 创建SSE连接2. 关闭SSE连接3. 结合Vue.js使用 使用Spring B…...

计算机网络技术基础:3.计算机网络的拓扑结构
网络拓扑结构是指用传输媒体互连各种设备的物理布局,即用什么方式把网络中的计算机等设备连接起来。将工作站、服务站等网络设备抽象为点,称为“节点”;将通信线路抽象为线,称为“链路”。由节点和链路构成的抽象结构就是网络拓扑…...

go-zero(十二)消息队列
go zero 消息队列 在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。 go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。 一、…...

会议通知:人工智能通识教育与实践发展暨和鲸科技AI通识课解决方案发布会
今年秋季学期起,全国多所高校面向本科生开设人工智能通识课。 当前人工智能通识课程的建设进展主要分为三种情况: 全市统筹,由某头部高校牵头建设市级人工智能通识课,以北京市、天津市为代表; 已于秋季学期按照课程…...
UDS自动化测试-Service 0x27(CAPL调用dll实现key计算)
文章目录 关联文章一、CANoe加载诊断数据库cdd、dll文件二、CAPLdiagGenerateKeyFromSeed关联文章 UDS - 深论Security Access Service 27服务-安全访问状态转换 CDD文件——CANdelaStudio Vector——CAPL语言设计 CANoe诊断测试 相信读者基于Diagnostic/ISO TP Confighratio…...
订单编号如何实现
背景 常见的订单编号是带有一些信息的,比如说创建日期例如:本案例中的订单日期 自增编号日期可以使用格式化字符串,自增则可以使用redis来实现 代码实现 redis就有自增的方法 每天的key都是不一样的,且过期时间设置为1天 // 生成…...

Vue3 大事件管理系统
Vue3 项目实战: 🆗好久没有更新blog,最近在找工作,还有准备考试,哎,😶🌫️爆炸的大环境🥲 内卷开始🌯🌯 本篇文章涉及的技术栈速通链接&#x…...

IOS通过WDA自动化中遇到的问题
IOS自动化遇到的问题 搭建WDA环境中遇到的问题1、XCode unsupport iphone xxx.2、创建Bundle Identifier出现问题:Communication with Apple failed3、创建Bundle Identifier出现问题:Automatic signing failed \Signing certificate is invalid4、创建B…...
单独测试 pyautogui 的鼠标点击功能,确保它能够在当前环境中正常工作,鼠标自动点击的录制回放功能
感谢您提供的详细日志信息。根据您的反馈,问题可能出在 pyautogui 没有正确获取鼠标焦点或无法在预期的位置执行点击操作。我们将采取以下步骤来进一步诊断和解决这个问题: 1. **确保 pyautogui 正确执行点击操作**: - 我们将添加更多的调…...

2025年上海市“星光计划”第十一届职业院校技能大赛 网络安全赛项技能操作模块样题
2025年上海市“星光计划”第十一届职业院校技能大赛 网络安全赛项技能操作模块样题 (二)模块 A:安全事件响应、网络安全数据取证、应用安全、系统安全任务一:漏洞扫描与利用:任务二:Windows 操作系统渗透测试 :任务三&…...
后端解决跨域问题的三种方案:注解配置 vs 全局配置 vs 过滤器配置(附完整代码详解)
文章目录 一、引言:跨域问题的本质与解决方案分类解决方案分类二、方案一:`WebMvcConfigurer` 全局配置(推荐)1. 核心代码(你提供的 `CorsConfig` 示例)2. 代码详解3. 优点4. 注意事项三、方案二:`CorsFilter` 过滤器配置(传统方式)1. 核心代码(你提供的 `ResourcesC…...

【Redis】笔记|第10节|京东HotKey实现多级缓存架构
缓存架构 京东HotKey架构 代码结构 代码详情 功能点:(如代码有错误,欢迎讨论纠正) 多级缓存,先查HotKey缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新…...

Redis常见使用场景解析
1. 数据库缓存 Redis 作为典型的 Key-Value 型内存数据库,数据缓存是其最广为人知的应用场景。使用 Redis 缓存数据操作简便,通常将序列化后的对象以 string 类型存储。但在实际应用中,需注意以下关键要点: Key 设计:必须确保不同对象的 Key 具有唯一性,且尽量缩短长度,…...

从以物换物到DeFi:交易的演变与Arbitrum的DeFi生态
交易的本质:从以物换物到现代金融 交易是人类社会经济活动的核心,是通过交换资源(如货物、服务或货币)满足各方需求的行为。其本质是价值交换,旨在实现资源的优化配置。交易的历史可以追溯到人类文明的起源࿰…...

附加模块--Qt Shader Tools功能及架构解析
Qt 6.0 引入了全新的 Shader Tools 模块,为着色器管理提供了现代化、跨平台的解决方案。 一、主要功能 核心功能 跨平台着色器编译 支持 GLSL、HLSL 和 MetalSL 着色器语言 可在运行时或构建时进行着色器编译 自动处理不同图形API的着色器变体 SPIR-V 支持 能…...

Xilinx IP 解析之 Block Memory Generator v8.4 ——02-如何配置 IP(仅 Native 接口)
相关文章: Xilinx IP 解析之 Block Memory Generator v8.4 ——01-手册重点解读(仅Native RAM) – 徐晓康的博客 Xilinx IP 解析之 Block Memory Generator v8.4 ——02-如何配置 IP(仅 Native RAM) – 徐晓康的博客 V…...

【原创】基于视觉模型+FFmpeg+MoviePy实现短视频自动化二次编辑+多赛道
AI视频处理系统功能总览 🎯 系统概述 这是一个智能短视频自动化处理系统,专门用于视频搬运和二次创作。系统支持多赛道配置,可以根据不同的内容类型(如"外国人少系列"等)应用不同的处理策略。 Ἵ…...
Vue前端篇——Vue 3的watch深度解析
📌 前言 在 Vue.js 的世界中,“数据驱动”是其核心理念之一。而在这一理念下,watch 扮演着一个非常关键的角色。它允许我们监听响应式数据的变化,并在其发生变化时执行特定的业务逻辑。 本文将通过实际代码示例,深入…...
2. 库的操作
2.1 创建数据库 语法: CREATE DATABASE [IF NOT EXISTS] db_name [create_specification [, create_specification] ...] create_specification: [DEFAULT] CHARACTER SET charset_name # 字符集: 存储编码 [DEFAULT] COLLATE collation_name # 校验集: 比较/选择/读…...