当前位置: 首页 > article >正文

SpringCloud-Stream + RocketMQ/Kafka

一、核心认知Spring Cloud Stream 是什么解决什么问题1.1 基本定义Spring Cloud Stream 是 Spring 生态下的「消息驱动微服务框架」基于 Spring Boot 构建核心定位是「统一消息中间件接口简化消息驱动开发」。它通过抽象层Binder、Binding封装了不同消息中间件的底层细节提供统一的编程模型让开发者无需修改业务代码就能切换 RocketMQ、Kafka、RabbitMQ 等中间件。官方定位为构建消息驱动的微服务应用提供统一的框架本质是「消息中间件的胶水层」连接微服务与消息中间件降低开发与维护成本。1.2 核心解决的微服务痛点服务强耦合同步调用OpenFeign中上游服务依赖下游服务的可用性一旦下游服务宕机上游服务会受影响异步消息可实现服务解耦上游只负责发消息不关心下游是否消费。中间件切换成本高不同消息中间件RocketMQ、Kafka的 API、配置差异大切换中间件需修改大量代码如将 KafkaTemplate 替换为 RocketMQTemplate。消息处理繁琐手动处理消息的序列化、反序列化、重试、死信队列等代码冗余易出错。流量削峰与最终一致性秒杀、大促等场景下突发流量会压垮核心服务消息队列可缓冲流量分布式事务场景下通过消息回调实现最终一致性。1.3 核心架构3大核心组件必懂Spring Cloud Stream 的架构极简核心是「3层抽象」自上而下分别为业务层、抽象层、中间件层各层职责清晰协同工作业务层开发者编写的业务逻辑消息生产者、消费者通过 Stream 提供的注解EnableBinding、StreamListener 等接入消息通道。抽象层核心层包含 Binder绑定器、Binding绑定、Message消息3个核心组件是 Stream 实现「统一抽象」的关键Binder绑定器最核心组件负责与底层消息中间件RocketMQ、Kafka对接封装了中间件的连接、消息收发逻辑。每个中间件对应一个 Binder如 RocketMQ 对应 RocketMQBinderKafka 对应 KafkaBinder开发者只需引入对应 Binder 依赖即可适配不同中间件。Binding绑定连接业务层与 Binder 的桥梁分为「输入绑定Input Binding」和「输出绑定Output Binding」Output Binding将业务层的消息发送到 Binder再由 Binder 转发到底层消息中间件生产者Input Binding将 Binder 从中间件接收的消息转发到业务层的消费者。Message消息Stream 统一的消息格式包含「消息体Payload」和「消息头Headers」屏蔽了不同中间件的消息格式差异支持 JSON、XML 等多种序列化方式。中间件层底层消息中间件RocketMQ、Kafka负责消息的存储、转发、持久化Stream 不直接操作中间件全部通过 Binder 间接交互。简单类比Spring Cloud Stream 就像「USB 转接头」业务层是手机中间件层是充电器/U盘Binder 是转接头——不管底层是哪种“设备”中间件手机业务层只需通过转接头Binder就能实现统一交互无需关心“设备”的具体型号。1.4 核心特性为什么选择 Spring Cloud Stream中间件无关性一次编码适配多种中间件RocketMQ、Kafka、RabbitMQ切换中间件只需修改依赖和配置无需修改业务代码。简化开发封装了消息的序列化、反序列化、重试、分区、死信队列等常用功能开发者无需手动实现注解即可完成配置。Spring 生态无缝集成完美适配 Spring Boot、Spring Cloud Alibaba、Sentinel、Nacos 等组件支持配置中心动态配置消息规则。可扩展性强支持自定义 Binder、自定义消息转换器、自定义拦截器满足复杂业务场景需求。支持消息分组与分区实现消息的负载均衡、顺序消费适配高并发场景。二、Spring Cloud Stream 核心概念2.1 消息通道MessageChannel消息通道是消息流转的“管道”分为「输入通道InputChannel」和「输出通道OutputChannel」OutputChannel生产者发送消息的通道消息通过该通道传递给 BinderInputChannel消费者接收消息的通道Binder 从中间件接收的消息通过该通道传递给消费者。Stream 提供了默认的通道Sink、Source、Processor也支持自定义通道Sink默认输入通道对应消费者用于接收消息Source默认输出通道对应生产者用于发送消息Processor组合通道同时包含输入和输出通道用于“接收消息→处理消息→转发消息”的场景如消息中转。2.2 绑定器Binder详解Binder 是 Stream 与底层中间件的“连接器”不同中间件对应不同的 Binder 实现2026年企业主流的 Binder 有两种RocketMQ Binder阿里开源适配 RocketMQ适合国内企业、电商、金融等场景支持事务消息、延迟消息等高级特性Kafka BinderSpring 官方实现适配 Kafka适合高吞吐、大数据场景如日志采集、实时计算。Binder 的核心作用连接中间件管理与中间件的连接如 RocketMQ 的 NameServer、Kafka 的 Broker消息转换将 Stream 的统一 Message 格式转换为中间件支持的消息格式如 RocketMQ 的 Message、Kafka 的 ProducerRecord规则执行执行消息的分区、重试、死信等规则无需开发者手动处理。2.3 消息分组Consumer Group消息分组是 Stream 实现「负载均衡」和「消息幂等」的关键核心规则同一个主题Topic下多个消费者属于同一个分组时消息会被均匀分配给组内的消费者负载均衡避免重复消费同一个主题下多个消费者属于不同分组时每个分组都会接收该主题的所有消息广播消费必须为消费者指定分组否则服务重启后会重新消费所有历史消息默认分组为 anonymous不推荐使用。示例订单主题topic-order有3个消费者都属于 group-order 分组此时订单消息会被均匀分配给3个消费者每个消费者处理一部分消息实现负载均衡。2.4 消息分区Partition消息分区用于实现「顺序消费」和「高吞吐」核心逻辑生产者发送消息时根据指定的分区规则如按消息ID哈希、按参数分区将消息发送到主题的不同分区消费者分组内的每个消费者对应主题的一个分区确保同一个分区的消息被同一个消费者顺序消费分区数量越多吞吐能力越强适合高并发场景如秒杀订单消息。注意顺序消费的前提是「消息按分区顺序发送且分区被单个消费者消费」否则无法保证全局顺序。2.5 消息序列化与反序列化Stream 默认使用 JSON 格式进行消息序列化/反序列化也支持自定义序列化方式如 Protobuf、XML。核心配置通过content-type指定消息格式如application/json、application/x-protobuf。示例生产者发送 Java 对象如 OrderDTOStream 自动将其序列化为 JSON 字符串消费者接收 JSON 字符串自动反序列化为 OrderDTO 对象无需手动转换。三、实战一Spring Cloud Stream RocketMQ 整合国内主流RocketMQ 是阿里开源的分布式消息中间件支持事务消息、延迟消息、死信队列等高级特性适配国内企业的业务场景如电商订单、支付回调结合 Spring Cloud Stream 整合开发效率极高。以下是完整实操步骤基于 Spring Cloud Alibaba 2025.1.0 RocketMQ 2.x。3.1 环境准备部署 RocketMQ 服务下载 RocketMQ 最新二进制文件并解压解压后目录包含 bin、conf、lib 等文件夹启动 NameServerLinux/Mac 命令nohup sh bin/mqnamesrv tail -f ~/logs/rocketmqlogs/namesrv.log启动 BrokerLinux/Mac 命令nohup sh bin/mqbroker -n localhost:9876 tail -f ~/logs/rocketmqlogs/broker.logWindows 系统可执行对应 cmd 命令.\bin\mqnamesrv.cmd启动 NameServer .\bin\mqbroker.cmd -n localhost:9876启动 Broker默认端口NameServer 9876Broker 10911。引入依赖Spring Cloud Alibaba 环境!-- Spring Cloud Stream 核心依赖 -- dependency groupIdorg.springframework.cloud/groupId artifactIdspring-cloud-stream/artifactId /dependency !-- Spring Cloud Stream RocketMQ Binder 依赖 -- dependency groupIdcom.alibaba.cloud/groupId artifactIdspring-cloud-starter-stream-rocketmq/artifactId /dependency !-- 消息序列化依赖JSON -- dependency groupIdcom.alibaba/groupId artifactIdfastjson2/artifactId version2.0.32/version /dependency核心配置application.ymlserver: port: 8081 spring: application: name: stream-rocketmq-demo cloud: stream: # 绑定器配置RocketMQ rocketmq: binder: name-server: localhost:9876 # RocketMQ NameServer 地址集群用逗号分隔 # 生产者全局配置 producer: group: stream-rocketmq-producer # 生产者组 retry-times-when-send-failed: 3 # 发送失败重试次数 send-message-timeout: 3000 # 发送超时时间ms # 消费者全局配置 consumer: group: stream-rocketmq-consumer # 消费者组默认分组可在binding中覆盖 broadcast: false # 是否广播消费默认集群消费 # 消息通道绑定Input消费者Output生产者 bindings: # 输出通道生产者名称自定义如 output-channel output-channel: destination: stream-rocketmq-topic # 绑定的 RocketMQ 主题自动创建 content-type: application/json # 消息格式JSON序列化 producer: sync: true # 同步发送默认异步 # 输入通道消费者名称自定义如 input-channel input-channel: destination: stream-rocketmq-topic # 与生产者主题一致才能接收消息 content-type: application/json # 消息格式JSON反序列化 group: stream-rocketmq-consumer # 消费者组必须指定 consumer: max-attempts: 5 # 消费重试次数 concurrency: 3 # 消费线程数并发消费 # 死信队列配置可选 dead-letter-queue: true # 开启死信队列 dead-letter-topic: stream-rocketmq-dlq # 死信主题 dead-letter-group: stream-rocketmq-dlq-group # 死信消费者组3.2 自定义消息通道可选推荐Stream 提供默认的 Sink/Source 通道但实际开发中推荐自定义通道更贴合业务场景如按业务模块划分通道import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义 RocketMQ 消息通道 */ public interface RocketMQStreamChannel { // 输出通道生产者名称与配置文件中 bindings.output-channel 一致 String OUTPUT_CHANNEL output-channel; // 输入通道消费者名称与配置文件中 bindings.input-channel 一致 String INPUT_CHANNEL input-channel; // 输出通道发送消息 Output(OUTPUT_CHANNEL) MessageChannel outputChannel(); // 输入通道接收消息 Input(INPUT_CHANNEL) SubscribableChannel inputChannel(); }3.3 消息生产者实现通过EnableBinding绑定自定义通道注入MessageChannel或StreamBridge发送消息StreamBridge 是 Spring Cloud Stream 3.0 推荐方式更灵活import com.alibaba.fastjson2.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; // 绑定自定义通道若用默认通道可绑定 Source.class EnableBinding(RocketMQStreamChannel.class) Service public class RocketMQProducerService { // 方式1注入 StreamBridge推荐灵活无需绑定固定通道 Autowired private StreamBridge streamBridge; // 方式2注入自定义输出通道 Autowired private RocketMQStreamChannel rocketMQStreamChannel; /** * 发送消息StreamBridge 方式 * param message 消息内容Java对象 */ public void sendMessageByStreamBridge(Object message) { // 构建消息可添加消息头如消息ID、时间戳 MessageString streamMessage MessageBuilder .withPayload(JSON.toJSONString(message)) .setHeader(messageId, System.currentTimeMillis()) .setHeader(timestamp, System.currentTimeMillis()) .build(); // 发送消息参数1通道名称参数2消息 streamBridge.send(RocketMQStreamChannel.OUTPUT_CHANNEL, streamMessage); } /** * 发送消息MessageChannel 方式 * param message 消息内容Java对象 */ public void sendMessageByChannel(Object message) { MessageString streamMessage MessageBuilder .withPayload(JSON.toJSONString(message)) .build(); // 通过自定义通道发送消息 rocketMQStreamChannel.outputChannel().send(streamMessage); } }3.4 消息消费者实现通过StreamListener注解监听输入通道接收消息并处理支持消息重试、死信队列等特性import com.alibaba.fastjson2.JSON; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; // 绑定自定义通道若用默认通道可绑定 Sink.class EnableBinding(RocketMQStreamChannel.class) Service public class RocketMQConsumerService { /** * 监听消息输入通道 * param message 接收的消息Message类型包含消息体和消息头 */ StreamListener(RocketMQStreamChannel.INPUT_CHANNEL) public void receiveMessage(MessageString message) { try { // 获取消息头 String messageId message.getHeaders().get(messageId).toString(); // 获取消息体JSON字符串反序列化为Java对象 String payload message.getPayload(); OrderDTO orderDTO JSON.parseObject(payload, OrderDTO.class); // 业务逻辑处理如订单入库、库存扣减 System.out.println(接收消息messageId messageId 订单信息 orderDTO); } catch (Exception e) { // 消费失败会自动重试重试次数由 max-attempts 配置 // 重试次数耗尽后消息会进入死信队列若开启 throw new RuntimeException(消息消费失败, e); } } }3.5 高级特性实操RocketMQ 专属1事务消息保障分布式事务最终一致性RocketMQ 支持事务消息Stream 整合后可通过Transactional注解实现事务消息发送核心场景订单创建后发送消息扣减库存确保订单创建与库存扣减的一致性。// 生产者添加事务消息发送方法 Transactional public void sendTransactionalMessage(OrderDTO orderDTO) { // 1. 执行本地事务如订单入库 orderMapper.insert(orderDTO); // 2. 发送事务消息 MessageString message MessageBuilder .withPayload(JSON.toJSONString(orderDTO)) .build(); // 事务消息发送参数1通道名称参数2消息参数3本地事务状态回调 streamBridge.send(RocketMQStreamChannel.OUTPUT_CHANNEL, message, Collections.singletonMap(rocketmq.transactional, true)); }2延迟消息定时消息RocketMQ 支持延迟消息可设置消息延迟时间如10秒、1分钟核心场景订单超时未支付自动取消。public void sendDelayMessage(OrderDTO orderDTO, int delayLevel) { // delayLevel延迟级别11秒25秒310秒430秒51分钟... MessageString message MessageBuilder .withPayload(JSON.toJSONString(orderDTO)) .setHeader(rocketmq.delay.level, delayLevel) .build(); streamBridge.send(RocketMQStreamChannel.OUTPUT_CHANNEL, message); }3死信队列DLQ消费者消费消息失败重试次数耗尽后消息会进入死信队列可单独监听死信队列进行异常处理如人工介入配置已在 application.yml 中添加消费者监听死信队列// 死信队列消费者绑定死信主题 StreamListener(dlq-input-channel) // 死信通道需在配置中绑定死信主题 public void receiveDlqMessage(MessageString message) { String payload message.getPayload(); System.out.println(死信消息 payload); // 异常处理逻辑如记录日志、人工重试 }四、实战二Spring Cloud Stream Kafka 整合高吞吐场景Kafka 是 Apache 开源的高吞吐分布式消息中间件适合大数据、日志采集、实时计算等场景结合 Spring Cloud Stream 整合可充分发挥其高吞吐优势。以下是完整实操步骤基于 Spring Cloud 2025.1.x Kafka 3.x。4.1 环境准备部署 Kafka 服务Kafka 依赖 ZooKeeper或 KRaft先启动 ZooKeeper默认端口2181启动 Kafka Broker默认端口9092命令bin/kafka-server-start.sh config/server.propertiesLinux/Mac创建主题可选Stream 会自动创建bin/kafka-topics.sh --create --topic stream-kafka-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1。引入依赖!-- Spring Cloud Stream 核心依赖 -- dependency groupIdorg.springframework.cloud/groupId artifactIdspring-cloud-stream/artifactId /dependency !-- Spring Cloud Stream Kafka Binder 依赖 -- dependency groupIdorg.springframework.cloud/groupId artifactIdspring-cloud-starter-stream-kafka/artifactId /dependency !-- JSON 序列化依赖 -- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId /dependency核心配置application.ymlserver: port: 8082 spring: application: name: stream-kafka-demo cloud: stream: # 绑定器配置Kafka kafka: binder: brokers: localhost:9092 # Kafka Broker 地址集群用逗号分隔 default-broker-port: 9092 # 默认端口 # 消费者全局配置 consumer-properties: auto-offset-reset: earliest # 消费偏移量重置earliest从最早消息开始消费 # 生产者全局配置 producer-properties: acks: 1 # 消息确认机制1Broker 确认接收即可 retries: 3 # 发送失败重试次数 # 消息通道绑定 bindings: # 输出通道生产者 kafka-output-channel: destination: stream-kafka-topic # 绑定的 Kafka 主题 content-type: application/json # 消息格式 producer: partition-count: 3 # 主题分区数 partition-key-expression: headers[partitionKey] # 分区规则按消息头的partitionKey分区 # 输入通道消费者 kafka-input-channel: destination: stream-kafka-topic # 与生产者主题一致 content-type: application/json group: stream-kafka-consumer # 消费者组 consumer: concurrency: 3 # 消费线程数与分区数一致实现顺序消费 max-attempts: 5 # 消费重试次数 # 死信队列配置Kafka 死信基于主题后缀默认是 .DLT enable-dlq: true # 开启死信队列 dlq-name: stream-kafka-topic.DLT # 死信主题默认主题名DLT4.2 自定义消息通道import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义 Kafka 消息通道 */ public interface KafkaStreamChannel { String KAFKA_OUTPUT_CHANNEL kafka-output-channel; String KAFKA_INPUT_CHANNEL kafka-input-channel; Output(KAFKA_OUTPUT_CHANNEL) MessageChannel kafkaOutputChannel(); Input(KAFKA_INPUT_CHANNEL) SubscribableChannel kafkaInputChannel(); }4.3 消息生产者实现import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; EnableBinding(KafkaStreamChannel.class) Service public class KafkaProducerService { Autowired private StreamBridge streamBridge; Autowired private ObjectMapper objectMapper; /** * 发送消息支持分区 * param message 消息内容 * param partitionKey 分区键用于指定消息发送到哪个分区 */ public void sendMessage(Object message, String partitionKey) throws Exception { String payload objectMapper.writeValueAsString(message); MessageString streamMessage MessageBuilder .withPayload(payload) .setHeader(partitionKey, partitionKey) // 分区键与配置中的partition-key-expression对应 .setHeader(messageId, System.currentTimeMillis()) .build(); // 发送消息 streamBridge.send(KafkaStreamChannel.KAFKA_OUTPUT_CHANNEL, streamMessage); } }4.4 消息消费者实现import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; EnableBinding(KafkaStreamChannel.class) Service public class KafkaConsumerService { Autowired private ObjectMapper objectMapper; /** * 监听 Kafka 消息 */ StreamListener(KafkaStreamChannel.KAFKA_INPUT_CHANNEL) public void receiveMessage(MessageString message) { try { String messageId message.getHeaders().get(messageId).toString(); String payload message.getPayload(); OrderDTO orderDTO objectMapper.readValue(payload, OrderDTO.class); // 业务逻辑处理 System.out.println(Kafka 接收消息messageId messageId 订单信息 orderDTO); } catch (Exception e) { throw new RuntimeException(Kafka 消息消费失败, e); } } /** * 监听死信队列消息 */ StreamListener(kafka-input-channel.DLT) // 死信通道与死信主题对应 public void receiveDlqMessage(MessageString message) { String payload message.getPayload(); System.out.println(Kafka 死信消息 payload); } }4.5 高级特性实操Kafka 专属1消息分区高吞吐核心Kafka 的高吞吐依赖分区Stream 中可通过partition-key-expression配置分区规则如按用户ID、订单ID哈希确保同一个用户的消息进入同一个分区实现顺序消费。2消费偏移量管理Kafka 消费者会记录消费偏移量offset用于下次消费时从指定位置开始Stream 支持三种偏移量重置策略earliest若没有偏移量记录从最早消息开始消费latest若没有偏移量记录从最新消息开始消费none若没有偏移量记录抛出异常。3批量消费提升吞吐Kafka 支持批量消费可通过配置开启减少消费次数提升吞吐spring: cloud: stream: kafka: bindings: kafka-input-channel: consumer: batch-mode: true # 开启批量消费 batch-size: 100 # 批量消费大小一次消费100条消息五、Spring Cloud Stream RocketMQ/Kafka 对比与选型RocketMQ 和 Kafka 都是企业主流的消息中间件结合 Stream 整合后各有优势需根据业务场景选型以下是详细对比5.1 核心对比表对比维度Spring Cloud Stream RocketMQSpring Cloud Stream Kafka核心优势1. 支持事务消息、延迟消息、死信队列等高级特性2. 阿里开源国内生态完善适配电商、金融等业务3. 部署简单运维成本低4. 支持广播消费、集群消费1. 高吞吐、高并发适合大数据、日志采集场景2. 分区机制完善吞吐能力远超 RocketMQ3. 开源社区活跃生态成熟4. 支持批量消费、流处理性能表现中高吞吐单 Broker 可支撑 10万 QPS延迟低毫秒级高吞吐单 Broker 可支撑 100万 QPS延迟低毫秒级适合海量数据场景适用场景国内企业、电商订单、支付回调、分布式事务、延迟任务如订单取消大数据、日志采集、实时计算、高吞吐场景如秒杀消息、用户行为日志生态适配完美适配 Spring Cloud Alibaba、Nacos、Sentinel 等国内生态组件适配 Spring 官方生态适合与 Spark、Flink 等大数据组件集成学习成本低API 简洁中文文档丰富国内案例多中需理解分区、偏移量、副本等概念适合有大数据基础的开发者运维成本低部署简单无需依赖 ZooKeeper新版本支持 KRaft中需部署 ZooKeeper或 KRaft集群运维复杂5.2 选型建议优先选 RocketMQ国内企业、电商、金融等业务场景需要事务消息、延迟消息、死信队列且希望部署运维简单、生态适配国内组件如 Nacos、Sentinel。优先选 Kafka大数据、日志采集、实时计算场景需要高吞吐、海量消息处理且需与 Spark、Flink 等大数据组件集成。灵活切换若不确定未来是否切换中间件建议基于 Spring Cloud Stream 开发后续切换时只需修改依赖和配置无需修改业务代码。六、实战避坑指南6.1 常见坑点及解决方案坑点1消费者无法接收消息解决方案检查生产者和消费者的destination主题是否一致检查消费者是否指定了group未指定会使用匿名分组易导致消息丢失检查中间件服务是否正常RocketMQ NameServer/Broker、Kafka Broker检查消息序列化/反序列化格式是否一致如生产者用 fastjson消费者用 jackson。坑点2消息重复消费解决方案实现消息幂等性如基于消息ID去重、数据库唯一约束确保消费者分组配置正确同一个分组的消费者不会重复消费Kafka 消费者需确保auto-offset-reset配置合理避免重复消费历史消息。坑点3事务消息发送失败解决方案确保 RocketMQ 版本支持事务消息2.x 及以上生产者方法需添加Transactional注解确保本地事务与消息发送的一致性检查事务消息回调逻辑是否正确避免本地事务执行成功但消息发送失败。坑点4高并发场景下消息堆积解决方案增加主题分区数Kafka/RocketMQ提高并发处理能力增加消费者线程数concurrency与分区数保持一致开启 Kafka 批量消费减少消费次数优化消费者业务逻辑减少处理耗时。坑点5切换中间件时代码报错解决方案确保移除原中间件的 Binder 依赖引入新中间件的 Binder 依赖修改配置文件中的 Binder 相关配置如 RocketMQ 的 name-server 改为 Kafka 的 brokers避免使用中间件专属的 API如 RocketMQ 的延迟消息头尽量使用 Stream 统一 API。6.2 生产环境最佳实践消息幂等性所有消费者必须实现幂等性避免重复消费导致业务异常如重复扣减库存死信队列必须开启死信队列对消费失败的消息进行单独处理避免消息丢失分区配置高并发场景下合理设置分区数建议与消费者线程数一致实现负载均衡和顺序消费监控告警集成 Prometheus Grafana监控消息生产/消费速率、消息堆积量、消费失败率及时告警配置持久化将 Stream 相关配置如主题、分组、重试次数存入 Nacos 配置中心支持动态调整无需重启服务版本选择使用稳定版本如 Spring Cloud Stream 4.x、RocketMQ 2.x、Kafka 3.x避免使用测试版本确保兼容性。核心总结Spring Cloud Stream 不是消息中间件而是「消息中间件的统一抽象层」它让开发者从复杂的中间件 API 中解放出来专注业务逻辑RocketMQ 适合国内业务场景提供丰富的高级特性Kafka 适合高吞吐、大数据场景性能卓越。在微服务架构中消息驱动是实现服务解耦、流量削峰、最终一致性的关键而 Spring Cloud Stream RocketMQ/Kafka 的组合是 2026 年企业实战的首选方案。建议结合实际业务场景选择合适的中间件通过实战不断积累经验才能真正发挥消息驱动的价值。

