当前位置: 首页 > article >正文

Netty源码—5.Pipeline和Handler一

大纲

1.Pipeline和Handler的作用和构成

2.ChannelHandler的分类

3.几个特殊的ChannelHandler

4.ChannelHandler的生命周期

5.ChannelPipeline的事件处理

6.关于ChannelPipeline的问题整理

7.ChannelPipeline主要包括三部分内容

8.ChannelPipeline的初始化

9.ChannelPipeline添加ChannelHandler

10.ChannelPipeline删除ChannelHandler

11.Inbound事件的传播

12.Outbound事件的传播

13.ChannelPipeline中异常的传播

14.ChannelPipeline总结

1.Pipeline和Handler的作用和构成

(1)Pipeline和Handler的作用

(2)Pipeline和Handler的构成

(1)Pipeline和Handler的作用

可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

Netty通过责任链模式来组织代码逻辑,能够支持逻辑的动态添加和删除,能够支持各类协议的扩展。

(2)Pipeline和Handler的构成

在Netty里,一个连接对应着一个Channel。这个Channel的所有处理逻辑都在一个叫ChannelPipeline的对象里,ChannelPipeline是双向链表结构,它和Channel之间是一对一的关系。

ChannelPipeline里的每个结点都是一个ChannelHandlerContext对象,这个ChannelHandlerContext对象能够获得和Channel相关的所有上下文信息。每个ChannelHandlerContext对象都包含一个逻辑处理器ChannelHandler,每个逻辑处理器ChannelHandler都处理一块独立的逻辑。

2.ChannelHandler的分类

ChannelHandler有两大子接口,分别为Inbound和Outbound类型:第一个子接口是ChannelInboundHandler,用于处理读数据逻辑,最重要的方法是channelRead()。第二个子接口是ChannelOutboundHandler,用于处理写数据逻辑,最重要的方法是write()。

这两个子接口默认的实现分别是:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。它们分别实现了两个子接口的所有功能,在默认情况下会把读写事件传播到下一个Handler。

InboundHandler的事件通常只会传播到下一个InboundHandler,OutboundHandler的事件通常只会传播到下一个OuboundHandler,InboundHandler的执行顺序与实际addLast的添加顺序相同,OutboundHandler的执行顺序与实际addLast的添加顺序相反。

Inbound事件通常由IO线程触发,如TCP链路的建立事件、关闭事件、读事件、异常通知事件等。其触发方法一般带有fire字眼,如下所示:

ctx.fireChannelRegister()、

ctx.fireChannelActive()、

ctx.fireChannelRead()、

ctx.fireChannelReadComplete()、

ctx.fireChannelInactive()。

Outbound事件通常由用户主动发起的网络IO操作触发,如用户发起的连接操作、绑定操作、消息发送等操作。其触发方法一般如:ctx.bind()、ctx.connect()、ctx.write()、ctx.flush()、ctx.read()、ctx.disconnect()、ctx.close()。

3.几个特殊的ChannelHandler

(1)ChannelInboundHandlerAdapter

(2)ChannelOutboundHandlerAdapter

(3)ByteToMessageDecoder

(4)SimpleChannelInboundHandler

(5)MessageToByteEncoder

