Netty-ChannelPipeline
EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件,为用户提供了 I/O 事件的全部控制权。
文章目录
- 一、ChannelPipeline 是什么?🤔️
- 二、ChannelPipeline 的内部结构🔍
- 1、HeadContext
- 2、TailContext
- 3、addLiast() 方法🔍
- 三、ChannelPipeline 事件传播机制
- 四、ChannelPipeline 异常传播机制
- 五、统一的异常处理器
一、ChannelPipeline 是什么?🤔️
pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。
ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性。ChannelPipeline维护着处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。
二、ChannelPipeline 的内部结构🔍
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}
从 ChannelPipeline 的构造函数可以看出,ChannelPipeline 维护了一组 ChannelHandlerContext 实例组成双向链表。默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理。我们自定义的ChannelHandler会插入到 head 和 tail 之间,这两个节点在 Netty 中已经默认实现了,它们在ChannelPipeline 中起到了至关重要的作用。
那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?
其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。
可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。
首先我们看下 HeadContext 和 TailContext 的继承关系

1、HeadContext
通过集成关系我们发现 HeadContext 分别实现了ChannelInboundHandler 和 ChannelOutboundHandler,即 HeadContext 既是 入站处理器,也是出站处理器。
HeadContext是入站的第一站,出站的最后一站。对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。
2、TailContext
TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 入站事件传播,例如释放 Message 数据资源等。
TailContext是入站的最后一站,出站的第一站。TailContext节点作为出站事件传播的第一站,仅仅是将出站事件传递给下一个节点。
从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。
3、addLiast() 方法🔍
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理,关于ChannelHandler将会在下文中详细讲解!

三、ChannelPipeline 事件传播机制
入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理。

接下来用一个示例来讲解~
服务端代码,
public class PipelineServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup(2);new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(1);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(2);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(3);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(4);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(5);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(6);super.write(ctx, msg, promise);}});}}).bind(8080);}
}
客户端代码,
public class PipelineClient {public static void main(String[] args) throws InterruptedException {new Bootstrap().group( new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).sync().channel().writeAndFlush("Hello,server!");}
}
依次启动服务端和客户端,服务端打印如下:
1
2
3
以上我们通过 Pipeline 的 addLast 方法分别添加了三个 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,添加顺序分别是 1 -> 2 -> 3,4 -> 5 -> 6。
此时为什么没有打印 4、5、6呢,即没有触发出站的操作❓
出站处理器只有向channel中写入数据才会触发,我们在第三个 ChannelInboundHandlerAdapter 实现类中加入以下代码!
通过依次点入,我们发现最终是调用了 tail节点 的writeAndFlush 方法,即TailContext节点作为出站事件传播的第一站!
最终服务端打印如下:
1
2
3
6
5
4
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
- ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己
如图,服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

四、ChannelPipeline 异常传播机制
ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 第二个 ChannelInboundHandlerAdapter 实现类 的实现来模拟业务逻辑异常:

由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点。

如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:

五、统一的异常处理器
在 Netty 应用开发的过程中,良好的异常处理机制会让开发在排查问题的时候事半功倍。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?
小编个人推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。
通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器!
/*** 自定义异常处理器*/
public static class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof RuntimeException) {System.out.println();log.error("业务异常处理,异常信息:{}", cause.getMessage());}}
}

