Kafka - 消息乱序问题的常见解决方案和实现
文章目录
- 概述
- 一、MQ消息乱序问题分析
- 1.1 相同topic内的消息乱序
- 1.2 不同topic的消息乱序
- 二、解决方案
- 方案一: 顺序消息
- Kafka
- 1. Kafka 顺序消息的实现
- 1.1 生产者:确保同一业务主键的消息发送到同一个分区
- 1.2 消费者:顺序消费消息
- 2. Kafka 顺序消息实现的局限性
- 3. 小结
- RocketMQ
- 1. 使用 RocketMQ 实现顺序消费
- 1.1 生产者:发送顺序消息
- 1.2 消费者:顺序消费消息
- 2. RocketMQ 顺序消息的局限性
- 3. 小结
- 方案二: 前置检测(Pre-check)
- 前置检测的方案
- 方案1: 使用辅助表进行前置检测
- 1.1 方案设计
- 1.2 数据库表设计
- 1.3 消费者前置检测代码实现
- 方案2: 使用序列号/时间戳进行顺序检查
- 2.1 方案设计
- 2.2 消费者前置检测代码实现
- 3. 小结
- 方案三: 状态机
- 1. 状态机的设计思路
- 2. 状态机的实现步骤
- 3. 设计与实现
- 3.1 状态机设计
- 3.1.1 定义状态
- 3.1.2 定义事件
- 3.1.3 状态机逻辑
- 3.1.4 使用状态机处理消息
- 4. 运行流程
- 5. 小结
- 监控与报警
- 伪实现
- 总结
概述
在分布式系统中,消息队列(MQ)作为实现系统解耦和异步通信的重要工具,广泛应用于各种业务场景。然而,消息消费时出现的乱序问题,常常会对业务逻辑的正确执行和系统稳定性产生不良影响。
接下来我们将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方案,包括顺序消息、前置检测、状态机等方式

