当前位置: 首页 > news >正文

Netty笔记

本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频

学习netty之前要先打好NIO的基础,可以先去看我的另一篇文章

一、概述

不想看的可以直接跳过

Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库

  • Spark - 大数据分布式计算框架

  • Hadoop - 大数据分布式存储框架

  • RocketMQ - ali 开源的消息队列

  • ElasticSearch - 搜索引擎

  • gRPC - rpc 框架

  • Dubbo - rpc 框架

  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端

  • Zookeeper - 分布式协调框架

Netty 的优势

  • Netty vs NIO,工作量大,bug 多

    • 需要自己构建协议

    • 解决 TCP 传输问题,如粘包、半包

    • epoll 空轮询导致 CPU 100%

    • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

  • Netty vs 其它网络应用框架

    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

    • 久经考验,16年,Netty 版本

      • 2.x 2004

      • 3.x 2008

      • 4.x 2013

      • 5.x 已废弃(没有明显的性能提升,维护成本高)

二、入门案例

首次看netty的代码会比较乱,不要慌,多看看多学学,就会很熟悉的。
最重要的是要理解每一步的作用

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world

  • 服务器仅接收,不返回

依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>

服务端

new ServerBootstrap().group(new NioEventLoopGroup()) // 1.channel(NioServerSocketChannel.class) // 2.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 5ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}}).bind(8080); // 4

  • 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开

  • 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有

  • 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

  • 4 处,ServerSocketChannel 绑定的监听端口

  • 5 处,SocketChannel 的处理器,解码 ByteBuf => String

  • 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

客户端

new Bootstrap().group(new NioEventLoopGroup()) // 1.channel(NioSocketChannel.class) // 2.handler(new ChannelInitializer<Channel>() { // 3@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder()); // 8}}).connect("127.0.0.1", 8080) // 4.sync() // 5.channel() // 6.writeAndFlush(new Date() + ": hello world!"); // 7
  • 1 处,创建 NioEventLoopGroup,同 Server

  • 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有

  • 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

  • 4 处,指定要连接的服务器和端口

  • 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕

  • 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作

  • 7 处,写入消息并清空缓冲区

  • 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出

  • 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

流程分析

其实黑马使用链式编程对初学者来说并不友好,我下面对代码进行拆分

乍一看怎么全是ServerBootstrap,其实这些操作都是围绕着这个类在转的。

下一章组件 将会对每一个组件进行详细的分析,到时候就没有这么乱了

可以说明的是NioEventLoopGroup 是一个EventLoop的集合,EventLoop相当于NIO里面处理读写时间的工作者,都可以单开线程的。类似于NIO里面的多线程优化。

  ServerBootstrap serverBootstrap = new ServerBootstrap();//得到启动类NioEventLoopGroup group = new NioEventLoopGroup();// EventLoop的集合ServerBootstrap serverBootstrap1 = serverBootstrap.group(group);// 将EventLoopGroup交给启动类ServerBootstrap serverBootstrap2 = serverBootstrap1.channel(NioServerSocketChannel.class);// 指定channel的类型ServerBootstrap serverBootstrap3 = serverBootstrap2.childHandler(new ChannelInitializer<NioSocketChannel>() {// 设置子通道(Channel)的处理器(ChannelHandler)的protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 添加处理器ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 添加自定义处理器@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}});ChannelFuture channelFuture = serverBootstrap1.bind(8080);// 绑定端口

三、组件 

1、EventLoop

我在前面提过,这就相当于处理读写事件的工作者。维护着自己的Selector。

我们之前提到过一个Selector能监听多个channel。一个服务端有多个Selector

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法(这也就让他有处理定时任务的能力)

  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,

    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop

    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力

    • 另有 next 方法获取集合中下一个 EventLoop

可以自己指定group的大小,但是没有必要

// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

结果: 

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98

优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

演示 NioEventLoop 处理 io 事件

下面主要演示的是 工人是轮流工作的,但是对于同一个channel,多次来进行读写,为他服务的是同一个工作(也就是EventLoop)

服务器端两个 nio worker 工人

