当前位置: 首页 > news >正文

Springcloud-消息总线-Bus

1.消息总线在微服务中的应用

BUS- 消息总线-将消息变更发送给所有的服务节点。
在微服务架构的系统中,通常我们会使用消息代理来构建一个Topic,让所有
服务节点监听这个主题,当生产者向topic中发送变更时,这个主题产生的消息会被
所有实例消费,这就是消息总线的工作模式。也是我们熟悉的发布-订阅模型。
其实广义的消息总线不单指这种“发布-订阅”模型!也可以代指分布式服务间进行通信,
消息分发的单播模式,甚至有的公司既不使用HTTP也不用RPC来构建微服务。完全靠消息
总线来做服务调用。比如,银行老系统采用总线型架构,在不同服务结点之间做消息分发

SpringCloud中的Bus职责范围就相对小了很多,因为还有一个Stream组件代理了大部分的消息中间件通信服务,因此BUS在实际应用中大多是为了应对“消息广播”的场景。比如和config异同搭配使用推送配置信息。

总线式架构的完整流程
在这里插入图片描述
我们要关注一下白底红框那三个和BUS有关系的步骤
MQ/KAFKA:BUS是一个调用封装,它的背后还是需要依赖消息中间件来完成底层的消息
分发,实际项目中最常用的两个中间件分别是RabbitMQ和kafka。
BUS:作为对接上游应用和下游中间件系统的中间层,当接到刷新请求的时候,通知底层中间件向所有服务结点推送消息。
Refresh:类比config-center中可以通过actuator的Refresh请求刷新配置,那么对于总线式架构的ReFresh请求来说,有两个需要解决的问题:
谁来发起变更,服务结点还是由ConfigServer发起变更请求?
何时发起变更-时手工发起变更?还是每次Github改动完成后自动推送?

2.BUS简介

BUS实现:
加入我们所有的节点都订阅了topic(消息组件这个属性刷新这个topic)当你的属性发生变动的时候,只要发送一个广播消息,所有的节点都会消费消息,并且触发刷新动作。

BUS的标签:
BUS只是对消息进行了简单的封装,底层是依赖Stream(专业用来与消息中间件进行通信的组件)来广播消息。

在这里插入图片描述
BUS的两个场景:
配置变更通知;自定义消息广播;

3.BUS体系结构解析

BUS的三个角色:
消息的发布者,是一个中间件;
事件监听者,监听事件动态,各个监听消息的服务节点;
事件主体,配置变更就是事件

事件的架构:
在BUS配置刷新的事件类是RefreshRemoteApplicationEvent。在 BUS的规范下,所有事件都包含三个维度的信息:
**source:**这是一个必填信息,它可以是一个自定义并且能够被序列化反序列化的pojo对象,它包含了一个事件想要传达的信息;
Original Service 消息来源方,通常是事件发布方的机器ID,或者AppId等;
Destination Service 目标机器,Bus会根据Destination Service指定的过滤条件(比如服务名,端口等),只让指定的监听者响应事件;

消息发布者
我们所有的“事件”都是通过Bus来发布的,Bus默认提供了两个Endpoint作为消息发布者:
bus-env:在本地发布EnvironmentChangeRemoteApplicationEvent事件,表示一个远程环境变更事件。进一步查看这个事件的内容,我们发现其中包含了一个Map<String, String>属性,事件监听者接收到这个事件之后,会将事件中的Map添加到Spring环境变量中(由Spring Cloud的EnvironmentManager负责具体处理),从而达到修改环境变量的目的
bus-refresh:发布RefreshRemoteApplicationEvent事件,表示一个远程配置刷新事件,这个事件会触发@RefreshScope注解所修饰的Java类中属性的刷新(@RefreshScope修饰的类可以在运行期更改属性)
以上两个ENDpoint就是BUS通过、actuator服务对外提供出来的

消息监听者:
BUS中默认创建了两个消息监听器,分别对应上面两个消息发布的Endpoints。
在这里插入图片描述
在spring-cloud-context这个依赖中定义了大量的事件。

4.Bus的接入方式RabbitMQ & Kafka

