当前位置: 首页 > news >正文

【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理

一、背景

项目中需要建立客户端与服务端之间的长链接,首先就考虑用WebSocket,再来SpringBoot原来整合WebSocket方式并不高效,因此找到了netty-websocket-spring-boot-starter 这款脚手架,它能让我们在SpringBoot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单

二、netty-websocket-spring-boot-starter 入门介绍

2.1 核心注解

2.1.1 @ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类 被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint(“/ws”) )

2.1.2 @OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders…

2.1.3 @OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

2.1.4 @OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

2.1.5 @OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

2.2 核心配置

属性属性说明
pathWebSocket的path,也可以用value来设置
hostWebSocket的host,"0.0.0.0"即是所有本地地址
portWebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
maxFramePayloadLength最大允许帧载荷长度
allIdleTimeSeconds与IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler

三、实践netty-websocket-spring-boot-starter

3.1引入POM文件

主要添加包括以下依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><dependency><groupId>org.yeauty</groupId><artifactId>netty-websocket-spring-boot-starter</artifactId><version>0.9.5</version>
</dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.4.6</version>
</dependency>

3.2 在主程序类中排除数据库使用

/*** 主程序启动类*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class WebsocketApplication {public static void main(String[] args) {SpringApplication.run(WebsocketApplication.class, args);}}

3.3 开启WebSocket支持

@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

3.4 定义WebSocketServer服务器(核心代码)

在端点类上加上@ServerEndpoint注解,并在相应的方法上加上@OnOpen、@OnMessage、@OnError、@OnClose注解, 代码如下:

@ServerEndpoint(port = "${ws.port}", path = "/demo/{version}", maxFramePayloadLength = "6553600", allIdleTimeSeconds = "300")
public class WebSocketServer {private static Log LOGGER = LogFactory.get();// concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();// 与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;// 接收用户IDprotected StringBuilder userInfo = new StringBuilder();@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session,HttpHeaders headers,@RequestParam String req,@RequestParam MultiValueMap<String, Object> reqMap,@PathVariable String arg,@PathVariable Map<String, Object> pathMap) {this.session = session;// 加入set中webSocketSet.add(this);// 在线数加1addOnlineCount();LOGGER.debug("UserId = {}, 通道ID={}, 当前连接人数={}", userInfo.toString(), getSessionId(session), getOnlineCount());}/*** 收到客户端消息后调用的方法*/@OnMessagepublic void onMessage(Session session, String message) {JSONObject jsonData = JSONUtil.parseObj(message);if (!jsonData.containsKey("command")) {LOGGER.debug("UserId = {}, 通道ID={}, 上行内容={}, 上行请求非法,缺少command参数, 处理结束",userInfo.toString(), getSessionId(session), message);return;}String userId = jsonData.getStr("userId");this.userInfo = new StringBuilder(userId);String command = jsonData.getStr("command");Class<?> service = Command.getService(command);if (Objects.isNull(service)) {errorMessage(command);LOGGER.error("UserId = {}, 通道ID={}, 解析指令执行出错!", userInfo.toString(), getSessionId(session));return;}LOGGER.info("UserId = {}, 通道ID={}, 处理类={}, 开始处理,请求内容={}",userInfo.toString(), getSessionId(session), service, jsonData.toString());BaseMessage baseMessage = getBaseMessage(service, session, command);if (baseMessage == null) {return;}try {jsonData.set("SessionId", getSessionId(session));JSON resp = baseMessage.handlerMessage(userInfo, jsonData);resp.putByPath("command", command);resp.putByPath("userId", userId);String value = resp.toString();//将结果写回客户端, 实现服务器主动推送ChannelFuture future = sendMessage(value);LOGGER.info("UserId = {}, 通道ID = {}, 返回内容 = {}, future = {}, 处理结束",userInfo.toString(), getSessionId(session), value, future.toString());} catch (Exception e) {LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session) {// 从set中删除webSocketSet.remove(this);// 在线数减1subOnlineCount();String userId = this.userInfo.toString();LOGGER.warn("UserId = {}, 通道ID = {}, 有一连接关闭!当前在线人数={}", userId, getSessionId(session), getOnlineCount());userInfo.delete(0, userInfo.length());if (ObjectUtil.isNotNull(userId)) {String keyStr = ConstDef.ONLINE_USER_TYPE + userId;redisTemplate.delete(keyStr);}session.close();}/*** 出错方法*/@OnErrorpublic void onError(Session session, Throwable cause) {if (Objects.nonNull(this.session) && Objects.nonNull(cause) && !(cause instanceof EOFException)) {LOGGER.error("UserId = {}, 通道ID={}, 出错信息={}", userInfo.toString(), this.session.id(), cause.toString());}if (Objects.nonNull(session) && session.isOpen()) {session.close();}}/*** 通过class获取Bean*/private BaseMessage getBaseMessage(Class<?> service, Session session, String command) {BaseMessage baseMessage;try {baseMessage = (BaseMessage) SpringUtils.getBean(service);} catch (Exception e) {LOGGER.error("UserId = {}, 通道ID = {}, 未找到协议头 = {} 的处理类", userInfo.toString(), getSessionId(session), service);errorMessage(command);return null;}return baseMessage;}/*** 获取通道ID*/private String getSessionId(Session session) {return session.id().asShortText();}/*** 协议错误*/public void errorMessage(String command) {JSONObject retObj = new JSONObject();retObj.set("code", ConstDef.ERROR_CODE_10001);retObj.set("msg", ConstDef.ERROR_CODE_10001_DESP);retObj.set("command", command);try {sendMessage(retObj.toString());} catch (IOException e) {LOGGER.error("UserId = {}, 通道ID={}, 解析执行出错信息={}", userInfo.toString(), getSessionId(session), e.getMessage());}}/*** 实现服务器主动推送*/public ChannelFuture sendMessage(String message) throws IOException {return this.session.sendText(message);}/*** 在线用户数*/public long getOnlineCount() {String onlineCountValue = redisTemplate.opsForValue().get(ConstDef.ONLINE_COUNT_KEY);if (StrUtil.isBlank(onlineCountValue) || !NumberUtil.isNumber(onlineCountValue)) {return 0L;}return Long.parseLong(onlineCountValue);}/*** 在线数+1*/private void addOnlineCount() {redisTemplate.opsForValue().increment(ConstDef.ONLINE_COUNT_KEY);}/*** 在线数-1*/private void subOnlineCount() {redisTemplate.opsForValue().decrement(ConstDef.ONLINE_COUNT_KEY);}
}

3.5 定义接口

/*** 消息处理接口*/
public interface BaseMessage {Log LOGGER = LogFactory.get();/*** 处理类、处理方法*/JSON handlerMessage(StringBuilder vin, JSONObject jsonData);
}

3.6 定义接口实现类 (业务处理逻辑)

该类是各业务的处理逻辑类,是接口类的具体实现。

@Component
@Configuration
public class QueryAllActivityListMessage implements BaseMessage {@Overridepublic JSON handlerMessage(StringBuilder userId, JSONObject jsonData) {LOGGER.debug("开始处理QueryAllActivityListMessage请求, 参数={}", JSONUtil.toJsonStr(jsonData));String resp = "我是服务器端返回的处理结果!";LOGGER.info("UserId = {}, param={}, QueryAllActivityListMessage回复 = {}", userId.toString(), jsonData, resp);JSONObject respStr = new JSONObject();return respStr.set("handleResult", resp);}
}

3.7 定义枚举Command

每增加一个业务接口的实现,就需要在这个枚举类注册一下。

/*** 指令-服务 枚举*/
public enum Command {/*** 业务1处理逻辑*/queryAllActivityList("queryAllActivityList", QueryAllActivityListMessage.class, "业务1处理逻辑");/*** 业务2处理逻辑*///略/*** 业务3处理逻辑*///略/*** 服务编码*/private String processCode;/*** 服务接口类*/private Class<?> service;/*** 接口描述*/private String desc;Command(String processCode, Class<?> service, String desc) {this.processCode = processCode;this.service = service;this.desc = desc;}public Class<?> getService() {return service;}public static Class<?> getService(String processCode) {for (Command command : Command.values()) {if (command.processCode.equals(processCode)) {return command.getService();}}return null;}
}

3.8 编写SpringUtils 工具类

用于搜索Bean, 通过class获取Bean

/*** SpringUtils 工具类,用于搜索*/
@Component
public class SpringUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (SpringUtils.applicationContext == null) {SpringUtils.applicationContext = applicationContext;}}/*** 获取applicationContext*/public static ApplicationContext getApplicationContext() {return applicationContext;}/*** 通过class获取Bean*/public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);}/*** 通过name获取 Bean.*/public static Object getBean(String name) {return getApplicationContext().getBean(name);}/*** 通过name,以及Clazz返回指定的Bean*/public static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);}}