new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if (byteBuf != null) {byte[] buf = new byte[16];ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());log.debug(new String(buf));}}});}}).bind(8080).sync();

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup(1)).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.println("init...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}}).channel(NioSocketChannel.class).connect("localhost", 8080).sync().channel();channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

最后输出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu         

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定

2、channelFuture

3、future和promise

4、bytebuf

工具类:用于调试 展示bytebuf的内容

private static void log(ByteBuf buffer) {int length = buffer.readableBytes();int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;StringBuilder buf = new StringBuilder(rows * 80 * 2).append("read index:").append(buffer.readerIndex()).append(" write index:").append(buffer.writerIndex()).append(" capacity:").append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());
}

1、内存模式和池化

堆内存vs直接内存

堆内存 分配效率高,但读写效率低。直接内存反之。

使用ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
默认方式创建出来的是使用的直接内存

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用

  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力

  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率

  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现

  • 4.1 之前,池化功能还不成熟,默认是非池化实现

2、组成、写入、读取

相比于ByteBuffer的优点是①可扩容  ②使用读写指针,不用切换,逻辑更简单

读过的内存就废除 。

如果没有指定初始大小,默认是256个字节

扩容

再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容

buffer.writeInt(6);
log(buffer);

扩容规则是

  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16

  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)

  • 扩容不能超过 max capacity 会报错

使用mark配合reset实现读取多次,其实就是做个标记,然后跳回标记的地方。

 public static void main(String[] args) throws ExecutionException, InterruptedException {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);buffer.writeBytes(new byte[]{1,2,3,4,5});log(buffer);buffer.markReaderIndex();System.out.println(buffer.readByte());buffer.resetReaderIndex();System.out.println(buffer.readByte());log(buffer);}

3、slice  和composite

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

例,原始 ByteBuf 进行一些初始操作

ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

         +-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

切片的正确使用

就是每个切片要执行retain,防止内存被释放。等到自己用完之后再执行release

composite合并

public static void main(String[] args) {ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{1,2,3});ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{4,5,6});CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();compositeBuffer.addComponents(true,buffer1,buffer2);log(compositeBuffer);}

5、实现一个简单的回响功能

客户端发送什么,服务端就回复什么

这是 服务端的代码

package cn.itcast.mytest.homework;import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;@Slf4j
//回声服务器  返回客户端发送的消息
public class Server {public static void main(String[] args) throws InterruptedException {log.debug("启动中。。。");new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("收到消息===>{}",msg);nioSocketChannel.writeAndFlush(msg);}});nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("发送消息===>{}",msg);super.write(ctx, msg, promise);}});}}).bind(8080);}
}

这是客户端

package cn.itcast.mytest.homework;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;import java.util.Scanner;@Slf4j
public class Client {public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("收到消息==>{}",msg);}});}}).connect("localhost",8080).sync();Scanner sc=new Scanner(System.in);while(true){System.out.println("请输入要发送的消息(输入q退出)");String s = sc.nextLine();if(!"q".equals(s)){channelFuture.channel().writeAndFlush(s);}else{break;}}log.debug("结束对话");}
}

四、黏包和半包

1、黏包现象

所谓黏包现象,就是发送方在短时间内发送多条数据,接收方无法准确分辨出每个独立数据包的边界。这种情况常见于基于流的传输协议(如TCP),因为TCP是面向字节流的协议,数据在网络中以流的形式发送,而不是分包的形式。


就是多个数据包黏在一起了

channel建立成功之后,会触发active事件。

案例 :客户端连续发送10次16字节的数据,服务方接收数据之后打印出来,我们会发现,服务端试将这10条数据当成1条数据了。

接收方(服务端)


@Slf4j
public class HelloWorldServer {static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new HelloWorldServer().start();}
}

发送方(客户端)

@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");Random r = new Random();char c = 'a';for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

2、半包现象 

半包现象指的是在网络通信中,一个逻辑上的数据包被拆分成多个部分进行接收。这种现象通常发生在基于流的协议(如 TCP)中,由于 TCP 是面向字节流的协议,数据在传输时并不会被强制分包,而是以流的方式发送和接收。

案例 :客户端一次发送1600字节的数据,但是服务端一次只能接受1024,要将一份数据分成两份,出现了半包的现象。


@Slf4j
public class HelloWorldServer {static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new HelloWorldServer().start();}
}

@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 100; i++) {buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

 

出现原因 

本质是因为 TCP 是流式协议,消息无边界

滑动窗口

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差

为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

    • 图中深色的部分即要发送的数据,高亮的部分即窗口

    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动

    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动

    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

3、解决方案

(1)短连接  

每一次接收完就断开连接,分10次发送

public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {for (int i = 0; i < 10; i++) {send();}}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);ctx.close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

 半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的