Spring的组件一向是以一种插件式的方式提供功能,将组件自身和我们项目中的业务代码隔离,使得我们更换组件的成本可以降到最低。Spring Cloud Bus也不例外,它的底层消息组件非常容易替换,替换过程不需要对业务代码引入任何变更。Bus就像一道隔离了底层消息组件和业务应用的中间层,比如我们从RabbitMQ切换为Kafka的时候,只需要做两件事就好了:
在项目pom中替换依赖组件;
更改配置文件里的连接信息。

RabbitMQ和Kafka两种消息组件如何接入Bus
接入RabbitMQ
RabbitMQ是实现了AMQP(Advanced Message Queue Protocal)的开源消息代理软件,也是平时项目中应用最广泛的消息分发组件之一。
接入RabbitMQ的方式很简单,我们只要在项目中引入以下依赖:

org.springframework.cloud
spring-cloud-starter-bus-amqp

点进去发现,它还依赖于spring-cloud-starter-stream-rabbit。
也就是说stream组件是被真正用来发送广播消息到RabbitMQ,
BUS只是帮我们封装了整个消息的发布和监听动作!
项目所需要的具体的配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

接入Kafka;
要使用kafka来实现消息代理,只需要把上一步中引入spring-cloud-starter-bus-amqp
依赖替换成spring-cloud-starter-bus-kafka依赖

org.springframework.cloud
spring-cloud-starter-bus-kafka

如果大家的Kafka和ZooKeeper都运行在本地,并且采用了默认配置,那么不需要做任何额外的配置,就可以直接使用。但是在生产环境中往往Kafka和ZooKeeper会部署在不同的环境,所以就需要做一些额外配置:
spring.cloud.stream.kafka.binder.brokers Kafka服务节点(默认localhost)
spring.cloud.stream.kafka.binder.defaultBrokerPort Kafka端口(默认9092)
spring.cloud.stream.kafka.binder.zkNodes ZooKeeper服务节点(默认localhost)
zspring.cloud.stream.kafka.binder.defaultZkPort ZooKeeper端口(默认2181)

5.部分关键源码:

内置事件的架构RefreshRemoteApplicationEvent
刷新事件的发送端-RefreshBusEndpoint

开端:RefreshRemoteApplicationEvent

