当前位置: 首页 > news >正文

Springcloud-消息总线-Bus

1.消息总线在微服务中的应用

BUS- 消息总线-将消息变更发送给所有的服务节点。
在微服务架构的系统中,通常我们会使用消息代理来构建一个Topic,让所有
服务节点监听这个主题,当生产者向topic中发送变更时,这个主题产生的消息会被
所有实例消费,这就是消息总线的工作模式。也是我们熟悉的发布-订阅模型。
其实广义的消息总线不单指这种“发布-订阅”模型!也可以代指分布式服务间进行通信,
消息分发的单播模式,甚至有的公司既不使用HTTP也不用RPC来构建微服务。完全靠消息
总线来做服务调用。比如,银行老系统采用总线型架构,在不同服务结点之间做消息分发

SpringCloud中的Bus职责范围就相对小了很多,因为还有一个Stream组件代理了大部分的消息中间件通信服务,因此BUS在实际应用中大多是为了应对“消息广播”的场景。比如和config异同搭配使用推送配置信息。

总线式架构的完整流程
在这里插入图片描述
我们要关注一下白底红框那三个和BUS有关系的步骤
MQ/KAFKA:BUS是一个调用封装,它的背后还是需要依赖消息中间件来完成底层的消息
分发,实际项目中最常用的两个中间件分别是RabbitMQ和kafka。
BUS:作为对接上游应用和下游中间件系统的中间层,当接到刷新请求的时候,通知底层中间件向所有服务结点推送消息。
Refresh:类比config-center中可以通过actuator的Refresh请求刷新配置,那么对于总线式架构的ReFresh请求来说,有两个需要解决的问题:
谁来发起变更,服务结点还是由ConfigServer发起变更请求?
何时发起变更-时手工发起变更?还是每次Github改动完成后自动推送?

2.BUS简介

BUS实现:
加入我们所有的节点都订阅了topic(消息组件这个属性刷新这个topic)当你的属性发生变动的时候,只要发送一个广播消息,所有的节点都会消费消息,并且触发刷新动作。

BUS的标签:
BUS只是对消息进行了简单的封装,底层是依赖Stream(专业用来与消息中间件进行通信的组件)来广播消息。

在这里插入图片描述
BUS的两个场景:
配置变更通知;自定义消息广播;

3.BUS体系结构解析

BUS的三个角色:
消息的发布者,是一个中间件;
事件监听者,监听事件动态,各个监听消息的服务节点;
事件主体,配置变更就是事件

事件的架构:
在BUS配置刷新的事件类是RefreshRemoteApplicationEvent。在 BUS的规范下,所有事件都包含三个维度的信息:
**source:**这是一个必填信息,它可以是一个自定义并且能够被序列化反序列化的pojo对象,它包含了一个事件想要传达的信息;
Original Service 消息来源方,通常是事件发布方的机器ID,或者AppId等;
Destination Service 目标机器,Bus会根据Destination Service指定的过滤条件(比如服务名,端口等),只让指定的监听者响应事件;

消息发布者
我们所有的“事件”都是通过Bus来发布的,Bus默认提供了两个Endpoint作为消息发布者:
bus-env:在本地发布EnvironmentChangeRemoteApplicationEvent事件,表示一个远程环境变更事件。进一步查看这个事件的内容,我们发现其中包含了一个Map<String, String>属性,事件监听者接收到这个事件之后,会将事件中的Map添加到Spring环境变量中(由Spring Cloud的EnvironmentManager负责具体处理),从而达到修改环境变量的目的
bus-refresh:发布RefreshRemoteApplicationEvent事件,表示一个远程配置刷新事件,这个事件会触发@RefreshScope注解所修饰的Java类中属性的刷新(@RefreshScope修饰的类可以在运行期更改属性)
以上两个ENDpoint就是BUS通过、actuator服务对外提供出来的

消息监听者:
BUS中默认创建了两个消息监听器,分别对应上面两个消息发布的Endpoints。
在这里插入图片描述
在spring-cloud-context这个依赖中定义了大量的事件。

4.Bus的接入方式RabbitMQ & Kafka