(1)ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter主要用于实现ChannelInboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline.
public interface ChannelHandler {//Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.void handlerAdded(ChannelHandlerContext ctx) throws Exception;//Gets called after the ChannelHandler was removed from the actual context and it doesn't handle events anymore.void handlerRemoved(ChannelHandlerContext ctx) throws Exception;//Gets called if a Throwable was thrown.@Deprecatedvoid exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;//Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.@Inherited@Documented@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@interface Sharable {// no value}
}//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {// Not using volatile because it's used only for a sanity check.boolean added;//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}//Do nothing by default, sub-classes may override this method.@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {// NOOP}//Do nothing by default, sub-classes may override this method.@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// NOOP}//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
}//ChannelHandler which adds callbacks for state changes. 
//This allows the user to hook in to state changes easily.
public interface ChannelInboundHandler extends ChannelHandler {//The Channel of the ChannelHandlerContext was registered with its EventLoopvoid channelRegistered(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext was unregistered from its EventLoopvoid channelUnregistered(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext is now activevoid channelActive(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext was registered is now inactive and reached its end of lifetime.void channelInactive(ChannelHandlerContext ctx) throws Exception;//Invoked when the current Channel has read a message from the peer.void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;//Invoked when the last message read by the current read operation has been consumed by #channelRead(ChannelHandlerContext, Object).//If ChannelOption#AUTO_READ is off, no further attempt to read an inbound data from the current Channel will be made until ChannelHandlerContext#read() is called.void channelReadComplete(ChannelHandlerContext ctx) throws Exception;//Gets called if an user event was triggered.void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;//Gets called once the writable state of a Channel changed. //You can check the state with Channel#isWritable().void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;//Gets called if a Throwable was thrown.@Override@SuppressWarnings("deprecation")void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}//Abstract base class for ChannelInboundHandler implementations which provide implementations of all of their methods.
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {//Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}//Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}//Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}//Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}//Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}//Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}//Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}//Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
}

(2)ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter主要用于实现ChannelOutboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//ChannelHandler which will get notified for IO-outbound-operations.
public interface ChannelOutboundHandler extends ChannelHandler {//Called once a bind operation is made.void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;//Called once a connect operation is made.void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;//Called once a disconnect operation is made.void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Called once a close operation is made.void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Called once a deregister operation is made from the current registered EventLoop.void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Intercepts ChannelHandlerContext#read().void read(ChannelHandlerContext ctx) throws Exception;//Called once a write operation is made. The write operation will write the messages through the ChannelPipeline.//Those are then ready to be flushed to the actual Channel once Channel#flush() is called.void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;//Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending.void flush(ChannelHandlerContext ctx) throws Exception;
}//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Overridepublic void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);}//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.close(promise);}//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}

(3)ByteToMessageDecoder

基于这个ChannelHandler可以实现自定义解码,而不用关心ByteBuf的强转和解码结果的传递。Netty里的ByteBuf默认下使用的是堆外内存,ByteToMessageDecoder会自动进行内存的释放,不用操心内存管理。我们自定义的ChannelHandler继承了ByteToMessageDecoder后,需要实现decode()方法。

(4)SimpleChannelInboundHandler

基于这个ChannelHandler可以实现每一种指令的处理,不再需要强转、不再有冗长的if else逻辑、不再需要手动传递对象。

同时还可以自动释放没有往下传播的ByteBuf,因为我们编写指令处理ChannelHandler时,可能会编写不用关心的if else判断,然后手动传递无法处理的对象至下一个指令处理器。

//xxxHandler.java
if (packet instanceof xxxPacket) {//进行处理
} else {ctx.fireChannelRead(packet);
}

(5)MessageToByteEncoder

基于这个ChannelHandler可以实现自定义编码,而不用关心ByteBuf的创建,不用把创建完的ByteBuf进行返回。

4.ChannelHandler的生命周期

(1)ChannelHandler回调方法的执行顺序

ChannelHandler回调方法的执行顺序可以称为ChannelHandler的生命周期。

新建连接时ChannelHandler回调方法的执行顺序是:handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()。

关闭连接时ChannelHandler回调方法的执行顺序是:channelInactive() -> channelUnregistered() -> handlerRemoved()。

接下来是ChannelHandler具体的回调方法说明,其中一二三的顺序可以参考AbstractChannel的内部类AbstractUnsafe的register0()方法。

一.handlerAdded()

检测到新连接后调用"ch.pipeline().addLast(...)"之后的回调,表示当前Channel已成功添加一个ChannelHandler。

二.channelRegistered()

表示当前Channel已和某个NioEventLoop线程建立了绑定关系,已经创建了一个Reactor线程来处理当前这个Channel的读写。

三.channelActive()

当Channel的Pipeline已经添加完所有的ChannelHandler以及绑定好一个NioEventLoop线程,这个Channel对应的连接才算真正被激活,接下来就会回调该方法。

四.channelRead()

服务端每次收到客户端发送的数据时都会回调该方法,表示有数据可读。

五.channelReadComplete()

服务端每次读完一条完整的数据都会回调该方法,表示数据读取完毕。

六.channelInactive()

表示这个连接已经被关闭,该连接在TCP层已经不再是ESTABLISH状态。

七.channelUnregister()

表示与这个连接对应的NioEventLoop线程移除了对这个连接的处理。

八.handlerRemoved()

表示给这个连接添加的所有的ChannelHandler都被移除了。

(2)ChannelHandler回调方法的应用场景

一.handlerAdded()方法与handlerRemoved()方法通常可用于一些资源的申请和释放。

二.channelActive()方法与channelInactive()方法表示的是TCP连接的建立与释放,可用于统计单机连接数或IP过滤。

三.channelRead()方法可用于根据自定义协议进行拆包。每次读到一定数据就累加到一个容器里,然后看看能否拆出完整的包。

四.channelReadComplete()方法可用于实现批量刷新。如果每次向客户端写数据都通过writeAndFlush()方法写数据并刷新到底层,其实并不高效。所以可以把调用writeAndFlush()方法的地方换成调用write()方法,然后再在channelReadComplete()方法里调用ctx.channel().flush()。

5.ChannelPipeline的事件处理

(1)消息读取和发送被Pipeline处理的过程

(2)ChannelPipeline的主要特征

(1)消息读取和发送被Pipeline处理的过程

消息的读取和发送被ChannelPipeline的ChannelHandler链拦截和处理的全过程:

一.首先AbstractNioChannel内部类NioUnsafe的read()方法读取ByteBuf时会触发ChannelRead事件,也就是由NioEventLoop线程调用ChannelPipeline的fireChannelRead()方法将ByteBuf消息传输到ChannelPipeline中。

二.然后ByteBuf消息会依次被HeadContext、xxxChannelHandler、...、TailContext拦截处理。在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递。

三.接着用户可能会调用ChannelHandlerContext的write()方法发送ByteBuf消息。此时ByteBuf消息会从TailContext开始,途径xxxChannelHandler、...、HeadContext,最终被添加到消息发送缓冲区中等待刷新和发送。在这个过程中,任何ChannelHandler都可以中断当前的流程,中断消息的传递。

(2)ChannelPipeline的主要特征

一.ChannelPipeline支持运行时动态地添加或者删除ChannelHandler

例如业务高峰时对系统做拥塞保护。处于业务高峰期时,则动态地向当前的ChannelPipeline添加ChannelHandler。高峰期过后,再移除ChannelHandler。

二.ChannelPipeline是线程安全的

多个业务线程可以并发操作ChannelPipeline,因为使用了synchronized关键字。但ChannelHandler却不一定是线程安全的,这由用户保证。

6.关于ChannelPipeline的问题整理

一.Netty是如何判断ChannelHandler类型的?

即如何判断一个ChannelHandler是Inbound类型还是Outbound类型?

答:当调用Pipeline去添加一个ChannelHandler结点时,旧版Netty会使用instanceof关键字来判断该结点是Inbound类型还是Outbound类型,并分别用一个布尔类型的变量来进行标识。新版Netty则使用一个整形的executionMask来具体区分详细的Inbound事件和Outbound事件。这个executionMask对应一个16位的二进制数,是哪一种事件就对应哪一个Mask。

//Inbound事件的Mask
MASK_EXEPTION_CAUGHT = 1;
MASK_CHANNEL_REGISTER = 1 << 1;
MASK_CHANNEL_UNREGISTER = 1 << 2;
MASK_CHANNEL_ACTIVE = 1 << 3;
MASK_CHANNEL_INACTIVE = 1 << 4;
MASK_CHANNEL_READ = 1 << 5;
MASK_CHANNEL_READ_COMPLETE = 1 << 6;
MASK_CHANNEL_USER_EVENT_TRIGGERED = 1 << 7;
MASK_CHANNEL_WRITABLITY_CHANGED = 1 << 8;//Outbound事件的Mask
MASK_BIND = 1 << 9;
MASK_CONNECT = 1 << 10;
MASK_DISCONNECT = 1 << 11;
MASK_CLOSE = 1 << 12;
MASK_DEREGISTER = 1 << 13;
MASK_READ = 1 << 14;
MASK_WRITE = 1 << 15;
MASK_FLUSH = 1 << 16;

二.添加ChannelHandler时应遵循什么样的顺序?

答:Inbound类型的事件传播跟添加ChannelHandler的顺序一样,Outbound类型的事件传播跟添加ChannelHandler的顺序相反。

三.用户手动触发事件传播的两种方式有什么区别?

这两种方式是分别是:ctx.writeAndFlush()和ctx.channel().writeAndFlush()。

答:当通过Channel去触发一个事件时,那么该事件会沿整个ChannelPipeline传播。如果是Inbound类型事件,则从HeadContext结点开始向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从TailContext结点开始向前传播到第一个Outbound类型的结点。当通过当前结点去触发一个事件时,那么该事件只会从当前结点开始传播。如果是Inbound类型事件,则从当前结点开始一直向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从当前结点开始一直向前传播到第一个Outbound类型的结点。

7.ChannelPipeline主要包括三部分内容

一.ChannelPipeline的初始化

服务端Channel和客户端Channel在何时初始化ChannelPipeline?在初始化时又做了什么事情?

二.添加和删除ChannelHandler

Netty是如何实现业务逻辑处理器动态编织的?

三.事件和异常的传播

读写事件和异常在ChannelPipeline中的传播。

8.ChannelPipeline的初始化

(1)ChannelPipeline的初始化时机

(2)ChannelPipeline的初始化内容

(3)ChannelPipeline的说明

(1)ChannelPipeline的初始化时机

在服务端启动和客户端连接接入的过程中,在创建NioServerSocketChannel和NioSocketChannel时,会逐层执行父类的构造方法,最后执行到AbstractChannel的构造方法。AbstractChannel的构造方法会将Netty的核心组件创建出来。而核心组件中就包含了DefaultChannelPipeline类型的ChannelPipeline组件。

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent;private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}//Returns a new DefaultChannelPipeline instance.protected DefaultChannelPipeline newChannelPipeline() {//创建ChannelPipeline组件return new DefaultChannelPipeline(this);}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;private final Channel channel;//保存了Channel的引用...protected DefaultChannelPipeline(Channel channel) {//保存Channel的引用到Pipeline组件的成员变量this.channel = ObjectUtil.checkNotNull(channel, "channel");...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}...
}

