Netty-ChannelPipeline
EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件,为用户提供了 I/O 事件的全部控制权。
文章目录
- 一、ChannelPipeline 是什么?🤔️
- 二、ChannelPipeline 的内部结构🔍
- 1、HeadContext
- 2、TailContext
- 3、addLiast() 方法🔍
- 三、ChannelPipeline 事件传播机制
- 四、ChannelPipeline 异常传播机制
- 五、统一的异常处理器
一、ChannelPipeline 是什么?🤔️
pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。
ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性。ChannelPipeline维护着处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。
二、ChannelPipeline 的内部结构🔍
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}
从 ChannelPipeline 的构造函数可以看出,ChannelPipeline 维护了一组 ChannelHandlerContext 实例组成双向链表。默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理。我们自定义的ChannelHandler会插入到 head 和 tail 之间,这两个节点在 Netty 中已经默认实现了,它们在ChannelPipeline 中起到了至关重要的作用。
那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?
其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。
可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。
首先我们看下 HeadContext 和 TailContext 的继承关系

1、HeadContext
通过集成关系我们发现 HeadContext 分别实现了ChannelInboundHandler 和 ChannelOutboundHandler,即 HeadContext 既是 入站处理器,也是出站处理器。
HeadContext是入站的第一站,出站的最后一站。对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。
2、TailContext
TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 入站事件传播,例如释放 Message 数据资源等。
TailContext是入站的最后一站,出站的第一站。TailContext节点作为出站事件传播的第一站,仅仅是将出站事件传递给下一个节点。
从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。
3、addLiast() 方法🔍
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理,关于ChannelHandler将会在下文中详细讲解!

三、ChannelPipeline 事件传播机制
入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理。

接下来用一个示例来讲解~
服务端代码,
public class PipelineServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup(2);new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(1);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(2);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(3);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(4);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(5);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(6);super.write(ctx, msg, promise);}});}}).bind(8080);}
}
客户端代码,
public class PipelineClient {public static void main(String[] args) throws InterruptedException {new Bootstrap().group( new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).sync().channel().writeAndFlush("Hello,server!");}
}
依次启动服务端和客户端,服务端打印如下:
1
2
3
以上我们通过 Pipeline 的 addLast 方法分别添加了三个 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,添加顺序分别是 1 -> 2 -> 3,4 -> 5 -> 6。
此时为什么没有打印 4、5、6呢,即没有触发出站的操作❓
出站处理器只有向channel中写入数据才会触发,我们在第三个 ChannelInboundHandlerAdapter 实现类中加入以下代码!
通过依次点入,我们发现最终是调用了 tail节点 的writeAndFlush 方法,即TailContext节点作为出站事件传播的第一站!
最终服务端打印如下:
1
2
3
6
5
4
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
- ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己
如图,服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

四、ChannelPipeline 异常传播机制
ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 第二个 ChannelInboundHandlerAdapter 实现类 的实现来模拟业务逻辑异常:

由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点。

如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:

五、统一的异常处理器
在 Netty 应用开发的过程中,良好的异常处理机制会让开发在排查问题的时候事半功倍。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?
小编个人推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。
通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器!
/*** 自定义异常处理器*/
public static class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof RuntimeException) {System.out.println();log.error("业务异常处理,异常信息:{}", cause.getMessage());}}
}

