探索Java中的分布式消息队列与事件总线:架构、实现与最佳实践
引言
在现代分布式系统中,消息队列和事件总线已经成为实现松耦合、高扩展性和高可用性架构的关键组件。无论是微服务架构、事件驱动架构,还是实时数据处理,消息队列和事件总线都扮演着至关重要的角色。本文将深入探讨Java中的分布式消息队列与事件总线的概念、实现方法、技术选型以及实际应用中的最佳实践,附带代码示例以便读者更好地理解。
什么是分布式消息队列和事件总线?
分布式消息队列
分布式消息队列是一种为分布式系统提供异步通信机制的中间件。它允许系统中的不同组件通过发送和接收消息进行交流,从而实现高效的数据传输和任务调度。
常见用途:
- 任务调度和执行
- 数据流处理
- 系统解耦
- 事件驱动架构
事件总线
事件总线是一种发布-订阅模型的实现,允许不同组件订阅和发布事件。事件总线可以在同一进程中运行,也可以跨多个分布式系统运行。
常见用途:
- 事件通知
- 事件驱动编程
- 系统解耦
- 实时数据处理
常见技术选型
技术 | 类型 | 优点 | 缺点 |
---|---|---|---|
RabbitMQ | 消息队列 | 高性能、强大的路由功能、良好的社区支持 | 配置复杂,学习曲线陡峭 |
Apache Kafka | 消息队列 | 高吞吐量、持久化、分布式特点 | 配置和管理复杂,低延迟不适合实时应用 |
ActiveMQ | 消息队列 | 易于使用、功能齐全 | 性能和扩展性不如Kafka和RabbitMQ |
Apache Pulsar | 消息队列 | 多租户、支持Geo-replication | 较新的技术,社区和文档相对较少 |
Spring Cloud Bus | 事件总线 | 易于集成Spring生态系统 | 主要适用于Spring项目,通用性较差 |
Vert.x Event Bus | 事件总线 | 轻量级、高性能、灵活 | 对于大型分布式系统,可能需要自定义扩展 |
实现分布式消息队列
使用RabbitMQ实现消息队列
配置RabbitMQ
首先,确保RabbitMQ服务在本地或远程服务器上运行。可以通过Docker快速启动RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
添加依赖
在你的pom.xml
文件中添加RabbitMQ客户端的依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version>
</dependency>
生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
消费者代码
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
实现事件总线
使用Spring Cloud Bus实现事件总线
添加依赖
在你的pom.xml
文件中添加Spring Cloud Bus和RabbitMQ的依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置文件
在application.yml
中配置RabbitMQ连接信息:
spring:cloud:bus:enabled: truerabbitmq:host: localhostport: 5672
事件发布者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.SpringCloudBusClient;
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class EventPublisherController {@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;@PostMapping("/publish-event")public String publishEvent() {applicationEventPublisher.publishEvent(new EnvironmentChangeRemoteApplicationEvent(this, "source", null));return "Event published";}
}
事件监听器
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;@Component
public class EventListenerComponent {@EventListenerpublic void onEnvironmentChange(EnvironmentChangeRemoteApplicationEvent event) {System.out.println("Received event: " + event);}
}
总结
本文详细介绍了分布式消息队列和事件总线的概念、常见技术选型以及在Java中的实现方法。通过RabbitMQ和Spring Cloud Bus的代码示例,展示了如何在实际应用中使用这些技术来实现异步通信和事件驱动架构。
相关文章:
探索Java中的分布式消息队列与事件总线:架构、实现与最佳实践
引言 在现代分布式系统中,消息队列和事件总线已经成为实现松耦合、高扩展性和高可用性架构的关键组件。无论是微服务架构、事件驱动架构,还是实时数据处理,消息队列和事件总线都扮演着至关重要的角色。本文将深入探讨Java中的分布式消息队列…...

HTML零基础教程(超详细)
一、什么是HTML HTML,全称超文本标记语言(HyperText Markup Language),是一种用于创建网页的标准标记语言。它通过一系列标签来定义网页的结构、内容和格式。HTML文档是由HTML元素构成的文本文件,这些元素包括标题、段…...