(2)定长解码器

本质就是发送一系列消息,以最长的那个消息为固定值。然后将发送的消息都以这个固定的长度为准,如果长度不够就进行填充。

前提是服务端和客户端要约定好长度。

在服务端代码中要加上  将收到的数据进行解码

ch.pipeline().addLast(new FixedLengthFrameDecoder(8));

客户端代码 

@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包Random r = new Random();char c = 'a';ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 10; i++) {byte[] bytes = new byte[8];for (int j = 0; j < r.nextInt(8); j++) {bytes[j] = (byte) c;}c++;buffer.writeBytes(bytes);}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

服务端接收定长的数据

(3)行解码器

消息之间以换行符作为分隔符  LineBasedFrameDecoder

参数是指定最大长度,要是超过这个最大长度还没找到分隔符就报错

 服务端加上

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

客户端代码

package cn.itcast.mytest;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Random;
@Slf4j
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包ByteBuf buffer = ctx.alloc().buffer();char c='0';Random r=new Random();StringBuilder sb=new StringBuilder();for (int i = 0; i < 10; i++) {sb.setLength(0);for(int j=0;j<r.nextInt(256);j++){sb.append(c);}sb.append("\n");buffer.writeBytes(sb.toString().getBytes());c++;}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}

 

这种方法性能低,用得比较少。

(4)LTC解码器 

 lengthFieldOffset   长度字段偏移量
  lengthFieldLength    长度字段 的长度
  lengthAdjustment    长度字段跳到正式内容的偏移值
  initialBytesToStrip   消息剖离

例子1:

长度字段偏移量是0,因为一开始就是长度字段    0
长度字段占两个字节  2
长度字段之后跳0字节就是正式内容   0
消息不剖离  0

例子2:

消息头可能携带其他信息,比如版本号、协议。比如下面的hdr1、hdr2

长度字段偏移量是1  从开始到长度字段有1个字节    1 
长度字段占两个字节  2
长度字段之后跳0字节就是正式内容   hdr2占1个字节    1
消息不剖离  3    从头到第3个字节的部分不要。

服务端加上   第一个参数是消息的最大长度

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));

客户端代码 

public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 发送内容随机的数据包ByteBuf buffer = ctx.alloc().buffer();String content="hello world";//将内容加入到缓冲区buffer.writeInt(content.length());//先写入长度buffer.writeBytes(content.getBytes());String content2="hi";//将内容加入到缓冲区buffer.writeInt(content2.length());//先写入长度buffer.writeBytes(content2.getBytes());ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}

 

注意:如果buffer中的消息长度不一致的时候,参数要以最长的消息制定

假如我将上面代码的长度字段改成2,也就是意味着长度字段所占的字节数只有2
hello world消息中 长度字段是00 00 00 0b,这样他只读取到00,消息长度才0
第二次读取到 00 0b 才会去真正读hello world
这是错误的   (int 占4字节)

五、协议设计与解析 

1、redis

其实我们可以用过netty发送消息给redis,比如set name zhangsan
解析出来就是 下面这图

使用下面的代码可以向redis发送消息并得到响应 

@Slf4j
public class testRedis {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();byte[] LINE = {13, 10};try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}
}

2、http

http协议相对来说比较复杂,好在netty已经帮我们封装了编解码器

我们只需要加上ch.pipeline().addLast(new HttpServerCodec());即可

首先我们先打印出消息的类型

浏览器访问http://localhost:8080/index.html之后(index.html可以换成其他的)

11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.DefaultHttpRequest
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.LastHttpContent$1 

我们会发现消息有两个,一个是请求头、请求行  ,另一个是请求体。这是http编解码器解析的

我们可以使用

 if (msg instanceof HttpRequest) { // 请求行,请求头

                    } else if (msg instanceof HttpContent) { //请求体

                    }

进行判断。


或者我们可以使用 SimpleChannelInboundHandler只处理我们关心的消息类型

 public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {log.debug("{}",msg.uri());DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);byte[] m="<h1>Hello World</h1>".getBytes();response.content().writeBytes(m);response.headers().setInt(CONTENT_LENGTH,m.length);//消息头写入 正文的长度,这样浏览器就不用一直空转ctx.writeAndFlush(response);}});/* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}",msg.getClass());}});*/}}).bind(8080).sync().channel();}

3、自定义