相关文章:

SpringCloud-Stream + RocketMQ/Kafka

一、核心认知:Spring Cloud Stream 是什么?解决什么问题?1.1 基本定义Spring Cloud Stream 是 Spring 生态下的「消息驱动微服务框架」,基于 Spring Boot 构建,核心定位是「统一消息中间件接口,简化消息驱动…...

绵阳高新区小学晚托自习

在绵阳石桥铺,孩子在家写作业拖拉磨蹭、坐不住,手机干扰不断等问题让家长们头疼不已。而分小全AI智能学习体验中心旗下的分小全智习室,正是解决这些问题的专业之选。督学老师资质分小全智习室的督学老师均具备师范类或教育学专业背景&#xf…...

别再踩坑了!SQL Server数据类型那点事儿,看懂这篇少背三个锅竟

从0构建WAV文件:读懂计算机文件的本质 虽然接触计算机有一段时间了,但是我的视野一直局限于一个较小的范围之内,往往只能看到于算法竞赛相关的内容,计算机各种文件在我看来十分复杂,认为构建他们并能达到目的是一件困难…...

P4561 [JXOI2018] 排序问题

题意 有一个序列,现在要在结尾加上 mmm 个 [l,r][l,r][l,r] 之间的数,求在所有方案中,猴子排序(每次随机一个排列,检查是否有序)的次数期望最大次数。 思路 假设最终的序列中数 iii 出现的次数是 cic_ici​…...

