当前位置: 首页 > news >正文

使用WebSocket实现log日志流的实时展示-从轮询到通知

场景介绍

最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
在这里插入图片描述

WebSocket介绍

参考:https://liaoxuefeng.com/books/java/spring/web/websocket/

WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。

由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:

GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade

这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。

现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

在这里插入图片描述

实践演示

Java后端

我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。

step1: 添加websocket依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
step2: 增加配置

这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类

@Configuration
@EnableWebSocket
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();// 将会话放入到线程池中,异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;@Autowired@Qualifier("wsThreadPoolExecutor")public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null != wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;}}@Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null != chaosLogger) {LogWsEndpoint.chaosLogger = chaosLogger;}}@OnOpenpublic void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {this.bizKey = String.format("%s-%s", bizType, bizId);log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);this.handler = new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) -> {List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;ends.add(this);return ends;});log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}@OnClosepublic void onClose() {log.info("websocket log server close");if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) -> {if (null != endpoints) {endpoints.remove(this);}return endpoints;});log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);}@OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);}@OnErrorpublic void onError(Session session, Throwable error) {log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);}@Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}

==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。

核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。

@Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;@Getterprivate final Session session;private final AtomicLong offset = new AtomicLong(-1L);private final AtomicBoolean hasTruncated = new AtomicBoolean(false);private final AtomicLong waitEventCnt = new AtomicLong(0L);private final Lock lock = new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger = chaosLogger;this.session = session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey = bizType + "-" + bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);return;}try {if (!force && waitEventCnt.getAndSet(0L) < 1) {log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);// 没有待处理的事件return;}log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());if (offset.get() < 0) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);if (contentLength == 0) {return;}if (contentLength > READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);if (contentLength <= offset.get()) {log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line = null;while (null != (line = reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info("[ws-chaos-log]hasTruncated changed to false");} else {log.info("[ws-chaos-log]send ws msg:{}", line);try {session.getBasicRemote().sendText(line + "\n");} catch (IllegalStateException e) {log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);}}// +1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);}} catch (IOException e) {log.error("", e);}} catch (NotFoundException e) {log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);} catch (RuntimeException e) {log.error("", e);} finally {lock.unlock();}log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);// 在处理过程中,可能又有新的事件,所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}
stept5: 广播事件,全局监听

前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?

这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。

@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {@Autowired(required = false)private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();@Overridepublic void onMessage(String message) {log.info("[MQ]receive ChaosLogEvent message:{}", message);ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);}}}
}

前端代码实现

以react为例,仅供参考:

export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {if (!bizType || !bizId) {console.log('fetchLogs: logContextToken or node is null')return}setLogs([])if (logsRef.current[0]) {console.log('close ws')logsRef.current[0].close()}let host = wsHost ? wsHost : window.location.hostlet protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)logsRef.current = [client, []]// 报错的回调函数client.onerror = (e: any) => {console.log('Connection Error')console.log(e)}//链接打开的回调函数client.onopen = () => {console.log('WebSocket Client Connected')}//链接关闭的回调函数client.onclose = () => {console.log('echo-protocol Client Closed')}//收到消息的处理函数client.onmessage = (e: any) => {if (logsRef.current[0] === client) {if (typeof e.data === 'string') {let newLogs = [...logsRef.current[1], e.data]if (newLogs.length > 250) {newLogs = newLogs.slice(200)}setLogs(newLogs)logsRef.current = [client, newLogs]}} else {client.close()}}const sendPing = () => {if (logsRef.current[0] === client) {const data = { message: 'heartbeat' }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}

相关文章:

使用WebSocket实现log日志流的实时展示-从轮询到通知

场景介绍 最近开发一个系统&#xff0c;其中一个模块需要展示实时的执行过程&#xff0c;过程日志可能比较多。以前的方案都是前端定时轮询&#xff0c;比如每秒查一次后端接口&#xff0c;将拉取回来的日志重新展示。轮询方案简单容易实现&#xff0c;但是比较消耗资源&#…...

UE5 从零开始制作跟随的大鹅

文章目录 二、绑定骨骼三、创建 ControlRig四、创建动画五、创建动画蓝图六、自动寻路七、生成 goose八、碰撞 和 Physics Asset缺点 # 一、下载模型 首先我们需要下载一个静态网格体&#xff0c;这里我们可以从 Sketchfab 中下载&#xff1a;Goose Low Poly - Download Free …...

O’Reilly

--江上往来人&#xff0c;但爱鲈鱼美。 --君看一叶舟&#xff0c;出没风波里。 OReilly OReilly出版社出版的技术类图书 俗称动物系列 应该是每个技术人员的必备手册。 OReilly动物系列&#xff08;中译本&#xff09; 简介" 动物系列作为 OReilly 书籍的典型代表被普遍…...

优盘驱动器未格式化:数据拯救行动指南

优盘困境&#xff1a;驱动器未格式化的挑战 在日常的数据存储与传输中&#xff0c;优盘以其便携性和高容量成为了我们不可或缺的伙伴。然而&#xff0c;当您尝试访问优盘时&#xff0c;突然弹出的“驱动器未被格式化”提示却如同晴天霹雳&#xff0c;让人措手不及。这一状况不…...

4.Handler mappings