要素

  • 魔数,用来在第一时间判定是否是无效数据包

  • 版本号,可以支持协议的升级

  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk

  • 指令类型,是登录、注册、单聊、群聊... 跟业务相关

  • 请求序号,为了双工通信,提供异步能力

  • 正文长度

  • 消息正文

编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

注意

channel能不能共享Handler。看源码有没有@Sharable注解,否则有线程安全问题

ByteToMessageCodec的子类不能标注为@Sharable

想要使用这个注解,就得继承MessageToMessageCodec,这个类默认是消息转化为消息,不会出现黏包和半包的情况。所以要使用LengthFieldBasedFrameDecoder来确保不会出现黏包半包的情况

@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> 

自定义类没有保存状态就是线程安全的

六、聊天业务

1、登录功能

使用最简单的控制台输入,还有固定数据

注意:全是采用我们之前自定义的编解码器

MessageCodecSharable   
相关资料,我上传上去了

服务端代码

添加后处理器 在连接建立之后执行校验用户名、密码的功能


@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean b = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage result =null;if(b){//登录成功result = new LoginResponseMessage(true, "登录成功");}else{result = new LoginResponseMessage(false, "用户名或密码错误");}ctx.writeAndFlush(result);}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

客户端代码 

连接建立之后,控制台输入用户名、密码。发送给客服端


@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);  //打印日志MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()->{Scanner sc=new Scanner(System.in);System.out.println("请输入用户名:");String username = sc.nextLine();System.out.println("请输入密码:");String password = sc.nextLine();LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);}).start();}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

线程通信

我们使用CountDownLatch来处理 登陆成功和登陆失败的情况,实现

两个线程在channelRead和channelActive之间的通信

@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);  //打印日志MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码CountDownLatch countDownLatch=new CountDownLatch(1);//到0表示可以继续运行AtomicBoolean b=new AtomicBoolean(false);//响应是成功还是失败try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()->{Scanner sc=new Scanner(System.in);System.out.println("请输入用户名:");String username = sc.nextLine();System.out.println("请输入密码:");String password = sc.nextLine();LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);try {countDownLatch.await();//线程阻塞在这里} catch (InterruptedException e) {throw new RuntimeException(e);}if(!b.get()){//如果登录失败。关闭连接ctx.channel().close();}log.debug("=================菜单=================");}).start();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx,msg);if (msg instanceof LoginResponseMessage) {LoginResponseMessage responseMessage = (LoginResponseMessage) msg;if (responseMessage.isSuccess()) {//登录成功b.set(true);}else{b.set(false);}countDownLatch.countDown();}}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

2、业务消息发送

没什么,看看就行

while(true){System.out.println("=======================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("=======================================");String choice = sc.nextLine();String[] s = choice.split(" ");switch (s[0]){case "send":  ctx.writeAndFlush(new ChatRequestMessage(username,s[1],s[2]));break;case "gsend": ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));break;case "gcreate":String[] members = s[2].split(",");Set<String> set=new HashSet<>( Arrays.asList(members));ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));break;case "quit":ctx.channel().close();return ;}}

3、单聊消息处理

为了让我们的代码更清晰

我们先对匿名内部类转化为内部类

 再转化为外部类

第一步

模仿上面的结构编写自定义ChatRequestMessageHandler,我们写在单独的类下

然后使用addLast加入到channel

package cn.itcast.server.Handler;import cn.itcast.message.ChatRequestMessage;
import cn.itcast.message.ChatResponseMessage;
import cn.itcast.server.service.UserServiceFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;//处理单聊的业务
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {String from = msg.getFrom();String to = msg.getTo();Channel channel = SessionFactory.getSession().getChannel(to);//查看对方是否在线if(channel!=null){//在线  发送给接收方channel.writeAndFlush(new ChatResponseMessage(from,msg.getContent()));}else{//发送给  发送方channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"用户不存在或者用户不在线"));}}
}

这是服务端的代码


@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER =new ChatRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}

4、群聊创建

在服务端代码中加上这个handler,这里就不做演示了。

package cn.itcast.server.Handler;import cn.itcast.message.GroupCreateRequestMessage;
import cn.itcast.message.GroupCreateResponseMessage;
import cn.itcast.server.session.Group;
import cn.itcast.server.session.GroupSession;
import cn.itcast.server.session.GroupSessionFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.List;
import java.util.Set;@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx,GroupCreateRequestMessage msg) throws Exception {String groupName = msg.getGroupName();Set<String> members = msg.getMembers();GroupSession groupSession = GroupSessionFactory.getGroupSession();Group group = groupSession.createGroup(groupName, members);//不存在的话,创建并返回nullif(group==null){ctx.writeAndFlush(new GroupCreateResponseMessage(true,"群聊创检查成功!"));//创建成功List<Channel> membersChannel =groupSession.getMembersChannel(groupName);for (Channel channel : membersChannel) {channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入"+groupName));}}else{//创建失败ctx.writeAndFlush(new GroupCreateResponseMessage(false,"群聊已存在"));}}
}