一、MQ消息乱序问题分析
1.1 相同topic内的消息乱序
- 并发消费:为了提高消息处理吞吐量,通常会配置多个消费者实例来并发消费同一个队列中的消息。然而,由于消费者实例的性能差异,可能导致消息的消费顺序与发送顺序不一致。
- 消息分区:MQ系统通常采用分区化设计,当同一业务逻辑的消息分发到不同的分区时,可能出现乱序。
- 网络延迟与抖动:消息在传输过程中可能会受到网络延迟和抖动的影响,导致消息到达消费者端的顺序与发送顺序不一致。
- 消息重试与故障恢复:当消费者处理消息失败或出现故障时,重试机制或故障恢复操作不当,也可能导致消息乱序。
1.2 不同topic的消息乱序
例如,系统A在01:00时向TopicA发送了消息msgA-01:00,而系统B在01:01时向TopicB发送了消息msgB-01:01。消费者无法预设msgA-01:00必然先于msgB-01:01被接收。消息系统中的分区策略、消费者的处理能力、网络等因素共同导致无法确保消息遵循严格的先进先出(FIFO)原则。
二、解决方案
为了应对消息乱序问题,有几种常见的解决方案,包括顺序消息、前置检测、状态机等。
方案一: 顺序消息
顺序消息是通过确保同一业务主键的消息发送到同一分区,从而保证消息的顺序性。
Kafka
以 Kafka 为例,虽然它不保证全局消息顺序,但可以通过合理的分区策略和消息键来确保消息的局部顺序性。
下面是使用 Kafka 作为消息队列(MQ)时,如何实现顺序消息的解决方案。通过使用 Kafka 的分区策略和消息键(key),可以确保同一业务主键的消息发送到同一个分区,从而保证消息的顺序性。
1. Kafka 顺序消息的实现
1.1 生产者:确保同一业务主键的消息发送到同一个分区
通过指定消息的 key,Kafka 会确保具有相同 key 的消息发送到同一个分区。这样,即使多个消费者并行消费,也能保证消息在同一个分区内的顺序。
生产者代码实现
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class OrderProducer {private final KafkaProducer<String, String> producer;private final String topic;public OrderProducer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());this.producer = new KafkaProducer<>(properties);}public void sendOrderMessage(String orderId, String orderMessage) {// 使用订单ID作为消息的 key,以确保同一订单的消息发送到同一个分区ProducerRecord<String, String> record = new ProducerRecord<>(topic, orderId, orderMessage);producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Message sent: " + metadata);}});}public void close() {producer.close();}public static void main(String[] args) {OrderProducer orderProducer = new OrderProducer("order-topic");// 发送顺序消息,确保同一订单的消息被发送到同一分区orderProducer.sendOrderMessage("order123", "Order Created");orderProducer.sendOrderMessage("order123", "Order Paid");orderProducer.sendOrderMessage("order123", "Order Shipped");// 发送另一个订单的消息orderProducer.sendOrderMessage("order456", "Order Created");orderProducer.sendOrderMessage("order456", "Order Paid");orderProducer.close();}
}
- 在生产者端,通过
ProducerRecord发送消息时,设置了消息的 key 为订单 ID(orderId)。Kafka 会使用该 key 来确定消息发送到哪个分区,从而确保同一订单的所有消息都会被发送到同一个分区,保证顺序。 producer.send()方法的回调函数用来处理消息发送的异步结果。
1.2 消费者:顺序消费消息
消费者使用 MessageListener 或 Consumer 来消费消息。Kafka 默认会根据分区消费顺序保证同一分区内消息的顺序。我们只需要保证同一个业务的消息被路由到同一个分区,消费者就能顺序消费这些消息。
消费者代码实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class OrderConsumer {private final KafkaConsumer<String, String> consumer;private final String topic;public OrderConsumer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "order-consumer-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("auto.offset.reset", "earliest");this.consumer = new KafkaConsumer<>(properties);}public void consumeMessages() {consumer.subscribe(Collections.singletonList(topic));while (true) {consumer.poll(1000).forEach(record -> {// 处理顺序消息System.out.println("Consumed message: " + record.key() + " - " + record.value());});}}public void close() {consumer.close();}public static void main(String[] args) {OrderConsumer orderConsumer = new OrderConsumer("order-topic");// 消费消息,确保同一个订单的消息顺序消费orderConsumer.consumeMessages();}
}
-
消费者通过
KafkaConsumer从指定的 topic 中拉取消息。在这种实现中,消息会按照 Kafka 内部的消费机制被顺序消费。 -
consumer.poll()方法定期从 Kafka 中拉取消息,并根据key分配到相应的分区进行消费。 -
Kafka 的分区是顺序消费的,即每个分区内的消息按照生产者发送的顺序消费。因此,通过确保同一订单的消息使用相同的
key,就能保证同一分区内消息的消费顺序。
2. Kafka 顺序消息实现的局限性
- 局部顺序保证:Kafka 只能保证同一分区内的消息顺序,对于跨分区的消息并不保证顺序。因此,确保同一业务的消息发送到同一分区非常关键。
- 性能与吞吐量:为了提高系统的吞吐量和并发能力,Kafka 会对 topic 进行分区。分区数过多可能影响顺序性,但可以通过合理设计业务键来平衡性能和顺序性要求。
3. 小结
通过使用 Kafka 的分区和消息键机制,我们可以确保同一业务主键的消息在同一分区内顺序消费。这种方法适用于需要保证顺序性的场景,如订单处理等。生产者确保消息按照业务主键路由到同一分区,消费者则按分区顺序消费消息,从而避免消息乱序的问题。
RocketMQ
在使用 RocketMQ 作为消息队列时,确保消息的顺序消费可以通过 顺序消息(Ordered Message)的特性来实现。RocketMQ 支持两种类型的顺序消费:局部顺序(确保同一消息队列内的消息顺序)和 全局顺序(通过单一队列保证全局顺序,但在高并发情况下可能会影响性能)。
1. 使用 RocketMQ 实现顺序消费
1.1 生产者:发送顺序消息
生产者通过指定消息的 key 来确保具有相同 key 的消息被发送到同一个消息队列,从而保证顺序性。RocketMQ 支持发送顺序消息的 API,通过 MessageQueueSelector 来指定消息发送到哪个队列。
生产者代码实现
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class OrderProducer {private DefaultMQProducer producer;public OrderProducer(String groupName) throws Exception {// 创建生产者实例producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址producer.start();}public void sendOrderMessage(String orderId, String orderMessage) throws Exception {// 创建消息实例Message message = new Message("OrderTopic", "OrderTag", orderMessage.getBytes(RemotingHelper.DEFAULT_CHARSET));// 使用订单ID作为消息的key,确保同一订单的消息发送到同一队列SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String orderId = (String) arg;int queueIndex = orderId.hashCode() % mqs.size(); // 根据订单ID选择队列return mqs.get(queueIndex);}}, orderId);System.out.println("Message sent: " + sendResult);}public void close() {producer.shutdown();}public static void main(String[] args) throws Exception {OrderProducer producer = new OrderProducer("order-group");// 发送顺序消息,确保同一订单的消息被发送到同一队列producer.sendOrderMessage("order123", "Order Created");producer.sendOrderMessage("order123", "Order Paid");producer.sendOrderMessage("order123", "Order Shipped");producer.sendOrderMessage("order456", "Order Created");producer.sendOrderMessage("order456", "Order Paid");producer.close();}
}
- 生产者通过
MessageQueueSelector来确保相同 key 的消息被发送到相同的队列。这里我们使用orderId作为消息的 key,通过计算orderId.hashCode()来决定消息发送到哪个队列。确保同一个订单的消息发送到同一个队列,从而在消费时保持顺序性。 SendResult会返回发送结果,包括消息发送的状态。
1.2 消费者:顺序消费消息
在消费者端,RocketMQ 提供了 MessageListenerOrderly 接口来实现顺序消费。该接口保证在同一队列内,消息会按照发送的顺序被消费。
消费者代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumer {private DefaultMQPushConsumer consumer;public OrderConsumer(String groupName) throws Exception {// 创建消费者实例consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}public void consumeMessages() throws Exception {// 设置顺序消费监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {// 消费顺序消息System.out.println("Consumed message: " + new String(msg.getBody()));}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumer consumer = new OrderConsumer("order-consumer-group");// 开始消费顺序消息consumer.consumeMessages();}
}
-
消费者使用
MessageListenerOrderly来实现顺序消费。该接口保证了消费者在同一消息队列内按顺序消费消息。 -
消费者在接收到消息后,会依次消费并输出消息内容。
-
RocketMQ 是基于消息队列的,每个队列内的消息是顺序消费的,即使有多个消费者,也只会有一个消费者消费某个队列的消息。通过将同一 key 的消息发送到同一个队列,可以确保这些消息按照顺序被消费。
-
需要注意的是,RocketMQ 保证的是 局部顺序,即同一队列内的消息按照发送顺序消费。对于多个队列和多个消费者,只有同一个队列内的消息顺序是保证的。
2. RocketMQ 顺序消息的局限性
- 局部顺序保证:RocketMQ 只能保证同一队列内的消息顺序,对于多个队列之间的消息没有顺序保证。
- 性能影响:如果需要保证全局顺序,可能需要将所有消息都发送到同一个队列,这会影响性能,导致吞吐量下降。通常需要在性能和顺序性之间进行权衡。
3. 小结
通过使用 RocketMQ 的 MessageQueueSelector 和 MessageListenerOrderly,我们可以保证同一业务的消息在同一队列内顺序消费。这种方式适用于需要保证顺序的场景,如订单处理、支付等高可靠性的业务系统。生产者通过业务主键选择队列,消费者则顺序消费消息,确保数据一致性和业务流程的正确执行。
方案二: 前置检测(Pre-check)
前置检测(Pre-check)在消息队列消费中,常用于确保消息消费的顺序性,防止因为消息乱序导致的数据不一致或业务错误。其核心思想是在消息消费之前进行验证,确保前置条件满足才继续消费当前消息。
在消费者处理消息之前,进行前置条件检查,确保上一条消息已成功消费。这可以通过消息辅助表来实现,或者在消息中附带序列号、时间戳等信息进行验证。
前置检测的方案
前置检测主要包括以下几种常见方法:
-
使用辅助表进行状态检查:通过创建一个辅助表(如状态表或消息表),记录消息的状态,消费者可以通过查询该表来验证上一个消息是否已经成功处理,确保消息按顺序消费。
-
使用序列号/时间戳进行顺序检查:在消息中包含序列号或时间戳,消费者根据这些信息判断当前消息是否按预期顺序到达。如果不符合顺序,则将当前消息暂时缓存,等待前一个消息处理完成。
-
利用死信队列处理无序消息:当消息的顺序不符合预期时,可以将这些消息暂时放入死信队列(DLQ)中,待前置消息消费成功后再重新消费。
方案1: 使用辅助表进行前置检测
假设在处理订单相关的消息时,我们希望确保订单的状态始终按照正确的顺序处理,比如,Order Created 应该在 Order Paid 前消费。
1.1 方案设计
- 设计一个
order_status表,记录订单的处理状态。 - 消费者在处理消息前,查询这个表,确保订单的前置状态已经处理完毕。
- 消费失败时,可以将消息放入死信队列或重试。
1.2 数据库表设计
CREATE TABLE order_status (order_id VARCHAR(255) PRIMARY KEY,status VARCHAR(255) NOT NULL,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 状态示例
-- 订单创建:CREATED
-- 订单支付:PAID
-- 订单完成:COMPLETED
1.3 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class OrderConsumerWithPreCheck {private static final String DB_URL = "jdbc:mysql://localhost:3306/order_db";private static final String DB_USER = "root";private static final String DB_PASSWORD = "password";private DefaultMQPushConsumer consumer;public OrderConsumerWithPreCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}// 检查订单状态public boolean checkOrderStatus(String orderId, String expectedStatus) throws Exception {try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {String query = "SELECT status FROM order_status WHERE order_id = ?";try (PreparedStatement statement = connection.prepareStatement(query)) {statement.setString(1, orderId);ResultSet rs = statement.executeQuery();if (rs.next()) {String currentStatus = rs.getString("status");return expectedStatus.equals(currentStatus); // 比对期望状态}}}return false; // 订单未找到,默认返回 false}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String orderId = msg.getKeys(); // 假设订单ID存储在消息的keys字段String currentStatus = new String(msg.getBody());// 检查前置状态,确保当前状态是顺序的try {if ("OrderCreated".equals(currentStatus)) {if (!checkOrderStatus(orderId, "CREATED")) {System.out.println("Order not created yet, skipping message: " + orderId);continue; // 如果前置状态不符合,跳过该消息}} else if ("OrderPaid".equals(currentStatus)) {if (!checkOrderStatus(orderId, "PAID")) {System.out.println("Order not paid yet, skipping message: " + orderId);continue;}}// 消费消息逻辑System.out.println("Processing order message: " + orderId + " - " + currentStatus);// 更新状态或其他业务逻辑} catch (Exception e) {e.printStackTrace();}}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithPreCheck consumer = new OrderConsumerWithPreCheck("order-consumer-group");consumer.consumeMessages();}
}
方案2: 使用序列号/时间戳进行顺序检查
在这种方法中,我们为每个消息分配一个 序列号 或 时间戳,并通过对比当前消息的序列号和前一条消息的序列号来确保消息按顺序消费。如果序列号不符合预期,消费者会将该消息缓存,等待前置消息的消费完成。
2.1 方案设计
- 消息中包含一个 sequenceId 或 timestamp 字段。
- 消费者检查当前消息的 sequenceId,如果当前消息的 sequenceId 小于等于上一个已消费消息的 sequenceId,则跳过当前消息。
2.2 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class OrderConsumerWithSequenceCheck {private DefaultMQPushConsumer consumer;private AtomicInteger lastSequenceId = new AtomicInteger(0); // 记录最后处理的序列号public OrderConsumerWithSequenceCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag}// 检查消息的序列号,确保顺序性public boolean checkSequenceId(int currentSequenceId) {return currentSequenceId == lastSequenceId.incrementAndGet();}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {int sequenceId = Integer.parseInt(new String(msg.getBody())); // 消息中的序列号if (!checkSequenceId(sequenceId)) {System.out.println("Out of order message, skipping message with sequence: " + sequenceId);continue; // 如果消息序列号不符合顺序,则跳过}// 消费消息逻辑System.out.println("Processing message with sequence ID: " + sequenceId);// 进行相应的业务处理}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithSequenceCheck consumer = new OrderConsumerWithSequenceCheck("order-consumer-group");consumer.consumeMessages();}
}
3. 小结
前置检测方案的核心是通过验证当前消息的处理条件(如订单的状态或消息的序列号),确保前置条件满足后再继续处理当前消息。此方案能有效防止由于消息乱序导致的数据不一致或业务错误,适用于需要严格保证处理顺序的场景。
- 数据库检查:通过查询数据库记录来验证消息的处理顺序。
- 序列号检查:通过消息中的序列号或时间戳验证消息是否按顺序到达。
方案三: 状态机
可以利用状态机来管理消息的消费顺序和状态。状态机的核心思想是定义系统的不同状态,以及触发状态变更的事件,从而确保消息在正确的状态下被处理。
通过引入状态机,我们能够:
- 通过状态转移机制保证消息按顺序消费。
- 在状态转移过程中,避免非法的状态变更和消息丢失。
1. 状态机的设计思路
在处理消息时,可以将消息消费的过程视为一系列状态的变换。每个消息会根据其当前状态决定是否可以进行处理。
-
定义状态:
- 定义消息消费的不同状态,例如
PENDING(待处理)、PROCESSING(处理中)、PROCESSED(已处理)。 - 每个消息在处理过程中会从一个状态转移到另一个状态。
- 定义消息消费的不同状态,例如
-
定义事件:
- 每个消息可能触发一个事件,事件可以是消息的到达或者某些外部条件的变化。
- 通过事件来决定状态的转移。
-
处理顺序:
- 确保某些消息必须在特定的顺序下处理。比如,某个状态的消息必须先处理完成,才能处理下一个状态的消息。
2. 状态机的实现步骤
- 状态定义:使用枚举类(
enum)定义消息的状态。 - 事件定义:根据消息到达的顺序或其他外部条件触发不同的事件。
- 状态机实现:根据当前状态和事件的触发来决定状态转移。
3. 设计与实现
假设我们有一个订单处理系统,订单的状态可能为以下几种:
ORDER_CREATED:订单已创建ORDER_PAID:订单已支付ORDER_SHIPPED:订单已发货ORDER_COMPLETED:订单已完成
我们希望确保消息的消费顺序是按顺序进行的,即订单创建 -> 支付 -> 发货 -> 完成。
3.1 状态机设计
3.1.1 定义状态
首先定义订单状态的枚举类型 OrderState:
public enum OrderState {ORDER_CREATED, // 订单已创建ORDER_PAID, // 订单已支付ORDER_SHIPPED, // 订单已发货ORDER_COMPLETED // 订单已完成
}
3.1.2 定义事件
根据业务需求,定义事件触发的条件。比如:
ORDER_CREATED_EVENT:订单创建事件ORDER_PAID_EVENT:订单支付事件ORDER_SHIPPED_EVENT:订单发货事件ORDER_COMPLETED_EVENT:订单完成事件
3.1.3 状态机逻辑
使用一个状态机类来管理状态的转换。状态机会根据当前状态和触发的事件来进行状态转换。
import java.util.HashMap;
import java.util.Map;public class OrderStateMachine {// 订单状态private OrderState currentState;// 状态转移规则,基于当前状态和事件决定下一个状态private final Map<OrderState, Map<String, OrderState>> transitionTable;public OrderStateMachine() {// 初始化状态为 ORDER_CREATEDthis.currentState = OrderState.ORDER_CREATED;// 初始化状态转移规则this.transitionTable = new HashMap<>();// 设置转移规则// 从 ORDER_CREATED 可以转到 ORDER_PAIDaddTransition(OrderState.ORDER_CREATED, "ORDER_CREATED_EVENT", OrderState.ORDER_PAID);// 从 ORDER_PAID 可以转到 ORDER_SHIPPEDaddTransition(OrderState.ORDER_PAID, "ORDER_PAID_EVENT", OrderState.ORDER_SHIPPED);// 从 ORDER_SHIPPED 可以转到 ORDER_COMPLETEDaddTransition(OrderState.ORDER_SHIPPED, "ORDER_SHIPPED_EVENT", OrderState.ORDER_COMPLETED);}// 添加状态转换规则private void addTransition(OrderState fromState, String event, OrderState toState) {transitionTable.putIfAbsent(fromState, new HashMap<>());transitionTable.get(fromState).put(event, toState);}// 处理事件并转换状态public boolean handleEvent(String event) {Map<String, OrderState> transitions = transitionTable.get(currentState);if (transitions != null && transitions.containsKey(event)) {OrderState nextState = transitions.get(event);System.out.println("State transition: " + currentState + " -> " + nextState);this.currentState = nextState; // 执行状态转移return true;} else {System.out.println("Invalid event for the current state: " + currentState);return false;}}// 获取当前状态public OrderState getCurrentState() {return currentState;}
}
3.1.4 使用状态机处理消息
假设我们在消息队列中有不同的订单消息,需要按顺序消费。我们将消费者与状态机结合使用,确保消息按照正确的顺序消费。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumerWithStateMachine {private static final String TOPIC = "OrderTopic";private static final String GROUP = "OrderConsumerGroup";private DefaultMQPushConsumer consumer;private OrderStateMachine stateMachine;public OrderConsumerWithStateMachine() {consumer = new DefaultMQPushConsumer(GROUP);stateMachine = new OrderStateMachine();try {consumer.setNamesrvAddr("localhost:9876");consumer.subscribe(TOPIC, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String event = new String(msg.getBody());System.out.println("Received message: " + event);// 根据消息的内容触发状态机事件if ("ORDER_CREATED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_CREATED_EVENT");} else if ("ORDER_PAID_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_PAID_EVENT");} else if ("ORDER_SHIPPED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_SHIPPED_EVENT");} else if ("ORDER_COMPLETED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_COMPLETED_EVENT");}System.out.println("Current state: " + stateMachine.getCurrentState());}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();System.out.println("Order consumer started");} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {new OrderConsumerWithStateMachine();}
}
4. 运行流程
- 消费者会根据事件(如
ORDER_CREATED_EVENT,ORDER_PAID_EVENT等)处理消息。 - 消费者会触发状态机,状态机会根据当前状态和事件来进行状态转换。
- 如果消息的顺序不正确(例如
ORDER_PAID_EVENT在ORDER_CREATED_EVENT之前到达),状态机会拒绝处理,并打印Invalid event for the current state。
5. 小结
- 状态机可以帮助管理消息的消费顺序,确保在处理消息时遵循正确的流程和业务逻辑。
- 通过定义状态和事件,状态机提供了一个清晰的框架来管理复杂的消息处理过程。
- 结合消息队列,状态机可以有效地控制消息的顺序消费,避免乱序带来的问题。
监控与报警
建立系统的监控和报警机制,及时发现并处理消息错乱等异常情况。通过设定阈值或检测规则,监控系统的消息流转,确保及时响应并纠正问题。
- 定期监控消息队列的消费进度,若发现消费滞后或消息顺序异常,自动报警。
- 通过日志和统计信息,捕获异常并自动触发处理流程。
伪实现
public class MessageMonitor {private static final Logger logger = LoggerFactory.getLogger(MessageMonitor.class);public void monitorMessageQueue() {// 假设有一个队列监控机制boolean isOutOfOrder = checkMessageOrder();if (isOutOfOrder) {logger.error("Message order error detected, triggering alert!");// 触发报警或采取恢复措施}}private boolean checkMessageOrder() {// 检查消息顺序是否正常return false; // 假设没有乱序}
}
总结
MQ消息乱序是分布式系统中常见的挑战,直接影响到系统的稳定性和业务一致性。我们可以通过顺序消息、前置检测、状态机等解决方案, 保证消息的顺序性,提高系统的可靠性和用户体验。

相关文章:
Kafka - 消息乱序问题的常见解决方案和实现
文章目录 概述一、MQ消息乱序问题分析1.1 相同topic内的消息乱序1.2 不同topic的消息乱序 二、解决方案方案一: 顺序消息Kafka1. Kafka 顺序消息的实现1.1 生产者:确保同一业务主键的消息发送到同一个分区1.2 消费者:顺序消费消息 2. Kafka 顺…...
【golang】匿名内部协程,值传递与参数传递
代码例子 下面代码的区别是直接调用循环变量,这里使用的就是这个变量的引用,而不是将参数的副本传递给协程执行 for task : range taskChan {wg.Add(1)go func() {defer wg.Done()task.Do() // 使用外部循环变量}() }func DistributeTasks(taskChan &…...
Jenkins与SonarQube持续集成搭建及坑位详解
Jenkins和SonarQube都是软件开发过程中常用的工具,它们在代码管理、构建、测试和质量管理方面发挥着重要作用。以下是关于Jenkins与SonarQube的作用及整合步骤环境搭建的详细解释: 一、Jenkins与SonarQube的作用 Jenkins: Jenkins是一个开源的持续集成和交付工具,它可以帮…...
.NET6 WebAPI从基础到进阶--朝夕教育
1、环境准备 1. Visual Studio 2022 2. .NET6 平台支持 3. Internet Information Services 服务器( IIS ) 4. Linux 服务器 【 CentOS 系统】 ( 跨平台部署使用 ) 5. Linux 服务器下的 Docker 容器( Docker 部署使用) …...
购物车案例--分模块存储数据,发送请求数据渲染,底部总计数量和价格
shift鼠标右键,打开powershell,新建项目 自定义 只有一个页面,不涉及路由,勾选vuex,css,babel 无需保存预设 回车项目开始创建 项目用vscode打开 将src里的内容全部清空 将第七天的课程准备代码复制粘贴到src中 刷新页面&…...
PCIe学习笔记
PCIE高速串行数据总线 当拿到一块板子 比如你要用到PCIE 首先要看这块板子的原理图 一般原理图写的是 PCI express 表示PCIE 以下是Netfpga为例下的PCIE插口元件原理图 的特点主要体现在它如何结合传统面向对象编程(OOP)的理念与LabVIEW的图形化编程模式,提供灵活的抽象和模块化的功能。以下是LabVIEW面向对象编程的几个主要特点: 1. 类&#x…...
配置Nginx自签名SSL证书,支持HTTPS
配置Nginx自签名SSL证书的流程 生成一个SSL自签名证书客户端机器信任这个自签名证书修改RHEL服务器的Nginx配置在客户机用curl测试HTTPS 生成一个SSL自签名证书 在RHEL服务器上, 用openssl命令生成一个自签名证书 openssl genrsa -out server.key 2048 #生成一个2048位的RS…...
使用Spring Boot、VUE实现SSE长连接:跟踪文件上传和任务进度
使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度 文章目录 使用Spring Boot实现SSE长连接:跟踪文件上传和任务进度什么是SSE?使用场景前端库选择安装event-source-polyfill1. 创建SSE连接2. 关闭SSE连接3. 结合Vue.js使用 使用Spring B…...
计算机网络技术基础:3.计算机网络的拓扑结构
网络拓扑结构是指用传输媒体互连各种设备的物理布局,即用什么方式把网络中的计算机等设备连接起来。将工作站、服务站等网络设备抽象为点,称为“节点”;将通信线路抽象为线,称为“链路”。由节点和链路构成的抽象结构就是网络拓扑…...
go-zero(十二)消息队列
go zero 消息队列 在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。 go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。 一、…...
会议通知:人工智能通识教育与实践发展暨和鲸科技AI通识课解决方案发布会
今年秋季学期起,全国多所高校面向本科生开设人工智能通识课。 当前人工智能通识课程的建设进展主要分为三种情况: 全市统筹,由某头部高校牵头建设市级人工智能通识课,以北京市、天津市为代表; 已于秋季学期按照课程…...
UDS自动化测试-Service 0x27(CAPL调用dll实现key计算)
文章目录 关联文章一、CANoe加载诊断数据库cdd、dll文件二、CAPLdiagGenerateKeyFromSeed关联文章 UDS - 深论Security Access Service 27服务-安全访问状态转换 CDD文件——CANdelaStudio Vector——CAPL语言设计 CANoe诊断测试 相信读者基于Diagnostic/ISO TP Confighratio…...
订单编号如何实现
背景 常见的订单编号是带有一些信息的,比如说创建日期例如:本案例中的订单日期 自增编号日期可以使用格式化字符串,自增则可以使用redis来实现 代码实现 redis就有自增的方法 每天的key都是不一样的,且过期时间设置为1天 // 生成…...
Vue3 大事件管理系统
Vue3 项目实战: 🆗好久没有更新blog,最近在找工作,还有准备考试,哎,😶🌫️爆炸的大环境🥲 内卷开始🌯🌯 本篇文章涉及的技术栈速通链接&#x…...
IOS通过WDA自动化中遇到的问题
IOS自动化遇到的问题 搭建WDA环境中遇到的问题1、XCode unsupport iphone xxx.2、创建Bundle Identifier出现问题:Communication with Apple failed3、创建Bundle Identifier出现问题:Automatic signing failed \Signing certificate is invalid4、创建B…...
单独测试 pyautogui 的鼠标点击功能,确保它能够在当前环境中正常工作,鼠标自动点击的录制回放功能
感谢您提供的详细日志信息。根据您的反馈,问题可能出在 pyautogui 没有正确获取鼠标焦点或无法在预期的位置执行点击操作。我们将采取以下步骤来进一步诊断和解决这个问题: 1. **确保 pyautogui 正确执行点击操作**: - 我们将添加更多的调…...
Llama-3.2V-11B-cot保姆级教学:Streamlit缓存机制加速推理响应
Llama-3.2V-11B-cot保姆级教学:Streamlit缓存机制加速推理响应 1. 项目概述 Llama-3.2V-11B-cot是基于Meta Llama-3.2V-11B-cot多模态大模型开发的高性能视觉推理工具,专为双卡4090环境深度优化。这个工具解决了视觉权重加载的关键Bug,支持…...
Java集成LibreOffice实现高效Office文档批量转PDF方案
1. 为什么选择LibreOffice进行文档转换 在企业日常办公中,我们经常需要处理大量的Office文档。想象一下这样的场景:财务部门每月要生成上百份报表,人力资源部门要处理大量简历,而市场部门则需要频繁修改和分享各种方案文档。这些文…...
OpenClaw浏览器自动化:Qwen3-14b_int4_awq驱动网页检索与数据抓取
OpenClaw浏览器自动化:Qwen3-14b_int4_awq驱动网页检索与数据抓取 1. 为什么需要浏览器自动化助手 作为一个经常需要收集行业动态的技术博主,我每天要花大量时间在不同网站间切换、搜索关键词、复制粘贴数据。这种重复劳动不仅效率低下,还容…...
Wireshark网络协议分析与故障排查实战指南
1. Wireshark网络分析入门指南作为一名网络工程师,我使用Wireshark进行网络故障排查已有8年时间。这款开源网络协议分析器确实改变了我的工作方式,让我能够直观地"看到"网络流量。记得第一次使用Wireshark分析一个棘手的TCP连接问题时…...
离谱又惊艳!C++隐藏宝藏库numeric_range深度探索,竟藏着JS彩蛋和隐零点
文章目录离谱又惊艳!C隐藏宝藏库numeric_range深度探索,竟藏着JS彩蛋和隐零点一、初遇:以为是青铜,实则是王者二、深挖:废弃方法的“马甲现场”,官方摆烂实锤三、惊现:一整个范围家族࿰…...
jEasyUI 显示海量数据
jEasyUI 显示海量数据 引言 随着互联网技术的飞速发展,大数据时代已经到来。在众多前端框架中,jEasyUI以其简洁、易用、功能强大等特点,受到了广大开发者的喜爱。本文将深入探讨jEasyUI在显示海量数据方面的应用,帮助开发者更好地应对大数据挑战。 jEasyUI简介 jEasyUI…...
3大终极方案解决Amlogic设备U盘启动难题:从故障诊断到系统优化的完整指南
3大终极方案解决Amlogic设备U盘启动难题:从故障诊断到系统优化的完整指南 【免费下载链接】amlogic-s9xxx-armbian Supports running Armbian on Amlogic, Allwinner, and Rockchip devices. Support a311d, s922x, s905x3, s905x2, s912, s905d, s905x, s905w, s90…...
AI 编程 Harness 框架深度拆解(非常详细),6 大框架从入门到精通,收藏这一篇就够了!
AI 会写,不等于 AI 能稳定交付。 前段时间我们都在说 Vibe Coding,大家都知道是氛围编程的意思,但是现在也有叫“直觉编程”。什么叫直觉编程,就是完全不用管其它的,想到什么就做什么,主打一个靠直觉写代码…...
率零测评:AI率83%的文章降完是什么效果
率零(www.0ailv.com)最大的特点是便宜——3.2元/千字,在主流工具里价格最低,还有1000字免费体验。这让很多AI率高的同学把它作为第一选择。 它的实际效果怎么样?这篇文章来说清楚。 测试基本情况 测试论文ÿ…...
计算机毕业设计:Python地铁运营数据多维分析与智能管理平台 Django框架 数据分析 可视化 大数据 机器学习 深度学习(建议收藏)✅
博主介绍:✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立软件开发工作室,专注于计算机相关专业项目实战6年之久,累计开发项目作品上万套。凭借丰富的经验与专业实力,已帮助成千上万的学生顺利毕业,…...