3.9 定义常量定义类 + 返回码

/*** 常量定义类 + 返回码*/
public class ConstDef {/*** 返回码*/public static final int ERROR_CODE_10001 = 10001;public static final String ERROR_CODE_10001_DESP = "请求参数不合法";/*** 按UserId决定,用户在线类型,车机或者手机*/public static final String ONLINE_USER_TYPE = "ONLINE_USER_TYPE_";/*** 在线用户数*/public static final String ONLINE_COUNT_KEY = "ONLINE_COUNT_KEY";
}

四、功能验证

打开WebSocket客户端,连接到ws://127.0.0.1:9095/demo/1

从截图来看,WebSocket服务端能正常接受并处理来自客户端的请求,验证成功!

在这里插入图片描述

相关文章:

【项目实战】基于netty-websocket-spring-boot-starter实现WebSocket服务器长链接处理

一、背景 项目中需要建立客户端与服务端之间的长链接&#xff0c;首先就考虑用WebSocket&#xff0c;再来SpringBoot原来整合WebSocket方式并不高效&#xff0c;因此找到了netty-websocket-spring-boot-starter 这款脚手架&#xff0c;它能让我们在SpringBoot中使用Netty来开发…...

BC双驱、ChatGPT大火,AI独角兽撬开盈利大门?