(2)ChannelPipeline的初始化内容

ChannelPipeline的初始化主要涉及三部分内容:

一.Pipeline在创建Channel时被创建

二.Pipeline的结点是ChannelHandlerContext

三.Pipeline两大哨兵HeadContext和TailContext

(3)ChannelPipeline的说明

ChannelPipeline中保存了Channel的引用,ChannelPipeline中每个结点都是一个ChannelHandlerContext对象,每个ChannelHandlerContext结点都包裹着一个ChannelHandler执行器,每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline。由于ChannelPipeline又保存了Channel的引用,所以每个ChannelHandlerContext结点都可以拿到所有的上下文信息。

ChannelHandlerContext接口多继承自AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker。

ChannelHandlerContext的关键方法有:channel()、executor()、handler()、pipeline()、alloc()。ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能。

ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表。HeadContext结点会比TailContext结点多一个unsafe成员变量。

public class DefaultChannelPipeline implements ChannelPipeline {//ChannelPipeline中每个结点都是一个ChannelHandlerContext对象final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;private final Channel channel;//ChannelPipeline中保存了Channel的引用...protected DefaultChannelPipeline(Channel channel) {//保存Channel的引用到Pipeline组件的成员变量this.channel = ObjectUtil.checkNotNull(channel, "channel");...//ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {//HeadContext结点会比TailContext结点多一个unsafe成员变量private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}...}final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, true, false);setAddComplete();}...}...
}//ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { //每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipelineprivate final DefaultChannelPipeline pipeline;...AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;ordered = executor == null || executor instanceof OrderedEventExecutor;}
}public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {//Return the Channel which is bound to the ChannelHandlerContext.Channel channel();//Returns the EventExecutor which is used to execute an arbitrary task.EventExecutor executor();//The unique name of the ChannelHandlerContext.//The name was used when then ChannelHandler was added to the ChannelPipeline. //This name can also be used to access the registered ChannelHandler from the ChannelPipeline.String name();//The ChannelHandler that is bound this ChannelHandlerContext.ChannelHandler handler();//Return true if the ChannelHandler which belongs to this context was removed from the ChannelPipeline. //Note that this method is only meant to be called from with in the EventLoop.boolean isRemoved();ChannelHandlerContext fireChannelRegistered();ChannelHandlerContext fireChannelUnregistered();ChannelHandlerContext fireChannelActive();ChannelHandlerContext fireChannelInactive();ChannelHandlerContext fireExceptionCaught(Throwable cause);ChannelHandlerContext fireUserEventTriggered(Object evt);ChannelHandlerContext fireChannelRead(Object msg);ChannelHandlerContext fireChannelReadComplete();ChannelHandlerContext fireChannelWritabilityChanged();ChannelHandlerContext read();ChannelHandlerContext flush();//Return the assigned ChannelPipelineChannelPipeline pipeline();//Return the assigned ByteBufAllocator which will be used to allocate ByteBufs.ByteBufAllocator alloc();...
}

