Springboot项目使用原生Websocket
目录
- 1.启用Websocket功能
- 2.封装操作websocket session的工具
- 3.保存websocket session的接口
- 4.保存websocket session的类
- 5.定义websocket 端点
- 6.创建定时任务 ping websocket 客户端
1.启用Websocket功能
package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}}
2.封装操作websocket session的工具
package com.xxx.robot.websocket.util;import java.util.Map;import javax.websocket.Session;import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;public final class WebSocketSessionUtils {private WebSocketSessionUtils() {}public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;/*** websocket block 发送超时 毫秒*/public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;/*** 从 websocket session 中找到登录用户* 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User* LoginUser、User 从业务层自定义的类* 项目中使用了spring security框架*/public static User findUser (Session session) {UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();LoginUser loginUser = (LoginUser) userDetails.getUserData();return (User) loginUser.getAdditionalInfo();}/*** 给 websocket session 设置参数*/public static void setProperties(Session session) {//设置websocket文本消息的长度为8M,默认为8ksession.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);//设置websocket二进制消息的长度为8M,默认为8ksession.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);Map<String, Object> userProperties = session.getUserProperties();//设置websocket发送消息的超时时长为10秒,默认为20秒userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);}
}
3.保存websocket session的接口
package com.xxx.robot.websocket;import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;import javax.websocket.Session;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public interface WebSocketSessionManager {Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);String PING = "ping";String PONG = "pong";Session get (String key);List<String> keys();void add (String key, Session session);Session remove (String key);/*** ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法*/default void pingBatch () {List<String> keyList = keys();log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));try {Thread.sleep(10);} catch (InterruptedException e1) {}} catch (Exception e) {log.error("WebSocket-ping异常", e);}}}}}/*** 消除所有websocket客户端*/default void clearAllSession () {List<String> keyList = keys();int i = 0;for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {remove(key);i++;session.close();} catch (IOException e1) {log.error("WebSocket-移除并关闭session异常", e1);}if (i % 10 == 0) {try {Thread.sleep(0);} catch (InterruptedException e1) {}}}}}log.info("WebSocket-移除并关闭session数量为:{}", i);}
}
4.保存websocket session的类
package com.xxx.robot.websocket.robot.manager;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;import javax.websocket.Session;import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;import com.xxx.robot.websocket.WebSocketSessionManager;/*** 机器人模块WebSocket Session管理器*/
@Component
public class RobotSessionManager implements WebSocketSessionManager {/*** key = userId + '-' + managerId* userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端* 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap*/private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();public static final String joinKey (String userId, String managerId) {return userId + '-' + managerId;}public static final String joinKey (Long userId, String managerId) {return userId.toString() + '-' + managerId;}public static final String[] splitKey (String key) {return StringUtils.split(key, '-');}@Overridepublic Session get(String key) {return SESSION_POOL.get(key);}/*** 根据用户ID查询所有websocket session的key* @param userId* @param excludeManagerId 排除的key, 可为空* @return*/public List<String> keysByUserId(String userId, String excludeManagerId) {//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');NavigableSet<String> keySet = subMap.navigableKeySet();List<String> list = new ArrayList<>();if (StringUtils.isBlank(excludeManagerId)) {for (String key : keySet) {if (key != null) {list.add(key);}}} else {for (String key : keySet) {if (key != null && !key.equals(excludeManagerId)) {list.add(key);}}}return list;}@Overridepublic List<String> keys() {NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();List<String> list = new ArrayList<>();for (String key : keySet) {if (key != null) {list.add(key);}}return list;}@Overridepublic synchronized void add(String key, Session session) {removeAndClose(key);SESSION_POOL.put(key, session);}@Overridepublic synchronized Session remove(String key) {return SESSION_POOL.remove(key);}/*** 必须key和value都匹配才能删除*/public synchronized void remove(String key, Session session) {SESSION_POOL.remove(key, session);}private void removeAndClose (String key) {Session session = remove(key);if (session != null) {try {session.close();} catch (IOException e) {}}}}
5.定义websocket 端点
package com.xxx.robot.websocket.robot.endpoint;import java.util.Map;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;import lombok.extern.slf4j.Slf4j;/*** 机器人模块WebSocket接口* 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的* 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean*/
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {private volatile User user;private volatile String id;private volatile Session session;private volatile Map<String, RobotCoreService> robotCoreServiceMap;/*** 所有初始化操作都写在@OnOpen注释的方法中* 连接成功* @param session*/@OnOpenpublic void onOpen(@PathParam("id") String id, Session session) {WebSocketSessionUtils.setProperties(session);this.user = WebSocketSessionUtils.findUser(session);this.id = id;this.session = session;log.info("连接成功:{}, {}", id, this.user.getUserCode());//使用BeanUtils代替@Autowired获取bean, //RobotCoreService为业务类,不必关心robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//保存websocket sessionrobotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);}/*** 连接关闭* @param session*/@OnClosepublic void onClose() {log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//连接关闭时,使用两个参数的remove方法,多线程下安全删除robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}@OnErrorpublic void onError(Throwable error) {log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//websocket异常时,使用两个参数的remove方法,多线程下安全删除//比如ping客户端超时,触发此方法,删除该客户端robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}/*** 接收到消息* @param message*/@OnMessagepublic void onMessage(String message) {log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);if (WebSocketSessionManager.PING.equals(message)) {//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);return;}//用 try...catch 包裹防止抛出异常导致websocket关闭try {//业务层,使用jackson反序列化json,不必关心具体的业务JsonNode root = BaseJsonUtils.readTree(message);String apiType = root.at("/apiType").asText();//业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);} catch (Exception e) {log.error("处理消息错误", e);}}}
6.创建定时任务 ping websocket 客户端
package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 启用定时任务功能* 因为websocket session是有状态的,只能保存在各自的服务端,* 所以只能使用单机式的定时任务,而不能使用分布式定时任务,* 因此 springboot自带的定时任务功能成为了首选* springboot定时任务线程池*/
@Configuration
@EnableScheduling
public class TaskExecutorConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(5);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("scheduler-executor-");return executor;}}
package com.xxx.robot.websocket;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;/*** @author Sunzhihua*/
@Slf4j
@Component
public class WebSocketSchedulerTask {/*** 注入所有的 websocket session 管理器*/@Autowiredprivate List<WebSocketSessionManager> webSocketSessionManagers;/*** initialDelay 表示 延迟60秒初始化* fixedDelay 表示 上一次任务结束后,再延迟30秒执行*/@Scheduled(initialDelay = 60000, fixedDelay = 30000)public void clearInvalidSession() {try {log.info("pingBatch 开始。。。");for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {webSocketSessionManager.pingBatch();}log.info("pingBatch 完成。。。");} catch (Exception e) {log.error("pingBatch异常", e);}}
}
相关文章:

Springboot项目使用原生Websocket
目录 1.启用Websocket功能2.封装操作websocket session的工具3.保存websocket session的接口4.保存websocket session的类5.定义websocket 端点6.创建定时任务 ping websocket 客户端 1.启用Websocket功能 package com.xxx.robot.config;import org.springframework.context.a…...
在Vue 3中如何实现服务端渲染(SSR)
今天我要给你们介绍一个很酷的功能——在Vue 3中实现服务端渲染(SSR) 首先,我们来聊聊SSR是什么。它就像是一个魔术师,能让你的网页在服务器上就预先渲染好,然后发送到客户端。想象一下,你在浏览一个网页&…...

【计算机组成原理期末课设作业】16位微型计算机实现——MOVS串传送扩展指令设计
16位微型计算机实现——MOVS串传送扩展指令设计😎 前言🙌教学目的:1、数据**加粗样式**通路分析2、微程序控制器分析3、指令系统分析4、微程序控制器指令周期流程图5、微指令编码6、测试程序和运行结果(1)首先先在内存…...

CodeMirror 对 XML 文档熟悉及元素控制自定义
CodeMirror 是一个网络代码编辑器组件。它可以在网站中用于实现支持多种编辑功能的文本输入字段,并具有丰富的编程接口以允许进一步扩展。 本文为 xml 格式的代码提示约束格式规范的自定义示例内容。 先看效果,如下: 官方 Demo 的完整代码如…...
Jetpack - ViewModel
一、概念 二、使用 2.1 创建ViewModel //无参 class MainViewModel : ViewModel() {} //有参 class MainViewModel(mainRepository: MainRepository) : ViewModel() {} class MainViewModelFactory(private val mainRepository: MainRepository) : ViewModelProvider.Factor…...
【新版系统架构】第十三章-层次式架构设计理论与实践
软考-系统架构设计师知识点提炼-系统架构设计师教程(第2版) 第一章-绪论第二章-计算机系统基础知识(一)第二章-计算机系统基础知识(二)第三章-信息系统基础知识第四章-信息安全技术基础知识第五章-软件工程…...

剖析Linux文件系统
Linux 文件系统体系结构是一个对复杂系统进行抽象化的有趣例子。通过使用一组通用的 API 函数,Linux 可以在许多种存储设备上支持许多种文件系统。例如,read 函数调用可以从指定的文件描述符读取一定数量的字节。read 函数不了解文件系统的类型ÿ…...

简介Maven结构与配置方法
一、Maven是什么 Maven是apache旗下的一个开源项目,是一款用于管理和构建java项目的工具。 它有什么用呢? 比如我以前要IOUtils这个包,那要到网站下去下载下来,再导入。 当jar包多的时候,一个一个导出,…...
好用的网址6
PPT课件网:http://www.pptkj.net/ ImgUpscaler:AI Image Upscaler - Upscale Photo, Cartoons in Batch Free 加强图片 AI Draw:AI Draw | Convert Images to One-Line Drawings with AI ZToDoList:https://www.ztodolis…...

MySQL数据库---笔记5
MySQL数据库---笔记5 一、锁1.1、介绍1.2、全局锁1.2.1、全局锁介绍1.2.2、一致性数据备份 1.3、表级锁1.3.1、表锁1.3.2、元数据锁(meta data lock , MDL)1.3.3、意向锁 1.4、行级锁1.4.1、介绍1.4.2、行锁1.4.3、间隙锁/临建锁 二、InnoDB引擎2.1、逻辑…...
Yocto:初始
1.构建Yocto项目前,需要先安装其所依赖的一些组件及工具 1 System Requirements — The Yocto Project 4.2.999 documentation 需要依次安装: $ sudo apt install gawk wget git diffstat unzip texinfo gcc build-essential chrpath socat cpio python3 python3-pip python…...

autodl算力租用平台应用于pycharm
一、GPU租用选择 1、创建实例 首先进入算力市场 博客以2080为例,选择计费方式,选择合适的主机,选择要创建实例中的GPU数量,选择镜像(内置了不同的深度学习框架),最后创建即可 2、SSH远程连…...

高德地图的使用
JS API 结合 Vue 使用 高德地图 jsapi 下载、引入 npm add amap/amap-jsapi-loaderimport AMapLoader from amap/amap-jsapi-loader 使用2.0版本的loader需要在window对象下先配置 securityJsCode JS API 安全密钥使用 JS API 使用 script 标签同步加载增加代理服务器设置…...
<List<Map<String,String>>> 删除元素常见的误区以及删除方法
看到这么标题可能觉得这个真是太easy了,不就remove吗,分分钟搞定。 但结果却出乎意料,下面我们来j简单说说list删除数据可能遇到的坑: 先说明我们可能会遇到的两个问题: 1.java.lang.IndexOutOfBoundsException(索引越…...
Linux下的编辑器——vim的简单上手指南
文章目录 一.概念1. 什么是 vim2. Vim 的模式①命令模式② 插入模式③底线命令模式 二.vim的基本操作1.如何启动vim?2. [命令模式」切换至 「插入模式」3.「插入模式」 切换至 「命令模式」4.「命令模式」切换至 「底行模式」5. 如何退出 vim? 三.vim指令…...
C++多线程学习(二、多线程的几种创造方式【有返回值的之后讲】)
目录 创建多线程 1.普通函数充当线程处理函数创造线程 2.Lambda表达式充当线程处理函数 3.带参函数创建线程 3.1普通参数 3.2传入引用 3.3智能指针充当函数参数 4.通过类中的成员函数创建 4.1仿函数方式创建:类名的方式调用 4.2普通类中的成员函数 创建多…...
前端开发框架生命周期详解:Vue、React和Angular
引言 作为前端开发者,掌握前端开发框架的生命周期是非常重要的。在现代Web应用开发中,Vue.js、React和Angular是三个最流行的前端开发框架。本篇博客将详细解读这三个框架的生命周期,包括每个阶段的含义、用途以及如何最大限度地利用它们。通…...

【Java从入门到大牛】程序流程控制
🔥 本文由 程序喵正在路上 原创,CSDN首发! 💖 系列专栏:Java从入门到大牛 🌠 首发时间:2023年7月7日 🦋 欢迎关注🖱点赞👍收藏🌟留言🐾…...
UML学习统一建模语言
unified modeling language 统一建模语言 面向对象软件分析与设计建模的事实标准 类命名:帕斯卡特命名 类之间的关系 关联关系:班级和学生,一个类的对象作为另一个类的成员变量; 通过非构造和setter注入的方式建立联系…...

【C++学习笔记】RAII思想——智能指针
智能指针 1 内存泄漏问题2 RAII(Resource Acquisition Is Initialization)2.1 使用RAII思想设计的SmartPtr类2.2 智能指针的原理2.3 小总结智能指针原理 3 智能指针的拷贝问题3.1 std::auto_ptr3.2 std::unique_ptr3.3 std::shared_ptr3.3.1 拷贝构造函数…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
规则与人性的天平——由高考迟到事件引发的思考
当那位身着校服的考生在考场关闭1分钟后狂奔而至,他涨红的脸上写满绝望。铁门内秒针划过的弧度,成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定",构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...

解析“道作为序位生成器”的核心原理
解析“道作为序位生成器”的核心原理 以下完整展开道函数的零点调控机制,重点解析"道作为序位生成器"的核心原理与实现框架: 一、道函数的零点调控机制 1. 道作为序位生成器 道在认知坐标系$(x_{\text{物}}, y_{\text{意}}, z_{\text{文}}…...
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...

【多线程初阶】单例模式 指令重排序问题
文章目录 1.单例模式1)饿汉模式2)懒汉模式①.单线程版本②.多线程版本 2.分析单例模式里的线程安全问题1)饿汉模式2)懒汉模式懒汉模式是如何出现线程安全问题的 3.解决问题进一步优化加锁导致的执行效率优化预防内存可见性问题 4.解决指令重排序问题 1.单例模式 单例模式确保某…...