Spring的组件一向是以一种插件式的方式提供功能,将组件自身和我们项目中的业务代码隔离,使得我们更换组件的成本可以降到最低。Spring Cloud Bus也不例外,它的底层消息组件非常容易替换,替换过程不需要对业务代码引入任何变更。Bus就像一道隔离了底层消息组件和业务应用的中间层,比如我们从RabbitMQ切换为Kafka的时候,只需要做两件事就好了:
在项目pom中替换依赖组件;
更改配置文件里的连接信息。

RabbitMQ和Kafka两种消息组件如何接入Bus
接入RabbitMQ
RabbitMQ是实现了AMQP(Advanced Message Queue Protocal)的开源消息代理软件,也是平时项目中应用最广泛的消息分发组件之一。
接入RabbitMQ的方式很简单,我们只要在项目中引入以下依赖:

org.springframework.cloud
spring-cloud-starter-bus-amqp

点进去发现,它还依赖于spring-cloud-starter-stream-rabbit。
也就是说stream组件是被真正用来发送广播消息到RabbitMQ,
BUS只是帮我们封装了整个消息的发布和监听动作!
项目所需要的具体的配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

接入Kafka;
要使用kafka来实现消息代理,只需要把上一步中引入spring-cloud-starter-bus-amqp
依赖替换成spring-cloud-starter-bus-kafka依赖

org.springframework.cloud
spring-cloud-starter-bus-kafka

如果大家的Kafka和ZooKeeper都运行在本地,并且采用了默认配置,那么不需要做任何额外的配置,就可以直接使用。但是在生产环境中往往Kafka和ZooKeeper会部署在不同的环境,所以就需要做一些额外配置:
spring.cloud.stream.kafka.binder.brokers Kafka服务节点(默认localhost)
spring.cloud.stream.kafka.binder.defaultBrokerPort Kafka端口(默认9092)
spring.cloud.stream.kafka.binder.zkNodes ZooKeeper服务节点(默认localhost)
zspring.cloud.stream.kafka.binder.defaultZkPort ZooKeeper端口(默认2181)

5.部分关键源码:

内置事件的架构RefreshRemoteApplicationEvent
刷新事件的发送端-RefreshBusEndpoint

开端:RefreshRemoteApplicationEvent