9.ChannelPipeline添加ChannelHandler

(1)常见的客户端代码

(2)ChannelPipeline添加ChannelHandler入口

(3)DefaultChannelPipeline的addLast()方法

(4)检查是否重复添加ChannelHandler结点

(5)创建ChannelHandlerContext结点

(6)添加ChannelHandlerContext结点

(7)回调handerAdded()方法

(8)ChannelPipeline添加ChannelHandler总结

(1)常见的客户端代码

首先用一个拆包器Spliter对二进制数据流进行拆包,然后解码器Decoder会将拆出来的包进行解码,接着业务处理器BusinessHandler会处理解码出来的Java对象,最后编码器Encoder会将业务处理完的结果编码成二进制数据进行输出。

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();   p.addLast(newSpliter());p.addLast(new Decoder());p.addLast(new BusinessHandler());p.addLast(new Encoder());          }
});

整个ChannelPipeline的结构如下所示:

图片

这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是ChannelInboundHandler,用来处理Inbound事件,比如读取数据流进行加工处理。一种是ChannelOutboundHandler,用来处理Outbound事件,比如当调用writeAndFlush()方法时就会经过这种类型的Handler。

(2)ChannelPipeline添加ChannelHandler入口

当服务端Channel的Reactor线程轮询到新连接接入的事件时,就会调用AbstractNioChannel的内部类NioUnsafe的read()方法,也就是调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read()方法。

然后会触发执行代码pipeline.fireChannelRead()传播ChannelRead事件,从而最终触发调用ServerBootstrapAcceptor接入器的channelRead()方法。

