基于Netty构建Websocket服务端
除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
引入pom依赖:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.69.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
编写SocketServer:
package com.lzq.websocket.config;import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.TimeUnit;@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {private static final Integer PORT = 8888;@Overridepublic void run(String... args) throws Exception {new WebSocketConfig().start();}public void start() {// 创建EventLoopGroupEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec());// 最大数据长度pipeline.addLast(new HttpObjectAggregator(65536));// 添加接收websocket请求的url匹配路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 10秒内收不到消息强制断开连接// pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));pipeline.addLast(new WebSocketHandler());}});ChannelFuture future = serverBootstrap.bind(PORT).sync();log.info("websocket server started, port={}", PORT);// 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭// 阻塞future.channel().closeFuture().sync();} catch (Exception e) {log.error("websocket server exception", e);throw new RuntimeException(e);} finally {log.info("websocket server close");// 关闭EventLoopGroupbossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
编写WebSocketHandler:
package com.lzq.websocket.handlers;import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {private WebSocketServerHandshaker webSocketServerHandshaker;private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 创建连接时执行NettyConfig.group.add(ctx.channel());log.info("client channel active, id={}", ctx.channel().id().toString());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 关闭连接时执行NettyConfig.group.remove(ctx.channel());log.info("client channel disconnected, id={}", ctx.channel().id().toString());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 服务端接收客户端发送过来的数据结束之后调用ctx.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {// 处理客户端http握手请求handlerHttpRequest(ctx, (FullHttpRequest) msg);} else if (msg instanceof WebSocketFrame) {// 处理websocket连接业务handlerWebSocketFrame(ctx, (WebSocketFrame) msg);}}/*** 处理websocket连接业务** @param ctx* @param frame*/private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());// 判断是否是关闭websocket的指令if (frame instanceof CloseWebSocketFrame) {webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());return;}// 判断是否是ping消息if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}if (!(frame instanceof TextWebSocketFrame)) {throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());}String text = ((TextWebSocketFrame) frame).text();if ("ping".equals(text)) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}log.info("WebSocket message received: {}", text);/*** 可通过客户传输的text,设计处理策略:* 如:text={"type": "messageHandler", "userId": "111"}* 服务端根据type,采用策略模式,自行派发处理** 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),* Handler已经是线程处理,每个用户的请求是线程隔离的*/// 返回WebSocket响应ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));/*// 群发TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()+ ctx.channel().id()+ " : "+ text);NettyConfig.group.writeAndFlush(twsf);*/}/*** 处理客户端http握手请求** @param ctx* @param request*/private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());// 判断是否采用WebSocket协议if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);webSocketServerHandshaker = wsFactory.newHandshaker(request);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {webSocketServerHandshaker.handshake(ctx.channel(), request);}}private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {if (response.getStatus().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);response.content().writeBytes(buf);buf.release();}// 服务端向客户端发送数据ChannelFuture f = ctx.channel().writeAndFlush(response);if (response.getStatus().code() != 200) {f.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 非正常断开时调用log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);ctx.close();}
}
NettyConfig:
package com.lzq.websocket.config;import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;public class NettyConfig {/*** 存储接入的客户端的channel对象*/public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
使用Apifox测试:
相关文章:

基于Netty构建Websocket服务端
除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。 项目…...
基于Rocket MQ扩展的无限延迟消息队列
基于Rocket MQ扩展的无限延迟消息队列 背景: Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…...
Python办公自动化 – 日志分析和自动化FTP操作
Python办公自动化 – 日志分析和自动化FTP操作 以下是往期的文章目录,需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自动化 – 对…...

MyBatis 关联查询
目录 一、一对一查询(sqlMapper配置文件) 1、需求: 2、创建account和user实体类 3、创建AccountMapper 接口 4、创建并配置AccountMapper.xml 5、测试 二、一对多查询(sqlMapper配置文件) 1、需求:…...

NVIDIA NCCL 源码学习(十二)- double binary tree
上节我们以ring allreduce为例看到了集合通信的过程,但是随着训练任务中使用的gpu个数的扩展,ring allreduce的延迟会线性增长,为了解决这个问题,NCCL引入了tree算法,即double binary tree。 double binary tree 朴素…...

.net core webapi 大文件上传到wwwroot文件夹
1.配置staticfiles(program文件中) app.UseStaticFiles();2.在wwwroot下创建upload文件夹 3.返回结果封装 namespace webapi;/// <summary> /// 统一数据响应格式 /// </summary> public class Results<T> {/// <summary>/// 自定义的响应码ÿ…...

C++设计模式 #3策略模式(Strategy Method)
动机 在软件构建过程中,某些对象使用的的算法可能多种多样,经常改变。如果将这些算法都写在类中,会使得类变得异常复杂;而且有时候支持不频繁使用的算法也是性能负担。 如何在运行时根据需求透明地更改对象的算法?将…...
金融知识——OMS、EMS和PMS分别是什么意思
金融知识——OMS、EMS和PMS分别是什么意思 OMSEMSPMS OMS OMS(Order Management System)是为了管理头寸,以多种方式创建订单,并进行订单屈从检验以使得用户在订单创建时收到一些约束。在交易管理方面,OMS提供交易组合…...

Docker——微服务的部署
Docker——微服务的部署 文章目录 Docker——微服务的部署初识DockerDocker与虚拟机Docker架构安装DockerCentOS安装Docker卸载(可选)安装docker启动docker配置镜像加速 Docker的基本操作Docker的基本操作——镜像Docker基本操作——容器Docker基本操作—…...

AI时代架构设计新模式
云原生架构原则 云原生架构本身作为一种架构,也有若干架构原则作为应用架构的核心架构控制面,通过遵从这些架构原则可以让技术主管和架构师在做技术选择时不会出现大的偏差。 服务化原则 当代码规模超出小团队的合作范围时,就有必要进行服务…...
速盾网络:高防IP的好处
随着互联网的快速发展,网络安全问题日益突出,越来越多的企业和个人开始关注网络安全防护。其中,高防IP作为一种高效的防御手段,越来越受到用户的青睐。本文将介绍速盾网络高防IP的好处,帮助您了解其优势和应用场景。一…...

创建Maven Web工程
目录下也会有对应的生命周期。其中常用的是:clean、compile、package、install。 比如这里install ,如果其他项目需要将这里的模块作为依赖使用,那就可以 install 。安装到本地仓库的位置: Java的Web工程,所以我们要选…...

【PHP入门】2.2 流程控制
-流程控制- 流程控制:代码执行的方向 2.2.1控制分类 顺序结构:代码从上往下,顺序执行。(代码执行的最基本结构) 分支结构:给定一个条件,同时有多种可执行代码(块)&am…...

springCould中的zookeeper-从小白开始【3】
目录 1.启动zookeeper❤️❤️❤️ 2.创建8004模块 ❤️❤️❤️ 3.临时节点还是永久节点❤️❤️❤️ 4.创建zk80消费模块❤️❤️❤️ 1.启动zookeeper❤️❤️❤️ 进入自己zookeeper的bin目录下 分别使用命令: ./zkServer.sh start 和 ./zkCli.sh -serve…...

Node.js-模块化(二)
1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时,自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说,模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化,就是遵守固定的规则&…...
MAC 安装nginx
使用Homebrew方式进行安装 步骤: 1、更新 Homebrew brew update 2、下载并安装 Nginx brew install nginx 3、查看 nginx 配置信息 brew info nginx zhanghuaBreeze ~ % brew info nginx // 版本信息 > nginx: stable 1.25.1 (bottled), HEAD HTTP(S) se…...

开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”
强大的人工智能正在飞速发展,而完全由 OpenAI、Midjourney、Google(Bard)这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下,试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor,可谓…...

设计模式:循序渐进走入工厂模式
文章目录 前言一、引入二、简单工厂模式1.实现2.优缺点3.扩展 三、工厂方法模式1.实现2.优缺点 四、抽象工厂模式1.实现2.优缺点3.使用场景 五、模式扩展六、JDK源码解析总结 前言 软件设计模式之工厂模式。 一、引入 需求:设计一个咖啡店点餐系统。 设计一个咖啡类…...

如何将图片(matlab、python)无损放入word论文
许多论文对插图有要求,直接插入png、jpg一般是不行的,这是一篇顶刊文章(pdf)的插图,放大2400%后依旧清晰,搜罗了网上的方法,总结了一下如何将图片无损放入论文中。 这里主要讨论的是数据生成的图…...

在Next.js和React中搭建Cesium项目
在Next.js和React中搭建Cesium项目,需要确保Cesium能够与服务端渲染(SSR)兼容,因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库,通常用于在网页中展示三维地球或地图。下面是一个基本的步骤,用于在Next.js项目中…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...