这里有一个缺陷,就是群主并没有进去群聊。所以要在客户端加上

set.add(username);

其他业务就不一一写了,都是编写自定义的Handler。

5、空闲检测、心跳

连接假死:可能因为网络的原因,连接已经断开了。

我们可以做空闲检测,就是服务端和客户端定时检测是否还有消息传输

如果超过5秒就判断连接假死,但是这样是不合理的,客户端可能没有及时发送消息导致连接被断开。   我们可以在客户端进行定时发送心跳,如果心跳没有传输到服务端可以证明连接确实出现问题。如果能传输到,服务端也不会断开连接;

这里要介绍netty提供的IdleStateHandler,他有三个参数

参数1:读空闲时间  参数2:写空闲时间  参数3:读写空闲时间

时间到了之后会触发响应的事件

我们可以使用ChannelDuplexHandler的userEventTriggered方法来处理各种事件

客户端加上ch.pipeline().addLast(new IdleStateHandler(0,3,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if(event.state()== IdleState.WRITER_IDLE){log.debug("心跳");ctx.writeAndFlush(new PingMessage());}}});服务端加上//参数1:读空闲时间  参数2:写空闲时间  参数3:读写空闲时间ch.pipeline().addLast(new IdleStateHandler(5,0,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if(event==IdleStateEvent.READER_IDLE_STATE_EVENT){log.debug("5秒没有读事件");ctx.channel().close();}}});

如果客户端还是断开了,是因为这里有个bug,等下一章才学习到。

相关文章:

Netty笔记

本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频 学习netty之前要先打好NIO的基础&#xff0c;可以先去看我的另一篇文章 一、概述 不想看的可以直接跳过 Netty 的地位 Netty 在 Java 网络应用框架中的地位就好比&#xff1a;Spring 框架在 JavaEE 开发中的地位…...

管道燃气监管系统

一、建设目标 建立一个高效、智能、安全的管道燃气监管系统&#xff0c;实现对管道燃气的实时监测、风险预警、事故应急处理和数据分析&#xff0c;确保燃气供应的安全稳定&#xff0c;提高燃气管理的效率和水平。 二、系统架构 感知层 安装压力传感器、流量传感器、温度传感…...

Python语法结构(三)(Python Syntax Structure III)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…...

08_Linux文件查找技巧:locate、find 和 grep 命令详解

系列文章导航&#xff1a;01_Linux基础操作CentOS7学习笔记-CSDN博客 文章目录 1. locate命令2. grep命令3. find命令 在Linux系统中&#xff0c;文件查找是一项常见的任务。本文将详细介绍三种强大的文件查找命令&#xff1a;locate、find 和 grep&#xff0c;以及它们的使用…...

JAVA 实验六

一&#xff1a; &#xff08;1&#xff09; 运行以上尟序并尣以上尟序尜尢一行尥码添加注解&#xff0c;将尟序保存尣e601.java &#xff08;2&#xff09; 以上尟序尣类变量是哪一个变量&#xff0c;类尠尞是哪一个尠尞&#xff0c;请找出类变量和类尠尞被使用尜语…...

电脑查不到IP地址是什么原因?怎么解决

在日常使用电脑的过程中&#xff0c;有时会遇到无法查询到电脑IP地址的情况&#xff0c;这可能会影响到网络的正常使用。本文将探讨电脑查不到IP地址的可能原因&#xff0c;并提供相应的解决方案。 一、原因分析 ‌网络连接问题‌&#xff1a;首先&#xff0c;网络连接不稳定或…...

Axure重要元件三——中继器修改数据

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 课程主题&#xff1a;中继器修改数据 主要内容&#xff1a;显示编辑内容、表格赋值、修改数据 应用场景&#xff1a;更新行、表单数据行修改 案例展示&#xff1a; 正文…...

