05期:面向业务的消息服务落地实践
这里记录的是学习分享内容,文章维护在 Github:studeyang/leanrning-share。
我们在上次分享中聊到了领域驱动设计和微服务,在 DDD 中有一个术语叫做领域事件,例如订单模型中的订单已创建、商品已发货。领域事件会触发下一步的业务操作,如果领域事件发生在微服务内,可以通过观察者模式很容易实现消息监听并处理。
如果发生在微服务之间,则需引入事件总线或者消息中间件。
一、消息队列解决方案
经过技术选型后,我们决定使用 Kafka 作为消息中间件,此时微服务间的通信示意图如下:

不过,直接使用消息队列将面临以下问题:
- 开发成本大:开发团队成员都需要对消息队列如 Kafka 技术有一定的了解,并且还需要关注连接消息队列的一些配置;
- 管理难度大:各团队都使用一个消息队列,其中一个团队使用不当时,例如创建了很多个 topic,造成资源浪费;
- 监控难度大:当前只有对 Kafka 集群简单的监控功能;
- 运维困难:遇到线上消息没有消费时,很难排查问题,无从下手;
- 升级难度大:Kafka-Client 需要升级时,涉及到服务太多,导致升级成本高;
我们期望提供的是一种以业务为重心的,面向服务的解决方案。
也就是说,即使团队中没人了解消息队列技术,也能够收发消息。于是对 Kafka SDK 二次封装,主要就是为了简化消息的接入,无需关注配置。

封装后解决了开发成本大、管理难度大的问题,但是离面向服务的解决方案目标还有一定的差距。比如业务方监听到消息后,执行一系列的业务逻辑异常了,想要做业务补偿,我们的“基于 Kafka SDK 二次封装”的方案就没办法满足,只能要求消息发送方再发一次消息,但这又会影响其他消息监听者。
于是我们决定将消息列队封装成消息服务,对业务方提供切实的服务能力。

二、消息服务解决方案
我们熟知计算机中总线,在计算机系统中,不同的组件和设备需要相互通信以完成各种任务,此时,计算机总线就发挥了重要作用。类似的,微服务系统中,微服务就像是计算机系统中的各个组件和设备,而消息服务充当的就是计算机总线的角色。消息总线由此而来。
本文中出现的消息总线和消息服务指的是同一个东西。
2.1 架构设计
发送消息和接收消息是消息服务最基本的能力,这两项能力分别由消息生产服务、消息消费服务提供。

2.2 消息的流转过程

三、消息服务初体验
微服务架构采用的技术栈是:SpringBoot、Kubernetes。
我们将消息总线取名为 Courier,Courier 的意思是“快递员”,消息的传递类似于快递的收发,消息总线正是快递员的角色。下面开始消息服务的初体验。
3.1 零配置接入消息总线
由于我们的微服务使用的是 SpingBoot 来落地的,因此我们提供了一个接入消息总线的 spring-boot-starter。
<dependency><groupId>com.casstime.open</groupId><artifactId>courier-spring-boot-starter</artifactId>
</dependency>
接入消息总线,微服务只需要一个@EnableMessage注解即可加载所有相关配置:
@EnableMessage
@SpringBootApplication
public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class, args);}
}
3.2 消息结构定义
下面代码定义了一个消息的基本信息,也称为消息 Header,包括消息 id,分区键 primaryKey,来源服务 service,消息 topic,创建时间 timstamp。
public abstract class Message {private String id;private String primaryKey;private String service;private String topic;private Date timeStamp;
}
消息可以分为两类,一类是事件,另一类是广播。定义如下:
// 事件
public abstract class Event extends Message {
}
// 广播
public abstract class Event extends Message {
}
业务消息内容称为消息 Body,例如订单已创建这个消息体的定义:
@Topic(name = "order")
public class OrderCreated extends Event {private String orderId;private String orderName;private Date createdAt;
}
3.3 使消息收发变得简单
业务方可以在业务执行方法的任一处,只需要一行代码,即可完成消息的发送。
// 发送消息
EventPublisher.publish(new OrderCreated());
对于消息的监听,业务方只需关注业务逻辑的执行,屏蔽了 Offset 提交、重试等技术实现。
// 接收消息
@EventHandler(topic = "order", consumerGroup = "consumer-group1")
public class OrderMessageHandler {public void handle(OrderCreated orderCreated) {System.out.println("receive message: " + orderCreated);}
}
3.4 提供 5 种功能类型的消息
我们提供了 5 种不同功能类型的消息,满足各类业务场景。
1、事件消息
@Topic(name = "order")
public class OrderCreated extends Event {private String orderId;private String orderName;private Date createdAt;
}public void send() {EventPublisher.publish(new OrderCreated());
}
上面消息定义是事件,这是使用最多的一种消息。
2、广播消息
广播消息的消费示意图如下:

@Topic(name = "order")
public class CacheUpdate extends Broadcast {private String orderId;private String orderName;private Date createdAt;
}public void send() {EventPublisher.publish(new CacheUpdate());
}
上面消息定义时,继承了Broadcast,表示这是一个广播消息,消费服务的每个节点都将会收到这个广播。例如更新本地缓存事件,就需要用到广播消息。
3、顺序消息
@Topic(name = "order")
public class OrderCreated extends Event {@PrimaryKeyprivate String orderId;private String orderName;private Date createdAt;
}public void send() {EventPublisher.publish(new OrderCreated());
}
上面消息定义时,在orderId上加了@PrimaryKey注解,表示相同orderId的消息会有序的消费。
4、事务消息
@Topic(name = "order")
public class OrderCreated extends Event {private String orderId;private String orderName;private Date createdAt;
}@Transactional
public void send() {EventPublisher.publish(new OrderCreated());
}
上面消息发送时,在方法上添加了@Transactional注解,这是 Spring 的注解,表示这个方法里的逻辑执行是有事务性的。
5、延迟消息
@Topic(name = "order")
public class OrderCreated extends Event {private String orderId;private String orderName;private Date createdAt;
}@Transactional
public void send() {EventPublisher.publish(new OrderCreated(), 2, TimeUnit.SECONDS);
}
上面消息发送多了两个参数,表示延迟 2 秒接收。
3.5 消息追踪
只要是通过EventPublisher.publish()方法发送的消息,都可以追踪到这条消息记录。
消息定义了 5 种状态:
- 发送失败(SEND_FAIL):通常消息定义不规范,消息体过大;少数由于网络抖动。
- 已提交(COMMITED):消息总线已收到消息。
- 推送失败(PUSH_FAIL):例如服务已下线。
- 处理失败(HANDLE_FAIL):监听到了消息,但是执行业务逻辑抛出了异常。
- 已处理(HANDLED)

作为消息的发送方,关注的是消息是否发送成功,可通过下面页面查询。

作为消息的接收方,关注的是消息是否正常消费,可通过下面页面查询。

3.6 消息高可靠
对于 5 种状态的消息,处理策略如下:
- 发送失败(SEND_FAIL):自动重试+手动重试,可在消息管理中心手动再发送。
- 已提交(COMMITED):长期处理已提交状态的消息,可能消费方已接收,但状态流转异常,消息总线会定时重试。
- 推送失败(PUSH_FAIL):自动重试+延迟重试。
- 处理失败(HANDLE_FAIL):自动重试默认关闭,由消费方决定是否开启重试。
- 已处理(HANDLED):也可手动重试。