在ServerBootstrapAcceptor的channelRead()方法中,便会通过执行代码channel.pipeline().addLast()添加ChannelHandler,也就是通过调用DefaultChannelPipeline的addLast()方法添加ChannelHandler。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {Selector selector;private SelectedSelectionKeySet selectedKeys;private boolean needsToSelectAgain;private int cancelledKeys;...@Overrideprotected void run() {for (;;) {...//1.调用select()方法执行一次事件轮询select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}...//2.处理产生IO事件的ChannelneedsToSelectAgain = false;processSelectedKeys();...//3.执行外部线程放入TaskQueue的任务runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}private void processSelectedKeys() {if (selectedKeys != null) {//selectedKeys.flip()会返回一个数组processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {//1.首先取出IO事件final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//Help GC//2.然后获取对应的Channel和处理该Channel//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//网络事件的处理processSelectedKey(k, (AbstractNioChannel) a);} else {//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}//3.最后判断是否应该再进行一次轮询if (needsToSelectAgain) {for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();//selectedKeys.flip()会返回一个数组selectedKeys = this.selectedKeys.flip();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();...try {int readyOps = k.readyOps();...//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入//此时将调用Channel的unsafe变量来进行实际操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法//进行新连接接入处理unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}...
}//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {//临时存放读到的连接NioSocketChannelprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//断言确保该read()方法必须来自Reactor线程调用assert eventLoop().inEventLoop();//获得Channel对应的Pipelinefinal ChannelPipeline pipeline = pipeline();//获得Channel对应的RecvByteBufAllocator.Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();do {//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channelint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接//2.设置并绑定NioSocketChannelint size = readBuf.size();for (int i = 0; i < size; i ++) {//调用DefaultChannelPipeline的fireChannelRead()方法pipeline.fireChannelRead(readBuf.get(i));}//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法readBuf.clear();pipeline.fireChannelReadComplete();}}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {//从Pipeline的第一个HeadContext处理器开始调用AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//调用AbstractChannelHandlerContext的fireChannelRead()方法ctx.fireChannelRead(msg);}@Overridepublic ChannelHandler handler() {return this;}...}...
}public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...//初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器@Overridevoid init(Channel channel) throws Exception {//1.设置服务端Channel的Option与Attrfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//2.设置客户端Channel的Option与Attrfinal EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//3.配置服务端启动逻辑ChannelPipeline p = channel.pipeline();//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {//一.添加用户自定义的Handler,注意这是handler,而不是childHandlerfinal ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) pipeline.addLast(handler);//二.添加一个特殊的Handler用于接收新连接//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptorch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//调用DefaultChannelPipeline的addLast()方法pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;...//channelRead()方法在新连接接入时被调用@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;//1.给新连接的Channel添加用户自定义的Handler处理器//这里的childHandler其实是一个特殊的Handler: ChannelInitializerchild.pipeline().addLast(childHandler);//2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关for (Entry<ChannelOption<?>, Object> e: childOptions) {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}}//3.设置新连接Channel的属性for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}//4.绑定Reactor线程//childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法childGroup.register(child);}...}...
}

(3)DefaultChannelPipeline的addLast()方法

使用synchronized关键字是为了防止多线程并发操作ChannelPipeline底层的双向链表,添加ChannelHandler结点的过程主要分为4个步骤:

步骤一:判断ChannelHandler是否重复添加

步骤二:创建结点

步骤三:添加结点到链表

步骤四:回调添加完成事件

这个结点便是ChannelHandlerContext,Pipeline里每个结点都是一个ChannelHandlerContext。addLast()方法便是把ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表。

public class DefaultChannelPipeline implements ChannelPipeline {...@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) throw new NullPointerException("handlers");for (ChannelHandler h: handlers) {if (h == null) break;addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//1.检查是否有重复的ChannelHandler结点checkMultiplicity(handler);//2.创建ChannelHandlerContext结点newCtx = newContext(group, filterName(name, handler), handler);//3.添加ChannelHandlerContext结点addLast0(newCtx);...}//4.回调用户方法//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了callHandlerAdded0(newCtx);return this;}...
}

(4)检查是否重复添加ChannelHandler结点

Netty使用了一个成员变量added来表示一个ChannelHandler是否已经添加。如果当前要添加的ChannelHandler是非共享的并且已经添加过,那么抛出异常,否则标识该ChannelHandler已添加。

如果一个ChannelHandler支持共享,那么它就可以无限次被添加到ChannelPipeline中。如果要让一个ChannelHandler支持共享,只需要加一个@Sharable注解即可。而ChannelHandlerAdapter的isSharable()方法正是通过判断该ChannelHandler对应的类是否标有@Sharable注解来实现的。

Netty为了性能优化,还使用了ThreadLocal来缓存ChannelHandler是否共享的情况。在高并发海量连接下,每次有新连接添加ChannelHandler都会调用isSharable()方法,从而优化性能。

public class DefaultChannelPipeline implements ChannelPipeline {...private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}...
}//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {//Not using volatile because it's used only for a sanity check.boolean added;//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.public boolean isSharable() {//Cache the result of Sharable annotation detection to workaround a condition. //We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads. //Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway.Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}...
}