应用层——电子邮件、MIME、简单网络管理协议SNMP

电子邮件 电子邮件系统采用三个主要构件组成&#xff1a;用户代理、邮件服务器、电子邮件所需的协议 我们可以简单的认为邮件服务器中有很多邮箱&#xff0c;还有用来缓存再转发邮件的缓存&#xff0c;发送方使用用户代理通过邮件发送协议。例如SMTP将邮件发送给发送方。 邮件服…...

我与C语言二周目邂逅vlog——8.编译和链接

C语言中的编译和链接过程详细总结 1. 概述 C 语言是一种经典的系统级编程语言&#xff0c;其开发过程包括多个阶段&#xff0c;其中最关键的就是编译和链接过程。编译和链接的理解对于掌握 C 语言程序的构建至关重要。在本篇文章中&#xff0c;我们将深入讲解 C 语言的编译和…...

Views Page 视图页面

下图中显示的 Views 页面允许自定义网格级别及其相应的 View。 Views &#xff08;视图&#xff09; 页面包含两个主要部分&#xff1a; 关卡设计师;请注意&#xff0c;其他设计器页面为在关卡设计器中选择的 View 提供设置;Properties &#xff08;属性&#xff09; 窗口&…...

Win10 IDEA远程连接HBase

Win10 IDEA远程连接HBase Win10 IDEA连接虚拟机中的Hadoop&#xff08;HDFS&#xff09; 关闭Hadoop和Hbase 如果已经关闭不需要走这一步 cd /usr/local/hbase bin/stop-hbase.sh cd /usr/local/hadoop ./sbin/stop-dfs.sh获取虚拟机的ip 虚拟机终端输入 ip a关闭虚拟机…...

1.centos 镜像

centos 它有官网的下载地址&#xff1a;https://vault.centos.org/ 选择想要的版本&#xff0c;我选择 centos7.8 进入到镜像目录 isos 选择 x86_64 选择想要的版本&#xff0c;我选择 CentOS-7-x86_64-DVD-2003.iso 安装就正常安装就行。我选择虚拟机安装。这个参考&…...

electron 操作 cookie

前言&#xff1a;在 Electron 中操作 Cookie 可以使用electron模块提供的session对象来实现。 一、获取 Cookie 通过defaultSession获取默认会话对象&#xff0c;然后调用cookies.get方法并传入要获取 Cookie 的 URL 地址&#xff0c;以获取该 URL 对应的 Cookie。 const el…...

黑马软件测试第一篇_Linux