免疫治疗新视角:CD47 (分化簇47) 信号通路机制与药物研发技术综述

在生物制药与免疫学领域,CD47 (分化簇47) 作为连接先天免疫与适应性免疫的关键节点,近年来备受关注。作为一种广泛表达的跨膜糖蛋白,它通过复杂的信号轴调控免疫细胞的吞噬行为。本文将深入剖析CD47的作用机制、当前药物研发的临床进展以及未…...

linux文件,IO,缓存,动\静函数库

1.文件IO与标准IO的区别文件IO:直接调用内核提供的系统调用函数,头文件是unistd.h标准IO:间接调用系统调用函数,头文件是stdio.h缓存的概念1.程序的缓存就是用户空间的缓存。2.每打开一个文件,在内核中开辟一个缓存即为…...

【Java】通过Mybatis Plus自带的方式,实现公共字段自动填充。

通过Mybatis Plus自带的方式,实现公共字段自动填充。 第一步,创建一个公共字段类,加上对应注解。 Data public class BaseEntity implements Serializable {Serialprivate static final long serialVersionUID 1L;TableField(value "c…...

《道德经》第九章

本章以持而盈之功成身退为核心,是道家保身、持满、长久的警示章。老子用“持盈、揣锐、富贵而骄”三组世俗常见困境,指出过刚则折、过满则溢、过骄则亡的天道规律,最终以“功成身退,天之道”点破最高处世心法,是全书最…...

设置echarts 图例为长方形

在 ECharts 中,要将图例(legend)的 标记(icon) 设置为 长方形(矩形),可以通过 legend 配置项中的 icon 属性来实现。✅ 方法:使用 icon: rect ECharts 内置了多种图例标记…...

系统设计面试通关秘籍:从场景分析到微服务拆分的核心思路

系统设计面试通关秘籍:从场景分析到微服务拆分的核心思路一、Scenario场景分析:打好系统设计的基础牌🔍 先定功能:抓核心,舍冗余📊 再估流量:从MAU到QPS,做有依据的推算⚙️ 流量决定…...

OpenClaw自动化测试实践:gemma-3-12b-it驱动Python脚本批量执行

OpenClaw自动化测试实践:gemma-3-12b-it驱动Python脚本批量执行 1. 为什么选择OpenClawgemma做测试自动化? 上个月重构一个爬虫项目时,我遇到了测试脚本管理的噩梦——每次修改核心逻辑后,都需要手动执行十几个测试用例&#xf…...

【51 单片机入门到进阶】08 入门:51单片机定时器0/1使用详解

1,定时器中断核心基础总览 定时器中断:定时器计数溢出时,硬件自动触发中断,CPU 暂停主程序执行中断服务函数,是单片机定时控制、延时、周期任务的核心方式。中断名称中断号入口地址核心控制寄存器中断标志定时器 0 中断…...

stock-sdk-mcp 的实践整理绰

一、什么是urllib3? urllib3 是一个用于处理 HTTP 请求和连接池的强大、用户友好的 Python 库。 它可以帮助你: 发送各种 HTTP 请求(GET, POST, PUT, DELETE等)。 管理连接池,提高网络请求效率。 处理重试和重定向。 支…...

Programmable-Air开源气动控制库底层驱动解析

1. Programmable-Air 开源控制库深度解析:面向嵌入式工程师的底层驱动实践指南Programmable-Air 是一款基于 Crowdfunding 平台 CrowdSupply 成功孵化的开源气动控制硬件平台,其核心价值在于将传统工业级气动执行器(泵、阀、压力传感器&#…...

千问3.5-9B+OpenClaw成本对比:自建模型VS商业API

千问3.5-9BOpenClaw成本对比:自建模型VS商业API 1. 为什么需要关注OpenClaw的token消耗 去年冬天,当我第一次用OpenClaw自动整理全年会议纪要时,看着控制台不断刷新的token消耗记录,手指不自觉地敲起了桌子——这个看似简单的任…...

FreakStudio泄

环境安装 pip install keystone-engine capstone unicorn 这3个工具用法极其简单,下面通过示例来演示其用法。 Keystone 示例 from keystone import * CODE b"INC ECX; ADD EDX, ECX" try:ks Ks(KS_ARCH_X86, KS_MODE_64)encoding, count ks.asm(CODE)…...

JavaScript中BigInt与Number类型混用的报错机制

JavaScript中BigInt与Number不能直接混合运算&#xff0c;会立即抛出TypeError&#xff1b;所有涉及两者混合的算术和关系操作&#xff08;如1n1、10n<5&#xff09;均报错&#xff0c;仅和不报错但返回false。JavaScript中BigInt与Number不能直接混合运算&#xff0c;会立即…...

ESP居然能当 DNS 服务器用?内含NCSI欺骗和DNS劫持实现罢

前言 Kubernetes 本身并不复杂&#xff0c;是我们把它搞复杂的。无论是刻意为之还是那种虽然出于好意却将优雅的原语堆砌成 鲁布戈德堡机械 的狂热。平台最初提供的 ReplicaSets、Services、ConfigMaps&#xff0c;这些基础组件简单直接&#xff0c;甚至显得有些枯燥。但后来我…...

告别格式烦恼:如何用Chrome扩展一键转换网页图片格式?

告别格式烦恼&#xff1a;如何用Chrome扩展一键转换网页图片格式&#xff1f; 【免费下载链接】Save-Image-as-Type Save Image as Type is an chrome extension which add Save as PNG / JPG / WebP to the context menu of image. 项目地址: https://gitcode.com/gh_mirror…...

毕业设计实战:基于Java+MySQL的C2C商务网站设计与实现指南

毕业设计实战&#xff1a;基于JavaMySQL的C2C商务网站设计与实现指南 在开发“基于JavaMySQL的C2C商务网站”毕业设计时&#xff0c;我曾因商品订单表未通过用户ID、商品ID与收货地址ID三外键关联踩过关键坑——初期设计订单表时&#xff0c;仅记录了订单号、总价、下单时间等基…...

Python编程第09课:Python列表(List)操作完全手册

前言&#xff1a;Python最常用的数据结构 列表是Python中最常用、最灵活的数据结构。它就像一个容器&#xff0c;可以存储任意类型的元素&#xff0c;并且可以随时添加、删除或修改元素。无论是处理数据、实现算法还是构建应用程序&#xff0c;列表都是你离不开的工具。 本课程…...

OpenClaw模型量化指南:压缩Qwen2.5-VL-7B提升本地运行效率

OpenClaw模型量化指南&#xff1a;压缩Qwen2.5-VL-7B提升本地运行效率 1. 为什么需要量化多模态大模型 当我第一次在本地MacBook Pro上尝试运行Qwen2.5-VL-7B时&#xff0c;风扇立刻开始狂转&#xff0c;16GB内存几乎被吃满&#xff0c;模型加载就花了近3分钟。这种体验让我意…...

OpenClaw调试技巧大全:Qwen3-14b_int4_awq任务失败排查指南

OpenClaw调试技巧大全&#xff1a;Qwen3-14b_int4_awq任务失败排查指南 1. 为什么我们需要系统化的调试方法 上周我在尝试用OpenClaw自动整理项目文档时&#xff0c;遇到了一个诡异的问题&#xff1a;任务执行到一半突然卡住&#xff0c;既没有报错也没有继续执行。花了整整三…...

一款基于 .NET 开源、跨平台应用程序自动升级组件阅

基础示例&#xff1a;单工作表 Excel 转 TXT 以下是将一个 Excel 文件中的第一个工作表转换为 TXT 的完整步骤&#xff1a; 1. 加载并读取Excel文件 from spire.xls import * from spire.xls.common import * workbook Workbook() workbook.LoadFromFile("示例.xlsx"…...

Docker 容器中运行 AI CLI 工具:用户隔离与持久化卷实战指南撂

环境安装 pip install keystone-engine capstone unicorn 这3个工具用法极其简单&#xff0c;下面通过示例来演示其用法。 Keystone 示例 from keystone import * CODE b"INC ECX; ADD EDX, ECX" try:ks Ks(KS_ARCH_X86, KS_MODE_64)encoding, count ks.asm(CODE)…...

软件人员可以关注的 Skill,亲测确实不错,值得试一下

Agent Skill 是一套标准化的能力封装&#xff0c;它将复杂的、需要多步处理和工具调用的任务&#xff0c;打包成一个可以直接使用的“技能包”。推荐一些在实际工作中表现不错的 Skill&#xff1a; 1. frontend-design 介绍&#xff1a;打造具有高设计质量的独特生产级前端界面…...

和AI一起搞事情#:边剥龙虾边做个中医技能来起号冠

1. 核心概念 在 Antigravity 中&#xff0c;技能系统分为两层&#xff1a; Skills (全局库)&#xff1a;实际的代码、脚本和指南&#xff0c;存储在系统级目录&#xff08;如 ~/.gemini/antigravity/skills&#xff09;。它们是“能力”的本体。 Workflows (项目级)&#xff1a…...

Blazor组件化演进终极指南:2026年必须掌握的5大架构范式与3种反模式规避清单

第一章&#xff1a;Blazor组件化演进的底层动因与2026技术坐标系Blazor 的组件化并非单纯语法糖的迭代&#xff0c;而是对 Web 前端架构范式、.NET 生态边界以及现代云原生交付链路三重压力下的系统性响应。其底层动因根植于三个不可逆趋势&#xff1a;WebAssembly 运行时成熟度…...

2026年AI热点:阿里新模型领跑行业

今日AI热点汇总&#xff08;2026年4月9日&#xff09; 一、阿里发布新模型&#xff0c;性能大幅提升 今天&#xff0c;阿里巴巴重磅推出了全新的通义千问模型&#xff0c;这个新模型在语言理解、逻辑推理和代码生成等方面都有了显著提升。 更强的理解能力&#xff1a;能更准确地…...

普通人也能轻松掌握!5个技巧让你玩转AI大模型,从入门到精通的实用指南!

随着ChatGPT、文心一言、通义千问等AI大模型的普及&#xff0c;很多人都在使用AI大模型&#xff0c;普通人学习时往往陷入“指令说不清楚、输出不符合预期”的困境。其实&#xff0c;学会AI大模型的核心&#xff0c;不在于掌握技术原理&#xff0c;而在于掌握“与大模型高效沟通…...