011.Python爬虫系列_bs4解析
我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈 入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈 虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈 PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)文章合集 👈👈 Oracle数…...

django摄影竞赛小程序论文源码调试讲解
2系统关键技术及工具简介 系统开发过程中设计的关键技术是系统的核心,而开发工具则会影响的项目开发的进程和效率。第二部分便描述了系统的设计与实现等相关开发工具。 2.1 Python简介 Python 属于一个高层次的脚本语言,以解释性,编译性&am…...
Unity-OpenCV-Imgproc函数概览
OpenCV-Imgproc函数概览 函数名功能描述createLineSegmentDetector创建一个智能指针到 LineSegmentDetector 对象并初始化它。此算法用于检测图像中的线段。getGaussianKernel返回高斯滤波器的系数。这些系数用于平滑图像或进行高斯模糊。getDerivKernels返回计算图像空间导数的…...

水晶连连看 - 无限版软件操作说明书
水晶连连看 – 无限版游戏软件使用说明书 文章目录 水晶连连看 – 无限版游戏软件使用说明书1 引言1.1 编写目的1.2 项目名称1.3 项目背景1.4 项目开发环境 2 概述2.1 目标2.2 功能2.3 性能 3 运行环境3.1 硬件3.2 软件 4 使用说明4.1 游戏开始界面4.2 游戏设定4.2.1 游戏帮助4…...
目标检测-YOLOv3
YOLOv3介绍 YOLOv3 (You Only Look Once, Version 3) 是 YOLO 系列目标检测模型的第三个版本,相较于 YOLOv2 有了显著的改进和增强,尤其在检测速度和精度上表现优异。YOLOv3 的设计目标是在保持高速的前提下提升检测的准确性和稳定性。下面是对 YOLOv3 …...
vscode好用的快捷键整理~
vscode好用的快捷键 将当前行复制并插入到上一行 shift alt ↑将当前行复制并插入到上一行 shift alt ↓将光标复制到上一行 ctrl alt ↑将光标复制到下一行 ctrl alt ↓删除当前行 ctrl x 本身是剪切当前行,也可以作为删除当前行来用选中下一个相同的片段…...
Docker in Docker 实践 on mac
在尝试tekton构建ci pipeline是,需要在k8 pod里build image,于是研究了如何docker in docker。 1. 编写自己的dind docker image FROM docker:20.10.16-dind ENV DOCKER_HOST unix:///var/run/docker.sock 2. docker build 自己的dind docker image并…...
Flask-Session扩展,使用Redis存储会话数据
深入理解Flask-session扩展Redis Flask 应用中使用 flask-session 扩展将 session 数据存储在 Redis 中是一种高效且可扩展的方法,特别是在需要处理大量用户或需要分布式部署的应用中。以下是如何在 Flask 应用中配置 flask-session 以使用 Redis 存储 session 的步…...

urdf ( xacro ) 的 collision碰撞参数设置
目录 写在前面的话整体流程1 URDF 文件结构2 查看原始碰撞形状描述3 加入简单碰撞形状描述方法一 Meshlab 自动测量方法二 人为测量 4 加入XACRO函数简化描述 最终结果展示侧视图正视图碰撞几何体中心点设置不对出现的结果 写在前面的话 本文使用的 URDF 文件是由 solidworks …...

iOS——方法交换Method Swizzing
什么是方法交换 Method Swizzing是发生在运行时的,主要用于在运行时将两个Method进行交换,我们可以将Method Swizzling代码写到任何地方,但是只有在这段Method Swilzzling代码执行完毕之后互换才起作用。 利用Objective-C Runtimee的动态绑定…...

【有啥问啥】大模型应用中的哈希链推理任务
大模型应用中的哈希链推理任务 随着人工智能技术的快速发展,尤其是大模型(如GPT、BERT、Vision Transformer等)的广泛应用,确保数据处理和模型推理的透明性与安全性变得愈发重要。哈希链推理任务作为一种技术手段,能够…...

