netty-学习
Netty
- Netty 的核心概念
- Netty 的主要特性
- Netty 的应用场景
- Netty 的基本使用
- 服务器端
- 处理器
- 总结
- 代码分析
- 1.心跳检测
- 代码解析
- 类和成员变量
- `userEventTriggered`方法
- 总结
- 4.参数详解
- `ChannelHandlerContext ctx`
- `Object evt`
- 事件来源
- 示例:配置 `IdleStateHandler`
- 事件处理
- 示例回调
- 2.Netty WebSocket 服务器启动器类
- 1.代码解析
- 类和成员变量
- 资源关闭方法
- `run` 方法
- 2.总结
- 3.Netty WebSocket 处理器 HandlerWebSocket
- 代码解析
- 类和成员变量
- `channelActive` 方法
- `channelInactive` 方法
- `channelRead0` 方法
- `userEventTriggered` 方法
- `getToken` 方法
- 总结
- 完整过程
- 1. 客户端连接到 WebSocket 服务器
- 2. 服务器端初始化和配置
- Netty 服务器启动
- 3. 通道初始化
- 4. 握手和连接事件
- WebSocket 协议处理和心跳检测
- 心跳检测
- 5. 消息处理
- 6. 连接关闭
- 总结
Netty 是一个基于 Java 的异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络应用。Netty 提供了丰富的 API,支持多种传输协议和多种编解码方式,广泛应用于高性能的网络服务器和客户端的开发。
Netty 的核心概念
-
Channel:Netty 中的基本网络操作抽象。它代表一个打开的连接,比如一个到远程服务器的 TCP 连接。Channel 提供了异步的网络 I/O 操作,如读、写、连接和绑定。
-
EventLoop:Netty 中处理 I/O 操作的核心。EventLoop 是一个处理 I/O 事件、运行任务和处理定时任务的循环。每个 Channel 都会分配给一个 EventLoop。
-
ChannelFuture:Netty 中的异步操作结果。ChannelFuture 提供了在操作完成时通知的机制,允许你在操作完成后执行一些特定的操作(如写操作完成后的回调处理)。
-
ChannelHandler:用于处理 Channel 的 I/O 事件和数据。ChannelHandler 是处理网络事件和数据的核心接口。你可以实现 ChannelHandler 来处理入站和出站数据。
-
ChannelPipeline:Netty 中的责任链模式实现。每个 Channel 都有一个 ChannelPipeline,它持有一个 ChannelHandler 的链。当一个 I/O 事件发生时,事件在 ChannelPipeline 中传播,通过其中的 ChannelHandler 进行处理。
-
ByteBuf:Netty 中的数据容器,比 Java 的 NIO ByteBuffer 更高效和灵活。ByteBuf 提供了丰富的 API,支持动态扩展、零拷贝等特性。
Netty 的主要特性
-
异步和事件驱动:Netty 基于异步和事件驱动的编程模型,能够处理大量并发连接,并提供高吞吐量和低延迟。
-
多协议支持:Netty 支持多种协议,如 HTTP、WebSocket、TCP、UDP 等。你可以轻松实现自定义协议。
-
高效的内存管理:Netty 提供了高效的内存管理机制,避免了频繁的垃圾回收,提升了应用的性能。
-
丰富的工具类和扩展点:Netty 提供了大量的工具类和扩展点,方便开发者定制和扩展。
Netty 的应用场景
- 高性能的网络服务器:如 HTTP 服务器、游戏服务器、聊天服务器等。
- 分布式系统中的网络通信:如 RPC 框架、消息中间件等。
- 代理服务器:如反向代理、网关等。
- 任何需要高并发和低延迟的网络应用。
Netty 的基本使用
以下是一个简单的 Netty 服务器示例:
服务器端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {private final int port;public NettyServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 处理连接事件EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理I/O事件try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture f = b.bind(port).sync(); // 绑定端口f.channel().closeFuture().sync(); // 等待服务器关闭} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;new NettyServer(port).start();}
}
处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg); // 将接收到的消息写回去}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush(); // 刷新所有写入到客户端的消息}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close(); // 发生异常时关闭连接}
}
总结
Netty 是一个强大的网络编程框架,适用于构建高性能、高可用性的网络应用。它提供了丰富的 API 和灵活的扩展机制,使得开发高性能的网络应用变得更加简单和高效。通过异步事件驱动模型和高效的内存管理,Netty 可以处理大量并发连接并提供低延迟的服务。
代码分析
1.心跳检测
public class HandlerHeartBeat extends ChannelDuplexHandler {private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}}
}
代码解析
这是一个Netty的处理器类HandlerHeartBeat,继承了ChannelDuplexHandler。这个类主要用于处理心跳检测逻辑,以确保连接的存活性。以下是对代码的详细解析:
类和成员变量
public class HandlerHeartBeat extends ChannelDuplexHandler {private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
}
HandlerHeartBeat:继承自ChannelDuplexHandler,它是Netty提供的一个用于处理双向事件的处理器类。
userEventTriggered方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
-
userEventTriggered:这是Netty中的一个回调方法,当用户事件触发时会调用这个方法。在心跳检测中,当连接变为空闲时,Netty会触发一个IdleStateEvent事件。 -
if (evt instanceof IdleStateEvent):检查事件是否为IdleStateEvent。IdleStateEvent是Netty提供的一个特殊事件,用于表示连接的空闲状态。 -
IdleStateEvent e = (IdleStateEvent) evt:将事件强制转换为IdleStateEvent。 -
处理
READER_IDLE状态:if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close(); }IdleState.READER_IDLE:表示读取通道空闲,即长时间没有从客户端读取到数据。ctx.channel().attr(...):获取与通道相关的属性。在这里,通过通道的ID作为键来获取用户ID。logger.info(...):记录日志,说明哪个用户没有发送心跳包导致连接断开。ctx.close():关闭连接。
-
处理
WRITER_IDLE状态:else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart"); }IdleState.WRITER_IDLE:表示写入通道空闲,即长时间没有向客户端发送数据。ctx.writeAndFlush("heart"):向客户端发送一个心跳消息"heart",保持连接的活跃状态。
总结
这个处理器类HandlerHeartBeat主要用于处理心跳检测逻辑,以确保客户端和服务器之间的连接在长时间没有数据交互时保持活跃或及时关闭:
- 如果长时间没有读取到数据(
READER_IDLE),则关闭连接并记录日志。 - 如果长时间没有向客户端发送数据(
WRITER_IDLE),则发送一个心跳消息"heart"以保持连接活跃。
在Netty中,userEventTriggered方法的参数是ChannelHandlerContext和Object类型的事件。以下是详细解释:
4.参数详解
ChannelHandlerContext ctx
- 类型:
ChannelHandlerContext - 作用:
ChannelHandlerContext提供了各种操作以触发 IO 操作和事件处理(如读取、写入、连接、断开等)。它关联了一个Channel,并且允许访问ChannelPipeline中的其他ChannelHandler。
Object evt
- 类型:
Object - 作用:这是传递给该方法的事件对象。在心跳检测的场景下,这个事件对象通常是
IdleStateEvent,它表示连接的空闲状态(读空闲、写空闲、读写空闲)。
事件来源
在Netty中,空闲状态检测通常是通过 IdleStateHandler 来实现的。IdleStateHandler 会监测通道的读写操作,如果通道在指定的时间内没有读或写操作,就会触发 IdleStateEvent 事件。
示例:配置 IdleStateHandler
以下是一个示例,展示如何将 IdleStateHandler 添加到 ChannelPipeline 中,以便在通道空闲时触发 IdleStateEvent:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;public class NettyServer {public static void main(String[] args) throws Exception {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 添加 IdleStateHandler,配置读、写空闲时间p.addLast(new IdleStateHandler(60, 30, 0)); // 读空闲60秒,写空闲30秒// 添加自定义的心跳检测处理器p.addLast(new HandlerHeartBeat());}});b.bind(8080).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
事件处理
当 IdleStateHandler 发现通道空闲时,会触发 IdleStateEvent,并调用 HandlerHeartBeat 中的 userEventTriggered 方法。此时,ctx 和 evt 参数的具体信息如下:
ctx:当前通道的上下文,提供了通道的各种操作方法。evt:具体的事件对象,这里是IdleStateEvent,表示通道的空闲状态。
示例回调
假设配置的读空闲时间是60秒,写空闲时间是30秒:
- 如果60秒内没有读取到任何数据,
IdleStateHandler会触发IdleStateEvent,其中状态为IdleState.READER_IDLE。 - 如果30秒内没有向通道写入数据,
IdleStateHandler会触发IdleStateEvent,其中状态为IdleState.WRITER_IDLE。
在 HandlerHeartBeat 中:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
IdleStateEvent:包含了通道的空闲状态(读空闲、写空闲、读写空闲)。IdleState.READER_IDLE:表示读空闲事件。IdleState.WRITER_IDLE:表示写空闲事件。
2.Netty WebSocket 服务器启动器类
@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;/*** boss线程组,用于处理连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup(1);/*** work线程组,用于处理消息*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/*** 资源关闭——在容器销毁时关闭*/@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void run() {try {//创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();//设置几个重要的处理器// 对http协议的支持,使用http的编码器,解码器pipeline.addLast(new HttpServerCodec());//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest//保证接收的http请求的完整性pipeline.addLast(new HttpObjectAggregator(64 * 1024));//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit// readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息// writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息//allIdleTime 所有类型的超时时间pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());//将http协议升级为ws协议,对websocket支持pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});//启动ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
1.代码解析
这段代码定义了一个 Netty WebSocket 服务器启动器类 NettyWebSocketStarter,该类实现了 Runnable 接口,可以在独立的线程中运行。这个类负责配置并启动 Netty WebSocket 服务器。
类和成员变量
@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;/*** boss线程组,用于处理连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup(1);/*** work线程组,用于处理消息*/private EventLoopGroup workerGroup = new NioEventLoopGroup();
}
NettyWebSocketStarter:实现了Runnable接口,使得该类可以在独立的线程中运行。logger:用于记录日志的 Logger 对象。appConfig:注入的应用配置类,用于获取配置项,如 WebSocket 端口。handlerWebSocket:注入的 WebSocket 处理器,用于处理 WebSocket 消息。bossGroup和workerGroup:分别用于处理连接和处理消息的线程组。
资源关闭方法
/*** 资源关闭——在容器销毁时关闭*/
@PreDestroy
public void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();
}
@PreDestroy:在 Spring 容器销毁之前调用close方法,优雅地关闭bossGroup和workerGroup,释放资源。
run 方法
@Override
public void run() {try {//创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();//设置几个重要的处理器// 对http协议的支持,使用http的编码器,解码器pipeline.addLast(new HttpServerCodec());//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest//保证接收的http请求的完整性pipeline.addLast(new HttpObjectAggregator(64 * 1024));//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit// readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息// writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息//allIdleTime 所有类型的超时时间pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());//将http协议升级为ws协议,对websocket支持pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});//启动ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
-
创建并配置
ServerBootstrap:ServerBootstrap是 Netty 用于引导服务器的启动助手类。serverBootstrap.group(bossGroup, workerGroup)设置了用于处理连接的bossGroup和用于处理消息的workerGroup。serverBootstrap.channel(NioServerSocketChannel.class)设置了服务器通道类型为NioServerSocketChannel,它适用于 NIO 传输。
-
添加处理器:
serverBootstrap.handler(new LoggingHandler(LogLevel.DEBUG)):添加一个日志处理器,用于记录调试级别的日志。serverBootstrap.childHandler(new ChannelInitializer<Channel>() {...}):添加一个子处理器,用于初始化每个新连接的通道。
-
初始化通道:
HttpServerCodec:添加 HTTP 编解码器,支持 HTTP 协议。HttpObjectAggregator:添加 HTTP 对象聚合器,确保接收完整的 HTTP 请求。IdleStateHandler:添加空闲状态处理器,用于检测读超时(60秒)。HandlerHeartBeat:添加心跳检测处理器,用于处理空闲事件。WebSocketServerProtocolHandler:添加 WebSocket 协议处理器,将 HTTP 协议升级为 WebSocket 协议。handlerWebSocket:添加自定义的 WebSocket 消息处理器。
-
启动服务器:
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();:绑定端口并启动服务器。logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());:记录服务器启动成功的日志。channelFuture.channel().closeFuture().sync();:等待服务器通道关闭。
-
异常处理和资源释放:
- 如果发生异常,记录堆栈跟踪并优雅地关闭
bossGroup和workerGroup,释放资源。
- 如果发生异常,记录堆栈跟踪并优雅地关闭
2.总结
NettyWebSocketStarter 是一个用于启动 Netty WebSocket 服务器的类。它通过配置一系列处理器(如 HTTP 编解码器、心跳检测处理器、WebSocket 协议处理器等)来初始化服务器,并在指定端口上启动服务器。同时,通过 @PreDestroy 注解确保在 Spring 容器销毁时优雅地关闭资源。该类实现了 Runnable 接口,使其可以在独立线程中运行,通常可以用于多线程环境中启动 Netty 服务器。
3.Netty WebSocket 处理器 HandlerWebSocket
/*** 设置通道共享*/
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);// @Resource
// private ChannelContextUtils channelContextUtils;@Resourceprivate RedisComponet redisComponet;/*** 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// Channel channel = ctx.channel();logger.info("有新的连接加入。。。");}/*** 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());}/*** 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。** @param ctx* @param textWebSocketFrame* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {//接收心跳Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));//String userId = attribute.get();//redisComponet.saveUserHeartBeat(userId);}//用于处理用户自定义的事件 当有用户事件触发时会调用此方法,例如连接超时,异常等。@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}/*** 用户加入*///channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}}/*** 获取url中的token** @param url* @return*/private String getToken(String url) {if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {return null;}String[] queryParams = url.split("\\?");if (queryParams.length < 2) {return url;}String[] params = queryParams[1].split("=");if (params.length != 2) {return url;}return params[1];}
}
代码解析
这段代码定义了一个 Netty WebSocket 处理器 HandlerWebSocket,继承自 SimpleChannelInboundHandler<TextWebSocketFrame>。该处理器主要用于处理 WebSocket 的各种事件,如连接建立、消息接收、连接关闭等。以下是对代码的详细解析:
类和成员变量
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);@Resourceprivate RedisComponet redisComponet;
}
@ChannelHandler.Sharable:注解表明这个处理器是可以在多个 Channel 之间共享的。@Component("handlerWebSocket"):将这个类注册为 Spring 的一个组件,并指定组件名称为handlerWebSocket。logger:用于记录日志的 Logger 对象。redisComponet:注入的 Redis 组件,用于与 Redis 进行交互。
channelActive 方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("有新的连接加入。。。");
}
channelActive:当通道就绪时调用这个方法,通常用于初始化操作。logger.info:记录有新的连接加入的日志。
channelInactive 方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());
}
channelInactive:当通道不再活跃(连接关闭)时调用这个方法,通常用于清理工作。logger.info:记录有连接断开的日志。
channelRead0 方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));// String userId = attribute.get();// redisComponet.saveUserHeartBeat(userId);
}
channelRead0:当有消息可读时调用这个方法,读取并处理消息。Channel:获取当前的通道。logger.info:记录接收到的消息。- 注释掉的部分代码:可以用于获取用户 ID 并保存心跳到 Redis。
userEventTriggered 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}// 用户加入// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}
}
userEventTriggered:处理用户自定义事件,如连接超时、异常等。WebSocketServerProtocolHandler.HandshakeComplete:处理 WebSocket 握手完成事件。getToken:从 URL 中获取 token。ctx.channel().close():关闭通道,如果 token 为空或无效。
getToken 方法
private String getToken(String url) {if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {return null;}String[] queryParams = url.split("\\?");if (queryParams.length < 2) {return url;}String[] params = queryParams[1].split("=");if (params.length != 2) {return url;}return params[1];
}
getToken:从 URL 中提取 token 参数。StringTools.isEmpty(url):检查 URL 是否为空。url.indexOf("?"):检查 URL 是否包含查询参数。url.split("\\?"):分割 URL 获取查询参数部分。params.length:检查参数长度是否符合预期。
总结
HandlerWebSocket 是一个 Netty WebSocket 处理器,用于处理各种 WebSocket 事件。它包含以下功能:
channelActive:当通道就绪时,记录新的连接加入。channelInactive:当通道不再活跃时,记录连接断开。channelRead0:读取并处理接收到的 WebSocket 消息,记录消息内容。userEventTriggered:处理 WebSocket 握手完成事件,从 URL 中提取 token 并验证,如果无效则关闭连接。getToken:从 URL 中提取 token 参数,用于验证用户身份。
通过这些方法,HandlerWebSocket 能够有效地管理 WebSocket 连接的生命周期和消息处理,确保连接的可靠性和安全性。
完整过程
当然可以,我会详细解释从访问 WebSocket 接口到完成所有步骤的整个流程,包括各个代码片段是如何协同工作的。
1. 客户端连接到 WebSocket 服务器
当客户端尝试连接到 WebSocket 服务器时,例如通过 ws://localhost:5051/ws?token=16515151,Netty 会接受这个连接请求并触发一系列的事件来处理这个连接。
2. 服务器端初始化和配置
在服务器端,我们使用了 NettyWebSocketStarter 类来初始化和配置 Netty 服务器。
Netty 服务器启动
@Component
public class NettyWebSocketStarter implements Runnable {private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workerGroup = new NioEventLoopGroup();@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void run() {try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(64 * 1024));pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HandlerHeartBeat());pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
3. 通道初始化
当客户端连接到服务器时,Netty 会初始化通道并调用配置的处理器。这里的处理器包括 HttpServerCodec、HttpObjectAggregator、IdleStateHandler、HandlerHeartBeat 和 WebSocketServerProtocolHandler 以及我们的 HandlerWebSocket。
4. 握手和连接事件
WebSocket 协议处理和心跳检测
WebSocketServerProtocolHandler 处理 WebSocket 协议的握手。当握手完成时,会触发 userEventTriggered 方法。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String url = complete.requestUri();String token = getToken(url);if (token == null) {ctx.channel().close();return;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);if (null == tokenUserInfoDto) {ctx.channel().close();return;}// 用户加入// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}
}
- 握手完成:当 WebSocket 握手完成时,
WebSocketServerProtocolHandler触发HandshakeComplete事件。 - 提取 Token:从 URL 中提取
token。 - 验证 Token:从 Redis 中验证 Token 是否有效。
- 管理连接:将用户信息和通道关联(注释部分)。
心跳检测
IdleStateHandler 会检测连接的空闲状态(例如60秒内没有读取到数据),触发相应的事件。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;if (e.state() == IdleState.READER_IDLE) {Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush("heart");}}
}
- 读空闲:如果长时间未读取到数据(
READER_IDLE),则关闭连接。 - 写空闲:如果长时间未写入数据(
WRITER_IDLE),发送心跳消息。
5. 消息处理
当客户端发送消息时,Netty 会触发 channelRead0 方法,处理接收到的 WebSocket 消息。
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = ctx.channel();logger.info("接收到消息,{}", textWebSocketFrame.text());// Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));// String userId = attribute.get();// redisComponet.saveUserHeartBeat(userId);
}
- 接收消息:记录接收到的消息内容。
- 心跳更新(注释部分):可以从消息中提取用户ID,并在 Redis 中更新心跳记录。
6. 连接关闭
当连接关闭时,Netty 会调用 channelInactive 方法。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("有连接已经断开。。。");//channelContextUtils.removeContext(ctx.channel());
}
- 记录日志:记录连接断开的日志。
- 清理资源(注释部分):从上下文中移除断开的连接。
总结
整个过程包括以下步骤:
- 客户端连接:客户端通过 WebSocket URL 连接到服务器(如
ws://localhost:5051/ws?token=16515151)。 - 服务器初始化:
NettyWebSocketStarter启动服务器,并配置各个处理器。 - 握手和协议升级:
WebSocketServerProtocolHandler处理握手并升级协议。 - 事件触发和处理:
- 握手完成:在
userEventTriggered中处理握手完成事件,提取并验证 token。 - 心跳检测:在
HandlerHeartBeat中处理读空闲和写空闲事件。
- 握手完成:在
- 消息处理:在
HandlerWebSocket的channelRead0方法中处理接收到的 WebSocket 消息。 - 连接关闭:在
HandlerWebSocket的channelInactive方法中处理连接关闭事件。
通过这些步骤,Netty 能够高效地管理 WebSocket 连接,确保连接的可靠性和数据的实时性。
相关文章:
netty-学习
Netty Netty 的核心概念Netty 的主要特性Netty 的应用场景Netty 的基本使用服务器端处理器 总结 代码分析1.心跳检测代码解析类和成员变量userEventTriggered方法总结 4.参数详解ChannelHandlerContext ctxObject evt 事件来源示例:配置 IdleStateHandler事件处理示…...
无线和移动网络
背景 两个重要的挑战 无线:通过无线链路通信移动:需要网络处理移动(不同变换所接入的网络)用户 无线网络中的组件 无线主机(无线并不总是意味着移动的)基站(base station 或者叫AP࿰…...
快团团账号被封,大团长帮卖团长如何避免违规操作
去年末到现在有部分小伙伴反馈,自己的快团团账号资金提现受到限制,也有个别的快团团大团长账号直接被查封了,有些团长是明知是违规行为还抱有侥幸的心理,比如有个做房产中间的小伙,知道套现违规还频频套现,…...
Github Copilot登录账号,完美支持chat
Github Copilot 代码补全等功能,提高写代码的效率 https://web.52shizhan.cn/activity/copilot 登录授权后,已经可以使用,完美。如图...
Ubuntu系统中Apache Web服务器的配置与实战
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...
如何在路由器上安装代理服务:详细教程
如何在路由器上安装代理服务:详细教程 步骤一:通过漏洞进入路由器系统开启Telnet服务使用Telnet登录路由器系统查看系统信息和CPU信息步骤二:交叉编译MIPS程序 Go对MIPS的支持 安装TFTP Server使用BusyBox tftp传输文件在路由器系统中下载编译…...
JavaScript html css前端 日期对象 date对象 日期格式化 时间戳
日期对象 Date对象 Date 对象和 Math 对象不一样,他是一个构造函数,所以我们需要实例化后才能使用 Date 实例用来处理日期和时间 Date()使用方法 示例:获取当前时间 let now new Date() console.log(now) 示例:获取指定时间…...
【再探】设计模式—备忘录模式与解释器模式
备忘录模式是用于保存对象在某个时刻的状态,来实现撤销操作。而解释器模式则是将文本按照定义的文法规则解析成对应的命令。 1 备忘录模式 需求:保存对象在某个时刻的状态,后面可以对该对象实行撤销操作。 1.1 备忘录模式介绍 提供一种状…...
SpringCloud网关-gateway
一 什么是网关?为什么选择 Gateway? 网关功能如下: 身份认证和权限校验服务路由、负载均衡请求限流 在 Spring Cloud 中网关的实现包含两种: Gateway(推荐):是基于 Spring5 中提供的 WebFlux ÿ…...
LiveData是如何感知Room数据变化的
一 Room数据变化LiveData如何收到onChanged回调的? 1.1 LiveData是如何创建的 这里讨论的LiveData的创建是特指Dao定义的方法的返回类型,而不是所有的LiveData。 以NoteDao 举例: Dao public interface NoteDao {Query("select * fr…...
【自动化】WebUI自动化通过读取用户数据的方式启动浏览器实现绕过相关登录验证的方法。
背景说明 我相信做自动化测试或者实现UI自动化相关功能的同学肯定碰到过,每次写好脚本执行时都是默认打开一个 “新”的浏览器,我的意思是就跟刚下载的浏览器一样。而不是平时日常使用着的浏览器的状态,日常使用浏览器时只要近期登录过&…...
信号:干扰类别及特征提取
目录 第一部分:干扰类别 1.压制干扰 1.1噪声调幅瞄准式干扰(单音干扰) 1.2噪声调频阻塞式干扰(宽带噪声干扰) 1.3噪声调频扫频式干扰(线性调频) 2.欺骗干扰 2.1距离欺骗干扰(幅度调制干扰࿰…...
【推荐】用scss循环zoom缩放比例,解决可视化大屏在不同分辨率屏幕下的适配问题
方法1: 指定几种常规屏幕宽度(用这种方式就必须要强制用户全屏查看页面,在固定的宽度下才能达到比较不错的显示效果) // 适配不同分辨率的页面---------------------------------------- html {overflow: hidden;width: 1920px;…...
23中设计模式之一— — — —命令模式的详细介绍
命令模式 Command Pattern讲解 概念描述模式结构主要角色模式的UIM类图模式优点模式缺点应用场景实例演示类图代码演示运行结果 概念 命令模式(别名:动作,事务) 命令模式是一种行为设计模式,将一个请求封装为一个对象…...
解决 Mac Django 连接Mysql 出现 image not found 问题
最近在使用 Django 框架,因为升级到4.2版本了,对应的本机 Mysql 5.7 就不适用了,于是升级到了 Mysql 8.0,写好代码之后出现如下错误: 仔细分析一下错误的描述: ImportError: dlopen(/Library/Frameworks/P…...
EitbaseEX香港业务开展,提升用户友好交易体验
在全球范围内备受瞩目的加密货币交易平台Coinbase,宣布正式入驻香港市场,并命名为EitbaseEX。这一战略性扩展举措,旨在为香港提供先进的加密货币交易技术和服务,同时将香港打造为其在亚太地区的重要枢纽。 作为国际金融中心&#…...
ROS学习记录:自定义消息类型
前言 当我们需要传输一些特殊的数据时,且官方的消息包无法满足需求,我们便可以自己定义一个消息类型。 实验步骤 一、在终端输入cd ~/catkin_ws1/src进入工作空间中src目录 二、输入catkin_create_pkg qq_msgs roscpp rospy std_msgs message_generati…...
创新实训2024.06.06日志:部署web服务
1. 运行web项目前后端服务 首先我们要先在服务器上运行客户端以及服务端的应用程序。随后再考虑如何通过公网/局域网访问的问题。 如何启动服务在仓库对应分支下的Readme文件中已经有详细描述了。 1.1. 启动服务端 对于服务端,即(要求你在服务端子项…...
使用C++实现YOLO图像分类:从环境搭建到性能评估的完整指南
⭐️我叫忆_恒心,一名喜欢书写博客的研究生👨🎓。 如果觉得本文能帮到您,麻烦点个赞👍呗! 近期会不断在专栏里进行更新讲解博客~~~ 有什么问题的小伙伴 欢迎留言提问欧,喜欢的小伙伴给个三连支…...
Linux中安装Docker,并使用Docker安装MySQL和Redis
1、安装docker 1卸载系统之前的docker yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine2、安装Docker-CE #安装必须的依赖 sudo yum install -y yum-utils \device-map…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...
Python常用模块:time、os、shutil与flask初探
一、Flask初探 & PyCharm终端配置 目的: 快速搭建小型Web服务器以提供数据。 工具: 第三方Web框架 Flask (需 pip install flask 安装)。 安装 Flask: 建议: 使用 PyCharm 内置的 Terminal (模拟命令行) 进行安装,避免频繁切换。 PyCharm Terminal 配置建议: 打开 Py…...
RLHF vs RLVR:对齐学习中的两种强化方式详解
在语言模型对齐(alignment)中,强化学习(RL)是一种重要的策略。而其中两种典型形式——RLHF(Reinforcement Learning with Human Feedback) 与 RLVR(Reinforcement Learning with Ver…...
linux设备重启后时间与网络时间不同步怎么解决?
linux设备重启后时间与网络时间不同步怎么解决? 设备只要一重启,时间又错了/偏了,明明刚刚对时还是对的! 这在物联网、嵌入式开发环境特别常见,尤其是开发板、树莓派、rk3588 这类设备。 解决方法: 加硬件…...
