当前位置: 首页 > news >正文

EDA - Spring Boot构建基于事件驱动的消息系统

文章目录

  • 概述
  • 事件驱动架构的基本概念
  • 工程结构
  • Code
    • 创建事件和事件处理器
    • 创建事件总线
    • 创建消息通道和发送逻辑
    • 创建事件处理器
    • 消息持久化
    • 创建消息发送事件
    • 配置 Spring Boot 启动类
    • 测试
    • 运行项目

在这里插入图片描述


概述

在微服务架构和大规模分布式系统中,事件驱动架构(EDA)成为了非常重要的设计模式。通过事件驱动,我们可以解耦系统的各个组件,提高系统的可扩展性、可维护性和响应能力。

接下来,我们将演示一下如何在 Spring Boot 中实现一个基于事件驱动的消息发送和接收流程,从消息的发送、事件的发布到事件的监听。


事件驱动架构的基本概念

在事件驱动架构中,系统的各个组件通过事件进行通信。每个事件代表一个特定的行为或状态变化,当事件发布时,系统的其他部分可以响应这些事件并做出相应的处理。消息发送和接收的流程正是通过发布和监听事件来实现的。

接下来我们使用 Spring Boot 来实现一个基于事件驱动的消息系统。、

系统包含以下几个部分:

  • 消息发送: 消息将通过一个 MessageEventProcessor 进行处理,并且在处理完成后会发布一个事件。
  • 事件发布: 消息成功发送后,通过 ApplicationEventPublisher 发布一个 MessageSentEvent
  • 事件监听: 一个监听器会接收到发布的事件并进行相应的处理(比如记录日志、通知其他组件等)

工程结构

在这里插入图片描述

  • EventBus:事件总线,负责发布事件。
  • MessageEventProcessor:处理消息事件的处理器。
  • EventMessageEventMessageSentEvent:事件类,MessageEventMessageSentEvent继承自Event
  • MessageChannel:消息通道接口,EmailMessageChannel是其具体实现。
  • MessageRepository:消息存储库,用于保存消息事件。
  • MessageChannelConfig:消息通道配置,配置了消息通道的Bean。
  • MessageController:消息控制器,处理发送消息的请求。
  • MessageSentEventListener:监听消息发送事件的监听器。

Code

创建事件和事件处理器

Event.java - 定义基础事件

package com.artisan.booteventbus.domain;public abstract class Event {// 事件的基本字段
}

MessageEvent.java - 定义具体的消息事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.util.Map;@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class MessageEvent extends Event {private String message;private String channel;private Map<String, Object> metadata;}

EventHandler.java - 定义事件处理器接口

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;public interface EventHandler<T extends Event> {void handle(T event);
}

创建事件总线

EventBus.java - 用于发布事件

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;@Component
public class EventBus {private final ApplicationEventPublisher publisher;public EventBus(ApplicationEventPublisher publisher) {this.publisher = publisher;}public void publish(Event event) {publisher.publishEvent(event);}
}

创建消息通道和发送逻辑

MessageChannel.java - 定义消息通道接口

package com.artisan.booteventbus.service;import com.artisan.booteventbus.domain.MessageEvent;import java.util.concurrent.CompletableFuture;public interface MessageChannel {boolean supports(MessageEvent event);CompletableFuture<Void> sendAsync(MessageEvent event);
}

MessageChannelConfig.java - 初始化channel

