Netty深入浅出Java网络编程学习笔记(二) Netty进阶应用篇
目录
四、应用
1、粘包与半包
现象分析
粘包
半包
本质
解决方案
短链接
定长解码器
行解码器
长度字段解码器——LTC
2、协议设计与解析
协议的作用
Redis协议
HTTP协议
自定义协议
组成要素
编码器与解码器
编写测试类
@Sharable注解
自定义编解码器能否使用@Sharable注解
3、在线聊天室
聊天室业务
用户登录接口
用户会话接口
群聊会话接口
整体结构
客户端代码结构
服务器代码结构
登录
客户端代码
服务器代码
运行结果
单聊
群聊
创建群聊
群聊聊天
加入群聊
退出
查看群聊成员
退出聊天室
连接假死
解决方法
四、应用
1、粘包与半包
粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包
只要是TCP协议的网络交互都有粘包和半包问题,因为TCP的传输是基于字节的传输方式,数据是以字节的形式进行传输的,并没有明确的边界。因此,在传输过程中,TCP没有办法直接识别数据包的边界,并且在流量控制下,TCP的字节传输还不稳定,当发送方连续发送多个数据包时,这些数据包可能会在网络传输的过程中合并或拆分,导致粘包和半包问题的出现,而UDP则没有这个问题,因为UDP的传输是基于数据报的
现象分析
粘包
现象
- 发送 abc def,接收 abcdef
原因
- 应用层
- 接收方 ByteBuf 设置太大(Netty 默认 1024)
- 传输层-网络层
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
Nagle算法的原理如下:当TCP发送方需要发送一个小数据包时,Nagle算法会将这个数据包缓存起来,不立即发送。然后,TCP发送方会继续等待其他数据,直到以下两个条件中的任意一个满足后再发送数据:
- 接收到之前发送的数据的确认ACK。
- 发送方的发送缓冲区中的数据量达到一定的阈值(一般是MSS,即最大报文长度)。
之所以要缓存起来是因为一个TCP的请求都是要进行数据报头的添加,而IP的报头+TCP的报头 = 40字节,哪怕你只是发送了1个字节的数据,也会被封装为一个41字节的传输内容,那这样子粘包现象就很严重了,为此解决方法就是,缓存多点字节再一起发过来。然而,当发送方连续发送多个小数据包时,这些数据包可能会在网络传输的过程中被合并成一个大数据包,导致粘包问题的出现。这是因为Nagle算法本身不考虑数据包的边界,只是简单地将小数据包缓存起来,直到条件满足后发送。
半包
现象
- 发送 abcdef,接收 abc def
原因
- 应用层
- 接收方 ByteBuf 小于实际发送数据量
- 传输层-网络层
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过 MSS (最大报文长度)限制后,会将数据切分发送,就会造成半包
本质
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
解决方案
解决方案的思路和我这篇文章的处理方式类似,可以先看一下这个大概思路https://blog.csdn.net/weixin_73077810/article/details/131843387
短链接和长连接是描述客户端与服务器之间TCP连接持续时间的概念。
- 短链接:短链接通常指的是一次性的临时连接。在短链接中,客户端与服务器建立连接、交换数据后,连接就会关闭。在每次通信之前,需要重新建立连接,进行握手和协商。
短链接的优点是简单、轻量,适用于临时的、低频率的通信。但在高并发或频繁通信的场景中,频繁的连接建立和关闭会增加网络开销和延迟。
- 长连接:长连接指的是客户端与服务器之间持久的TCP连接。在长连接中,连接一经建立,客户端和服务器可以多次、长时间地进行双向通信。在连接建立后,数据可以实时、便捷地传输。
长连接的优点是减少连接建立和断开的开销,节省网络资源,减少延迟,提高通信效率。长连接常用于需要实时交互的应用,如即时通信、实时数据传输等。
需要注意的是,长连接可能会带来一些管理上的挑战。服务器需要维护大量的长连接,消耗资源,需要适当管理连接数和超时机制,防止资源浪费和死连接问题。
短链接
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
客户端代码改进
修改channelActive方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);// 使用短链接,每次发送完毕后就断开连接ctx.channel().close();
}
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder
对数据进行定长解码,具体使用方法如下
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
客户端代码
客户端发送数据的代码如下
// 约定最大长度为16
final int maxLength = 16;
// 被发送的数据
char c = 'a';
// 向服务器发送10个报文
for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 定长byte数组,未使用部分会以0进行填充byte[] bytes = new byte[maxLength];// 生成长度为0~15的数据for (int j = 0; j < (int)(Math.random()*(maxLength-1)); j++) {bytes[j] = (byte) c;}buffer.writeBytes(bytes);c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);
}
服务器代码
使用FixedLengthFrameDecoder
对粘包数据进行拆分,该handler需要添加在LoggingHandler
之前,保证数据被打印时已被拆分
// 通过定长解码器对粘包数据进行拆分
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
行解码器
行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的
- 可以通过
LineBasedFrameDecoder(int maxLength)
来拆分以换行符(\n or \r\n)为分隔符的数据 - 可以通过
DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
来指定通过什么分隔符来拆分数据(可以传入多个分隔符)
两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException
异常
以换行符 \n 为分隔符
客户端代码
// 约定最大长度为 64
final int maxLength = 64;
// 被发送的数据
char c = 'a';
for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 生成长度为0~62的数据Random random = new Random();StringBuilder sb = new StringBuilder();for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {sb.append(c);}// 数据以 \n 结尾sb.append("\n");buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);
}
服务器代码
// 通过行解码器对粘包数据进行拆分,以 \n 为分隔符
// 需要指定最大长度
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
以自定义分隔符 \c 为分隔符
客户端代码
// 数据以 \c 结尾
sb.append("\\c");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
服务器代码
// 将分隔符放入ByteBuf中
ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c".getBytes(StandardCharsets.UTF_8));
// 通过行解码器对粘包数据进行拆分,以 \c 为分隔符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64, ch.alloc().buffer().writeBytes(bufSet)));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
长度字段解码器——LTC
在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
LengthFieldBasedFrameDecoder
解码器可以提供更为丰富的拆分方法,其构造方法有五个参数
public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip)
参数解析
-
maxFrameLength 数据最大长度
- 表示数据的最大长度(包括附加信息、长度标识等内容)
-
lengthFieldOffset 数据长度标识的起始偏移量
- 用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
-
lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)
- 数据中用于表示有用数据长度的标识所占的字节数
-
lengthAdjustment 长度表示为有用数据的偏移量
- 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
-
initialBytesToStrip 数据读取起点
- 读取起点,不读取 0 ~ initialBytesToStrip 之间的数据
参数图解
使用
通过 EmbeddedChannel 对 handler 进行测试
public class EncoderStudy {public static void main(String[] args) {// 模拟服务器// 使用EmbeddedChannel测试handlerEmbeddedChannel channel = new EmbeddedChannel(// 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),new LoggingHandler(LogLevel.DEBUG));// 模拟客户端,写入数据ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();send(buffer, "Hello");channel.writeInbound(buffer);send(buffer, "World");channel.writeInbound(buffer);}private static void send(ByteBuf buf, String msg) {// 得到数据的长度int length = msg.length();byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);// 将数据信息写入buf// 写入长度标识前的其他信息buf.writeByte(0xCA);// 写入数据长度标识buf.writeInt(length);// 写入长度标识后的其他信息buf.writeByte(0xFE);// 写入具体的数据buf.writeBytes(bytes);}
}
运行结果
146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 48 65 6c 6c 6f |......Hello |
+--------+-------------------------------------------------+----------------+146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 57 6f 72 6c 64 |......World |
+--------+-------------------------------------------------+----------------+
2、协议设计与解析
协议的作用
TCP/IP 中消息传输基于字节流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议
如果我们要向Redis服务器发送一条set name Nyima
的指令,需要遵守如下协议
// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
客户端代码如下
public class RedisClient {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 打印日志ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 回车与换行符final byte[] LINE = {'\r','\n'};// 获得ByteBufByteBuf buffer = ctx.alloc().buffer();// 连接建立后,向Redis中发送一条指令,注意添加回车与换行// set name Nyimabuffer.writeBytes("*3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("set".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$4".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("name".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$5".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("Nyima".getBytes());buffer.writeBytes(LINE);ctx.writeAndFlush(buffer);}});}}).connect(new InetSocketAddress("localhost", 6379));channelFuture.sync();// 关闭channelchannelFuture.channel().close().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 关闭groupgroup.shutdownGracefully();}}
}
Redis中查询执行结果
HTTP协议
HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec
作为服务器端的解码器与编码器,来处理HTTP请求
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>implements HttpServerUpgradeHandler.SourceCodec
服务器代码
public class HttpServer {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();new ServerBootstrap().group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 作为服务器,使用 HttpServerCodec 作为编码器与解码器ch.pipeline().addLast(new HttpServerCodec());// 服务器只处理HTTPRequest,具体的限定取决于泛型ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {// 获得请求urilog.debug(msg.uri());// 获得完整响应,设置版本号与状态码DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);// 设置响应内容byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);// 设置响应体长度,避免浏览器一直接收响应内容response.headers().setInt(CONTENT_LENGTH, bytes.length);// 设置响应体response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}}).bind(8080);}
}
服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse
,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH
而一直空转,需要添加CONTENT_LENGTH
字段,表明响应体中数据的具体长度
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
运行结果
浏览器
自定义协议
组成要素
- 魔数:用来在第一时间判定接收的数据是否为无效数据包
- 版本号:可以支持协议的升级
- 序列化算法:消息正文到底采用哪种序列化反序列化方式
- 如:json、protobuf、hessian、jdk
- 指令类型:是登录、注册、单聊、群聊… 跟业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度
- 消息正文
编码器与解码器
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 设置魔数 4个字节out.writeBytes(new byte[]{'N','Y','I','M'});// 设置版本号 1个字节out.writeByte(1);// 设置序列化方式 1个字节out.writeByte(1);// 设置指令类型 1个字节out.writeByte(msg.getMessageType());// 设置请求序号 4个字节out.writeInt(msg.getSequenceId());// 为了补齐为2的次幂个字节,填充1个字节的数据,满足为16字节out.writeByte(0xff);// 获得序列化后的msgByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 获得并设置正文长度 长度用4个字节标识out.writeInt(bytes.length);// 设置消息正文out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 获取魔数int magic = in.readInt();// 获取版本号byte version = in.readByte();// 获得序列化方式byte seqType = in.readByte();// 获得指令类型byte messageType = in.readByte();// 获得请求序号int sequenceId = in.readInt();// 移除补齐字节in.readByte();// 获得正文长度int length = in.readInt();// 获得正文byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();// 将信息放入List中,传递给下一个handlerout.add(message);// 打印获得的信息正文System.out.println("===========魔数===========");System.out.println(magic);System.out.println("===========版本号===========");System.out.println(version);System.out.println("===========序列化方法===========");System.out.println(seqType);System.out.println("===========指令类型===========");System.out.println(messageType);System.out.println("===========请求序号===========");System.out.println(sequenceId);System.out.println("===========正文长度===========");System.out.println(length);System.out.println("===========正文===========");System.out.println(message);}
}
-
编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
- 编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
-
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler
编写测试类
public class TestCodec {static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel();// 添加解码器,避免粘包半包问题channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));// 开启控制台日志channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 绑定自定义编码器与解码器,其内部重写父类的encode和decode两个handler方法channel.pipeline().addLast(new MessageCodec());// 自定义的封装dto类LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");// 测试编码与解码ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();// 内部将user正文数据存储到byteBuf的正文位置上new MessageCodec().encode(null, user, byteBuf);channel.writeInbound(byteBuf);}
}
- 测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
- 通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码
运行结果
@Sharable注解
为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder
。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
- channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
- 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable
注解来标明,该handler能否在多个channel中共享。
只有带有该注解,才能通过对象的方式被共享,否则无法被共享
自定义编解码器能否使用@Sharable注解
这需要根据自定义的handler的处理逻辑进行分析
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的
但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
-
因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下
-
这就意味着ByteToMessageCodec不能被多个channel所共享的
- 原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题
如果想要共享,需要怎么办呢?
继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {...}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {...}
}
3、在线聊天室
聊天室业务
用户登录接口
public interface UserService {/*** 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}
用户会话接口
public interface Session {/*** 绑定会话* @param channel 哪个 channel 要绑定会话* @param username 会话绑定用户*/void bind(Channel channel, String username);/*** 解绑会话* @param channel 哪个 channel 要解绑会话*/void unbind(Channel channel);/*** 获取属性* @param channel 哪个 channel* @param name 属性名* @return 属性值*/Object getAttribute(Channel channel, String name);/*** 设置属性* @param channel 哪个 channel* @param name 属性名* @param value 属性值*/void setAttribute(Channel channel, String name, Object value);/*** 根据用户名获取 channel* @param username 用户名* @return channel*/Channel getChannel(String username);
}
群聊会话接口
public interface GroupSession {/*** 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/*** 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/*** 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/*** 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/*** 获取组成员* @param name 组名* @return 成员集合, 如果群不存在或没有成员会返回 empty set*/Set<String> getMembers(String name);/*** 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);/*** 判断群聊是否一被创建* @param name 群聊名称* @return 是否存在*/boolean isCreated(String name);
}
整体结构
-
client包:存放客户端相关类
-
message包:存放各种类型的消息
-
protocol包:存放自定义协议
-
server包:存放服务器相关类
- service包:存放用户相关类
- session包:单聊及群聊相关会话类
客户端代码结构
public class ChatClient {static final Logger log = LoggerFactory.getLogger(ChatClient.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);MessageSharableCodec messageSharableCodec = new MessageSharableCodec();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 自定义的协议解码粘半包处理器ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(loggingHandler);ch.pipeline().addLast(messageSharableCodec);}});Channel channel = bootstrap.connect().sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}
服务器代码结构
public class ChatServer {static final Logger log = LoggerFactory.getLogger(ChatServer.class);public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);MessageSharableCodec messageSharableCodec = new MessageSharableCodec();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker);bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(loggingHandler);ch.pipeline().addLast(messageSharableCodec);}});Channel channel = bootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
登录
客户端代码
客户端添加如下handler,分别处理登录、聊天等操作
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// 这是一个计数锁,只有当其维护的value减为0的时候才会释放CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);// 原子变量AtomicBoolean LOGIN = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 创建连接时执行的处理器,用于执行登陆操作*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 开辟额外线程(不要让nio的线程被录入阻塞),用于用户登陆及后续操作new Thread(()->{Scanner scanner = new Scanner(System.in);System.out.println("请输入用户名");String username = scanner.next();System.out.println("请输入密码");String password = scanner.next();// 创建包含登录信息的请求体LoginRequestMessage message = new LoginRequestMessage(username, password);// 发送到channel中,注意这里用ctx写出,因为他要从这里找前面的那些处理器进行加工ctx.writeAndFlush(message);// 校验登录结果,如果能获取到锁就说明登录成功if (!loginStatus.get()) {// 登陆失败,关闭channel并返回ctx.channel().close();return;}// 登录成功后,执行其他操作while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = scanner.nextLine();// 获得指令及其参数,并发送对应类型消息// 注意这里!!!!!你发送的消息类型决定了在服务器端处理的handelerString[] commands = command.split(" ");switch (commands[0]){case "send":ctx.writeAndFlush(new ChatRequestMessage(username, commands[1], commands[2]));break;case "gsend":ctx.writeAndFlush(new GroupChatRequestMessage(username,commands[1], commands[2]));break;case "gcreate":// 分割,获得群员名String[] members = commands[2].split(",");Set<String> set = new HashSet<>(Arrays.asList(members));// 把自己加入到群聊中set.add(username);ctx.writeAndFlush(new GroupCreateRequestMessage(commands[1],set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(commands[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username, commands[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username, commands[1]));break;case "quit":ctx.channel().close();return;default:System.out.println("指令有误,请重新输入");continue;}}}, "login channel").start();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 注意噢,这个消息的接收是在这里进行输出控制台log.debug("{}", msg);if (msg instanceof LoginResponseMessage) {// 如果是登录响应信息LoginResponseMessage message = (LoginResponseMessage) msg;boolean isSuccess = message.isSuccess();// 登录成功,设置登陆标记if (isSuccess) {loginStatus.set(true);}// 登陆后,唤醒登陆线程,原始计数为1,减了一个后就变为0,释放锁waitLogin.countDown();}}});
服务器代码
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());// 日志ch.pipeline().addLast(LOGGING_HANDLER);// 自定义的协议编解码操作ch.pipeline().addLast(MESSAGE_CODEC);// 只对LoginRequestMessage解码结果进行操作ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();// 拿着账号密码去后端做校验,校验通过还要把用户名和channel的对应关系也要存储起来,用来实现单聊的时候看对方在不在线boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if(login) {message = new LoginResponseMessage(true, "登录成功");} else {message = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(message);}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
// 该handler处理登录请求
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ch.pipeline().addLast(new LoginRequestMessageHandler());
运行结果
客户端
5665 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 1, 0, 279
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:AbstractResponseMessage{success=true, reason='登陆成功'}
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='登陆成功'}
success
服务器
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 0, 0, 217
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:LoginRequestMessage{username='Nyima', password='123'}7946 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8e7c07f6, L:/127.0.0.1:8080 - R:/127.0.0.1:60572] WRITE: 295B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............|
|00000010| ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima|
|00000020| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000030| 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon|
|00000040| 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.|
|00000050| 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima|
|00000060| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000070| 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes|
|00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2|
|00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes|
|000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja|
|000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x|
|000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu|
|000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.|
|000000e0| 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.|
|000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|00000100| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|00000110| 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......|
|00000120| 86 e6 88 90 e5 8a 9f |....... |
+--------+-------------------------------------------------+----------------+
单聊
客户端输入send username content
即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler
@ChannelHandler.Sharable // 必须添加该注解
// 表明只对ChatRequestMessage的消息进行加工
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {// 获得user所在的channelChannel channel = SessionFactory.getSession().getChannel(msg.getTo());// 如果双方都在线if (channel != null) {// 通过接收方与服务器之间的channel发送信息,注意,这里不是写到byteBuf去channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));} else {// 通过发送方与服务器之间的channel发送消息ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));}}
}
// 该handler处理单聊请求
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
ch.pipeline().addLast(chatRequestMessageHandler);
运行结果
发送方(zhangsan)
send Nyima hello
接收方(Nyima)
// 收到zhangsan发来的消息
20230 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - ChatResponseMessage{from='zhangsan', content='hello'}
群聊
创建群聊
添加处理GroupCreateRequestMessage
的handler
@ChannelHandler.Sharable
// 表明只对GroupCreateRequestMessage的消息进行加工
public class GroupCreateMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {// 获得要创建的群聊名,ctx对应的是发送这个创建群聊的业务请求的人的这个channelString groupName = msg.getGroupName();// 获得要创建的群聊的成员组(首次拉起形成群聊的那几个人,包含自身才行)Set<String> members = msg.getMembers();// 判断该群聊是否创建过,未创建返回null并创建群聊Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);if (group == null) {// 向群的创建者发送创建成功消息GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(true, groupName + "创建成功");ctx.writeAndFlush(groupCreateResponseMessage);// 获得在线群员的channel,给群员发送入群聊消息List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);groupCreateResponseMessage = new GroupCreateResponseMessage(true, "您已被拉入"+groupName);// 给每个在线群员发送消息for(Channel channel : membersChannel) {channel.writeAndFlush(groupCreateResponseMessage);}} else {// 发送失败消息给创建人GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(false, groupName + "已存在");ctx.writeAndFlush(groupCreateResponseMessage);}}
}
// 该handler处理创建群聊请求
GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler();
ch.pipeline().addLast(groupCreateMessageHandler);
运行结果
创建者客户端
// 首次创建
gcreate Netty学习 zhangsan,lisi31649 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='Netty学习创建成功'}
15244 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}// 再次创建
gcreate Netty学习 zhangsan,lisi
40771 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='Netty学习已存在'}
群员客户端
28788 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}
群聊聊天
@ChannelHandler.Sharable
// 表明只对GroupChatRequestMessage的消息进行加工
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {String groupName = msg.getGroupName();GroupSession groupSession = GroupSessionFactory.getGroupSession();// 判断群聊是否存在boolean isCreated = groupSession.isCreated(groupName);if (isCreated) {// 给群员发送信息List<Channel> membersChannel = groupSession.getMembersChannel(groupName);for(Channel channel : membersChannel) {channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));}} else {ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));}}
}
// 该handler处理群聊聊天
GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
ch.pipeline().addLast(groupChatMessageHandler);
运行结果
发送方(群聊存在)
gsend Netty学习 你们好45408 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}
接收方
48082 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}
发送方(群聊不存在)
gsend Spring学习 你们好25140 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在'}
加入群聊
@ChannelHandler.Sharable
public class GroupJoinMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {GroupSession groupSession = GroupSessionFactory.getGroupSession();// 判断该用户是否在群聊中Set<String> members = groupSession.getMembers(msg.getGroupName());boolean joinFlag = false;// 群聊存在且用户未加入,才能加入if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) {joinFlag = true;}if (joinFlag) {// 加入群聊groupSession.joinMember(msg.getGroupName(), msg.getUsername());ctx.writeAndFlush(new GroupJoinResponseMessage(true,"加入"+msg.getGroupName()+"成功"));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(false, "加入失败,群聊未存在或您已加入该群聊"));}}
}
// 该handler处理加入群聊
GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler();
ch.pipeline().addLast(groupJoinMessageHandler);
运行结果
正常加入群聊
94921 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='加入Netty学习成功'}
加入不能存在或已加入的群聊
44025 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='加入失败,群聊未存在或您已加入该群聊'}
退出
@ChannelHandler.Sharable
public class GroupQuitMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {GroupSession groupSession = GroupSessionFactory.getGroupSession();String groupName = msg.getGroupName();Set<String> members = groupSession.getMembers(groupName);String username = msg.getUsername();// 判断用户是否在群聊中以及群聊是否存在boolean joinFlag = false;if (groupSession.isCreated(groupName) && members.contains(username)) {// 可以退出joinFlag = true;}if (joinFlag) {// 退出成功groupSession.removeMember(groupName, username);ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出"+groupName+"成功"));} else {// 退出失败ctx.writeAndFlush(new GroupQuitResponseMessage(false, "群聊不存在或您未加入该群,退出"+groupName+"失败"));}}
}
// 该handler处理退出群聊
GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler();
ch.pipeline().addLast(groupQuitMessageHandler);
运行结果
正常退出
32282 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='退出Netty学习成功'}
退出不存在或未加入的群聊
67404 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在或您未加入该群,退出Netty失败'}
查看群聊成员
@ChannelHandler.Sharable
public class GroupMembersMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {ctx.writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName())));}
}
// 该handler处理查看成员
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
ch.pipeline().addLast(groupMembersMessageHandler);
运行结果
46557 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupMembersResponseMessage{members=[zhangsan, Nyima]}
退出聊天室
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {/*** 断开连接时触发 Inactive事件*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 解绑SessionFactory.getSession().unbind(ctx.channel());}/*** 异常退出,需要解绑*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 解绑SessionFactory.getSession().unbind(ctx.channel());}
}
// 该handler处理退出聊天室
ch.pipeline().addLast(quitHandler);
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
退出时,客户端会关闭channel并返回
case "quit":// 关闭channel并返回ctx.channel().close();return;
连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
解决方法
可以添加
IdleStateHandler
对空闲时间进行检测,通过构造函数可以传入三个参数
- readerIdleTimeSeconds 读空闲经过的秒数
- writerIdleTimeSeconds 写空闲经过的秒数
- allIdleTimeSeconds 读和写空闲经过的秒数
当指定时间内未发生读或写事件时,会触发特定事件
- 读空闲会触发
READER_IDLE
- 写空闲会触发
WRITE_IDLE
- 读和写空闲会触发
ALL_IDEL
将定时任务的周期设置为 0,这意味着不会触发该空闲状态事件。
想要处理这些事件,需要自定义事件处理函数
服务器端代码
// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
/*ChannelDuplexHandler 是 Netty 框架中的一个特殊类,它是用来处理网络通信中的读写事件的双向处理器。它扩展
了ChannelInboundHandler和ChannelOutboundHandler,同时负责处理从网络中读取到的数据以及将数据写入到网络中。*/
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 获得事件IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {// 断开连接ctx.channel().close();}}
});
- 使用
IdleStateHandler
进行空闲检测 - 使用双向处理器
ChannelDuplexHandler
对入站与出站事件进行处理IdleStateHandler
中的事件为特殊事件,需要实现ChannelDuplexHandler
的userEventTriggered
方法,判断事件类型并自定义处理方式,来对事件进行处理
为避免因非网络等原因引发的WRITER_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生WRITER_IDLE事件,直接让服务器断开连接是不可取的
为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds
,一般设置为其值的一半
客户端代码
// 发送心跳包,让服务器知道客户端在线
// 3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.WRITER_IDLE) {// 发送心跳包ctx.writeAndFlush(new PingMessage());}}
});
相关文章:

Netty深入浅出Java网络编程学习笔记(二) Netty进阶应用篇
目录 四、应用 1、粘包与半包 现象分析 粘包 半包 本质 解决方案 短链接 定长解码器 行解码器 长度字段解码器——LTC 2、协议设计与解析 协议的作用 Redis协议 HTTP协议 自定义协议 组成要素 编码器与解码器 编写测试类 Sharable注解 自定义编解码器能否使用Sharable注解 3、在…...

机器学习基础之《回归与聚类算法(1)—线性回归》
一、线性回归的原理 1、线性回归应用场景 如何判定一个问题是回归问题的,目标值是连续型的数据的时候 房价预测 销售额度预测 贷款额度预测、利用线性回归以及系数分析因子 2、线性回归定义 线性回归(Linear regression)是利用回归方程(函数)对一个或多个自变量(…...

如何实现制造业信息化转型?
一、制造业信息化历史 (1)1930年代 库存控制、管理 当时计算机系统尚未出现,人们为了解决库存管控的难题,提出了订货点法——当库存量降低到某一预先设定的点时,即开始发出订货单补充库存,直至库存量降低…...

stable diffusion艰难炼丹之路
文章目录 概要autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大模型完成后报错 概要 主要是通过autoDL服务器部署stable diffusion,通过dreambooth训练大模型。 问题: autoDL系统盘爆满autoDL python3.8切换python3.10dreambooth训练大…...

竞赛 深度学习 opencv python 实现中国交通标志识别
文章目录 0 前言1 yolov5实现中国交通标志检测2.算法原理2.1 算法简介2.2网络架构2.3 关键代码 3 数据集处理3.1 VOC格式介绍3.2 将中国交通标志检测数据集CCTSDB数据转换成VOC数据格式3.3 手动标注数据集 4 模型训练5 实现效果5.1 视频效果 6 最后 0 前言 🔥 优质…...

用Python实现数据透视表、音频文件格式转换
用Python实现数据透视表、音频文件格式转换 1.用Python实现数据透视表 import pandas as pdif __name__ __main__:# df pd.read_excel(广告-资源位变现效率监测看板-1.xlsx, sheet_name各业务在该资源位的明细数据)df pd.read_excel(填充率分析-Q3.xlsx, sheet_name库存底…...

java枚举中写抽象方法
之前写java枚举时,都是中规中矩的写,从来没见过在枚举中写抽象方法的,但最近换了新公司,接手了新项目,发现枚举中竟然写了抽象方法,由于之前没接触过这种写法,所以这里记录下 实体类student代码…...

麒麟操作系统提示“默认密钥环已上锁”的解决办法
在国产麒麟操作系统上,有的时候不知道为啥,打开vscode或者其他应用软件时,总是提示“密钥环已上锁”,该怎么处理呢? 需要点击“开始”,在搜索框中输入“password” 点击打开“密码和密钥”,看到如下图。 然后点击左上角的箭头,回退,打开如下图:...

云原生周刊:Docker 推出 Docker Debug | 2023.10.9
开源项目推荐 SchemaHero SchemaHero 是一个 Kubernetes Operator,用于各种数据库的声明式架构管理。SchemaHero 有以下目标: 数据库表模式可以表示为可以部署到集群的 Kubernetes 资源。可以编辑数据库模式并将其部署到集群。SchemaHero 将计算所需的…...

设计模式 - 行为型模式考点篇:迭代器模式(概述 | 案例实现 | 优缺点 | 使用场景)
目录 一、行为型模式 一句话概括行为型模式 1.1、迭代器模式 1.1.1、概述 1.1.2、案例实现 1.1.3、优缺点 1.1.4、使用场景 一、行为型模式 一句话概括行为型模式 行为型模式:类或对象间如何交互、如何划分职责,从而更好的完成任务. 1.1、迭代器…...

Spark任务优化分析
一、背景 首先需要掌握 Spark DAG、stage、task的相关概念 Spark的job、stage和task的机制论述 - 知乎 task数量和rdd 分区数相关 二、任务慢的原因分析 找到运行时间比较长的stage 再进去看里面的task 可以看到某个task 读取的数据量明显比其他task 较大。 如果是sql 任…...

最新数据库流行度最新排名(每月更新)
2023年10月数据库流行度最新排名 TOP DB顶级数据库索引是通过分析在谷歌上搜索数据库名称的频率来创建的 一个数据库被搜索的次数越多,这个数据库就被认为越受欢迎。这是一个领先指标。原始数据来自谷歌Trends 如果您相信集体智慧,那么TOP DB索引可以帮…...

Python:如何在一个月内学会爬取大规模数据
Python爬虫为什么受欢迎 如果你仔细观察,就不难发现,懂爬虫、学习爬虫的人越来越多,一方面,互联网可以获取的数据越来越多,另一方面,像 Python这样的编程语言提供越来越多的优秀工具,让爬虫变得…...

K8S云计算系列-(4)
K8s Dashboard UI 部署实操 Kubernetes实现的最重要的工作是对Docker容器集群统一的管理和调度,通常使用命令行来操作Kubernetes集群及各个节点,命令行操作非常不方便,如果使用UI界面来可视化操作,会更加方便的管理和维护。如下为…...

【Mybatis源码】IDEA中Mybatis源码环境搭建
一、Mybatis源码源 在github中找到Mybatis源码地址:https://github.com/mybatis/mybatis-3 找到Mybatis git地址 二、IDEA导入Mybatis源码 点击Clone下载Mybatis源码 三、选择Mybatis分支 选择Mybatis分支,这里我选择的是3.4.x分支...

VUE如何使得大屏自适应的几种方法?
VUE学习大屏自适应的几种方法 1.自适屏幕,始终保持16:9的比例 <!-- 大屏固定比例16:9自适应 --> <template><div class"container"><div class"content" :style"getAspectRatioStyle"><!-- …...

API接口安全运营研究(内附官方开发平台api接口接入方式)
摘 要 根据当前API技术发展的趋势,从实际应用中发生的安全事件出发,分析并讨论相关API安全运营问题。从风险角度阐述了API接口安全存在的问题,探讨了API检测技术在安全运营中起到的作用,同时针对API安全运营实践,提出…...

信钰证券:股票交易费用计算方法?
股票生意是股市参加者之间进行的买入和卖出股票的进程。其中,股票生意费用是参加股市生意的重要组成部分。本文将从多个视点分析股票生意费用计算方法。 首先,股票生意费用一般包含三部分。分别是佣钱、印花税和过户费。佣钱是证券公司为代理股票生意而收…...

通过js获取用户网络ip地址
<!DOCTYPE html> <html><head><meta charset"utf-8"><title>js获取本地ip</title> </head><body><script>var xmlhttp;if (window.XMLHttpRequest) {xmlhttp new XMLHttpRequest();} else {xmlhttp new Act…...

微信小程序wxml使用过滤器
微信小程序wxml使用过滤器 1. 新建wxs2. 引用和使用 如何在微信小程序wxml使用过滤器? 犹如Angular使用pipe管道这样子方便,用的最多就是时间格式化。 下面是实现时间格式化的方法和步骤: 1. 新建wxs 插入代码: /*** 管道过滤工…...

内网渗透面试问题
文章目录 1、熟悉哪些域渗透的手段2、详细说明哈希传递的攻击原理NTLM认证流程哈希传递 3、聊一下黄金票据和白银票据4、shiro反序列化漏洞的形成原因,尝试使用burp抓包查看返回包内容安装环境漏洞验证 5、log4j组件的命令执行漏洞是如何造成的6、画图描述Kerberos协…...

Go语言函数进阶:值传递、引用传递、函数式编程
文章目录 值传递和引用传递闭包柯里化defer go语言教程: 安装入门➡️ for循环➡️ 数组、切片和指针➡️ switch和map 值传递和引用传递 go语言中,函数通过关键字func定义,对于传入和返回的参数需要做类型的定义,其返回值可…...

数据结构 堆——详细动画图解,形象理解
作者主页 📚lovewold少个r博客主页 ➡️栈和队列博客传送门 🌳参天大树充满生命力,其根深叶茂,分枝扶疏,为我们展示了数据分治的生动形态 目录 🌳 树 树的常见概念 📒树的表示 二叉树 一…...

使用pymodbus进行modbus-TCP通信
模拟modbus-slave 创建slave 设置 完成 安装pymodbus pip3 install pymodbus2.5.3代码 from pymodbus.client.sync import ModbusTcpClient from pymodbus.bit_read_message import ReadCoilsResponse from pymodbus.register_read_message import ReadInputRegistersRe…...

2. redis常见数据类型
一、Redis 数据类型 Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合…...

多测师肖sir_高级金牌讲师_python之结构语句005
python中常见语句: 一、目录: 1、if语句 2、while 循环语句 3、for循环语句 4、continue 语句 5、break 语句 二、语句详解 1、if判断语句 (1)if单分支 格式:if 判断条件: 语句块1…… else: 语…...

用3-8译码器实现全减器
描述 请使用3-8译码器和必要的逻辑门实现全减器,全减器接口图如下,A是被减数,B是减数,Ci是来自低位的借位,D是差,Co是向高位的借位。 3-8译码器代码如下,可将参考代码添加并例化到本题答案中。 …...

招投标系统简介 企业电子招投标采购系统源码之电子招投标系统 —降低企业采购成本
功能描述 1、门户管理:所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含:招标公告、非招标公告、系统通知、政策法规。 2、立项管理:企业用户可对需要采购的项目进行立项申请,并提交审批,查看所…...

Linux Centos7 下使用yum安装的nginx平滑升级
1. 查看当前nginx版本 1nginx -v2. 查看centos版本 1cat /etc/redhat-release3. 创建一个新的文件nginx.repo,其中第三行的7是因为我的centos版本是7点多的,你看自己是多少就改多少 1vim /etc/yum.repos.d/nginx.repo23[nginx]4namenginx repo 5baseu…...

C/S架构学习之多线程实现TCP并发服务器
并发概念:并发是指两个或多个事件在同一时间间隔发生;多线程实现TCP并发服务器的实现流程:一、创建套接字(socket函数):通信域选择IPV4网络协议、套接字类型选择流式; int sockfd socket(AF_IN…...