处理程序映射 简介 在早期版本的 Spring 中&#xff0c;用户需要在 Web 应用程序上下文中定义一个或多个 HandlerMapping bean 以将传入的 Web 请求映射到适当的处理程序。随着注解控制器的引入&#xff0c;通常不再需要这样做&#xff0c;因为 RequestMappingHandlerMapping…...

《学会 SpringMVC 系列 · 消息转换器 MessageConverters》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…...

深度学习项目 -7-使用 Python 的手写数字识别

一、前言 该文章仅作为个人学习使用 二、正文 项目源代码&#xff1a;深度学习项目 - 使用 Python 进行手写数字识别 - DataFlair (data-flair.training) 数据集&#xff1a;​​​​​​​https://drive.google.com/open?id1hJiOlxctFH3uL2yTqXU_1f6c0zLr8V_K Python 深…...

MySQL —— 库,数据类型 与 表

库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行&#xff0c;这里是表示的是有效的数据&#xff0c;不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒&#xff0c;一般情况…...

Java重修笔记 第二十七天 匿名内部类

匿名内部类 1. 定义&#xff1a;无类名&#xff08;底层自动分配类名“外部类名$1”&#xff09;&#xff0c;既是类也是对象&#xff0c;定义在外部类的局部位置&#xff0c;例如方法体和代码块中&#xff0c;通过new类或接口并在大括号里重写方法来实现。 2. 使用场景&…...

Nero Lens 智图 - 适用于 iOS 和 iPadOS 的专业图片处理 App

首先是手机端的无损放大 App&#xff1a;Nero Lens 智图&#xff0c;适用于 iOS 和 iPadOS&#xff0c;不仅可以放大&#xff0c;还有多种 AI 图片增强功能。 使用这款 App 可以通过 AI 模型智能放大可达 400%&#xff0c;还有老照片去划痕、上色&#xff0c;抠图移除背景、照…...

Nginx代理路径被吃

Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源&#xff0c;有时正常代理&#xff0c;发现代理不过去。 验证被吃调location情况 通过浏览器访问&#xff1a; https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…...

pytest-html报告修改与汉化

前言 Pytest框架可以使用两种测试报告&#xff0c;其中一种就是使用pytest-html插件生成的测试报告&#xff0c;但是报告中有一些信息没有什么用途或者显示的不太好看&#xff0c;还有一些我们想要在报告中展示的信息却没有&#xff0c;最近又有人问我pytest-html生成的报告&a…...

react-native从入门到实战系列教程一Swiper组件的使用及bug修复

轮播图&#xff0c;在app中随处可见&#xff0c;这么重要的功能我们怎么可能不学习下在react-native中的实现方式。 依然是第三方组件react-native-swiper 官网地址 https://www.npmjs.com/package/react-native-swiper 组件使用的组件及事件参考官方即可。 实现效果 官网…...

springboot开发的常用注解总结-配置组件类注解

Spring Boot 提供了许多注解&#xff0c;这些注解大大简化了 Spring 应用的配置和开发过程。以下是一些常见的 Spring Boot注解及其作用。 目录 配置组件类 &#xff08;Configure Component &#xff09;Configuration解释&#xff1a;Demo Code&#xff1a;更深度使用&#x…...

DataX 最新版本安装部署

1、下载 git clone gitgithub.com:alibaba/DataX.git 2、打包 mvn -U clean package assembly:assembly -Dmaven.test.skiptrue...

【架构】应用保护

这篇文章总结一下应用保护的手段。如今说到应用保护&#xff0c;更多的会想到阿里的sentinel&#xff0c;手段丰富&#xff0c;应用简单。sentinel的限流、降级、熔断&#xff0c;可以自己去试一下&#xff0c;sentinel主要通过配置实现功能&#xff0c;不难。sentinel的简介放…...

从核心到边界:六边形、洋葱与COLA架构的深度解析

文章目录 1 引言2 软件架构3 架构分类4 典型的应用架构4.1 分层架构4.2 CQRS4.3 六边形架构4.4 洋葱架构4.5 DDD 5 COLA架构设计5.1 分层设计5.2 扩展设计5.3 规范设计5.3.1 组件规范5.3.2 包规范5.3.3 命名规范 6 COLA架构总览7 小结 1 引言 软件的首要技术使命&#xff1a;管…...

04-Fastjson反序列化漏洞

免责声明 本文仅限于学习讨论与技术知识的分享&#xff0c;不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;本文作者不为此承担任何责任&#xff0c;一旦造成后果请自行承担&…...

ABC365(A-D)未补

A - Leap Year&#xff08;模拟&#xff09; 题意&#xff1a;给定一个数字n&#xff0c;如果n不是4的倍数&#xff0c;输出365&#xff1b;如果n是4的倍数但不是100的倍数&#xff0c;输出366&#xff1b;如果n是100的倍数但不是400的倍数&#xff0c;输出365&#xff1b;如果…...

Python用png生成不同尺寸的图标

Kimi生成 from PIL import Imagedef generate_icon(source_image_path, output_image_path, size):with Image.open(source_image_path) as img:# 转换图片为RGBA模式&#xff0c;确保有透明通道if img.mode ! RGBA:img img.convert(RGBA)# 调整图片大小到指定尺寸img img.r…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...