package com.artisan.booteventbus.config;import com.artisan.booteventbus.service.MessageChannel;
import com.artisan.booteventbus.service.impl.EmailMessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.ArrayList;
import java.util.List;@Configuration
public class MessageChannelConfig {@Beanpublic List<MessageChannel> messageChannels() {List<MessageChannel> channels = new ArrayList<>();channels.add(new EmailMessageChannel());// 可以继续添加其他类型的通道return channels;}
}

EmailMessageChannel.java - 实现邮件发送通道

package com.artisan.booteventbus.service.impl;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.service.MessageChannel;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;@Slf4j
public class EmailMessageChannel implements MessageChannel {@Overridepublic boolean supports(MessageEvent event) {return "email".equals(event.getChannel());}@Overridepublic CompletableFuture<Void> sendAsync(MessageEvent event) {return CompletableFuture.runAsync(() -> {// 模拟邮件发送System.out.println(Thread.currentThread().getName() + "- Sending email: " + event.getMessage());log.info("Sending email: {}", event.getMessage());});}
}

创建事件处理器

MessageEventProcessor.java - 处理消息事件,保存事件并发送

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.dao.MessageRepository;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.domain.MessageSentEvent;import com.artisan.booteventbus.service.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;import java.util.List;@Component
public class MessageEventProcessor implements EventHandler<MessageEvent> {private final EventBus eventBus;private final MessageRepository messageRepository;private final List<MessageChannel> channels;@Autowiredpublic MessageEventProcessor(EventBus eventBus, MessageRepository messageRepository, List<MessageChannel> channels) {this.eventBus = eventBus;this.messageRepository = messageRepository;this.channels = channels;}/*** @param event* Asyn 请使用自定义线程池,这里仅仅是 为了演示异步*/@Async@Overridepublic void handle(MessageEvent event) {// 1. 消息持久化messageRepository.save(event);// 2. 通道路由MessageChannel channel = channels.stream().filter(ch -> ch.supports(event)).findFirst().orElseThrow();// 3. 异步发送channel.sendAsync(event).thenRun(() -> eventBus.publish(new MessageSentEvent(event)));}}

消息持久化

MessageRepository.java - 用于消息的持久化(可以使用内存或数据库)

package com.artisan.booteventbus.dao;import com.artisan.booteventbus.domain.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;import java.util.ArrayList;
import java.util.List;@Slf4j
@Repository
public class MessageRepository {private final List<MessageEvent> messageStore = new ArrayList<>();public void save(MessageEvent event) {// 模拟存储messageStore.add(event);System.out.println(Thread.currentThread().getName() + " - Message saved: " + event.getMessage());log.info("Message saved {}", event.getMessage());}
}

创建消息发送事件

MessageSentEvent.java - 定义发送后的事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageSentEvent extends Event {private MessageEvent originalEvent;}

配置 Spring Boot 启动类

package com.artisan;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@EnableAsync(proxyTargetClass=true)
@SpringBootApplication
public class BootEventBusApplication {public static void main(String[] args) {SpringApplication.run(BootEventBusApplication.class, args);}}

测试

为了测试整个架构,创建一个控制器来模拟发送消息。

package com.artisan.booteventbus.controller;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.bus.EventBus;
import com.artisan.booteventbus.bus.MessageEventProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;@RestController
@RequestMapping("/messages")
public class MessageController {private final EventBus eventBus;private final MessageEventProcessor eventProcessor;@Autowiredpublic MessageController(EventBus eventBus, MessageEventProcessor eventProcessor) {this.eventBus = eventBus;this.eventProcessor = eventProcessor;}@RequestMapping("/send")public String sendMessage(@RequestParam String message, @RequestParam String channel) {MessageEvent event = new MessageEvent(message, channel, new HashMap<>());eventProcessor.handle(event); // 异步处理消息return "Message is being processed";}
}

运行项目

http://localhost:8080/messages/send?message=artisan&channel=email

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

相关文章:

EDA - Spring Boot构建基于事件驱动的消息系统

文章目录 概述事件驱动架构的基本概念工程结构Code创建事件和事件处理器创建事件总线创建消息通道和发送逻辑创建事件处理器消息持久化创建消息发送事件配置 Spring Boot 启动类测试运行项目 概述 在微服务架构和大规模分布式系统中&#xff0c;事件驱动架构&#xff08;EDA&a…...

使用vue-seamless-scroll实现echarts图表大屏滚动,出现空白间隔的解决方案

一、背景介绍 最近的业务开发需求&#xff0c;想要实现echarts图表大屏滚动&#xff0c;小编首先采用vue-seamless-scroll进行实现&#xff0c;结果发现第二屏出现空白间隔&#xff0c;尝试了多种解决方案均不生效&#xff0c;最终选择换一个方案。 二、封装的ScrollList组件…...

ios使用UIScrollView和PageControl创建图片轮播

1.创建cocoa touch class 2.同时创建xib页面 3.SceneDelegate设置根视图控制器 // // SceneDelegate.m // iosstudy2024 // // Created by figo on 2024/8/5. //#import "SceneDelegate.h" #import "WidgetViewController.h"interface SceneDelegate …...

3D 生成重建024-LGM第一个开源的3D生成大模型!

3D 生成重建024-LGM第一个开源的3D生成大模型 文章目录 0 论文工作1 论文方法2 实验效果 0 论文工作 这篇论文介绍了一种名为LGM&#xff08;大型多视角高斯模型&#xff09;的新方法&#xff0c;用于从单视角图像或文本提示生成高分辨率的三维内容。该方法的核心思想是双重的…...

linux目录权限

一、目录权限的基本概念 Linux中的每个文件和目录都有与之关联的权限&#xff0c;这些权限决定了谁可以读取、写入或执行它们。权限分为三组&#xff1a; 所有者&#xff08;Owner&#xff09;权限&#xff1a;目录所有者的权限群组&#xff08;Group&#xff09;权限&#x…...

语言模型使用心得

使用像文心一言这样的语言模型&#xff0c;在撰写文章时确实能提供极大的帮助。然而&#xff0c;重要的是我们要明确主次关系&#xff1a;自己的创意和内容应当是文章的核心&#xff0c;而语言模型则扮演着一个辅助角色&#xff0c;帮助我们梳理思路&#xff0c;使文章条理更加…...

ChatGPT客户端安装教程(附下载链接)

用惯了各类AI的我们发现每天打开网页还挺不习惯和麻烦&#xff0c;突然发现客户端上架了&#xff0c;懂摸鱼的人都知道这里面的道行有多深&#xff0c;话不多说&#xff0c;开整&#xff01; 以下是ChatGPT客户端的详细安装教程&#xff0c;适用于Windows和Mac系统&#xff1a…...

Electron 基础+传值+引用+安全

文章目录 概要elctron 生命周期及窗口应用主进程与渲染进程交互技术细节electron 中需要注意的安全问题 概要 一、Electron简介 Electron是一个开源框架&#xff0c;它允许开发者使用JavaScript、HTML和CSS构建跨平台的桌面应用程序。它基于Chromium&#xff08;谷歌浏览器的…...

手机租赁系统全面解析与开发指南

内容概要 手机租赁系统已经成为现代商业中不可或缺的一部分&#xff0c;尤其是在智能手机普及的时代。随着消费者对新机型兴趣的不断增加&#xff0c;大家纷纷走上了“试一试再买”的道路&#xff0c;手机租赁这条路因此越走越宽。这部分的市场需求让创业者们看到了机会。不仅…...

mongoDb的读session和写session权限报错问题

go在使用mongoDb时用到了全局会话&#xff0c;发现在创建的session的逻辑相同&#xff0c;首先会进行数据的查询&#xff0c;此时获取了全局session执行读操作&#xff0c;查询所有文档&#xff0c;则当前会话为读会话&#xff0c;当再去插入时发现会报错&#xff0c;此时sessi…...

Centos在2024年6月30日停止维护后如何换yum源安装组件

现象&#xff1a; 在centos7里使用yum安装报错&#xff1a; Loading mirror speeds from cached hostfile Could not retrieve mirrorlist http://mirrorlist.centos.org/?release7&archx86_64&repoos&infrastock error was 14: curl#6 - “Could not resolve…...

阿里云ACP云计算模拟试题(附答案解析)

1、将基础设施作为服务的云计算服务类型是_____服务。 A.laas B.Paas C.SaaS D.Daas 答案&#xff1a;A 解析&#xff1a;基础设施即服务有时缩写为 IaaS&#xff0c;包含云 IT 的基本构建块&#xff0c;通常提供对联网功能、计算机&#xff08;虚拟或专用硬件&#x…...

简单的爬虫脚本编写

一、数据来源分析 想爬取一个网站的数据&#xff0c;我们首先要进行数据分析。通过浏览器F12开发者工具栏进行抓包&#xff0c;可以分析我们想要的数据来源。 通过关键字搜索&#xff0c;可以找到相对应的数据包 二、爬虫实现 需要用到的模块为&#xff1a;request&#xf…...

[MySQL基础](三)SQL--图形化界面+DML

本专栏内容为&#xff1a;MySQL学习专栏 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;MySql &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&#x1f69a; &#x1f339;&#x1f339;&#x1f339;关注我带你学习编程知识 目录 图…...

11.23[大数据]

PRO1:LSTM模型预测输出都是同一个值&#xff1f; 画出来的图像就是一条横线 这个搜了搜&#xff0c;原因可能有很多&#xff0c;但感觉最主要的原因极可能是激活函数选择不当&#xff0c;以及层的搭建不合适 原模型是 REF https://zhuanlan.zhihu.com/p/654325094 https:/…...

C++ 游戏开发进阶:打造更精彩的游戏世界

在之前的 C 游戏开发入门教程中&#xff0c;我们已经了解了游戏开发的基本概念和一些简单的实现方法。现在&#xff0c;让我们进一步深入探讨 C 游戏开发中的进阶技术&#xff0c;为玩家打造更精彩、更具沉浸感的游戏体验。 一、游戏物理引擎的集成 物理引擎是现代游戏开发中…...

想在iPad上远程操作安卓手机的APP,怎样实现iPad远程控制安卓?

学生党或互联网行业的打工人&#xff0c;人手连三台电子设备也很常见&#xff0c;手机、平板还有笔记本电脑一大堆&#xff0c;如果出门要全带上&#xff0c;背包压力也变大。 有没有想过用远程控制功能&#xff0c;让iPad远程控制安卓手机&#xff1f;这样做&#xff0c;出门就…...

GPS北斗卫星授时服务器功能是什么?应用是什么?

GPS北斗卫星授时服务器功能是什么&#xff1f;应用是什么&#xff1f; GPS北斗卫星授时服务器功能是什么&#xff1f;应用是什么&#xff1f; 摘 要:首先对计算机网络时间同步相关技术进行了介绍,然后阐述了时间同步技术在现代计算机网络中的应用与发展,最后指出时间同步网络…...

利用Java爬虫获取商品数据的完整指南

在当今数字化时代&#xff0c;数据已成为企业和个人决策的关键资源。特别是在电商领域&#xff0c;获取商品数据对于市场分析、价格监控和竞争对手分析至关重要。Java作为一种强大且广泛使用的编程语言&#xff0c;非常适合开发复杂的爬虫系统。本文将详细介绍如何利用Java编写…...

mysql 迁移达梦数据库出现的 sql 语法问题 以及迁移方案

迁移方案&#xff1a; 1.下载官方DM8开发版 产品下载-达梦数据 2.会下载到win系统下的左下角的开始 1.1.2 创建工程 右击空白处&#xff0c;新建 1.1.3 新建迁移 1.1.3.1 选择迁移方式 MySql迁移DM 1.1.3.2 配置数据源 输入你的mysql配置后&#xff0c;刷新&#xff0c;选择…...

AMD GPU大模型部署与优化指南:基于ollama-for-amd的本地AI解决方案

AMD GPU大模型部署与优化指南&#xff1a;基于ollama-for-amd的本地AI解决方案 【免费下载链接】ollama-for-amd Get up and running with Llama 3, Mistral, Gemma, and other large language models.by adding more amd gpu support. 项目地址: https://gitcode.com/gh_mir…...

解锁Audacity:5个零成本音频处理功能彻底改变你的创作流程

解锁Audacity&#xff1a;5个零成本音频处理功能彻底改变你的创作流程 【免费下载链接】audacity Audio Editor 项目地址: https://gitcode.com/GitHub_Trending/au/audacity 价值定位&#xff1a;为什么Audacity是音频创作者的必备工具 在音频编辑领域&#xff0c;专…...

雪女-斗罗大陆-造相Z-Turbo助力AI编程:自动生成代码片段与函数注释

雪女-斗罗大陆-造相Z-Turbo助力AI编程&#xff1a;自动生成代码片段与函数注释 作为一名写了十几年代码的老兵&#xff0c;我经历过从记事本写代码到现代IDE的整个进化史。这些年&#xff0c;各种提升效率的工具层出不穷&#xff0c;但“写代码”这件事的核心——将想法转化为…...

Agent-S:重新定义人机协作的智能体框架技术解析

Agent-S&#xff1a;重新定义人机协作的智能体框架技术解析 【免费下载链接】Agent-S Agent S: an open agentic framework that uses computers like a human 项目地址: https://gitcode.com/GitHub_Trending/ag/Agent-S 在数字化转型加速的今天&#xff0c;人机协作的…...

别再让C盘爆红了!Windows 11上Ollama安装与模型存储路径修改保姆级教程

Windows 11上Ollama安装避坑指南&#xff1a;彻底解决C盘空间焦虑 每次看到C盘飘红&#xff0c;就像看到手机电量只剩5%一样让人焦虑。特别是当你兴冲冲地安装Ollama准备体验本地大模型时&#xff0c;却发现默认安装路径无情地吞噬着宝贵的C盘空间。本文将带你从零开始&#xf…...

STorM BGC V1.31硬件 + SimpleBGC源码:从零搭建三轴云台开发环境(含.Net框架避坑)

STorM BGC V1.31硬件 SimpleBGC源码&#xff1a;从零搭建三轴云台开发环境&#xff08;含.Net框架避坑&#xff09; 三轴云台作为稳定控制领域的经典应用&#xff0c;近年来在无人机、摄影设备、工业检测等领域展现出巨大潜力。STorM BGC V1.31硬件平台配合SimpleBGC开源架构&…...

【PVE实战】低成本2.5G网卡升级与iperf3性能验证全记录

1. 为什么需要升级到2.5G网络环境 最近几年&#xff0c;随着NAS、视频剪辑、虚拟机等应用场景的普及&#xff0c;传统的千兆网络&#xff08;1Gbps&#xff09;越来越显得力不从心。我自己就经常遇到这样的情况&#xff1a;在局域网内传输大文件时&#xff0c;千兆网络的极限速…...

FreeRTOS内核探秘:双向链表如何玩转任务调度?从xListEnd到pxIndex全解析

FreeRTOS内核探秘&#xff1a;双向链表如何玩转任务调度&#xff1f;从xListEnd到pxIndex全解析 在嵌入式实时操作系统领域&#xff0c;任务调度效率直接决定了系统响应能力。FreeRTOS作为市场占有率最高的RTOS之一&#xff0c;其精巧的内核设计一直是开发者研究的焦点。想象一…...

Java面试高频:阿里真实面试题——Redis分布式锁实现(3分钟速通,不会直接挂)

一、真实面试场景&#xff08;代入感拉满&#xff09; 上周&#xff0c;一个候选人来面试阿里P6。 技术面已经过了两轮&#xff0c;表现都不错。 最后一轮&#xff0c;面试官只问了一个问题&#xff1a; “你们项目里用过Redis分布式锁吗&#xff1f;怎么实现的&#xff1f;…...

终极zsh语法高亮插件版本兼容性测试:Zsh 5.0到5.9全面支持指南

终极zsh语法高亮插件版本兼容性测试&#xff1a;Zsh 5.0到5.9全面支持指南 【免费下载链接】zsh-syntax-highlighting Fish shell like syntax highlighting for Zsh. 项目地址: https://gitcode.com/gh_mirrors/zs/zsh-syntax-highlighting zsh-syntax-highlighting是Z…...