DevExpress WinForms v24.1新版亮点:功能区、数据编辑器全新升级
DevExpress WinForms拥有180组件和UI库,能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序,无论是Office风格的界面,还是分析处理大批量的业务数据,它都能轻松胜…...

FreeRTOS内部机制学习01(任务创建的细节以及任务调度的内部机制)
文章目录 前言:首先要谢谢韦东山老师的无私奉献,让我学到了很多东西,我做这个笔记是害怕我会忘记,所以就记录了下来,希望对大家有帮助!关于寄存器CPU内部的寄存器这些寄存器到底要保存一些什么?…...

CANoe突然出现Trace窗口筛选项无法显示的问题
原因:和最近window的推送的补丁包有关 同事通过网上的操作,一顿操作猛如虎,卸载掉了这个插件,结果电脑文件夹无法打开和闪退。 IT的同事通过cmd命令也无法恢复。 dism /online /cleanup-image /scanhealth dism /online /cleanu…...

Linux日志-sar日志
作者介绍:简历上没有一个精通的运维工程师。希望大家多多关注作者,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 Linux 系统中的日志是记录系统活动和事件的重要工具,它们可以帮助管理员监视系统状态、调查问题以及了解系统运行…...

全国计算机二级考试C语言篇3——选择题
C语言部分——C语言概述 1.程序模块化的优点 程序模块化的优点在于它可以使程序的开发、维护和复用变得更简单。下面是一些主要的优点: 降低复杂度:模块化可以将复杂的问题分解成更小的、更易管理的部分。 可维护性:模块化使得代码更易于维护…...
Python实现混合蛙跳算法
博客目录 引言 什么是混合蛙跳算法(Shuffled Frog Leaping Algorithm, SFLA)?混合蛙跳算法的应用场景为什么使用混合蛙跳算法? 混合蛙跳算法的原理 混合蛙跳算法的基本概念蛙群分组与局部搜索全局混洗与更新混合蛙跳算法的流程 …...

印度再现超级大片,豪华阵容加顶级特效
最近,印度影坛再次掀起了风潮,一部名为《毗湿奴降临》的神话大片强势登陆各大影院,上映首周票房就飙升至105亿卢比,成功占据了票房榜首的位置。之后,这部电影也在北美上映,海外市场的表现同样不俗ÿ…...

wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

计算机基础知识解析:从应用到架构的全面拆解
目录 前言 1、 计算机的应用领域:无处不在的数字助手 2、 计算机的进化史:从算盘到量子计算 3、计算机的分类:不止 “台式机和笔记本” 4、计算机的组件:硬件与软件的协同 4.1 硬件:五大核心部件 4.2 软件&#…...

ubuntu22.04有线网络无法连接,图标也没了
今天突然无法有线网络无法连接任何设备,并且图标都没了 错误案例 往上一顿搜索,试了很多博客都不行,比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动,重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...

Python环境安装与虚拟环境配置详解
本文档旨在为Python开发者提供一站式的环境安装与虚拟环境配置指南,适用于Windows、macOS和Linux系统。无论你是初学者还是有经验的开发者,都能在此找到适合自己的环境搭建方法和常见问题的解决方案。 快速开始 一分钟快速安装与虚拟环境配置 # macOS/…...
React父子组件通信:Props怎么用?如何从父组件向子组件传递数据?
系列回顾: 在上一篇《React核心概念:State是什么?》中,我们学习了如何使用useState让一个组件拥有自己的内部数据(State),并通过一个计数器案例,实现了组件的自我更新。这很棒&#…...
AWS vs 阿里云:功能、服务与性能对比指南
在云计算领域,Amazon Web Services (AWS) 和阿里云 (Alibaba Cloud) 是全球领先的提供商,各自在功能范围、服务生态系统、性能表现和适用场景上具有独特优势。基于提供的引用[1]-[5],我将从功能、服务和性能三个方面进行结构化对比分析&#…...