配图来自Canva可画 放眼AI行业&#xff0c;各大AI玩家长期亏损、“钱”景堪忧。 回看过去一年&#xff0c;部分AI独角兽的亏损问题愈发尖锐——云从科技2022年净亏损同比扩大至8.5亿元&#xff1b;寒武纪2022年净亏损11.6亿元&#xff0c;较上年同期扩大41.4%&#xff1b;地平…...

1/4车、1/2车、整车悬架H2/H∞控制仿真合集

目录 前言 1. 1/4悬架系统 1.1数学模型 1.2 H2/H∞求解反馈阵阵 1.3仿真分析 2. 1/2悬架系统 2.1数学模型 2.2 H2/H∞求解反馈阵阵 2.3仿真分析 3. 整车悬架系统 3.1数学模型 整车7自由度主动悬架数学模型 3.2 H2/H∞求解反馈阵阵 3.3仿真分析 4.总结 参考文献 …...

Git使用教程、命令

Git使用教程、命令 基本配置 git的配置文件位置&#xff1a; win: c:\users\<userName>\.gitconfig linux: /home/<userName>/.gitconfig # 个人/etc/gitconfig # 系统全局# 修改git init时的默认分支为master&#x…...

《c++ primer笔记》第九章 顺序容器

前言 知识点很多&#xff0c;这里只记录遗忘的。从这章开始会对前面章节的内容进行一个扩充&#xff0c;如果以前的忘了读起来会有点吃力。总的来说&#xff0c;本章节难度不大。 文章目录一、概述二、容器库概览2.1容器定义和初始化2.2赋值三、顺序容器操作3.1添加元素3.2删除…...

QML动画(弹动和翻转效果)

Flickable&#xff08;弹动&#xff09; QML中提供了一个Flickable元素&#xff0c;可以将其子项设置在一个可以拖拽和弹动的界面上&#xff0c;使得子项目的视图可以滚动。在传统的用户界面中&#xff0c;可以使用标准控件&#xff08;如滚动条和箭头按钮&#xff09;滚动视图…...

GPS启动方式、定位速度、定位精度介绍

前面文章介绍了GPS定位基础知识 GPS定位知识介绍 (qq.com) 本文主要介绍GPS启动方式。 定位过程中最重要的辅助信息是时间、星历、位置。 根据辅助信息不同,...

深度学习零基础学习之路——第五章 个人数据集的制作

Python深度学习入门 第一章 Python深度学习入门之环境软件配置 第二章 Python深度学习入门之数据处理Dataset的使用 第三章 数据可视化TensorBoard和TochVision的使用 第四章 UNet-Family中Unet、Unet和Unet3的简介 第五章 个人数据集的制作 深度学习数据集的制作Python深度学…...

女神节 | PHP和Java算什么,女工程师才是最美最好的语言!

世界上第一个程序员是女性 第一个发现Bug的也是女性 在智领云有一群追求快乐和独立的女性工程师 她们多有魅力&#xff1f; 工位上她们专注于数据与代码 平日里郊游、瑜伽、插花、科学养娃一件不落 不仅用0和1编织数字世界 也在用心装点自己的生活 今天是国际劳动妇女节…...

【Python】装饰器

一、装饰器的作用 装饰器能够为已经存在的对象添加额外的功能。 二、什么是装饰器 装饰器本质是一个python函数&#xff0c;它可以让其他函数在不需要做任何代码变动的前提下增加额外功能&#xff0c;装饰器的返回值也是一个函数对象。 三、装饰器的应用场景 插入日志、性能…...

