探索Java中的分布式消息队列与事件总线:架构、实现与最佳实践
引言
在现代分布式系统中,消息队列和事件总线已经成为实现松耦合、高扩展性和高可用性架构的关键组件。无论是微服务架构、事件驱动架构,还是实时数据处理,消息队列和事件总线都扮演着至关重要的角色。本文将深入探讨Java中的分布式消息队列与事件总线的概念、实现方法、技术选型以及实际应用中的最佳实践,附带代码示例以便读者更好地理解。
什么是分布式消息队列和事件总线?
分布式消息队列
分布式消息队列是一种为分布式系统提供异步通信机制的中间件。它允许系统中的不同组件通过发送和接收消息进行交流,从而实现高效的数据传输和任务调度。
常见用途:
- 任务调度和执行
- 数据流处理
- 系统解耦
- 事件驱动架构
事件总线
事件总线是一种发布-订阅模型的实现,允许不同组件订阅和发布事件。事件总线可以在同一进程中运行,也可以跨多个分布式系统运行。
常见用途:
- 事件通知
- 事件驱动编程
- 系统解耦
- 实时数据处理
常见技术选型
| 技术 | 类型 | 优点 | 缺点 |
|---|---|---|---|
| RabbitMQ | 消息队列 | 高性能、强大的路由功能、良好的社区支持 | 配置复杂,学习曲线陡峭 |
| Apache Kafka | 消息队列 | 高吞吐量、持久化、分布式特点 | 配置和管理复杂,低延迟不适合实时应用 |
| ActiveMQ | 消息队列 | 易于使用、功能齐全 | 性能和扩展性不如Kafka和RabbitMQ |
| Apache Pulsar | 消息队列 | 多租户、支持Geo-replication | 较新的技术,社区和文档相对较少 |
| Spring Cloud Bus | 事件总线 | 易于集成Spring生态系统 | 主要适用于Spring项目,通用性较差 |
| Vert.x Event Bus | 事件总线 | 轻量级、高性能、灵活 | 对于大型分布式系统,可能需要自定义扩展 |
实现分布式消息队列
使用RabbitMQ实现消息队列
配置RabbitMQ
首先,确保RabbitMQ服务在本地或远程服务器上运行。可以通过Docker快速启动RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
添加依赖
在你的pom.xml文件中添加RabbitMQ客户端的依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version>
</dependency>
生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
消费者代码
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
实现事件总线
使用Spring Cloud Bus实现事件总线
添加依赖
在你的pom.xml文件中添加Spring Cloud Bus和RabbitMQ的依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置文件
在application.yml中配置RabbitMQ连接信息:
spring:cloud:bus:enabled: truerabbitmq:host: localhostport: 5672
事件发布者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.SpringCloudBusClient;
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class EventPublisherController {@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;@PostMapping("/publish-event")public String publishEvent() {applicationEventPublisher.publishEvent(new EnvironmentChangeRemoteApplicationEvent(this, "source", null));return "Event published";}
}
事件监听器
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;@Component
public class EventListenerComponent {@EventListenerpublic void onEnvironmentChange(EnvironmentChangeRemoteApplicationEvent event) {System.out.println("Received event: " + event);}
}
总结
本文详细介绍了分布式消息队列和事件总线的概念、常见技术选型以及在Java中的实现方法。通过RabbitMQ和Spring Cloud Bus的代码示例,展示了如何在实际应用中使用这些技术来实现异步通信和事件驱动架构。
相关文章:
探索Java中的分布式消息队列与事件总线:架构、实现与最佳实践
引言 在现代分布式系统中,消息队列和事件总线已经成为实现松耦合、高扩展性和高可用性架构的关键组件。无论是微服务架构、事件驱动架构,还是实时数据处理,消息队列和事件总线都扮演着至关重要的角色。本文将深入探讨Java中的分布式消息队列…...
HTML零基础教程(超详细)
一、什么是HTML HTML,全称超文本标记语言(HyperText Markup Language),是一种用于创建网页的标准标记语言。它通过一系列标签来定义网页的结构、内容和格式。HTML文档是由HTML元素构成的文本文件,这些元素包括标题、段…...
011.Python爬虫系列_bs4解析
我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈 入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈 虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈 PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)文章合集 👈👈 Oracle数…...
django摄影竞赛小程序论文源码调试讲解
2系统关键技术及工具简介 系统开发过程中设计的关键技术是系统的核心,而开发工具则会影响的项目开发的进程和效率。第二部分便描述了系统的设计与实现等相关开发工具。 2.1 Python简介 Python 属于一个高层次的脚本语言,以解释性,编译性&am…...
Unity-OpenCV-Imgproc函数概览
OpenCV-Imgproc函数概览 函数名功能描述createLineSegmentDetector创建一个智能指针到 LineSegmentDetector 对象并初始化它。此算法用于检测图像中的线段。getGaussianKernel返回高斯滤波器的系数。这些系数用于平滑图像或进行高斯模糊。getDerivKernels返回计算图像空间导数的…...
水晶连连看 - 无限版软件操作说明书
水晶连连看 – 无限版游戏软件使用说明书 文章目录 水晶连连看 – 无限版游戏软件使用说明书1 引言1.1 编写目的1.2 项目名称1.3 项目背景1.4 项目开发环境 2 概述2.1 目标2.2 功能2.3 性能 3 运行环境3.1 硬件3.2 软件 4 使用说明4.1 游戏开始界面4.2 游戏设定4.2.1 游戏帮助4…...
目标检测-YOLOv3
YOLOv3介绍 YOLOv3 (You Only Look Once, Version 3) 是 YOLO 系列目标检测模型的第三个版本,相较于 YOLOv2 有了显著的改进和增强,尤其在检测速度和精度上表现优异。YOLOv3 的设计目标是在保持高速的前提下提升检测的准确性和稳定性。下面是对 YOLOv3 …...
vscode好用的快捷键整理~
vscode好用的快捷键 将当前行复制并插入到上一行 shift alt ↑将当前行复制并插入到上一行 shift alt ↓将光标复制到上一行 ctrl alt ↑将光标复制到下一行 ctrl alt ↓删除当前行 ctrl x 本身是剪切当前行,也可以作为删除当前行来用选中下一个相同的片段…...
Docker in Docker 实践 on mac
在尝试tekton构建ci pipeline是,需要在k8 pod里build image,于是研究了如何docker in docker。 1. 编写自己的dind docker image FROM docker:20.10.16-dind ENV DOCKER_HOST unix:///var/run/docker.sock 2. docker build 自己的dind docker image并…...
Flask-Session扩展,使用Redis存储会话数据
深入理解Flask-session扩展Redis Flask 应用中使用 flask-session 扩展将 session 数据存储在 Redis 中是一种高效且可扩展的方法,特别是在需要处理大量用户或需要分布式部署的应用中。以下是如何在 Flask 应用中配置 flask-session 以使用 Redis 存储 session 的步…...
urdf ( xacro ) 的 collision碰撞参数设置
目录 写在前面的话整体流程1 URDF 文件结构2 查看原始碰撞形状描述3 加入简单碰撞形状描述方法一 Meshlab 自动测量方法二 人为测量 4 加入XACRO函数简化描述 最终结果展示侧视图正视图碰撞几何体中心点设置不对出现的结果 写在前面的话 本文使用的 URDF 文件是由 solidworks …...
iOS——方法交换Method Swizzing
什么是方法交换 Method Swizzing是发生在运行时的,主要用于在运行时将两个Method进行交换,我们可以将Method Swizzling代码写到任何地方,但是只有在这段Method Swilzzling代码执行完毕之后互换才起作用。 利用Objective-C Runtimee的动态绑定…...
【有啥问啥】大模型应用中的哈希链推理任务
大模型应用中的哈希链推理任务 随着人工智能技术的快速发展,尤其是大模型(如GPT、BERT、Vision Transformer等)的广泛应用,确保数据处理和模型推理的透明性与安全性变得愈发重要。哈希链推理任务作为一种技术手段,能够…...
DevExpress WinForms v24.1新版亮点:功能区、数据编辑器全新升级
DevExpress WinForms拥有180组件和UI库,能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序,无论是Office风格的界面,还是分析处理大批量的业务数据,它都能轻松胜…...
FreeRTOS内部机制学习01(任务创建的细节以及任务调度的内部机制)
文章目录 前言:首先要谢谢韦东山老师的无私奉献,让我学到了很多东西,我做这个笔记是害怕我会忘记,所以就记录了下来,希望对大家有帮助!关于寄存器CPU内部的寄存器这些寄存器到底要保存一些什么?…...
CANoe突然出现Trace窗口筛选项无法显示的问题
原因:和最近window的推送的补丁包有关 同事通过网上的操作,一顿操作猛如虎,卸载掉了这个插件,结果电脑文件夹无法打开和闪退。 IT的同事通过cmd命令也无法恢复。 dism /online /cleanup-image /scanhealth dism /online /cleanu…...
Linux日志-sar日志
作者介绍:简历上没有一个精通的运维工程师。希望大家多多关注作者,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 Linux 系统中的日志是记录系统活动和事件的重要工具,它们可以帮助管理员监视系统状态、调查问题以及了解系统运行…...
全国计算机二级考试C语言篇3——选择题
C语言部分——C语言概述 1.程序模块化的优点 程序模块化的优点在于它可以使程序的开发、维护和复用变得更简单。下面是一些主要的优点: 降低复杂度:模块化可以将复杂的问题分解成更小的、更易管理的部分。 可维护性:模块化使得代码更易于维护…...
Python实现混合蛙跳算法
博客目录 引言 什么是混合蛙跳算法(Shuffled Frog Leaping Algorithm, SFLA)?混合蛙跳算法的应用场景为什么使用混合蛙跳算法? 混合蛙跳算法的原理 混合蛙跳算法的基本概念蛙群分组与局部搜索全局混洗与更新混合蛙跳算法的流程 …...
印度再现超级大片,豪华阵容加顶级特效
最近,印度影坛再次掀起了风潮,一部名为《毗湿奴降临》的神话大片强势登陆各大影院,上映首周票房就飙升至105亿卢比,成功占据了票房榜首的位置。之后,这部电影也在北美上映,海外市场的表现同样不俗ÿ…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
git: early EOF
macOS报错: Initialized empty Git repository in /usr/local/Homebrew/Library/Taps/homebrew/homebrew-core/.git/ remote: Enumerating objects: 2691797, done. remote: Counting objects: 100% (1760/1760), done. remote: Compressing objects: 100% (636/636…...
node.js的初步学习
那什么是node.js呢? 和JavaScript又是什么关系呢? node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说, 需要在node.js的环境上进行当JavaScript作为前端开发语言来说,需要在浏览器的环境上进行 Node.js 可…...
何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡
何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡 背景 我们以建设星云智控官网来做AI编程实践,很多人以为AI已经强大到不需要程序员了,其实不是,AI更加需要程序员,普通人…...
