当前位置: 首页 > news >正文

ActiveMQ、RabbitMQ 和 Kafka 在 Spring Boot 中的实战

在现代的微服务架构和分布式系统中,消息队列 是一种常见的异步通信工具。消息队列允许应用程序之间通过 生产者-消费者模型 进行松耦合、异步交互。在 Spring Boot 中,我们可以通过简单的配置来集成不同的消息队列系统,包括 ActiveMQRabbitMQKafka。本文将重点介绍它们的实战案例及使用时需要注意的地方。
在这里插入图片描述

一、Spring Boot 集成 ActiveMQ

1. ActiveMQ 概述

ActiveMQ 是一个开源、支持 JMS(Java Message Service)的消息中间件。它支持点对点(Queue)和发布/订阅(Topic)模式,是 Spring Boot 常用的消息队列之一。

2. ActiveMQ 实战:生产者和消费者

依赖配置

pom.xml 中添加 ActiveMQ 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置 ActiveMQ 连接

application.properties 中配置 ActiveMQ 的连接地址:

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
生产者代码示例
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;@Component
public class ActiveMQProducer {private final JmsTemplate jmsTemplate;public ActiveMQProducer(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public void sendMessage(String queueName, String message) {jmsTemplate.convertAndSend(queueName, message);System.out.println("Message sent: " + message);}
}
消费者代码示例
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class ActiveMQConsumer {@JmsListener(destination = "testQueue")public void receiveMessage(String message) {System.out.println("Message received: " + message);}
}

3. 注意事项

  1. JMS 模式的选择:ActiveMQ 支持 点对点发布/订阅 两种模式。要根据场景选择合适的模式,比如订单处理适合点对点模式,而系统通知适合发布/订阅。
  2. 消息持久化:确保配置了持久化存储,尤其是当队列中消息量很大时,ActiveMQ 默认使用 KahaDB 存储,建议对其进行优化。

二、Spring Boot 集成 RabbitMQ

1. RabbitMQ 概述

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息代理,广泛应用于微服务系统。RabbitMQ 提供了更复杂的消息路由功能,例如 交换机(Exchange)和 绑定(Binding)。

2. RabbitMQ 实战:生产者和消费者

依赖配置

pom.xml 中添加 RabbitMQ 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置 RabbitMQ 连接

application.properties 中配置 RabbitMQ 的连接地址:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
生产者代码示例
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class RabbitMQProducer {private final RabbitTemplate rabbitTemplate;public RabbitMQProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);System.out.println("Message sent: " + message);}
}
消费者代码示例
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMQConsumer {@RabbitListener(queues = "testQueue")public void receiveMessage(String message) {System.out.println("Message received: " + message);}
}

3. 注意事项

  1. 交换机和队列的绑定:RabbitMQ 提供了丰富的交换机类型,如 Direct、FanoutTopic。选择合适的交换机类型非常关键,例如 Direct 适合路由到特定队列,而 Fanout 适合广播消息到所有绑定队列。
  2. 消息确认机制:RabbitMQ 支持消息的 手动确认,确保消费者已经正确处理了消息,避免消息丢失。

三、Spring Boot 集成 Kafka

1. Kafka 概述

Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,用于 实时数据流处理。与 ActiveMQ 和 RabbitMQ 不同,Kafka 主要用于处理 大规模的、持续的数据流,例如日志采集、消息传递等。

2. Kafka 实战:生产者和消费者

依赖配置

pom.xml 中添加 Kafka 的依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka 连接

application.properties 中配置 Kafka 的连接地址:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
生产者代码示例
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent to topic " + topic + ": " + message);}
}
消费者代码示例
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "testTopic", groupId = "my-group")public void receiveMessage(String message) {System.out.println("Message received: " + message);}
}

3. 注意事项

  1. 分区与副本机制:Kafka 的分区机制允许数据被并行处理,提升吞吐量。合理规划 分区数副本数,可以提高数据的可靠性和吞吐量。
  2. 消费偏移管理:Kafka 消费者需要管理消费偏移(offset),确保在重启或发生故障时,能够从上次的位置继续消费。Spring Boot 提供了自动和手动管理偏移的选项,建议根据需求选择合适的策略。

四、丢消息的处理方案

在使用消息队列时,丢消息是一个常见的问题,通常发生在以下场景:

  1. 生产者发送消息失败:消息未能成功送到队列。
  2. 消息未持久化:队列宕机导致消息丢失。
  3. 消费者处理消息失败:消费者在处理消息时出错,未能确认消息。