public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {@SuppressWarnings("unused")private RefreshRemoteApplicationEvent() {// for serializers}public RefreshRemoteApplicationEvent(Object source, String originService,String destinationService) {super(source, originService, destinationService);}

查看find usage:有两个大类:RefreshBusEndpoint以及RefreshListener类。
一个是起点RefreshBusEndpoint,一个是终点RefreshListener。
关注起点:RefreshBusEndpoint

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {super(context, id);}@WriteOperationpublic void busRefreshWithDestination(@Selector String destination) { // TODO:// document// destinationpublish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));}@WriteOperationpublic void busRefresh() {publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));}}

关注到主类的super方法,就是到了RemoteApplicationEvent类

protected RemoteApplicationEvent(Object source, String originService,String destinationService) {super(source);this.originService = originService;if (destinationService == null) {destinationService = "**";}// If the destinationService is not already a wildcard, match everything that// follows// if there at most two path elements, and last element is not a global wildcard// alreadyif (!"**".equals(destinationService)) {if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1&& !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {// All instances of the destination unless specifically requesteddestinationService = destinationService + ":**";}}this.destinationService = destinationService;this.id = UUID.randomUUID().toString();
}

本人进行测试的接口是:
测试的接口是:localhost:60002/actuator/bus-refresh
在这里插入图片描述

研究了发现对于RemoteApplicationEvent就是确定destination!

在RefreshBusEndpoint中,将contex存放在ApplicationEventPublisher里。
这就是ApplicationEventPublisher,用来发布上下文消息的!

接下来到了AbstractApplicationContext中

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");// Decorate event as an ApplicationEvent if necessaryApplicationEvent applicationEvent;if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;}else {applicationEvent = new PayloadApplicationEvent<>(this, event);if (eventType == null) {eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();}}// Multicast right now if possible - or lazily once the multicaster is initializedif (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);}else {getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);}// Publish event via parent context as well...if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);}else {this.parent.publishEvent(event);}}
}

整个过程是事件驱动,编程解耦!

6.如何实现自动推送?Git WebHook

问题:由谁来发起状态的变更请求?
如何通过GitHub的Webhook机制实现自动推送!

Webhook?Git的一种机制,可以用于自动化的构建。
当每次提交代码到Git以后,会触发Webhook执行一段程序,来完成预定义的操作。比如说让钩子通知CI/CD系统从Github拉取最新代码开始执行构建过程或者执行其他操作!

Webhook三步走:
设置encrypt.key;
将上一步中的key添加到Github仓库设置中;
设置Webhook url;
设置encrypt.key,类似属性加解密方式,只需要在application.yml中设置一个key就好!
encrypt:
key: yourKey

自动推送需要注意的问题
无法测试:改动只要一提交就被推送到所有机器,假如不小心修改错了属性,那所有服务器就要团灭了
定点推送:尽管Bus支持在URL中添加目标范围,定向推送到指定机器,但毕竟URL在Webhook里面是写死的,不方便我们根据实际情况做定点推送

相关文章:

Springcloud-消息总线-Bus

1.消息总线在微服务中的应用 BUS- 消息总线-将消息变更发送给所有的服务节点。 在微服务架构的系统中&#xff0c;通常我们会使用消息代理来构建一个Topic&#xff0c;让所有 服务节点监听这个主题&#xff0c;当生产者向topic中发送变更时&#xff0c;这个主题产生的消息会被…...

js 接收回调函数 转换为promise

下面是一个示例代码&#xff0c;展示如何编写一个接收回调函数并将其转换为 Promise 的 JavaScript 函数&#xff1a; // 定义一个接收回调函数并转换为 Promise 的函数 function convertCallbackToPromise(callbackFunction) {// 返回一个新的 Promise 对象return new Promis…...

Python 面试【★★★】

欢迎莅临我的博客 &#x1f49d;&#x1f49d;&#x1f49d;&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...

计算机网络(物理层)

物理层 物理层最核心的工作内容就是解决比特流在线路上传输的问题 基本概念 何为物理层&#xff1f;笼统的讲&#xff0c;就是传输比特流的。 可以着重看一下物理层主要任务的特性 传输媒体 传输媒体举例&#xff1a; 引导型传输媒体 引导型传输媒体指的是信号通过某种…...

OpenGL-ES 学习(6)---- 立方体绘制

目录 立方体绘制基本原理立方体的顶点坐标和绘制顺序立方体颜色和着色器实现效果和参考代码 立方体绘制基本原理 一个立方体是由8个顶点组成&#xff0c;共6个面&#xff0c;所以绘制立方体本质上就是绘制这6个面共12个三角形 顶点的坐标体系如下图所示&#xff0c;三维坐标…...

《数据结构与算法基础 by王卓老师》学习笔记——类C语言有关操作补充

1.元素类型说明 2.数组定义 3.C语言的内存动态分配 4..C中的参数传递 5.传值方式 6.传地址方式 例子...

高频面试题基本总结回顾2(含笔试高频算法整理)

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…...

《深入浅出MySQL:数据库开发、优化与管理维护(第3版)》

深入浅出MySQL sql执行流程第一步&#xff1a;通过连接器进行连接第二步&#xff1a;解析器解析 SQL第三步&#xff1a;执行SQL 行记录存储格式行溢出日志数据库三大范式第一范式第二范式第三范式 索引索引分类B树索引BTree vs Hash需要索引1、字段需要频繁的查询操作2、字段用…...

VBA技术资料MF171:创建指定工作表数的工作簿

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…...

【效率提升】新一代效率工具平台utools

下载地址&#xff1a;utools uTools这款软件&#xff0c;是一款功能强大且高度可定制的效率神器&#xff0c;使用快捷键alt space(空格) 随时调用&#xff0c;支持调用系统应用、用户安装应用和市场插件等。 utools可以调用系统设置和内置应用&#xff0c;这样可以方便快捷的…...

Jmeter插件管理器,websocket协议,Jmeter连接数据库,测试报告的查看

目录 1、Jmeter插件管理器 1、Jmeter插件管理器用处&#xff1a;Jmeter发展并产生大量优秀的插件&#xff0c;比如取样器、性能监控的插件工具等。但要安装这些优秀的插件&#xff0c;需要先安装插件管理器。 2、插件的下载&#xff0c;从Availabale Plugins中选择&#xff…...

Android中ViewModel+LiveData+DataBinding的配合使用(kotlin)

Android 中 ViewModel、LiveData 和 Data Binding 的配合使用&#xff08;Kotlin&#xff09; 摘要 本文将介绍如何在 Android 开发中结合使用 ViewModel、LiveData 和 Data Binding 进行数据绑定和状态更新。我们将详细探讨这三者之间的关系&#xff0c;并展示如何在 Kotlin…...

Elasticsearch 避免常见查询错误和陷阱

Elasticsearch 作为一款强大的搜索引擎和分析工具&#xff0c;已经被广泛应用于各种场景中。然而&#xff0c;在使用 Elasticsearch 进行查询时&#xff0c;如果不注意一些常见的错误和陷阱&#xff0c;可能会导致查询效率低下、结果不准确甚至系统性能下降。本文旨在总结一些常…...

【PyQt】20-QTimer(动态显示时间、定时关闭)

QTimer 前言一、QTimer介绍二、动态时间展示2.1 代码2.2 运行结果 三、定时关闭3.1 介绍他的两种用法1、使用函数或Lambda表达式2、带有定时器类型&#xff08;高级&#xff09; 3.2 代码3.3 运行结果 总结 前言 好久没学习了。 一、QTimer介绍 pyqt里面的多线程可以有两种实…...

[深度学习] 自编码器Autoencoder

自编码器&#xff08;Autoencoder&#xff09;是一种无监督学习算法&#xff0c;主要用于数据的降维、特征提取和数据重建。自编码器由两个主要部分组成&#xff1a;编码器&#xff08;Encoder&#xff09;和解码器&#xff08;Decoder&#xff09;。其基本思想是将输入数据映射…...

模型微调、智能体、知识库之间的区别

使用开源模型微调和使用知识库与智能体&#xff08;agent&#xff09;的区别主要体现在工作原理、应用场景和实现目标上。以下是对这三者的详细对比&#xff1a; 开源模型微调 定义&#xff1a; 微调是对预训练模型&#xff08;例如BERT、GPT等&#xff09;进行额外训练&…...

七日世界Once Human跳ping、延迟高、丢包怎么办?

七日世界是一款开放世界为轴点的生存射击游戏&#xff0c;玩家将进入一个荒诞、荒芜的末日世界&#xff0c;在这里与好友一起对抗可怖的怪物和神秘物质星尘的入侵&#xff0c;给这个星球留下最后的希望&#xff0c;共筑一片安全的领地。不过有部分玩家在游玩七日世界的时候&…...

机器人控制系列教程之关节空间运动控制器搭建(1)

机器人位置控制类型 机器人位置控制分为两种类型&#xff1a; 关节空间运动控制—在这种情况下&#xff0c;机器人的位置输入被指定为一组关节角度或位置的向量&#xff0c;这被称为机器人的关节配置&#xff0c;记作q。控制器跟踪一个参考配置&#xff0c;记作 q r e f q_{re…...

[linux]sed命令基础入门详解

sed是一种流编辑器&#xff0c;它一次处理一行内容。处理时&#xff0c;把当前处理的行存储在临时缓冲区中&#xff0c;称为“模式空间”&#xff0c;接着用sed命令处理缓冲区中的内容&#xff0c;处理完成后&#xff0c;把缓冲区的内容送往屏幕。接着处理下一行&#xff0c;这…...

Charles抓包工具系列文章(一)-- Compose 拼接http请求

一、背景 众所周知&#xff0c;Charles是一款抓包工具&#xff0c;当然是http协议&#xff0c;不支持tcp。&#xff08;如果你想要抓tcp包&#xff0c;请转而使用wireshark&#xff0c;在讲述websocket的相关技术有梳理过wireshark抓包&#xff09; 话说回来&#xff0c;char…...

OLMo:真正完全开源的大模型

最近&#xff0c;又有一家机构AI2&#xff08;Allen Institute for AI&#xff09;开源了一个LLM&#xff1a;OLMo&#xff0c;它的英文全称就叫Open Language Model。相比之前开源的大模型&#xff0c;OLMo的独特之处是完全开源&#xff0c;除了训练的模型&#xff0c;OLMo还开…...

51单片机STC89C52RC——12.1 数据存储芯片AT24C02

目的/效果 利用存储芯片AT24C02存储数据&#xff0c;LCD1602显示存储的数据。 一&#xff0c;STC单片机模块 二&#xff0c;AT24C02存储芯片 2.1 介绍 AT24C02是一个2K位串行CMOS E2PROM&#xff0c;内部含有256个8位字节&#xff0c;采用先进CMOS技术实质上减少了器件的功…...

融入云端的心跳:在Spring Cloud应用中集成Eureka Client

融入云端的心跳&#xff1a;在Spring Cloud应用中集成Eureka Client 引言 在微服务架构中&#xff0c;服务发现是一个关键组件&#xff0c;它允许服务实例之间相互发现并通信。Netflix Eureka是Spring Cloud体系中广泛使用的服务发现框架。Eureka提供了一个服务注册中心&…...

CocosCreator构建IOS的wwise教程

CocosCreator构建IOS教程 添加wwise教程: 1.添加include 2.添加SoundEngine 3.添加Profile-iphoneos下面lib下面的.a 4.导入js调用C++的文件 5.导入这些文件 6.初始化ios绝对路径和TTS语音合成对象 6.获得根目录绝对路径,加载pck需要找到绝对路径。怎么找绝对路径? #impor…...

掌握 SQL Server 中的 FLOOR 函数:数据舍入的艺术

掌握 SQL Server 中的 FLOOR 函数&#xff1a;数据舍入的艺术 引言 在数据分析和处理中&#xff0c;我们经常需要对数值进行精确控制&#xff0c;比如将数值舍入到特定的精度。SQL Server 提供了多种数学函数&#xff0c;其中 FLOOR 函数就是用来执行向上舍入操作的强大工具。…...

【折腾笔记】兰空图床使用Redis做缓存

前言 最近发现我部署在群晖NAS上的兰空图床程序在高并发的情况下会导致图片加载缓慢或出现图片加载失败的情况&#xff0c;于是我查阅了官方文档资料并进行了一系列的测试&#xff0c;发现兰空图床如果开启了原图保护功能&#xff0c;会非常的吃CPU的性能&#xff0c;尤其是在…...

【Ubuntu】如何用指令设置静态IP

这里介绍的是利用netplan 的配置文件一般在 /etc/netplan/ 目录下&#xff0c;文件名类似 01-network-manager-all.yaml。 用 nano/ vim 编辑器打开配置文件&#xff1a; sudo nano /etc/netplan/01-network-manager-all.yaml # 替换成你的文件名修改配置文件 network:versi…...

mechanize - 自动化与HTTP web服务器的交互操作

1、前言 随着自动化测试的普及与落地推广&#xff0c;出现了众多知名的自动化测试工具&#xff0c;如Selenium 、Robot Framework、Playwright等。本文将介绍一款在Python环境下的mechanize库&#xff0c;这个库能够模拟浏览器行为&#xff0c;支持发送HTTP请求、解析HTML页面和…...

【Android】保留elevation层级效果,舍弃阴影效果

关于elevation属性 elevation是高度&#xff0c;海拔的意思 该属性可以设置View在父容器中的层级&#xff0c;即z属性 当view的elevation高于其它view时&#xff0c;它将显示在最上方&#xff0c;并产生阴影效果 关闭阴影效果 view的高度阴影&#xff0c;通过outlineProvi…...

Java新手启航:Windows下JDK安装,开启编程之旅

你是不是对编程充满好奇&#xff0c;想要迈入Java的世界&#xff0c;却不知道从何开始&#xff1f;别担心&#xff0c;每一个Java大师都是从安装JDK开始的&#xff0c;而今天&#xff0c;我将手把手教你如何轻松完成JDK的安装&#xff0c;让你迈出编程之旅的第一步! 接下来&am…...