Spring事务及传播机制

概念 在MySQL中介绍过&#xff0c;当同一时间出现一起读写数据的情况&#xff0c;可能会导致最终的结果出错&#xff0c;因此可以使用事务来提高隔离级别 而Spring中也可以实现事务 手动添加事务 使用SpringBoot中的DataSourceTransactionManager对象可以获取事务&#xff0…...

43-Golang中的goroutine!!!

Golang中的goroutine进程和线程说明并发和并行并发并行Go协程和Go主线程案例小结goroutine的调度机制MPG模式基本介绍MPG模式运行的状态1MPG模式运行的状态2设置GOlang运行的CPU数不同 goroutine之间如何通讯使用全局变量加锁同步改进程序进程和线程说明 1.进程就是程序在操作…...

[深入理解SSD系列 闪存实战2.1.5] NAND FLASH基本读操作及原理_NAND FLASH Read Operation源码实现

前言 上面是我使用的NAND FLASH的硬件原理图,面对这些引脚,很难明白他们是什么含义, 下面先来个热身: 问1. 原理图上NAND FLASH只有数据线,怎么传输地址? 答1.在DATA0~DATA7上既传输数据,又传输地址 当ALE为高电平时传输的是地址, 问2. 从NAND FLASH芯片手册可知,要…...

pandas库中的read_csv函数读取数据时候的路径问题详解(ValueError: embedded null character)

read_csv()函数不仅是R语言中的一个读取csv文件的函数&#xff0c;也是pandas库中的一个函数。pandas是一个用于数据分析和处理的python库。它的read_csv函数可以读取csv文件里的数据&#xff0c;并将其转化为pandas里面的DataFrame对象。它由很多参数可以设置&#xff0c;例如…...

【量化交易笔记】4.移动平均值的实现

上一讲已经讲A股的数据下载到本地或保存数据库&#xff0c;我们可以随时使用。 移动平均MA(Moving Average) &#xff0c;是用统计分析的方法&#xff0c;将一定时期内的证券价格&#xff08;指数&#xff09;加以平均&#xff0c;并把不同时间的平均值连接起来&#xff0c;形成…...

2023年3月份的野兔在线工具系统版本更新

这个是野兔在线工具系统中文版更新&#xff0c;这次更新的功能&#xff0c;和修改的问题还是比较多的&#xff0c;也修复系统部分功能&#xff0c;应该也是目前市面上在线工具比较多的一个系统了。系统名称&#xff1a;野兔在线工具系统系统语言&#xff1a;中文版系统源码&…...

科技成果赋智中小企业深度行 边界无限靖云甲ADR入选十大优秀案例

近日&#xff0c;国家工业信息安全发展研究中心、青岛市工业和信息化局、青岛市民营经济发展局、青岛市即墨区人民政府、青岛蓝谷管理局联合举办的科技成果赋智中小企业“深度行”活动&#xff08;青岛站&#xff09;成功举办&#xff0c;同步举行了赋智“深度行”活动&#xf…...

我们的理性何处安放

每天工作压力和各种人相处都让我们非常忙碌&#xff0c;我们上大学&#xff0c;努力工作&#xff0c;都是想获得更好的人生场景&#xff0c;素养&#xff0c;提升自身的认知&#xff0c;这样就是对我们大多数人生最负责任。如何让自己理性与人为善&#xff0c;并能被人温柔以待…...

RecyclerView的详细使用

首先就是了解ListView和RecyclerView的区别1.ListView相比RecycleView的优点a.ListView实现添加HeaderView和FooderView有直接的方法b.分割线可以直接设置c.ListView实现onItemClickListence和onItemLongClickListence有直接的方法2.RecyclerView相比ListView的优点a.封装了Vie…...

一、向量及其线性运算

&#x1f64c;作者简介&#xff1a;数学与计算机科学学院出身、在职高校高等数学专任教师&#xff0c;分享学习经验、生活、 努力成为像代码一样有逻辑的人&#xff01; &#x1f319;个人主页&#xff1a;阿芒的主页 ⭐ 高等数学专栏介绍&#xff1a;本专栏系统地梳理高等数学…...

云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?

大家好&#xff0c;欢迎来到《云原生核心技术》系列的第七篇&#xff01; 在上一篇&#xff0c;我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在&#xff0c;我们就像一个拥有了一块崭新数字土地的农场主&#xff0c;是时…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...