(5)创建ChannelHandlerContext结点

根据ChannelHandler创建ChannelHandlerContext类型的结点时,会将该ChannelHandler的引用保存到结点的成员变量中。

public class DefaultChannelPipeline implements ChannelPipeline {...    @Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//1.检查是否有重复的ChannelHandler结点checkMultiplicity(handler);//2.创建ChannelHandlerContext结点newCtx = newContext(group, filterName(name, handler), handler);//3.添加ChannelHandlerContext结点addLast0(newCtx);...}//4.回调用户方法//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了callHandlerAdded0(newCtx);return this;}//给ChannelHandler创建一个唯一性的名字private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;}//根据ChannelHandler创建一个ChannelHandlerContext结点private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}...
}final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {...AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;ordered = executor == null || executor instanceof OrderedEventExecutor;}...
}

(6)添加ChannelHandlerContext结点

使用尾插法向双向链表添加结点。

public class DefaultChannelPipeline implements ChannelPipeline {...private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {volatile AbstractChannelHandlerContext next;volatile AbstractChannelHandlerContext prev;...
}

(7)回调handerAdded()方法

向ChannelPipeline添加完新结点后,会使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成,然后执行ctx.handler().handlerAdded(ctx),回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。

public class DefaultChannelPipeline implements ChannelPipeline {...private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {//使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成ctx.setAddComplete();//回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法ctx.handler().handlerAdded(ctx);}...
}//DemoHandler是用户定义的ChannelHandler
public class DemoHandler extends SimpleChannelInboundHandler<...> {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法}...
}

最典型的一个回调就是用户代码的ChannelInitializer被添加完成后,会先调用其initChannel()方法将用户自定义的ChannelHandler添加到ChannelPipeline,然后再调用pipeline.remove()方法将自身结点进行删除。

public class NettyServer {private int port;public NettyServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel.option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true)//设置一个ChannelInitializer类型的childHandler//新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)"//也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中//添加完这个结点后会回调ChannelInitializer的handlerAdded()方法//其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点//执行完initChannel()方法后,就会移除ChannelInitializer这个结点.childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new NettyServerHandler());//针对网络请求的处理逻辑}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果} catch (Exception e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception{System.out.println("Starting Netty Server...");int port = 8998;if (args.length > 0) {port = Integer.parseInt(args[0]);}new NettyServer(port).start();}
}@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {...@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {initChannel(ctx);}}@SuppressWarnings("unchecked")private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {remove(ctx);}return true;}return false;}private void remove(ChannelHandlerContext ctx) {try {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {//ChannelPipeline删除ChannelHandler结点(ChannelInitializer)pipeline.remove(this);}} finally {initMap.remove(ctx);}}...
}

(8)ChannelPipeline添加ChannelHandler总结

一.判断ChannelHandler是否重复添加的依据是:如果该ChannelHandler不是共享的且已被添加过,则拒绝添加。

二.否则就创建一个ChannelHandlerContext结点(ctx),并把这个ChannelHandler包装进去,也就是保存ChannelHandler的引用到ChannelHandlerContext的成员变量中。由于创建ctx时保存了ChannelHandler的引用、ChannelPipeline的引用到成员变量,ChannelPipeline又保存了Channel的引用,所以每个ctx都拥有一个Channel的所有信息。

三.接着通过双向链表的尾插法,将这个ChannelHandlerContext结点添加到ChannelPipeline中。

四.最后回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。

相关文章:

Netty源码—5.Pipeline和Handler一

大纲 1.Pipeline和Handler的作用和构成 2.ChannelHandler的分类 3.几个特殊的ChannelHandler 4.ChannelHandler的生命周期 5.ChannelPipeline的事件处理 6.关于ChannelPipeline的问题整理 7.ChannelPipeline主要包括三部分内容 8.ChannelPipeline的初始化 9.ChannelPi…...

Netlify 的深度解析及使用指南

以下是关于 Netlify 的深度解析及使用指南&#xff0c;结合其核心功能与用户需求&#xff0c;提供一站式解决方案&#xff1a; 一、Netlify 核心优势 全托管静态网站服务Netlify 提供从代码托管、自动化构建到全球 CDN 加速的全流程服务&#xff0c;支持 HTML/CSS/JS 静态资源及…...

MySQL小练习

目录 一、单表查询 二、多表查询 一、单表查询 素材&#xff1a; 表名&#xff1a;worker-- 表中字段均为中文&#xff0c;比如 部门号 工资 职工号 参加工作 等 CREATE TABLE worker ( 部门号 int(11) NOT NULL, 职工号 int(11) NOT NULL, 工作时间 date NOT NULL, 工资 float…...

Apache Hive:基于Hadoop的分布式数据仓库

Apache Hive 是一个基于 Apache Hadoop 构建的开源分布式数据仓库系统&#xff0c;支持使用 SQL 执行 PB 级大规模数据分析与查询。 主要功能 Apache Hive 提供的主要功能如下。 HiveServer2 HiveServer2 服务用于支持接收客户端连接和查询请求。 HiveServer2 支持多客户端…...

推荐算法分析

一、性能分析指标 1. 准确性指标&#xff08;Accuracy Metrics&#xff09; 衡量推荐系统预测评分的准确性&#xff0c;包括&#xff1a; ✅ RMSE&#xff08;均方根误差, Root Mean Squared Error&#xff09; 解释&#xff1a;衡量预测评分 (\hat{r}_i) 和真实评分 (r_i)…...

vllm 离线推理Qwen2.5-VL-Instruct,API部署,支持max_pixels

使用这里的最新镜像: https://www.dong-blog.fun/post/1799 启动环境 docker run -it --rm --gpus "device=1,2" \ --net host \ -v ./zizhi_merge_2025-1/:/Qwen2.5-VL-Instruct \ -v ./test:/test \...

检波、限幅、钳位电路

检波电路&#xff1a; 类似调制收音机信号&#xff1a;输入的基波和载波叠加成调制信号&#xff08;信号需要长距离里传输&#xff0c;频率要高&#xff0c;M级别的频率&#xff0c;所以要把低频信号叠在高频信号&#xff0c;才能把低频信号长距离传输&#xff0c;最后到达接收…...

学习threejs,使用TextGeometry文本几何体

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.TextGeometry1.1.1 ☘…...

Go红队开发—CLI框架(一)

CLI开发框架 命令行工具开发&#xff0c;主要是介绍开发用到的包&#xff0c;集成了一个框架&#xff0c;只要学会了基本每个人都能开发安全工具了。 该文章先学flags包&#xff0c;是比较经典的一个包&#xff0c;相比后面要学习的集成框架这个比较自由比较细化点&#xff0…...

解决点击按钮页面自动刷新

在React中&#xff0c;当你点击按钮时&#xff0c;如果按钮的type属性没有明确指定&#xff0c;它的默认值是submit。这意味着如果这个按钮被放置在一个<form>表单中&#xff0c;点击它会触发表单的提交行为&#xff0c;导致页面刷新。 在你的代码中&#xff0c;展开/折叠…...

高效团队开发的工具与方法 引言

引言 在现代软件开发领域&#xff0c;团队协作的效率和质量直接决定了项目的成败。随着项目规模的扩大和技术复杂度的增加&#xff0c;如何实现高效团队开发成为每个开发团队必须面对的挑战。高效团队开发不仅仅是个人技术能力的简单叠加&#xff0c;更需要借助合适的工具和方…...

【Java全栈进阶架构师实战:从设计模式到SpringCloudAlibaba,打造高可用系统】

&#x1f31f; 分享一个教程&#xff0c;助刚踏入IT行业、工作几年的老油条、或热爱学习的工作党们更上一层楼的&#xff01; &#x1f31f; ​适合人群&#xff1a;初中级Java开发者、求职面试备战者、技术提升党&#xff01; &#x1f4da; ​内容亮点&#xff1a; 1️⃣ ​…...

[蓝桥杯 2023 省 A] 异或和之和

题目来自洛谷网站&#xff1a; 暴力思路&#xff1a; 先进性预处理&#xff0c;找到每个点位置的前缀异或和&#xff0c;在枚举区间。 暴力代码&#xff1a; #include<bits/stdc.h> #define int long long using namespace std; const int N 1e520;int n; int arr[N…...

TDengine 3.3.2.0 集群报错 Post “http://buildkitsandbox:6041/rest/sql“

修复&#xff1a; vi /etc/hosts将buildkitsandbox映射为本机节点...

vue数据重置

前言 大家在开发后台管理系统的过程中&#xff0c;一定会遇到一个表格的条件查询重置功能吧&#xff0c;如果说查询条件少&#xff0c;重置起来还算是比较简单&#xff0c;如果元素特别多呢&#xff0c;那玩意写起来可遭老罪喽&#xff0c;那今天就给大家整一个如何快速重置数…...

22、web前端开发之html5(三)

六. 离线存储与缓存 在网络环境不稳定或需要优化资源加载速度的场景下&#xff0c;离线存储与缓存技术显得尤为重要。HTML5引入了多种离线存储和缓存机制&#xff0c;帮助开发者提升用户体验。本节将详细介绍Application Cache、localStorage、sessionStorage以及IndexedDB等技…...

git revert 用法实战:撤销一个 commit 或 merge

git revert 1 区别 • 常规的 commit &#xff08;使用 git commit 提交的 commit&#xff09; • merge commit 2 首先构建场景 master上的代码 dev开发分支上&#xff0c;添加一个a标签&#xff0c;并commit这次提交 切到master上&#xff0c;再次进行改动和提交 将de…...

修形还是需要再研究一下

最近有不少小伙伴问到修形和蜗杆砂轮的问题&#xff0c;之前虽然研究过一段时间&#xff0c;但是由于时间问题放下了&#xff0c;最近想再捡起来。 之前计算的砂轮齿形是一整段的&#xff0c;但是似乎这种对于有些小伙伴来说不太容易接受&#xff0c;希望按照修形的区域进行分…...

AI本地部署之dify

快捷目录 Windows 系统一、环境准备:首先windows 需要准备docker 环1. 安装Docker desktop2. 安装Docker3. 配置Docker 镜像路径4. 配置Docker 下载镜像源5. 重启Docker服务二、Dify 下载和安装1. Dify下载2. Dify 配置3. Dify 安装附件知识:4. Dify创建账号三、下载Ollama d…...

安恒春招一面

《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…...

GPIO八种模式的应用场景总结

以下是 GPIO不同模式及其适用应用场景的详细总结&#xff1a; GPIO模式分类及适用场景 GPIO模式电气特性典型应用场景关键注意事项浮空输入引脚电平由外部信号决定&#xff0c;无内部上拉/下拉电阻• 按键检测&#xff08;外接物理上拉/下拉&#xff09;• 数字信号输入&#…...

Python应用指南:利用高德地图API获取POI数据(关键词版)

都说“为一杯奶茶愿意赴一座城”&#xff0c;曾经不知有多少年轻人为了一口茶颜悦色不惜跨越千里来到长沙打卡&#xff0c;而如今也有不少年轻人被传说中的“奶茶界的海底捞”所吸引&#xff0c;千里迢迢来到安徽只为尝一口卡旺卡奶茶。要说起来这卡旺卡奶茶&#xff0c;尽管这…...

【零基础入门unity游戏开发——2D篇】2D物理系统 —— 2D刚体组件(Rigidbody 2d)

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…...

Linux网络相关概念和重要知识(2)(UDP套接字编程、聊天室的实现、观察者模式)

目录 1.UDP套接字编程 &#xff08;1&#xff09;socket编程 &#xff08;2&#xff09;UDP的使用 ①socket ②bind ③recvfrom ④sendto 2.聊天室的实现 &#xff08;1&#xff09;整体逻辑 &#xff08;2&#xff09;对sockaddr_in的封装 &#xff08;3&#xff09…...

机器学习之条件概率

1. 引言 概率模型在机器学习中广泛应用于数据分析、模式识别和推理任务。本文将调研几种重要的概率模型,包括EM算法、MCMC、朴素贝叶斯、贝叶斯网络、概率图模型(CRF、HMM)以及最大熵模型,介绍其基本原理、算法流程、应用场景及优势。 2. EM算法(Expectation-Maximizati…...

国科云:浅谈DNS在IPv6改造过程中的重要性

在IPv6改造过程中&#xff0c;DNS&#xff08;域名系统&#xff09;起着至关重要的作用&#xff0c;主要体现在以下几个方面&#xff1a; 地址解析与映射 IPv6地址采用128位的地址空间&#xff0c;与IPv4的32位地址相比&#xff0c;地址长度大大增加&#xff0c;这使得手动记…...

JVM 03

今天是2025/03/24 15:21 day 11 总路线请移步主页Java大纲相关文章 今天进行JVM 5,6 个模块的归纳 首先是JVM的相关内容概括的思维导图 5. 优化技术 JVM通过多种优化技术提升程序执行效率&#xff0c;核心围绕热点代码检测和编译优化实现动态性能提升。 热点代码检测 JVM…...

【VUE】day07 路由

【VUE】day07 路由 1. 路由2. 前端路由的工作方式3. 实现简易的前端路由4. 安装和配置路由4.1 安装vue-router包4.2 创建路由模块4.3 导入并挂在路由模块 5. 在路由模块中声明路由的对应关系5.1 router-view 1. 路由 在 Vue.js 中&#xff0c;路由&#xff08;Routing&#xf…...

内网穿透的应用-本地部署ChatTTS教程:Windows搭建AI语音合成服务的全流程配置

文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 各位开发者小伙伴们&#xff01;今天我要给大家推荐一个超级火的AI项目——ChatTTS。这个开源文本转语音&#xff08;TTS&#xff09;项目的火爆程度简直让人难以置信&a…...

2025-03-21 Unity 网络基础3——TCP网络通信准备知识

文章目录 1 IP/端口类1.1 IPAddress1.2 IPEndPoint 2 域名解析2.1 IPHostEntry2.2 Dns 3 序列化与反序列化3.1 序列化3.1.1 内置类型 -> 字节数组3.1.2 字符串 -> 字节数组3.1.3 类对象 -> 字节数组 3.2 反序列化3.2.1 字节数组 -> 内置类型3.2.2 字节数组 -> 字…...