1. 生产者发送失败的处理

在生产者发送消息时,可能会由于网络问题或队列不可用,导致消息未能成功发送。这时需要确保生产者具备 重试机制失败回调,保证消息最终能到达队列。

重试机制示例 (以 Kafka 为例):
kafkaTemplate.send(topic, message).addCallback(success -> System.out.println("Message sent successfully: " + message),failure -> {System.err.println("Message failed to send: " + message);// 可以在此进行重试逻辑或存储消息到数据库,后续处理}
);

注意事项

  • 重试机制:生产者可以通过配置重试策略,例如在 Kafka 中通过 retries 属性配置发送失败后的重试次数。
  • 备份存储:对于无法发送的消息,可以选择将其保存到数据库或日志文件中,以便后续重新发送。

2. 消息未持久化的处理

大多数消息队列(如 ActiveMQ、RabbitMQ、Kafka)都提供了 消息持久化 的功能。在配置消息队列时,必须确保消息被持久化存储在磁盘上,防止消息在队列宕机时丢失。

ActiveMQ 持久化配置示例:
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=false
RabbitMQ 持久化配置示例:
@Bean
public Queue durableQueue() {return new Queue("testQueue", true); // 设置队列为持久化
}

注意事项

  • 消息的持久化:确保生产者发送的消息和队列都是持久化的,尤其是在高可靠性系统中。
  • 集群化部署:对于 RabbitMQ 和 Kafka 等分布式消息系统,建议使用集群部署来提高可用性,防止单点故障。

3. 消费者处理失败的处理

在消费者从队列接收到消息后,如果发生处理失败,需要有相应的机制确保消息不会丢失。最常用的策略是 手动确认 消息和 消息重试

RabbitMQ 消费者手动确认:
@RabbitListener(queues = "testQueue")
public void receiveMessage(Message message, Channel channel) throws IOException {try {// 处理消息逻辑processMessage(message);// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息,消息可以重新入队或丢弃channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

注意事项

  • 手动确认机制:确保消费者在处理完消息后才确认消费成功。如果处理失败,可以拒绝消息并重新入队,防止消息丢失。
  • 死信队列(DLQ):如果消息经过多次重试仍然无法成功处理,可以将其发送到死信队列,进行人工检查或报警。

五、分布式环境下的消息处理

在分布式环境中,消息队列扮演着关键的角色。消息的 可靠投递顺序保证幂等性处理 是分布式系统中消息处理的核心问题。

1. 消息的可靠投递

在分布式系统中,网络延迟、节点宕机等问题会影响消息的可靠投递,常见的解决方案有以下几点:

  • 消息确认机制:如 Kafka 中的 acks=all 确保消息被所有副本写入成功后,生产者才会认为消息发送成功。

    spring.kafka.producer.acks=all
    
  • 消息重试和补偿机制:当网络分区或队列不可用时,生产者和消费者都应具备 重试机制。此外,当消息经过多次重试后仍然失败,通常会选择通过 补偿机制(如重新发送、人工干预)来处理。

2. 顺序保证

在某些业务场景下,消息的处理顺序非常关键。例如,订单的创建、支付和发货步骤必须按照顺序进行处理。在分布式环境中保证消息的顺序处理可以通过以下方法:

  • 单分区队列:确保消息按顺序发送到同一个分区,这样可以保证消息的顺序性。例如在 Kafka 中,可以通过配置相同的 partition key 来保证顺序。

    kafkaTemplate.send(topic, key, message);
    
  • 消息的排序机制:如果不能使用单分区,可以通过在消息中附加时间戳或序列号,在消费者侧进行排序处理。

3. 消息的幂等性

在分布式系统中,由于网络抖动或超时,消息可能会被 重复消费。为了避免重复处理消息,消费者需要实现 幂等性,即对相同消息的多次处理只产生一次效果。

  • 消息 ID 去重:使用消息的唯一 ID 或业务主键来判断消息是否已经处理过。例如,可以使用数据库或缓存(如 Redis)存储已经处理过的消息 ID。

    if (!redisTemplate.hasKey(messageId)) {// 处理消息processMessage(message);// 将消息ID存入Redis,标记为已处理redisTemplate.opsForValue().set(messageId, "processed");
    }
    
  • 分布式事务:对于某些场景,可能需要使用 分布式事务 保证消息处理的一致性,确保生产者和消费者的操作要么全部成功,要么全部失败。可以使用 Kafka 的事务 APIRabbitMQ 的 Confirm 模式 实现。

4. 分布式消息队列架构中的常见问题

  1. 网络分区:在分布式系统中,网络分区是不可避免的。消息队列的设计要考虑如何处理网络分区导致的消息延迟或丢失。Kafka 提供了 副本机制 来处理这种情况,而 RabbitMQ 通过 集群模式 提高可靠性。

  2. 消息堆积:在高并发情况下,生产者可能会产生大量的消息,如果消费者处理能力不足,会导致消息堆积。解决这个问题的关键在于 合理的扩展 消费者数量,同时可以使用 流控机制 限制消息的生产速度。


总结

在 Spring Boot 框架下使用 ActiveMQ、RabbitMQ 和 Kafka 进行消息处理时,开发者需要重点关注 丢消息的处理、顺序保证、幂等性分布式环境中的可靠性问题。通过合理配置消息的持久化、确认机制和集群部署,我们可以大大提高系统的稳定性和可靠性。

  1. 丢消息的处理 依赖于生产者和消费者的 重试机制手动确认 以及 持久化配置
  2. 分布式环境下的消息处理 需要考虑 顺序性幂等性,同时应对网络分区和系统扩展等问题。

通过这些策略,消息队列在分布式架构中可以更加高效可靠地运作。

相关文章:

ActiveMQ、RabbitMQ 和 Kafka 在 Spring Boot 中的实战

在现代的微服务架构和分布式系统中&#xff0c;消息队列 是一种常见的异步通信工具。消息队列允许应用程序之间通过 生产者-消费者模型 进行松耦合、异步交互。在 Spring Boot 中&#xff0c;我们可以通过简单的配置来集成不同的消息队列系统&#xff0c;包括 ActiveMQ、Rabbit…...

火语言RPA流程组件介绍--获取关联元素

&#x1f6a9;【组件功能】&#xff1a;获取指定元素的父元素、子元素、相邻元素等关联信息 配置预览 配置说明 目标元素 支持T或# 默认FLOW输入项 通过自动捕获工具捕获(选择元素工具使用方法)或手动填写网页元素的css,xpath&#xff0c;指定对应网页元素作为操作目标 关联…...

【2024研赛】【华为杯E题】2024 年研究生数学建模比赛思路、代码、论文助攻

思路将在名片下群聊分享 高速公路应急车道紧急启用模型 高速公路拥堵现象的原因众多&#xff0c;除了交通事故外&#xff0c;最典型的就是部分路段出现瓶颈现象&#xff0c;主要原因是车辆汇聚&#xff0c;而拥堵后又容易蔓延。高速公路一些特定的路段容易形成堵点&#xff0…...

Linux——K8s集群部署过程

&#xff11;、环境准备 &#xff08;1&#xff09;配置好网络ip和主机名 control: node1: node2: 配置ip 主机名的过程省略 配置一个简单的基于hosts文件的名称解析 [rootnode1 ~]# vim /etc/hosts // 文件中新增以下三行 192.168.110.10 control 192.168.110.11 node1 1…...

二.Unity中使用虚拟摇杆来控制角色移动

上一篇中我们完成了不借助第三方插件实现手游的虚拟摇杆&#xff0c;现在借助这个虚拟摇杆来实现控制角色的移动。 虚拟摇杆实际上就给角色输出方向&#xff0c;类似于键盘的WSAD&#xff0c;也是一个二维坐标&#xff0c;也就是(-1,1)的范围&#xff0c;将摇杆的方向进行归一化…...

基于SpringBoot的旅游管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 近年来&#xff0c;随着社会经济的快速发展和人民生活水平的显著提高&#xff0c;旅游已成为人们休闲娱乐、增长见识的重要方式。国家积极倡导“全民旅游”&#xff0c;鼓励民众利用节假日外出旅行&#xff0c;探索各地自然与人…...

Linux套接字

目录标题 套接字套接字的基本概念套接字的功能与分类套接字的使用流程套接字的应用场景总结套接字在不同操作系统中的实现差异有哪些&#xff1f;如何优化套接字编程以提高网络通信的效率和安全性&#xff1f;原始套接字&#xff08;SOCK_RAW&#xff09;的具体应用场景和使用示…...

软件测试面试题(5)——二面(游戏测试)

没想到测试题做完等了会儿就安排面试了&#xff0c;还以为自己会直接挂在测试题&#xff0c;这次面试很刺激。测试题总体来说不算太难&#xff0c;主要是实操写Bug那里真没经历过&#xff0c;所以写的很混乱。 我复盘一下这次面试的问题&#xff0c;这次面试是有两个面试官&…...

C#基于SkiaSharp实现印章管理(8)

上一章虽然增加了按路径绘制文本&#xff0c;支持按矩形、圆形、椭圆等路径&#xff0c;但测试时发现通过调整尺寸、偏移量等方式不是很好控制文本的位置。相对而言&#xff0c;使用弧线路径&#xff0c;通过弧线起始角度及弧线角度控制文本位置更简单。同时基于路径绘制文本时…...

信通院发布首个《大模型媒体生产与处理》标准,阿里云智能媒体服务作为业界首家“卓越级”通过

中国信通院近期正式发布《大模型驱动的媒体生产与处理》标准&#xff0c;阿里云智能媒体服务&#xff0c;以“首批首家”通过卓越级评估&#xff0c;并在9大模块50余项测评中表现为“满分”。 当下&#xff0c;AI大模型的快速发展带动了爆发式的海量AI运用&#xff0c;这其中&a…...

AI学习指南深度学习篇-Adam的Python实践

AI学习指南深度学习篇-Adam的Python实践 在深度学习领域&#xff0c;优化算法是影响模型性能的关键因素之一。Adam&#xff08;Adaptive Moment Estimation&#xff09;是一种广泛使用的优化算法&#xff0c;因其在多种问题上均表现优异而被广泛使用。本文将深入探讨Adam优化器…...

08_React redux

React redux 一、理解1、学习文档2、redux 是什么吗3、什么情况下需要使用 redux4、redux 工作流程5、react-redux 模型图 二、redux 的三个核心概念1、action2、reducer3、store 三、redux 的核心 API1、getState()2、dispatch() 四、使用 redux 编写应用1、求和案例\_redux 精…...

2024华为杯研究生数学建模竞赛(研赛)选题建议+初步分析

难度&#xff1a;DE<C<F&#xff0c;开放度&#xff1a;CDE>F。 华为专项的题目&#xff08;A、B题&#xff09;暂不进行选题分析&#xff0c;不太建议大多数同学选择&#xff0c;对自己专业技能有很大自信的可以选择华为专项的题目。后续会直接更新A、B题思路&#…...

001.从0开始实现线性回归(pytorch)

000动手从0实现线性回归 0. 背景介绍 我们构造一个简单的人工训练数据集&#xff0c;它可以使我们能够直观比较学到的参数和真实的模型参数的区别。 设训练数据集样本数为1000&#xff0c;输入个数&#xff08;特征数&#xff09;为2。给定随机生成的批量样本特征 X∈R10002 …...

Relations Prediction for Knowledge Graph Completion using Large Language Models

文章目录 题目摘要简介相关工作方法论实验结论局限性未来工作 题目 使用大型语言模型进行知识图谱补全的关系预测 论文地址&#xff1a;https://arxiv.org/pdf/2405.02738 项目地址&#xff1a; https://github.com/yao8839836/kg-llm 摘要 知识图谱已被广泛用于以结构化格式表…...

2024年中国研究生数学建模竞赛D题思路代码分析——大数据驱动的地理综合问题

地理系统是自然、人文多要素综合作用的复杂巨系统[1-2]&#xff0c;地理学家常用地理综合的方式对地理系统进行主导特征的表达[3]。如以三大阶梯概括中国的地形特征&#xff0c;以秦岭—淮河一线和其它地理区划的方式揭示中国气温、降水、植被、土壤及生态环境在水平和垂直方向…...

全国31省对外开放程度、经济发展水平、政府干预程度指标数据(2000-2022年)

旨在分析2000-2022年间中国31个省份的对外开放程度、经济发展水平和政府干预程度&#xff0c;探讨其背后的动因与影响。 2000年-2022年 全国31省对外开放程度、经济发展水平、政府干预程度指标数据https://download.csdn.net/download/2401_84585615/89478612 数据概览 对外…...

计算机网络传输层---课后综合题

线路&#xff1a;TCP报文下放到物理层传输。 TCP报文段中&#xff0c;“序号”长度为32bit&#xff0c;为了让序列号不会循环&#xff0c;则最多能传输2^32B的数据&#xff0c;则最多能传输&#xff1a;2^32/1500B个报文 结果&#xff1a; 吞吐率一个周期内传输的数据/周期时间…...

【homebrew安装】踩坑爬坑教程

homebrew官网&#xff0c;有安装教程提示&#xff0c;但是在实际安装时&#xff0c;由于待下载的包的尺寸过大&#xff0c;本地git缓存尺寸、超时时间的限制&#xff0c;会报如下错误&#xff1a; error: RPC failed; curl 92 HTTP/2 stream 5 was not closed cleanly&#xf…...

反游戏学(Reludology):概念、历史、现状与展望?(豆包AI版)

李升伟 以下是关于“反游戏学&#xff08;Reludology&#xff09;&#xff1a;概念、历史、现状与展望”的综述&#xff1a; 一、概念 反游戏学&#xff08;Reludology&#xff09;是一个相对较新且不太常见的概念&#xff0c;目前尚未有统一明确的定义。一般来说&#xf…...

【C/C++语言系列】实现单例模式

1.单例模式概念 定义&#xff1a;单例模式是一种常见的设计模式&#xff0c;它可以保证系统中一个类只有一个实例&#xff0c;而且该实例易于外界访问&#xff08;一个类一个对象&#xff0c;共享这个对象&#xff09;。 条件&#xff1a; 只有1个对象易于外界访问共享这个对…...

A. Make All Equal

time limit per test 1 second memory limit per test 256 megabytes You are given a cyclic array a1,a2,…,ana1,a2,…,an. You can perform the following operation on aa at most n−1n−1 times: Let mm be the current size of aa, you can choose any two adjac…...

业务安全治理

业务安全治理 1.账号安全撞库账户盗用 2.爬虫与反爬虫3.API网关防护4.钓鱼与反制钓鱼发现钓鱼处置 5.大数据风控风控介绍 1.账号安全 撞库 撞库分为垂直撞库和水平撞库两种&#xff0c;垂直撞库是对一个账号使用多个不同的密码进行尝试&#xff0c;可以理解为暴力破解&#x…...

HelpLook VS GitBook,在线文档管理工具对比

在线文档管理工具在当今时代非常重要。随着数字化时代的到来&#xff0c;人们越来越依赖于电子文档来存储、共享和管理信息。无论是与团队合作还是与客户分享&#xff0c;人们都可以轻松地共享文档链接或通过设置权限来控制访问。在线文档管理工具的出现大大提高了工作效率和协…...

docker面经

docker面经在线链接 docker面经在线链接&#x1f517;&#xff1a; (https://h03yz7idw7.feishu.cn/wiki/N3CVwO3kMifLypkJqnic9wNynKh)...

Python 中的 Kombu 类库

Kombu 是一个用于 Python 的消息队列库&#xff0c;提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一&#xff0c;但也可以单独使用。Kombu 支持多种消息代理&#xff08;如 RabbitMQ、Redis、Amazon SQS 等&#xff09;&#xff0c;并提供了消息生产者和消费者的功…...

safepoint是什么?有什么用?

在JVM中&#xff0c;safepoint&#xff08;安全点&#xff09;是一个非常重要的概念&#xff0c;特别是在垃圾回收&#xff08;GC&#xff09;和其他需要暂停所有应用线程的操作中。 什么是safepoint Safepoint是JVM执行过程中一个特定的位置&#xff0c;在这个位置上&#x…...

axios相关知识点

一、基本概念 1、基于Promise:Axios通过Promise实现异步请求&#xff0c;避免了传统回调函数导致的“回调地狱”问题&#xff0c;使得代码更加清晰和易于维护。 2、跨平台&#xff1a;Axios既可以在浏览器中运行&#xff0c;也可以在Node.js环境中使用&#xff0c;为前后端开…...

LeetCode 面试经典150题 67.二进制求和

415.字符串相加 思路一模一样 题目&#xff1a;给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 eg&#xff1a; 输入a“1010” b“1011” 输出“10101” 思路&#xff1a;从右开始遍历两个字符串&#xff0c;因为右边是低位先运算。如果…...

Dell PowerEdge 网络恢复笔记

我有一台Dell的PowerEdge服务器&#xff0c;之前安装了Ubuntu 20 桌面版。突然有一天不能开机了。 故障排查 Disk Error 首先是看一下机器的正面&#xff0c;有一个非常小的液晶显示器&#xff0c;只能显示一排字。 上面显示Disk Error&#xff0c;然后看挂载的硬盘仓&#…...