封面
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W6OFOpMz-1680000301507)(https://technotes.oss-cn-shenzhen.aliyuncs.com/2023/%E9%9D%A2%E5%90%91%E4%B8%9A%E5%8A%A1%E7%9A%84%E6%B6%88%E6%81%AF%E6%9C%8D%E5%8A%A1%E8%90%BD%E5%9C%B0%E5%AE%9E%E8%B7%B5.png)]
相关文章
也许你对下面文章也感兴趣。
- 04期:领域驱动设计与微服务
- 学习分享(第3期):你所理解的架构是什么?
相关文章:
05期:面向业务的消息服务落地实践
这里记录的是学习分享内容,文章维护在 Github:studeyang/leanrning-share。 我们在上次分享中聊到了领域驱动设计和微服务,在 DDD 中有一个术语叫做领域事件,例如订单模型中的订单已创建、商品已发货。领域事件会触发下一步的业务…...
代码随想录|day26|回溯算法part03● 39. 组合总和● 40.组合总和II● 131.分割回文串
今天的练习基本就是回溯法组合问题,这一节只要看labuladong即可。 组合问题: 39. 组合总和---------------------形式三,元素无重可复选 链接:代码随想录 一次对,同样在进入下次循环时,注意startindex是从j…...
linux-文件切割-splitcsplit
目录 按大小切割-split 按行数切割-split 按内容切割-csplit 按大小切割-split split -b 10k example.conf -d -a 3 output.file example.conf 被切割的文件 -b 指定切割大小 -d 数字后缀 -a 后缀长度,默认2 output.file …...
USB键盘实现——设备限定描述符(五)
文章目录设备限定描述符仓库地址设备限定描述符介绍设备限定描述符结构体定义获取设备限定描述符的请求标准设备请求USB 控制端点收到的数据设备限定描述符返回附 STM32 枚举日志设备限定描述符 设备限定描述符内容解析和 HID鼠标 一致。 仓库地址 仓库地址 设备限定描述符…...
【C++】map和set(一文拿捏,包教包会)
目录 1.关联式容器和序列式容器 2.键值对 3.树型结构的关联式容器 4.set 5.multiset 6.map 7.multimap 1.关联式容器和序列式容器 set:关联式容器——数据之间关联紧密 线性表(vector,list,deque):序…...
爬虫Day2 正则表达式
爬虫Day2 正则表达式 一、正则表达式 1. 正则的作用 正则表达式是一种可以让复杂的字符串变得简单的工具。 写正则表达式就是用正则符号来描述字符串规则 # 案例1:判断一个字符串是否是一个合法的手机号码 tel 23297293329# 方法1:不用正则 if len…...
LeetCode-0324~28
leetCode1032 思路:想的是维护一个后缀数组,然后用Set去判断一下,结果超时了,去看题解,好家伙AC自动机,没办法,开始学。 正确题解: class ACNode{public ACNode[] children;publi…...
Vue2自己封装的基础组件库或基于Element-ui再次封装的基础组件库,如何发布到npm并使用(支持全局或按需引入使用),超详细
最终效果如下 一、先创建vue2项目 1、 可以用vue-cli自己来创建;也可以直接使用我开源常规的vue2后台管理系统模板 以下我以 wocwin-admin-vue2 项目为例 修改目录结构,最终如下 2、修改vue.config.js文件 module.exports { // 修改 src 目录 为 exam…...
【开发】中间件——MongoDB
MongoDB是一个基于分布式(海量数据存储)文件存储的数据库。 MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的,它支持的数据结构非常松散,是类似json…...
C++进阶 — 【C++11】
目录 一、 C11简介 二、 统一的列表初始化 1.{}初始化 2. initializer_list 三、声明 1. auto 2. decltype 3. nullptr 四、范围for循环 五、STL中一些变化 1. 提供了一些新容器 2.容器中增加了一些新方法 六、右值引用和移动语义 1. 左值引用和右…...
Mac安装Homebrew
1.前往Homebrew官网,复制官网的安装命令 https://brew.sh/ /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"安装结束后,记得仔细看脚本执行最后的提示,需要我们复制两行命令执…...
【详细】利用VS2019创建Web项目,并发送到IIS,以及IIS与ASP.NET配置
一、打开VS2019选择创建新项目【最好以管理员身份运行VS2019,后面发布网站时需要以管理员身份,避免后面还要重启,可以一开始就以管理员身份运行】 二、选择语言为C#,然后选择“ASP.NET Web应用程序(.NET Framework&…...
FasterRcnn,Yolov2,Yolov3中的Label Assignment机制 和 ATSS
一般把anchor到gt之间如何匹配的方法称为label assignment,也就是给预设的anchor打上正负样本等标签,方便我们后续进一步回归。 其实RPN和Yolo有各自的label assignment方法, 在Faster rcnn,yolo,RetinaNet中…...
使用Java技术WebSocket创建聊天、群聊,实现好友列表,添加好友,好友分组,聊天记录查询功能。
文章目录 引入依赖主要代码配置WebSocket创建通讯完整后台项目代码下载WebSocket的由来: 之前只有一个http协议,http协议是请求响应,存在缺陷,就是请求只能由客户端发起,然后请求到服务器,服务器做响应,但是如果服务器状态做了改变,客户端并不能即使的更新,之前的是按照…...
【Redis07】Redis基础:Bitmap 与 HyperLogLog 相关操作
Redis基础学习:Bitmap 与 HyperLogLog 相关操作继续进行 Redis 基础部分的学习,今天我们学习的是两种另外的数据类型。说是数据类型,但其实它们实际上使用的都是 String 类型做为底层基础,只不过是在存储的时候进行了一些特殊的操…...
华为路由器 VRRP主备配置
组网需求 如下图所示,PC1通过SW1双归属到R1和R2。为保证用户的各种业务在网络传输中不中断,需在R1和R2上配置VRRP主备备份功能。 正常情况下,主机以R1为默认网关接入Internet,当R1故障时,R2接替R1作为网关继续进行工作…...
docker容器安装ES
1.拉取镜像 docker pull elasticsearch:6.5.42.修改别名 docker tag [容器ID] es65:6.5.42.启动应用 docker run -it -d -p 9200:9200 -p 9300:9300 --name es -e ES_JAVA_OPTS"-Xms128m -Xmx128m" es65:6.5.43.拷贝配置文件到宿主机 docker cp es:/usr/share/ela…...
Python Module — prompt_toolkit CLI 库
目录 文章目录目录prompt_toolkit示例化历史记录热键自动补全多行输入Python 代码高亮自定义样式prompt_toolkit prompt_toolkit 是一个用于构建 CLI 应用程序的 Python 库,可以让我们轻松地构建强大的交互式命令行应用程序。 自动补全:当用户输入命令…...
springboot mybatis-plus 调用 sqlserver 的 存储过程 返回值问题
问题: 在使用 mybatis-plus 调用sqlserver 存储过程 没有返回值 经过资料查找 注意点 此处使用Map传参,原因在于存储过程的返回值,通常在参数定义中实现,如In 入参、out 出参。 这样当执行后有结果返回时,则可以将结…...
【0180】PG内核读取pg_hba.conf并创建HbaLine记录(1)
文章目录 1. pg_hba.conf文件是什么?2. postmaster何时读取pg_hba.conf?2.1 pg内核使用pg_hba.conf完成客户端认证的原理2.2 读取pg_hba.conf的几个模块3. pg内核读取pg_hba.conf过程3.1 VFD机制获取文件描述符3.2 根据fd读取文件内容相关阅读: 【0178】DBeaver、pgAdmin I…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