相关文章:
Netty-ChannelPipeline
EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程…...
从入门到精通,30天带你学会C++【第六天:与或非三兄弟和If判断语句(博主目前最长文章,2514字)】(学不会你找我)
目录 前言 计算机里的真和假 与或非三兄弟 编辑与运算(&&) 具体说明表格: 举个栗子1: 或运算(||) 具体说明表格: 举个栗子2: 非运算(!)…...
如何快速找出占用空间最大的文件?
分析&回答 使用 find 命令找到大于指定大小的文件: 当前目录大于500M文件 find ./ -size 500M用户目录大于500M文件 find ~ -type f -size 500M根目录大于500M文件 find / -type f -size 500M 复制代码 让文件按大小排序 du -h * | sort -n 喵呜面试助手&am…...
算法:分治思想处理归并递归问题
文章目录 算法原理实现思路典型例题排序数组数组中的逆序对计算右侧小于当前元素的个数 总结 算法原理 利用归并思想进行分治也是很重要的一种思路,在解决逆序对的问题上有很大的需求空间 于是首先归并排序是首先的,归并排序要能写出来: c…...
小白学Go 基础02-了解Go语言的诞生与演进
Go语言诞生于何时?它的最初设计者是谁?它为什么被命名为Go?它的设计目标是什么?它如今发展得怎么样?带着这些问题,我们一起穿越时空,回到2007年9月Go语言诞生的那一历史时刻吧。 Go语言的诞生 …...
python中如何将十进制转成二进制
python中如何将十进制转成二进制 在 Python 中,你可以使用内置的 bin() 函数将十进制数转换为二进制表示形式。以下是使用 bin() 函数进行转换的示例: decimal_number 10binary_number bin(decimal_number)print(binary_number) # 输出:…...
数据结构--5.0.1图的存储结构
目录 一、邻接矩阵(无向图) 二、邻接矩阵(有向图) 三、邻接矩阵(网) 四、邻接表(无向图) 五、邻接表(有向图) ——图的存储结构相比较线性表与树来说就复…...
解决win10 wsl子系统安装的ubuntu环境中lsof,netstat命令查看端口没有任何输出的问题
最近有个以前的ssm项目需要在新电脑上运行测试一下,发现需要redis环境,看了官网说:有两种选择: 1. 要么在虚拟机比如vmware安装linux基础环境,然后再安装redis 2. 要么可以利用win10的wsl linux子系统安装ubuntu&…...
【OpenFeign】OpenFeign结合Hystrix和Sentinel实现熔断降级
OpenFeign可以与Hystrix和Sentinel结合使用,实现降级和熔断。 OpenFeign与Hystrix结合使用 使用OpenFeign需要引入OpenFeign的依赖: <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-sta…...
软件工程(十) 需求工程之需求开发与管理
前面我们学习到了需求工程的概念与分类,我们知道了需求工程主要分为需求开发和需求管理,但是没有说明到底该如何开发需求,有哪些方法去开发需求。到底该如何进行需求管理,又有哪些进行需求管理的方式。具体是如何去做的。下面我们将会详细进行描述。 1、需求开发 1.1、需…...
C++网狐服务器引入开源日志库spdlog
很多人对日志库不以为然,包括网狐这种十几年的公司都不重视,其实日志库记录的东西能在线上出问题时高效解决,特别是别人写的东西,人又走了,出了问题,还可以用日志分析快速解决。要是没有日志记录࿰…...
【C++】—— c++11之智能指针
前言: 本期,我们将要学习的是在c11中新提出的概念——异常指针! 目录 (一)智能指针的引入 (二)内存泄漏 1、什么是内存泄漏,内存泄漏的危害 2、内存泄漏分类(了解&a…...
html5——前端笔记
html 一、html51.1、理解html结构1.2、h1 - h6 (标题标签)1.3、p (段落和换行标签)1.4、br 换行标签1.5、文本格式化1.6、div 和 span 标签1.7、img 图像标签1.8、a 超链接标签1.9、table表格标签1.9.1、表格标签1.9.2、表格结构标签1.9.3、合并单元格 1.10、列表1.10.1、ul无序…...
如何在 Vue TypeScript 项目使用 emits 事件
Vue是构建出色的Web应用程序的最灵活、灵活和强大的JavaScript框架之一。Vue中最重要的概念和关键特性之一是能够促进应用程序组件之间的通信。让我们深入探讨一下Vue中的“emits”概念,并了解它们如何以流畅和无缝的方式实现父子组件之间的通信。 Vue中的emits是什…...
文件操作 黑马教程(04)
1.文本文件 写文件 #include "iostream" #include "fstream" using namespace std; /** 文件操作** 程序运行时产生的数据都属于临时数据,程序一旦结束都会被释放* 通过文件可以将数据持久化* C中对文件操作需要包含头文件<fstream>** 文…...
Jmeter(二十七):BeanShell PostProcessor跨线程全局变量使用
在性能测试中,两个相关联的接口不一定都在同一个线程组,遇见这种情况时,我们要进行跨线程组传参,此处用登录和查询配送单两个请求举例; 1、登录请求中配置json提取器,将接口返回的token保存在变量中&#…...
手写表格OCR识别并与大模型ChatGPT交互?
这是一张手写表格,姓名做了脱敏处理。现在需要对其识别,并分析。 直接粘贴剪切板中的表格原始图片,在网页中ctlV进行识别。识别结果列用分隔符|,可以直接粘贴到excel,进行数据列分隔。为了美观期间,也可以用…...
使用 v-for 指令和数组来实现在 Uni-app 中动态增减表单项并渲染多个数据
在 data 中定义一个数组,用于存储表单项的数据: data() {return {formItems: []} } 在模板中使用 v-for 指令渲染表单项: <template><div><div v-for"(item, index) in formItems" :key"index"><…...
xml
1.xml 1.1概述【理解】 万维网联盟(W3C) 万维网联盟(W3C)创建于1994年,又称W3C理事会。1994年10月在麻省理工学院计算机科学实验室成立。 建立者: Tim Berners-Lee (蒂姆伯纳斯李)。 是Web技术领域最具权威和影响力的国际中立性技术标准机构。 到目前为…...
Java中的动态代理(JDK Proxy VS CGLib)
前言 动态代理可以说是Java基础中一个比较重要的内容,这块内容关系到Spring框架中的AOP实现原理,所以特别写了一篇作为个人对这块知识的总结。这部分内容主要包括:JDK Proxy和CGLib的基本介绍、二者的实现原理、代码示例等。 什么是动态代理…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
机器学习的数学基础:线性模型
线性模型 线性模型的基本形式为: f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法,得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...


