Netty笔记
本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频
学习netty之前要先打好NIO的基础,可以先去看我的另一篇文章
一、概述
不想看的可以直接跳过
Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
-
Cassandra - nosql 数据库
-
Spark - 大数据分布式计算框架
-
Hadoop - 大数据分布式存储框架
-
RocketMQ - ali 开源的消息队列
-
ElasticSearch - 搜索引擎
-
gRPC - rpc 框架
-
Dubbo - rpc 框架
-
Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
-
Zookeeper - 分布式协调框架
Netty 的优势
-
Netty vs NIO,工作量大,bug 多
-
需要自己构建协议
-
解决 TCP 传输问题,如粘包、半包
-
epoll 空轮询导致 CPU 100%
-
对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
-
-
Netty vs 其它网络应用框架
-
Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
-
久经考验,16年,Netty 版本
-
2.x 2004
-
3.x 2008
-
4.x 2013
-
5.x 已废弃(没有明显的性能提升,维护成本高)
-
-
二、入门案例
首次看netty的代码会比较乱,不要慌,多看看多学学,就会很熟悉的。
最重要的是要理解每一步的作用
开发一个简单的服务器端和客户端
-
客户端向服务器端发送 hello, world
-
服务器仅接收,不返回
依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>
服务端
new ServerBootstrap().group(new NioEventLoopGroup()) // 1.channel(NioServerSocketChannel.class) // 2.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 5ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}}).bind(8080); // 4
-
1 处,创建 NioEventLoopGroup,可以简单理解为
线程池 + Selector
后面会详细展开 -
2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
-
3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
-
4 处,ServerSocketChannel 绑定的监听端口
-
5 处,SocketChannel 的处理器,解码 ByteBuf => String
-
6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端
new Bootstrap().group(new NioEventLoopGroup()) // 1.channel(NioSocketChannel.class) // 2.handler(new ChannelInitializer<Channel>() { // 3@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder()); // 8}}).connect("127.0.0.1", 8080) // 4.sync() // 5.channel() // 6.writeAndFlush(new Date() + ": hello world!"); // 7
-
1 处,创建 NioEventLoopGroup,同 Server
-
2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
-
3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
-
4 处,指定要连接的服务器和端口
-
5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
-
6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
-
7 处,写入消息并清空缓冲区
-
8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
-
数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
流程分析
其实黑马使用链式编程对初学者来说并不友好,我下面对代码进行拆分
乍一看怎么全是ServerBootstrap,其实这些操作都是围绕着这个类在转的。
下一章组件 将会对每一个组件进行详细的分析,到时候就没有这么乱了
可以说明的是NioEventLoopGroup 是一个EventLoop的集合,EventLoop相当于NIO里面处理读写时间的工作者,都可以单开线程的。类似于NIO里面的多线程优化。
ServerBootstrap serverBootstrap = new ServerBootstrap();//得到启动类NioEventLoopGroup group = new NioEventLoopGroup();// EventLoop的集合ServerBootstrap serverBootstrap1 = serverBootstrap.group(group);// 将EventLoopGroup交给启动类ServerBootstrap serverBootstrap2 = serverBootstrap1.channel(NioServerSocketChannel.class);// 指定channel的类型ServerBootstrap serverBootstrap3 = serverBootstrap2.childHandler(new ChannelInitializer<NioSocketChannel>() {// 设置子通道(Channel)的处理器(ChannelHandler)的protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 添加处理器ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 添加自定义处理器@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}});ChannelFuture channelFuture = serverBootstrap1.bind(8080);// 绑定端口
三、组件
1、EventLoop
我在前面提过,这就相当于处理读写事件的工作者。维护着自己的Selector。
我们之前提到过一个Selector能监听多个channel。一个服务端有多个Selector
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
-
一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法(这也就让他有处理定时任务的能力)
-
另一条线是继承自 netty 自己的 OrderedEventExecutor,
-
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
-
提供了 parent 方法来看看自己属于哪个 EventLoopGroup
-
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
-
继承自 netty 自己的 EventExecutorGroup
-
实现了 Iterable 接口提供遍历 EventLoop 的能力
-
另有 next 方法获取集合中下一个 EventLoop
-
可以自己指定group的大小,但是没有必要
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
结果:
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
优雅关闭
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示 NioEventLoop 处理 io 事件
下面主要演示的是 工人是轮流工作的,但是对于同一个channel,多次来进行读写,为他服务的是同一个工作(也就是EventLoop)
服务器端两个 nio worker 工人
new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if (byteBuf != null) {byte[] buf = new byte[16];ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());log.debug(new String(buf));}}});}}).bind(8080).sync();
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup(1)).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.println("init...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}}).channel(NioSocketChannel.class).connect("localhost", 8080).sync().channel();channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
最后输出
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
2、channelFuture
3、future和promise
4、bytebuf
工具类:用于调试 展示bytebuf的内容
private static void log(ByteBuf buffer) {int length = buffer.readableBytes();int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;StringBuilder buf = new StringBuilder(rows * 80 * 2).append("read index:").append(buffer.readerIndex()).append(" write index:").append(buffer.writerIndex()).append(" capacity:").append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());
}
1、内存模式和池化
堆内存vs直接内存
堆内存 分配效率高,但读写效率低。直接内存反之。
使用ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
默认方式创建出来的是使用的直接内存
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
-
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
-
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
-
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
-
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
-
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
-
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
-
4.1 之前,池化功能还不成熟,默认是非池化实现
2、组成、写入、读取
相比于ByteBuffer的优点是①可扩容 ②使用读写指针,不用切换,逻辑更简单
读过的内存就废除 。
如果没有指定初始大小,默认是256个字节
扩容
再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容
buffer.writeInt(6); log(buffer);
扩容规则是
-
如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
-
如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
-
扩容不能超过 max capacity 会报错
使用mark配合reset实现读取多次,其实就是做个标记,然后跳回标记的地方。
public static void main(String[] args) throws ExecutionException, InterruptedException {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);buffer.writeBytes(new byte[]{1,2,3,4,5});log(buffer);buffer.markReaderIndex();System.out.println(buffer.readByte());buffer.resetReaderIndex();System.out.println(buffer.readByte());log(buffer);}
3、slice 和composite
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
例,原始 ByteBuf 进行一些初始操作
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10); origin.writeBytes(new byte[]{1, 2, 3, 4}); origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
切片的正确使用
就是每个切片要执行retain,防止内存被释放。等到自己用完之后再执行release
composite合并
public static void main(String[] args) {ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{1,2,3});ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{4,5,6});CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();compositeBuffer.addComponents(true,buffer1,buffer2);log(compositeBuffer);}
5、实现一个简单的回响功能
客户端发送什么,服务端就回复什么
这是 服务端的代码
package cn.itcast.mytest.homework;import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;@Slf4j
//回声服务器 返回客户端发送的消息
public class Server {public static void main(String[] args) throws InterruptedException {log.debug("启动中。。。");new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("收到消息===>{}",msg);nioSocketChannel.writeAndFlush(msg);}});nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("发送消息===>{}",msg);super.write(ctx, msg, promise);}});}}).bind(8080);}
}
这是客户端
package cn.itcast.mytest.homework;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;import java.util.Scanner;@Slf4j
public class Client {public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("收到消息==>{}",msg);}});}}).connect("localhost",8080).sync();Scanner sc=new Scanner(System.in);while(true){System.out.println("请输入要发送的消息(输入q退出)");String s = sc.nextLine();if(!"q".equals(s)){channelFuture.channel().writeAndFlush(s);}else{break;}}log.debug("结束对话");}
}
四、黏包和半包
1、黏包现象
所谓黏包现象,就是发送方在短时间内发送多条数据,接收方无法准确分辨出每个独立数据包的边界。这种情况常见于基于流的传输协议(如TCP),因为TCP是面向字节流的协议,数据在网络中以流的形式发送,而不是分包的形式。
就是多个数据包黏在一起了
channel建立成功之后,会触发active事件。
案例 :客户端连续发送10次16字节的数据,服务方接收数据之后打印出来,我们会发现,服务端试将这10条数据当成1条数据了。
接收方(服务端)
@Slf4j
public class HelloWorldServer {static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new HelloWorldServer().start();}
}
发送方(客户端)
@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");Random r = new Random();char c = 'a';for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
2、半包现象
半包现象指的是在网络通信中,一个逻辑上的数据包被拆分成多个部分进行接收。这种现象通常发生在基于流的协议(如 TCP)中,由于 TCP 是面向字节流的协议,数据在传输时并不会被强制分包,而是以流的方式发送和接收。
案例 :客户端一次发送1600字节的数据,但是服务端一次只能接受1024,要将一份数据分成两份,出现了半包的现象。
@Slf4j
public class HelloWorldServer {static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new HelloWorldServer().start();}
}
@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 100; i++) {buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
出现原因
本质是因为 TCP 是流式协议,消息无边界
滑动窗口
-
TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
-
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
-
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
-
图中深色的部分即要发送的数据,高亮的部分即窗口
-
窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
-
如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
-
接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
-
3、解决方案
(1)短连接
每一次接收完就断开连接,分10次发送
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {for (int i = 0; i < 10; i++) {send();}}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);ctx.close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的
(2)定长解码器
本质就是发送一系列消息,以最长的那个消息为固定值。然后将发送的消息都以这个固定的长度为准,如果长度不够就进行填充。
前提是服务端和客户端要约定好长度。
在服务端代码中要加上 将收到的数据进行解码
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
客户端代码
@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包Random r = new Random();char c = 'a';ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 10; i++) {byte[] bytes = new byte[8];for (int j = 0; j < r.nextInt(8); j++) {bytes[j] = (byte) c;}c++;buffer.writeBytes(bytes);}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
服务端接收定长的数据
(3)行解码器
消息之间以换行符作为分隔符 LineBasedFrameDecoder
参数是指定最大长度,要是超过这个最大长度还没找到分隔符就报错
服务端加上
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端代码
package cn.itcast.mytest;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Random;
@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包ByteBuf buffer = ctx.alloc().buffer();char c='0';Random r=new Random();StringBuilder sb=new StringBuilder();for (int i = 0; i < 10; i++) {sb.setLength(0);for(int j=0;j<r.nextInt(256);j++){sb.append(c);}sb.append("\n");buffer.writeBytes(sb.toString().getBytes());c++;}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}
这种方法性能低,用得比较少。
(4)LTC解码器
lengthFieldOffset 长度字段偏移量
lengthFieldLength 长度字段 的长度
lengthAdjustment 长度字段跳到正式内容的偏移值
initialBytesToStrip 消息剖离
例子1:
长度字段偏移量是0,因为一开始就是长度字段 0
长度字段占两个字节 2
长度字段之后跳0字节就是正式内容 0
消息不剖离 0
例子2:
消息头可能携带其他信息,比如版本号、协议。比如下面的hdr1、hdr2
长度字段偏移量是1 从开始到长度字段有1个字节 1
长度字段占两个字节 2
长度字段之后跳0字节就是正式内容 hdr2占1个字节 1
消息不剖离 3 从头到第3个字节的部分不要。
服务端加上 第一个参数是消息的最大长度
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));
客户端代码
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包ByteBuf buffer = ctx.alloc().buffer();String content="hello world";//将内容加入到缓冲区buffer.writeInt(content.length());//先写入长度buffer.writeBytes(content.getBytes());String content2="hi";//将内容加入到缓冲区buffer.writeInt(content2.length());//先写入长度buffer.writeBytes(content2.getBytes());ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}
注意:如果buffer中的消息长度不一致的时候,参数要以最长的消息制定
假如我将上面代码的长度字段改成2,也就是意味着长度字段所占的字节数只有2
hello world消息中 长度字段是00 00 00 0b,这样他只读取到00,消息长度才0
第二次读取到 00 0b 才会去真正读hello world
这是错误的 (int 占4字节)
五、协议设计与解析
1、redis
其实我们可以用过netty发送消息给redis,比如set name zhangsan
解析出来就是 下面这图
使用下面的代码可以向redis发送消息并得到响应
@Slf4j
public class testRedis {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();byte[] LINE = {13, 10};try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}
2、http
http协议相对来说比较复杂,好在netty已经帮我们封装了编解码器
我们只需要加上ch.pipeline().addLast(new HttpServerCodec());即可
首先我们先打印出消息的类型
浏览器访问http://localhost:8080/index.html之后(index.html可以换成其他的)
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.DefaultHttpRequest
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.LastHttpContent$1
我们会发现消息有两个,一个是请求头、请求行 ,另一个是请求体。这是http编解码器解析的
我们可以使用
if (msg instanceof HttpRequest) { // 请求行,请求头
} else if (msg instanceof HttpContent) { //请求体
}
进行判断。
或者我们可以使用 SimpleChannelInboundHandler只处理我们关心的消息类型
public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {log.debug("{}",msg.uri());DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);byte[] m="<h1>Hello World</h1>".getBytes();response.content().writeBytes(m);response.headers().setInt(CONTENT_LENGTH,m.length);//消息头写入 正文的长度,这样浏览器就不用一直空转ctx.writeAndFlush(response);}});/* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}",msg.getClass());}});*/}}).bind(8080).sync().channel();}
3、自定义
要素
-
魔数,用来在第一时间判定是否是无效数据包
-
版本号,可以支持协议的升级
-
序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
-
指令类型,是登录、注册、单聊、群聊... 跟业务相关
-
请求序号,为了双工通信,提供异步能力
-
正文长度
-
消息正文
编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
注意
channel能不能共享Handler。看源码有没有@Sharable注解,否则有线程安全问题
ByteToMessageCodec的子类不能标注为@Sharable
想要使用这个注解,就得继承MessageToMessageCodec,这个类默认是消息转化为消息,不会出现黏包和半包的情况。所以要使用LengthFieldBasedFrameDecoder来确保不会出现黏包半包的情况
@ChannelHandler.Sharable /*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message>
自定义类没有保存状态就是线程安全的
六、聊天业务
1、登录功能
使用最简单的控制台输入,还有固定数据
注意:全是采用我们之前自定义的编解码器
MessageCodecSharable 相关资料,我上传上去了
服务端代码
添加后处理器 在连接建立之后执行校验用户名、密码的功能
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean b = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage result =null;if(b){//登录成功result = new LoginResponseMessage(true, "登录成功");}else{result = new LoginResponseMessage(false, "用户名或密码错误");}ctx.writeAndFlush(result);}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
客户端代码
连接建立之后,控制台输入用户名、密码。发送给客服端
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); //打印日志MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()->{Scanner sc=new Scanner(System.in);System.out.println("请输入用户名:");String username = sc.nextLine();System.out.println("请输入密码:");String password = sc.nextLine();LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);}).start();}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
线程通信
我们使用CountDownLatch来处理 登陆成功和登陆失败的情况,实现
两个线程在channelRead和channelActive之间的通信
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); //打印日志MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码CountDownLatch countDownLatch=new CountDownLatch(1);//到0表示可以继续运行AtomicBoolean b=new AtomicBoolean(false);//响应是成功还是失败try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()->{Scanner sc=new Scanner(System.in);System.out.println("请输入用户名:");String username = sc.nextLine();System.out.println("请输入密码:");String password = sc.nextLine();LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);try {countDownLatch.await();//线程阻塞在这里} catch (InterruptedException e) {throw new RuntimeException(e);}if(!b.get()){//如果登录失败。关闭连接ctx.channel().close();}log.debug("=================菜单=================");}).start();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx,msg);if (msg instanceof LoginResponseMessage) {LoginResponseMessage responseMessage = (LoginResponseMessage) msg;if (responseMessage.isSuccess()) {//登录成功b.set(true);}else{b.set(false);}countDownLatch.countDown();}}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
2、业务消息发送
没什么,看看就行
while(true){System.out.println("=======================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("=======================================");String choice = sc.nextLine();String[] s = choice.split(" ");switch (s[0]){case "send": ctx.writeAndFlush(new ChatRequestMessage(username,s[1],s[2]));break;case "gsend": ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));break;case "gcreate":String[] members = s[2].split(",");Set<String> set=new HashSet<>( Arrays.asList(members));ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));break;case "quit":ctx.channel().close();return ;}}
3、单聊消息处理
为了让我们的代码更清晰
我们先对匿名内部类转化为内部类
再转化为外部类
第一步
模仿上面的结构编写自定义ChatRequestMessageHandler,我们写在单独的类下
然后使用addLast加入到channel
package cn.itcast.server.Handler;import cn.itcast.message.ChatRequestMessage;
import cn.itcast.message.ChatResponseMessage;
import cn.itcast.server.service.UserServiceFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;//处理单聊的业务
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {String from = msg.getFrom();String to = msg.getTo();Channel channel = SessionFactory.getSession().getChannel(to);//查看对方是否在线if(channel!=null){//在线 发送给接收方channel.writeAndFlush(new ChatResponseMessage(from,msg.getContent()));}else{//发送给 发送方channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"用户不存在或者用户不在线"));}}
}
这是服务端的代码
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER =new ChatRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}
4、群聊创建
在服务端代码中加上这个handler,这里就不做演示了。
package cn.itcast.server.Handler;import cn.itcast.message.GroupCreateRequestMessage;
import cn.itcast.message.GroupCreateResponseMessage;
import cn.itcast.server.session.Group;
import cn.itcast.server.session.GroupSession;
import cn.itcast.server.session.GroupSessionFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.List;
import java.util.Set;@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx,GroupCreateRequestMessage msg) throws Exception {String groupName = msg.getGroupName();Set<String> members = msg.getMembers();GroupSession groupSession = GroupSessionFactory.getGroupSession();Group group = groupSession.createGroup(groupName, members);//不存在的话,创建并返回nullif(group==null){ctx.writeAndFlush(new GroupCreateResponseMessage(true,"群聊创检查成功!"));//创建成功List<Channel> membersChannel =groupSession.getMembersChannel(groupName);for (Channel channel : membersChannel) {channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入"+groupName));}}else{//创建失败ctx.writeAndFlush(new GroupCreateResponseMessage(false,"群聊已存在"));}}
}
这里有一个缺陷,就是群主并没有进去群聊。所以要在客户端加上
set.add(username);
其他业务就不一一写了,都是编写自定义的Handler。
5、空闲检测、心跳
连接假死:可能因为网络的原因,连接已经断开了。
我们可以做空闲检测,就是服务端和客户端定时检测是否还有消息传输
如果超过5秒就判断连接假死,但是这样是不合理的,客户端可能没有及时发送消息导致连接被断开。 我们可以在客户端进行定时发送心跳,如果心跳没有传输到服务端可以证明连接确实出现问题。如果能传输到,服务端也不会断开连接;
这里要介绍netty提供的IdleStateHandler,他有三个参数
参数1:读空闲时间 参数2:写空闲时间 参数3:读写空闲时间
时间到了之后会触发响应的事件
我们可以使用ChannelDuplexHandler的userEventTriggered方法来处理各种事件
客户端加上ch.pipeline().addLast(new IdleStateHandler(0,3,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if(event.state()== IdleState.WRITER_IDLE){log.debug("心跳");ctx.writeAndFlush(new PingMessage());}}});服务端加上//参数1:读空闲时间 参数2:写空闲时间 参数3:读写空闲时间ch.pipeline().addLast(new IdleStateHandler(5,0,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if(event==IdleStateEvent.READER_IDLE_STATE_EVENT){log.debug("5秒没有读事件");ctx.channel().close();}}});
如果客户端还是断开了,是因为这里有个bug,等下一章才学习到。
相关文章:

Netty笔记
本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频 学习netty之前要先打好NIO的基础,可以先去看我的另一篇文章 一、概述 不想看的可以直接跳过 Netty 的地位 Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位…...

管道燃气监管系统
一、建设目标 建立一个高效、智能、安全的管道燃气监管系统,实现对管道燃气的实时监测、风险预警、事故应急处理和数据分析,确保燃气供应的安全稳定,提高燃气管理的效率和水平。 二、系统架构 感知层 安装压力传感器、流量传感器、温度传感…...

Python语法结构(三)(Python Syntax Structure III)
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…...

08_Linux文件查找技巧:locate、find 和 grep 命令详解
系列文章导航:01_Linux基础操作CentOS7学习笔记-CSDN博客 文章目录 1. locate命令2. grep命令3. find命令 在Linux系统中,文件查找是一项常见的任务。本文将详细介绍三种强大的文件查找命令:locate、find 和 grep,以及它们的使用…...

JAVA 实验六
一: (1) 运行以上尟序并尣以上尟序尜尢一行尥码添加注解,将尟序保存尣e601.java (2) 以上尟序尣类变量是哪一个变量,类尠尞是哪一个尠尞,请找出类变量和类尠尞被使用尜语…...

电脑查不到IP地址是什么原因?怎么解决
在日常使用电脑的过程中,有时会遇到无法查询到电脑IP地址的情况,这可能会影响到网络的正常使用。本文将探讨电脑查不到IP地址的可能原因,并提供相应的解决方案。 一、原因分析 网络连接问题:首先,网络连接不稳定或…...

Axure重要元件三——中继器修改数据
亲爱的小伙伴,在您浏览之前,烦请关注一下,在此深表感谢! 课程主题:中继器修改数据 主要内容:显示编辑内容、表格赋值、修改数据 应用场景:更新行、表单数据行修改 案例展示: 正文…...

应用层——电子邮件、MIME、简单网络管理协议SNMP
电子邮件 电子邮件系统采用三个主要构件组成:用户代理、邮件服务器、电子邮件所需的协议 我们可以简单的认为邮件服务器中有很多邮箱,还有用来缓存再转发邮件的缓存,发送方使用用户代理通过邮件发送协议。例如SMTP将邮件发送给发送方。 邮件服…...

我与C语言二周目邂逅vlog——8.编译和链接
C语言中的编译和链接过程详细总结 1. 概述 C 语言是一种经典的系统级编程语言,其开发过程包括多个阶段,其中最关键的就是编译和链接过程。编译和链接的理解对于掌握 C 语言程序的构建至关重要。在本篇文章中,我们将深入讲解 C 语言的编译和…...

Views Page 视图页面
下图中显示的 Views 页面允许自定义网格级别及其相应的 View。 Views (视图) 页面包含两个主要部分: 关卡设计师;请注意,其他设计器页面为在关卡设计器中选择的 View 提供设置;Properties (属性) 窗口&…...

Win10 IDEA远程连接HBase
Win10 IDEA远程连接HBase Win10 IDEA连接虚拟机中的Hadoop(HDFS) 关闭Hadoop和Hbase 如果已经关闭不需要走这一步 cd /usr/local/hbase bin/stop-hbase.sh cd /usr/local/hadoop ./sbin/stop-dfs.sh获取虚拟机的ip 虚拟机终端输入 ip a关闭虚拟机…...

1.centos 镜像
centos 它有官网的下载地址:https://vault.centos.org/ 选择想要的版本,我选择 centos7.8 进入到镜像目录 isos 选择 x86_64 选择想要的版本,我选择 CentOS-7-x86_64-DVD-2003.iso 安装就正常安装就行。我选择虚拟机安装。这个参考&…...

electron 操作 cookie
前言:在 Electron 中操作 Cookie 可以使用electron模块提供的session对象来实现。 一、获取 Cookie 通过defaultSession获取默认会话对象,然后调用cookies.get方法并传入要获取 Cookie 的 URL 地址,以获取该 URL 对应的 Cookie。 const el…...

黑马软件测试第一篇_Linux
Linux 操作系统 说明: 所有硬件设备组装完成后的第⼀一层软件, 能够使⽤用户使⽤用硬件设备的软件 即为操作系统 常见分类 桌⾯面操作系统: Windows/macOS/Linux移动端操作系统: Android(安卓)/iOS(苹果)服务器器操作系统: Linux/Windows Server嵌⼊入式操作系统: Android(底…...

npm run dev 启动前端项目的原理
在一个使用 Vite 构建工具的 Vue 项目中,当你运行 npm run dev 时,实际执行的命令是 vite。为了理解这一过程,我们需要了解几个关键点: package.json 文件中的 scripts 字段: "scripts": {"dev": "vite&…...

【2024年SCI一区新算法:黑翅鸢优化算法 】分布式电网故障定位
1 场景介绍 使用10节点网络 2 故障设置 分为单重故障和两重故障 %% 2 故障设置 %% 1)单重故障 I[1,-1,0,0,-1,-1,0,0,-1,-1]; % 区段1故障 节点状态实际编码(是否流过故障电流) % I[1,1,0,0,-1,-1,0,0,-1,-1]; % 区段2故障 % I[…...

PyTorch 中 12 种张量操作详解
创作不易,还请各位同学三连点赞!!收藏!!转发!!! 对于刚入门学习Python还找不到方向的小伙伴可以试试我的这份学习方法和籽料,免费自取!! PyTorc…...

雷池WAF自动化实现安全运营实操案例终极篇
免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停…...

微信小程序实现canvas电子签名
一、先看效果 小程序canvas电子签名 二、文档 微信小程序canvas 组件文档 微信小程序canvas API文档 H5Canvas文档 三、分析 1、初始话Canvas容器 2、Canvas触摸事件,bindtouchstart(手指触摸动作开始)、bindtouchmove(手指触摸…...

【SpringCloud】Seata微服务事务
Seata微服务事务 分布式事务问题:本地事务分布式事务演示分布式事务问题:示例1 分布式事务理论CAP定理一致性可用性分区容错矛盾 Base理论解决分布式事务的思路 初识SeataSeata的架构部署TC服务微服务集成Seata引入依赖配置TC地址 其他服务 动手实践XA模…...

重新阅读《马说》,感悟“伯乐相马”背后的被选择与选择的大智慧
“初闻不识曲中意,再听已是曲终人”。世有伯乐,然后有千里马。千里马常有,而伯乐不常有。无论你是考研考公等考试大军中的一员,还是已步入社会的打工人或者领导,当你面临被人选择或者选择人时,皆可从《马说…...

深入拆解TomcatJetty(三)
深入拆解Tomcat&Jetty(三) 专栏地址:https://time.geekbang.org/column/intro/100027701 1 Tomcat组件生命周期 Tomcat如何如何实现一键式启停 Tomcat 架构图和请求处理流程如图所示: 对组件之间的关系进行分析,…...

MySQL 实现简单的性能优化
一:硬件优化 更高的网络带宽:在处理大规模的远程请求时可以提高MySQL服务器的响应速度; 更大的内存空间:有助于缓存更多的数据库数据,减少磁盘I/O操作,提高整体性能; 换用企业级SSD࿱…...

AB包资源管理器
简介 ABMgr(Asset Bundle Manager)类是一个用于管理 Unity 中 AssetBundle 资源加载的管理器。它通过字典缓存和管理加载的 AB 包,同时支持同步和异步加载。还包含了卸载和清理 AB 包的方法。 功能解析: 主包加载与依赖管理&…...

Centos7源报错问题
原因:是因为centos7在024年6月份停止维护,导致默认镜像不能使用,更改镜像即可mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/…...

Openlayers高级交互(2/20):清除所有图层的有效方法
Openlayers项目中,经常会放置很多的图层,在业务操作的时候,会做出删除所有图层的行为。这里面给出了一个详细的方法,能够有效的解决 清除所有图层的问题。 效果图 专栏名称内容介绍Openlayers基础实战 (72篇ÿ…...

黑马JavaWeb-day02
什么是JavaScript? JavaScript:简称Js,是一门跨平台、面向对象的脚本语言。是用来控制网页行为的,它能使网页可交互 JavaScript和Java是完全不同的语言,无论是概念还是设计。但是基础语法类似。 JavaScript JavaScript引入方式…...

laravel清除不同缓存
1、清除应用程序缓存: php artisan cache:clear2、清除路由缓存: php artisan route:cache3、清除配置缓存: php artisan config:cache4、清除编译后的视图文件: php artisan view:clear5、清除事件和监听器缓存: ph…...

【Git】解决分支冲突、分支合并、版本回退、版本管理
解决本地冲突 1. 合并分支 假设你正在 main 分支上,想要合并 feature 分支。 git checkout main git merge feature如果两个分支都对同一文件做了不同的修改,Git 会提示你有冲突,并显示冲突文件。 2. 查看冲突文件 使用以下命令查看冲突…...

linux file结构体与inode结构体
在 Linux 系统中,inode 结构体和 file 结构体都是与文件系统相关的重要数据结构。它们各自承担着不同的角色,帮助操作系统管理文件和目录。以下是它们的异同点: inode 结构体 1.定义:inode(索引节点)是文件…...