Springboot项目使用原生Websocket
目录
- 1.启用Websocket功能
- 2.封装操作websocket session的工具
- 3.保存websocket session的接口
- 4.保存websocket session的类
- 5.定义websocket 端点
- 6.创建定时任务 ping websocket 客户端
1.启用Websocket功能
package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}}
2.封装操作websocket session的工具
package com.xxx.robot.websocket.util;import java.util.Map;import javax.websocket.Session;import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;public final class WebSocketSessionUtils {private WebSocketSessionUtils() {}public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;/*** websocket block 发送超时 毫秒*/public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;/*** 从 websocket session 中找到登录用户* 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User* LoginUser、User 从业务层自定义的类* 项目中使用了spring security框架*/public static User findUser (Session session) {UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();LoginUser loginUser = (LoginUser) userDetails.getUserData();return (User) loginUser.getAdditionalInfo();}/*** 给 websocket session 设置参数*/public static void setProperties(Session session) {//设置websocket文本消息的长度为8M,默认为8ksession.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);//设置websocket二进制消息的长度为8M,默认为8ksession.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);Map<String, Object> userProperties = session.getUserProperties();//设置websocket发送消息的超时时长为10秒,默认为20秒userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);}
}
3.保存websocket session的接口
package com.xxx.robot.websocket;import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;import javax.websocket.Session;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public interface WebSocketSessionManager {Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);String PING = "ping";String PONG = "pong";Session get (String key);List<String> keys();void add (String key, Session session);Session remove (String key);/*** ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法*/default void pingBatch () {List<String> keyList = keys();log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));try {Thread.sleep(10);} catch (InterruptedException e1) {}} catch (Exception e) {log.error("WebSocket-ping异常", e);}}}}}/*** 消除所有websocket客户端*/default void clearAllSession () {List<String> keyList = keys();int i = 0;for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {remove(key);i++;session.close();} catch (IOException e1) {log.error("WebSocket-移除并关闭session异常", e1);}if (i % 10 == 0) {try {Thread.sleep(0);} catch (InterruptedException e1) {}}}}}log.info("WebSocket-移除并关闭session数量为:{}", i);}
}
4.保存websocket session的类
package com.xxx.robot.websocket.robot.manager;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;import javax.websocket.Session;import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;import com.xxx.robot.websocket.WebSocketSessionManager;/*** 机器人模块WebSocket Session管理器*/
@Component
public class RobotSessionManager implements WebSocketSessionManager {/*** key = userId + '-' + managerId* userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端* 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap*/private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();public static final String joinKey (String userId, String managerId) {return userId + '-' + managerId;}public static final String joinKey (Long userId, String managerId) {return userId.toString() + '-' + managerId;}public static final String[] splitKey (String key) {return StringUtils.split(key, '-');}@Overridepublic Session get(String key) {return SESSION_POOL.get(key);}/*** 根据用户ID查询所有websocket session的key* @param userId* @param excludeManagerId 排除的key, 可为空* @return*/public List<String> keysByUserId(String userId, String excludeManagerId) {//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');NavigableSet<String> keySet = subMap.navigableKeySet();List<String> list = new ArrayList<>();if (StringUtils.isBlank(excludeManagerId)) {for (String key : keySet) {if (key != null) {list.add(key);}}} else {for (String key : keySet) {if (key != null && !key.equals(excludeManagerId)) {list.add(key);}}}return list;}@Overridepublic List<String> keys() {NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();List<String> list = new ArrayList<>();for (String key : keySet) {if (key != null) {list.add(key);}}return list;}@Overridepublic synchronized void add(String key, Session session) {removeAndClose(key);SESSION_POOL.put(key, session);}@Overridepublic synchronized Session remove(String key) {return SESSION_POOL.remove(key);}/*** 必须key和value都匹配才能删除*/public synchronized void remove(String key, Session session) {SESSION_POOL.remove(key, session);}private void removeAndClose (String key) {Session session = remove(key);if (session != null) {try {session.close();} catch (IOException e) {}}}}
5.定义websocket 端点
package com.xxx.robot.websocket.robot.endpoint;import java.util.Map;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;import lombok.extern.slf4j.Slf4j;/*** 机器人模块WebSocket接口* 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的* 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean*/
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {private volatile User user;private volatile String id;private volatile Session session;private volatile Map<String, RobotCoreService> robotCoreServiceMap;/*** 所有初始化操作都写在@OnOpen注释的方法中* 连接成功* @param session*/@OnOpenpublic void onOpen(@PathParam("id") String id, Session session) {WebSocketSessionUtils.setProperties(session);this.user = WebSocketSessionUtils.findUser(session);this.id = id;this.session = session;log.info("连接成功:{}, {}", id, this.user.getUserCode());//使用BeanUtils代替@Autowired获取bean, //RobotCoreService为业务类,不必关心robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//保存websocket sessionrobotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);}/*** 连接关闭* @param session*/@OnClosepublic void onClose() {log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//连接关闭时,使用两个参数的remove方法,多线程下安全删除robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}@OnErrorpublic void onError(Throwable error) {log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//websocket异常时,使用两个参数的remove方法,多线程下安全删除//比如ping客户端超时,触发此方法,删除该客户端robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}/*** 接收到消息* @param message*/@OnMessagepublic void onMessage(String message) {log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);if (WebSocketSessionManager.PING.equals(message)) {//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);return;}//用 try...catch 包裹防止抛出异常导致websocket关闭try {//业务层,使用jackson反序列化json,不必关心具体的业务JsonNode root = BaseJsonUtils.readTree(message);String apiType = root.at("/apiType").asText();//业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);} catch (Exception e) {log.error("处理消息错误", e);}}}

6.创建定时任务 ping websocket 客户端
package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 启用定时任务功能* 因为websocket session是有状态的,只能保存在各自的服务端,* 所以只能使用单机式的定时任务,而不能使用分布式定时任务,* 因此 springboot自带的定时任务功能成为了首选* springboot定时任务线程池*/
@Configuration
@EnableScheduling
public class TaskExecutorConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(5);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("scheduler-executor-");return executor;}}
package com.xxx.robot.websocket;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;/*** @author Sunzhihua*/
@Slf4j
@Component
public class WebSocketSchedulerTask {/*** 注入所有的 websocket session 管理器*/@Autowiredprivate List<WebSocketSessionManager> webSocketSessionManagers;/*** initialDelay 表示 延迟60秒初始化* fixedDelay 表示 上一次任务结束后,再延迟30秒执行*/@Scheduled(initialDelay = 60000, fixedDelay = 30000)public void clearInvalidSession() {try {log.info("pingBatch 开始。。。");for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {webSocketSessionManager.pingBatch();}log.info("pingBatch 完成。。。");} catch (Exception e) {log.error("pingBatch异常", e);}}
}
相关文章:
Springboot项目使用原生Websocket
目录 1.启用Websocket功能2.封装操作websocket session的工具3.保存websocket session的接口4.保存websocket session的类5.定义websocket 端点6.创建定时任务 ping websocket 客户端 1.启用Websocket功能 package com.xxx.robot.config;import org.springframework.context.a…...
在Vue 3中如何实现服务端渲染(SSR)
今天我要给你们介绍一个很酷的功能——在Vue 3中实现服务端渲染(SSR) 首先,我们来聊聊SSR是什么。它就像是一个魔术师,能让你的网页在服务器上就预先渲染好,然后发送到客户端。想象一下,你在浏览一个网页&…...
【计算机组成原理期末课设作业】16位微型计算机实现——MOVS串传送扩展指令设计
16位微型计算机实现——MOVS串传送扩展指令设计😎 前言🙌教学目的:1、数据**加粗样式**通路分析2、微程序控制器分析3、指令系统分析4、微程序控制器指令周期流程图5、微指令编码6、测试程序和运行结果(1)首先先在内存…...
CodeMirror 对 XML 文档熟悉及元素控制自定义
CodeMirror 是一个网络代码编辑器组件。它可以在网站中用于实现支持多种编辑功能的文本输入字段,并具有丰富的编程接口以允许进一步扩展。 本文为 xml 格式的代码提示约束格式规范的自定义示例内容。 先看效果,如下: 官方 Demo 的完整代码如…...
Jetpack - ViewModel
一、概念 二、使用 2.1 创建ViewModel //无参 class MainViewModel : ViewModel() {} //有参 class MainViewModel(mainRepository: MainRepository) : ViewModel() {} class MainViewModelFactory(private val mainRepository: MainRepository) : ViewModelProvider.Factor…...
【新版系统架构】第十三章-层次式架构设计理论与实践
软考-系统架构设计师知识点提炼-系统架构设计师教程(第2版) 第一章-绪论第二章-计算机系统基础知识(一)第二章-计算机系统基础知识(二)第三章-信息系统基础知识第四章-信息安全技术基础知识第五章-软件工程…...
剖析Linux文件系统
Linux 文件系统体系结构是一个对复杂系统进行抽象化的有趣例子。通过使用一组通用的 API 函数,Linux 可以在许多种存储设备上支持许多种文件系统。例如,read 函数调用可以从指定的文件描述符读取一定数量的字节。read 函数不了解文件系统的类型ÿ…...
简介Maven结构与配置方法
一、Maven是什么 Maven是apache旗下的一个开源项目,是一款用于管理和构建java项目的工具。 它有什么用呢? 比如我以前要IOUtils这个包,那要到网站下去下载下来,再导入。 当jar包多的时候,一个一个导出,…...
好用的网址6
PPT课件网:http://www.pptkj.net/ ImgUpscaler:AI Image Upscaler - Upscale Photo, Cartoons in Batch Free 加强图片 AI Draw:AI Draw | Convert Images to One-Line Drawings with AI ZToDoList:https://www.ztodolis…...
MySQL数据库---笔记5
MySQL数据库---笔记5 一、锁1.1、介绍1.2、全局锁1.2.1、全局锁介绍1.2.2、一致性数据备份 1.3、表级锁1.3.1、表锁1.3.2、元数据锁(meta data lock , MDL)1.3.3、意向锁 1.4、行级锁1.4.1、介绍1.4.2、行锁1.4.3、间隙锁/临建锁 二、InnoDB引擎2.1、逻辑…...
Yocto:初始
1.构建Yocto项目前,需要先安装其所依赖的一些组件及工具 1 System Requirements — The Yocto Project 4.2.999 documentation 需要依次安装: $ sudo apt install gawk wget git diffstat unzip texinfo gcc build-essential chrpath socat cpio python3 python3-pip python…...
autodl算力租用平台应用于pycharm
一、GPU租用选择 1、创建实例 首先进入算力市场 博客以2080为例,选择计费方式,选择合适的主机,选择要创建实例中的GPU数量,选择镜像(内置了不同的深度学习框架),最后创建即可 2、SSH远程连…...
高德地图的使用
JS API 结合 Vue 使用 高德地图 jsapi 下载、引入 npm add amap/amap-jsapi-loaderimport AMapLoader from amap/amap-jsapi-loader 使用2.0版本的loader需要在window对象下先配置 securityJsCode JS API 安全密钥使用 JS API 使用 script 标签同步加载增加代理服务器设置…...
<List<Map<String,String>>> 删除元素常见的误区以及删除方法
看到这么标题可能觉得这个真是太easy了,不就remove吗,分分钟搞定。 但结果却出乎意料,下面我们来j简单说说list删除数据可能遇到的坑: 先说明我们可能会遇到的两个问题: 1.java.lang.IndexOutOfBoundsException(索引越…...
Linux下的编辑器——vim的简单上手指南
文章目录 一.概念1. 什么是 vim2. Vim 的模式①命令模式② 插入模式③底线命令模式 二.vim的基本操作1.如何启动vim?2. [命令模式」切换至 「插入模式」3.「插入模式」 切换至 「命令模式」4.「命令模式」切换至 「底行模式」5. 如何退出 vim? 三.vim指令…...
C++多线程学习(二、多线程的几种创造方式【有返回值的之后讲】)
目录 创建多线程 1.普通函数充当线程处理函数创造线程 2.Lambda表达式充当线程处理函数 3.带参函数创建线程 3.1普通参数 3.2传入引用 3.3智能指针充当函数参数 4.通过类中的成员函数创建 4.1仿函数方式创建:类名的方式调用 4.2普通类中的成员函数 创建多…...
前端开发框架生命周期详解:Vue、React和Angular
引言 作为前端开发者,掌握前端开发框架的生命周期是非常重要的。在现代Web应用开发中,Vue.js、React和Angular是三个最流行的前端开发框架。本篇博客将详细解读这三个框架的生命周期,包括每个阶段的含义、用途以及如何最大限度地利用它们。通…...
【Java从入门到大牛】程序流程控制
🔥 本文由 程序喵正在路上 原创,CSDN首发! 💖 系列专栏:Java从入门到大牛 🌠 首发时间:2023年7月7日 🦋 欢迎关注🖱点赞👍收藏🌟留言🐾…...
UML学习统一建模语言
unified modeling language 统一建模语言 面向对象软件分析与设计建模的事实标准 类命名:帕斯卡特命名 类之间的关系 关联关系:班级和学生,一个类的对象作为另一个类的成员变量; 通过非构造和setter注入的方式建立联系…...
【C++学习笔记】RAII思想——智能指针
智能指针 1 内存泄漏问题2 RAII(Resource Acquisition Is Initialization)2.1 使用RAII思想设计的SmartPtr类2.2 智能指针的原理2.3 小总结智能指针原理 3 智能指针的拷贝问题3.1 std::auto_ptr3.2 std::unique_ptr3.3 std::shared_ptr3.3.1 拷贝构造函数…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
站群服务器的应用场景都有哪些?
站群服务器主要是为了多个网站的托管和管理所设计的,可以通过集中管理和高效资源的分配,来支持多个独立的网站同时运行,让每一个网站都可以分配到独立的IP地址,避免出现IP关联的风险,用户还可以通过控制面板进行管理功…...
微服务通信安全:深入解析mTLS的原理与实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言:微服务时代的通信安全挑战 随着云原生和微服务架构的普及,服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...