Linux 操作系统 说明: 所有硬件设备组装完成后的第⼀一层软件, 能够使⽤用户使⽤用硬件设备的软件 即为操作系统 常见分类 桌⾯面操作系统: Windows/macOS/Linux移动端操作系统: Android(安卓)/iOS(苹果)服务器器操作系统: Linux/Windows Server嵌⼊入式操作系统: Android(底…...

npm run dev 启动前端项目的原理

在一个使用 Vite 构建工具的 Vue 项目中&#xff0c;当你运行 npm run dev 时&#xff0c;实际执行的命令是 vite。为了理解这一过程&#xff0c;我们需要了解几个关键点&#xff1a; package.json 文件中的 scripts 字段: "scripts": {"dev": "vite&…...

【2024年SCI一区新算法:黑翅鸢优化算法 】分布式电网故障定位

1 场景介绍 使用10节点网络 2 故障设置 分为单重故障和两重故障 %% 2 故障设置 %% 1&#xff09;单重故障 I[1,-1,0,0,-1,-1,0,0,-1,-1]; % 区段1故障 节点状态实际编码&#xff08;是否流过故障电流&#xff09; % I[1,1,0,0,-1,-1,0,0,-1,-1]; % 区段2故障 % I[…...

PyTorch 中 12 种张量操作详解

创作不易&#xff0c;还请各位同学三连点赞&#xff01;&#xff01;收藏&#xff01;&#xff01;转发&#xff01;&#xff01;&#xff01; 对于刚入门学习Python还找不到方向的小伙伴可以试试我的这份学习方法和籽料&#xff0c;免费自取&#xff01;&#xff01; PyTorc…...

雷池WAF自动化实现安全运营实操案例终极篇

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…...

微信小程序实现canvas电子签名

一、先看效果 小程序canvas电子签名 二、文档 微信小程序canvas 组件文档 微信小程序canvas API文档 H5Canvas文档 三、分析 1、初始话Canvas容器 2、Canvas触摸事件&#xff0c;bindtouchstart&#xff08;手指触摸动作开始&#xff09;、bindtouchmove&#xff08;手指触摸…...

【SpringCloud】Seata微服务事务

Seata微服务事务 分布式事务问题&#xff1a;本地事务分布式事务演示分布式事务问题&#xff1a;示例1 分布式事务理论CAP定理一致性可用性分区容错矛盾 Base理论解决分布式事务的思路 初识SeataSeata的架构部署TC服务微服务集成Seata引入依赖配置TC地址 其他服务 动手实践XA模…...

重新阅读《马说》,感悟“伯乐相马”背后的被选择与选择的大智慧

“初闻不识曲中意&#xff0c;再听已是曲终人”。世有伯乐&#xff0c;然后有千里马。千里马常有&#xff0c;而伯乐不常有。无论你是考研考公等考试大军中的一员&#xff0c;还是已步入社会的打工人或者领导&#xff0c;当你面临被人选择或者选择人时&#xff0c;皆可从《马说…...

深入拆解TomcatJetty(三)

深入拆解Tomcat&Jetty&#xff08;三&#xff09; 专栏地址&#xff1a;https://time.geekbang.org/column/intro/100027701 1 Tomcat组件生命周期 Tomcat如何如何实现一键式启停 Tomcat 架构图和请求处理流程如图所示&#xff1a; 对组件之间的关系进行分析&#xff0c;…...

MySQL 实现简单的性能优化

一&#xff1a;硬件优化 更高的网络带宽&#xff1a;在处理大规模的远程请求时可以提高MySQL服务器的响应速度&#xff1b; 更大的内存空间&#xff1a;有助于缓存更多的数据库数据&#xff0c;减少磁盘I/O操作&#xff0c;提高整体性能&#xff1b; 换用企业级SSD&#xff1…...

AB包资源管理器

简介 ABMgr&#xff08;Asset Bundle Manager&#xff09;类是一个用于管理 Unity 中 AssetBundle 资源加载的管理器。它通过字典缓存和管理加载的 AB 包&#xff0c;同时支持同步和异步加载。还包含了卸载和清理 AB 包的方法。 功能解析&#xff1a; 主包加载与依赖管理&…...

Centos7源报错问题

原因&#xff1a;是因为centos7在024年6月份停止维护&#xff0c;导致默认镜像不能使用&#xff0c;更改镜像即可mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/…...

Openlayers高级交互(2/20):清除所有图层的有效方法

Openlayers项目中&#xff0c;经常会放置很多的图层&#xff0c;在业务操作的时候&#xff0c;会做出删除所有图层的行为。这里面给出了一个详细的方法&#xff0c;能够有效的解决 清除所有图层的问题。 效果图 专栏名称内容介绍Openlayers基础实战 &#xff08;72篇&#xff…...

黑马JavaWeb-day02

什么是JavaScript&#xff1f; JavaScript&#xff1a;简称Js,是一门跨平台、面向对象的脚本语言。是用来控制网页行为的&#xff0c;它能使网页可交互 JavaScript和Java是完全不同的语言&#xff0c;无论是概念还是设计。但是基础语法类似。 JavaScript JavaScript引入方式…...

laravel清除不同缓存

1、清除应用程序缓存&#xff1a; php artisan cache:clear2、清除路由缓存&#xff1a; php artisan route:cache3、清除配置缓存&#xff1a; php artisan config:cache4、清除编译后的视图文件&#xff1a; php artisan view:clear5、清除事件和监听器缓存&#xff1a; ph…...

【Git】解决分支冲突、分支合并、版本回退、版本管理

解决本地冲突 1. 合并分支 假设你正在 main 分支上&#xff0c;想要合并 feature 分支。 git checkout main git merge feature如果两个分支都对同一文件做了不同的修改&#xff0c;Git 会提示你有冲突&#xff0c;并显示冲突文件。 2. 查看冲突文件 使用以下命令查看冲突…...

linux file结构体与inode结构体

在 Linux 系统中&#xff0c;inode 结构体和 file 结构体都是与文件系统相关的重要数据结构。它们各自承担着不同的角色&#xff0c;帮助操作系统管理文件和目录。以下是它们的异同点&#xff1a; inode 结构体 1.定义&#xff1a;inode&#xff08;索引节点&#xff09;是文件…...