相关文章:
Netty-ChannelPipeline
EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程…...
从入门到精通,30天带你学会C++【第六天:与或非三兄弟和If判断语句(博主目前最长文章,2514字)】(学不会你找我)
目录 前言 计算机里的真和假 与或非三兄弟 编辑与运算(&&) 具体说明表格: 举个栗子1: 或运算(||) 具体说明表格: 举个栗子2: 非运算(!)…...
如何快速找出占用空间最大的文件?
分析&回答 使用 find 命令找到大于指定大小的文件: 当前目录大于500M文件 find ./ -size 500M用户目录大于500M文件 find ~ -type f -size 500M根目录大于500M文件 find / -type f -size 500M 复制代码 让文件按大小排序 du -h * | sort -n 喵呜面试助手&am…...
算法:分治思想处理归并递归问题
文章目录 算法原理实现思路典型例题排序数组数组中的逆序对计算右侧小于当前元素的个数 总结 算法原理 利用归并思想进行分治也是很重要的一种思路,在解决逆序对的问题上有很大的需求空间 于是首先归并排序是首先的,归并排序要能写出来: c…...
小白学Go 基础02-了解Go语言的诞生与演进
Go语言诞生于何时?它的最初设计者是谁?它为什么被命名为Go?它的设计目标是什么?它如今发展得怎么样?带着这些问题,我们一起穿越时空,回到2007年9月Go语言诞生的那一历史时刻吧。 Go语言的诞生 …...
python中如何将十进制转成二进制
python中如何将十进制转成二进制 在 Python 中,你可以使用内置的 bin() 函数将十进制数转换为二进制表示形式。以下是使用 bin() 函数进行转换的示例: decimal_number 10binary_number bin(decimal_number)print(binary_number) # 输出:…...
数据结构--5.0.1图的存储结构
目录 一、邻接矩阵(无向图) 二、邻接矩阵(有向图) 三、邻接矩阵(网) 四、邻接表(无向图) 五、邻接表(有向图) ——图的存储结构相比较线性表与树来说就复…...
解决win10 wsl子系统安装的ubuntu环境中lsof,netstat命令查看端口没有任何输出的问题
最近有个以前的ssm项目需要在新电脑上运行测试一下,发现需要redis环境,看了官网说:有两种选择: 1. 要么在虚拟机比如vmware安装linux基础环境,然后再安装redis 2. 要么可以利用win10的wsl linux子系统安装ubuntu&…...
【OpenFeign】OpenFeign结合Hystrix和Sentinel实现熔断降级
OpenFeign可以与Hystrix和Sentinel结合使用,实现降级和熔断。 OpenFeign与Hystrix结合使用 使用OpenFeign需要引入OpenFeign的依赖: <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-sta…...
软件工程(十) 需求工程之需求开发与管理
前面我们学习到了需求工程的概念与分类,我们知道了需求工程主要分为需求开发和需求管理,但是没有说明到底该如何开发需求,有哪些方法去开发需求。到底该如何进行需求管理,又有哪些进行需求管理的方式。具体是如何去做的。下面我们将会详细进行描述。 1、需求开发 1.1、需…...
C++网狐服务器引入开源日志库spdlog
很多人对日志库不以为然,包括网狐这种十几年的公司都不重视,其实日志库记录的东西能在线上出问题时高效解决,特别是别人写的东西,人又走了,出了问题,还可以用日志分析快速解决。要是没有日志记录࿰…...
【C++】—— c++11之智能指针
前言: 本期,我们将要学习的是在c11中新提出的概念——异常指针! 目录 (一)智能指针的引入 (二)内存泄漏 1、什么是内存泄漏,内存泄漏的危害 2、内存泄漏分类(了解&a…...
html5——前端笔记
html 一、html51.1、理解html结构1.2、h1 - h6 (标题标签)1.3、p (段落和换行标签)1.4、br 换行标签1.5、文本格式化1.6、div 和 span 标签1.7、img 图像标签1.8、a 超链接标签1.9、table表格标签1.9.1、表格标签1.9.2、表格结构标签1.9.3、合并单元格 1.10、列表1.10.1、ul无序…...
如何在 Vue TypeScript 项目使用 emits 事件
Vue是构建出色的Web应用程序的最灵活、灵活和强大的JavaScript框架之一。Vue中最重要的概念和关键特性之一是能够促进应用程序组件之间的通信。让我们深入探讨一下Vue中的“emits”概念,并了解它们如何以流畅和无缝的方式实现父子组件之间的通信。 Vue中的emits是什…...
文件操作 黑马教程(04)
1.文本文件 写文件 #include "iostream" #include "fstream" using namespace std; /** 文件操作** 程序运行时产生的数据都属于临时数据,程序一旦结束都会被释放* 通过文件可以将数据持久化* C中对文件操作需要包含头文件<fstream>** 文…...
Jmeter(二十七):BeanShell PostProcessor跨线程全局变量使用
在性能测试中,两个相关联的接口不一定都在同一个线程组,遇见这种情况时,我们要进行跨线程组传参,此处用登录和查询配送单两个请求举例; 1、登录请求中配置json提取器,将接口返回的token保存在变量中&#…...
手写表格OCR识别并与大模型ChatGPT交互?
这是一张手写表格,姓名做了脱敏处理。现在需要对其识别,并分析。 直接粘贴剪切板中的表格原始图片,在网页中ctlV进行识别。识别结果列用分隔符|,可以直接粘贴到excel,进行数据列分隔。为了美观期间,也可以用…...
使用 v-for 指令和数组来实现在 Uni-app 中动态增减表单项并渲染多个数据
在 data 中定义一个数组,用于存储表单项的数据: data() {return {formItems: []} } 在模板中使用 v-for 指令渲染表单项: <template><div><div v-for"(item, index) in formItems" :key"index"><…...
xml
1.xml 1.1概述【理解】 万维网联盟(W3C) 万维网联盟(W3C)创建于1994年,又称W3C理事会。1994年10月在麻省理工学院计算机科学实验室成立。 建立者: Tim Berners-Lee (蒂姆伯纳斯李)。 是Web技术领域最具权威和影响力的国际中立性技术标准机构。 到目前为…...
Java中的动态代理(JDK Proxy VS CGLib)
前言 动态代理可以说是Java基础中一个比较重要的内容,这块内容关系到Spring框架中的AOP实现原理,所以特别写了一篇作为个人对这块知识的总结。这部分内容主要包括:JDK Proxy和CGLib的基本介绍、二者的实现原理、代码示例等。 什么是动态代理…...
IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...