public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {@SuppressWarnings("unused")private RefreshRemoteApplicationEvent() {// for serializers}public RefreshRemoteApplicationEvent(Object source, String originService,String destinationService) {super(source, originService, destinationService);}

查看find usage:有两个大类:RefreshBusEndpoint以及RefreshListener类。
一个是起点RefreshBusEndpoint,一个是终点RefreshListener。
关注起点:RefreshBusEndpoint

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {super(context, id);}@WriteOperationpublic void busRefreshWithDestination(@Selector String destination) { // TODO:// document// destinationpublish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));}@WriteOperationpublic void busRefresh() {publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));}}

关注到主类的super方法,就是到了RemoteApplicationEvent类

protected RemoteApplicationEvent(Object source, String originService,String destinationService) {super(source);this.originService = originService;if (destinationService == null) {destinationService = "**";}// If the destinationService is not already a wildcard, match everything that// follows// if there at most two path elements, and last element is not a global wildcard// alreadyif (!"**".equals(destinationService)) {if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1&& !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {// All instances of the destination unless specifically requesteddestinationService = destinationService + ":**";}}this.destinationService = destinationService;this.id = UUID.randomUUID().toString();
}

本人进行测试的接口是:
测试的接口是:localhost:60002/actuator/bus-refresh
在这里插入图片描述

研究了发现对于RemoteApplicationEvent就是确定destination!

在RefreshBusEndpoint中,将contex存放在ApplicationEventPublisher里。
这就是ApplicationEventPublisher,用来发布上下文消息的!

接下来到了AbstractApplicationContext中

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");// Decorate event as an ApplicationEvent if necessaryApplicationEvent applicationEvent;if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;}else {applicationEvent = new PayloadApplicationEvent<>(this, event);if (eventType == null) {eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();}}// Multicast right now if possible - or lazily once the multicaster is initializedif (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);}else {getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);}// Publish event via parent context as well...if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);}else {this.parent.publishEvent(event);}}
}

整个过程是事件驱动,编程解耦!

6.如何实现自动推送?Git WebHook

问题:由谁来发起状态的变更请求?
如何通过GitHub的Webhook机制实现自动推送!

Webhook?Git的一种机制,可以用于自动化的构建。
当每次提交代码到Git以后,会触发Webhook执行一段程序,来完成预定义的操作。比如说让钩子通知CI/CD系统从Github拉取最新代码开始执行构建过程或者执行其他操作!

Webhook三步走:
设置encrypt.key;
将上一步中的key添加到Github仓库设置中;
设置Webhook url;
设置encrypt.key,类似属性加解密方式,只需要在application.yml中设置一个key就好!
encrypt:
key: yourKey

自动推送需要注意的问题
无法测试:改动只要一提交就被推送到所有机器,假如不小心修改错了属性,那所有服务器就要团灭了
定点推送:尽管Bus支持在URL中添加目标范围,定向推送到指定机器,但毕竟URL在Webhook里面是写死的,不方便我们根据实际情况做定点推送

相关文章:

Springcloud-消息总线-Bus

1.消息总线在微服务中的应用 BUS- 消息总线-将消息变更发送给所有的服务节点。 在微服务架构的系统中&#xff0c;通常我们会使用消息代理来构建一个Topic&#xff0c;让所有 服务节点监听这个主题&#xff0c;当生产者向topic中发送变更时&#xff0c;这个主题产生的消息会被…...

js 接收回调函数 转换为promise

下面是一个示例代码&#xff0c;展示如何编写一个接收回调函数并将其转换为 Promise 的 JavaScript 函数&#xff1a; // 定义一个接收回调函数并转换为 Promise 的函数 function convertCallbackToPromise(callbackFunction) {// 返回一个新的 Promise 对象return new Promis…...

Python 面试【★★★】

欢迎莅临我的博客 &#x1f49d;&#x1f49d;&#x1f49d;&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...

计算机网络(物理层)

物理层 物理层最核心的工作内容就是解决比特流在线路上传输的问题 基本概念 何为物理层&#xff1f;笼统的讲&#xff0c;就是传输比特流的。 可以着重看一下物理层主要任务的特性 传输媒体 传输媒体举例&#xff1a; 引导型传输媒体 引导型传输媒体指的是信号通过某种…...

OpenGL-ES 学习(6)---- 立方体绘制

目录 立方体绘制基本原理立方体的顶点坐标和绘制顺序立方体颜色和着色器实现效果和参考代码 立方体绘制基本原理 一个立方体是由8个顶点组成&#xff0c;共6个面&#xff0c;所以绘制立方体本质上就是绘制这6个面共12个三角形 顶点的坐标体系如下图所示&#xff0c;三维坐标…...

《数据结构与算法基础 by王卓老师》学习笔记——类C语言有关操作补充

1.元素类型说明 2.数组定义 3.C语言的内存动态分配 4..C中的参数传递 5.传值方式 6.传地址方式 例子...

高频面试题基本总结回顾2(含笔试高频算法整理)

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…...

《深入浅出MySQL:数据库开发、优化与管理维护(第3版)》

深入浅出MySQL sql执行流程第一步&#xff1a;通过连接器进行连接第二步&#xff1a;解析器解析 SQL第三步&#xff1a;执行SQL 行记录存储格式行溢出日志数据库三大范式第一范式第二范式第三范式 索引索引分类B树索引BTree vs Hash需要索引1、字段需要频繁的查询操作2、字段用…...

VBA技术资料MF171:创建指定工作表数的工作簿

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…...

【效率提升】新一代效率工具平台utools

下载地址&#xff1a;utools uTools这款软件&#xff0c;是一款功能强大且高度可定制的效率神器&#xff0c;使用快捷键alt space(空格) 随时调用&#xff0c;支持调用系统应用、用户安装应用和市场插件等。 utools可以调用系统设置和内置应用&#xff0c;这样可以方便快捷的…...

Jmeter插件管理器,websocket协议,Jmeter连接数据库,测试报告的查看

目录 1、Jmeter插件管理器 1、Jmeter插件管理器用处&#xff1a;Jmeter发展并产生大量优秀的插件&#xff0c;比如取样器、性能监控的插件工具等。但要安装这些优秀的插件&#xff0c;需要先安装插件管理器。 2、插件的下载&#xff0c;从Availabale Plugins中选择&#xff…...

Android中ViewModel+LiveData+DataBinding的配合使用(kotlin)

Android 中 ViewModel、LiveData 和 Data Binding 的配合使用&#xff08;Kotlin&#xff09; 摘要 本文将介绍如何在 Android 开发中结合使用 ViewModel、LiveData 和 Data Binding 进行数据绑定和状态更新。我们将详细探讨这三者之间的关系&#xff0c;并展示如何在 Kotlin…...

Elasticsearch 避免常见查询错误和陷阱

Elasticsearch 作为一款强大的搜索引擎和分析工具&#xff0c;已经被广泛应用于各种场景中。然而&#xff0c;在使用 Elasticsearch 进行查询时&#xff0c;如果不注意一些常见的错误和陷阱&#xff0c;可能会导致查询效率低下、结果不准确甚至系统性能下降。本文旨在总结一些常…...

【PyQt】20-QTimer(动态显示时间、定时关闭)

QTimer 前言一、QTimer介绍二、动态时间展示2.1 代码2.2 运行结果 三、定时关闭3.1 介绍他的两种用法1、使用函数或Lambda表达式2、带有定时器类型&#xff08;高级&#xff09; 3.2 代码3.3 运行结果 总结 前言 好久没学习了。 一、QTimer介绍 pyqt里面的多线程可以有两种实…...

[深度学习] 自编码器Autoencoder

自编码器&#xff08;Autoencoder&#xff09;是一种无监督学习算法&#xff0c;主要用于数据的降维、特征提取和数据重建。自编码器由两个主要部分组成&#xff1a;编码器&#xff08;Encoder&#xff09;和解码器&#xff08;Decoder&#xff09;。其基本思想是将输入数据映射…...

模型微调、智能体、知识库之间的区别

使用开源模型微调和使用知识库与智能体&#xff08;agent&#xff09;的区别主要体现在工作原理、应用场景和实现目标上。以下是对这三者的详细对比&#xff1a; 开源模型微调 定义&#xff1a; 微调是对预训练模型&#xff08;例如BERT、GPT等&#xff09;进行额外训练&…...

七日世界Once Human跳ping、延迟高、丢包怎么办?

七日世界是一款开放世界为轴点的生存射击游戏&#xff0c;玩家将进入一个荒诞、荒芜的末日世界&#xff0c;在这里与好友一起对抗可怖的怪物和神秘物质星尘的入侵&#xff0c;给这个星球留下最后的希望&#xff0c;共筑一片安全的领地。不过有部分玩家在游玩七日世界的时候&…...

机器人控制系列教程之关节空间运动控制器搭建(1)

机器人位置控制类型 机器人位置控制分为两种类型&#xff1a; 关节空间运动控制—在这种情况下&#xff0c;机器人的位置输入被指定为一组关节角度或位置的向量&#xff0c;这被称为机器人的关节配置&#xff0c;记作q。控制器跟踪一个参考配置&#xff0c;记作 q r e f q_{re…...

[linux]sed命令基础入门详解

sed是一种流编辑器&#xff0c;它一次处理一行内容。处理时&#xff0c;把当前处理的行存储在临时缓冲区中&#xff0c;称为“模式空间”&#xff0c;接着用sed命令处理缓冲区中的内容&#xff0c;处理完成后&#xff0c;把缓冲区的内容送往屏幕。接着处理下一行&#xff0c;这…...

Charles抓包工具系列文章(一)-- Compose 拼接http请求

一、背景 众所周知&#xff0c;Charles是一款抓包工具&#xff0c;当然是http协议&#xff0c;不支持tcp。&#xff08;如果你想要抓tcp包&#xff0c;请转而使用wireshark&#xff0c;在讲述websocket的相关技术有梳理过wireshark抓包&#xff09; 话说回来&#xff0c;char…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型

CVPR 2025 | MIMO&#xff1a;支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题&#xff1a;MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者&#xff1a;Yanyuan Chen, Dexuan Xu, Yu Hu…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...