netty入门(二十六)任务加入异步线程池源码剖析
1.handler中加入线程池和Context添加线程池
1.1 源码剖析目的
(1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。
(2)而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2中方式,而且这2种方式实现的区别也蛮大的。
(3)处理耗时业务的第一种方式 -- handler 中加入线程池
(4)处理耗时业务的第二种方式 -- Context 中添加线程池
1.2 处理耗时业务的第一种方式--handler 种加入线程池
对前面的 Netty demo 源码进行修改,在 EchoServerHandler 的 channelRead 方法进行异步
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
说明:
在 channelRead 方法,模拟了一个耗时 10 秒的操作,这里,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。
这样处理之后,整个程序的逻辑如图
说明:
(1)解释上图,当 IO 线程轮询到一个 socket 事件,然后,IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。
(2)当耗时任务执行完毕再执行 pipeline write 方法的时候,(代码中使用的是 context 的 write 方法,上图画的是执行 pipeline 方法,是一个意思)会将任务交给 IO 线程
write 方法的源码(在AbstractChannelHandlerContext 类)
private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);} else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}}
说明:
(1)当判定下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务。
(2)这里可以Debug 来验证(提醒:Debug时,服务器端Debug ,客户端Run的方式),当我们使用了 group.submit(new Callable<Object>(){} 在handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)
1.3 处理耗时业务的第一种方式--Context 中添加线程池
在添加 pipeline 中的 handler 时候,添加一个线程池
public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));// 创建业务线程池// 创建2个子线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new LoggingHandler(LogLevel.INFO));
// p.addLast(new EchoServerHandler());// 说明:如果在 addLast 添加 handler,前面有指定 EventExecutorGroup,那么该 handler 会优先加入到该线程池中p.addLast(group, new EchoServerHandler());}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池/*group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});*/// 普通方式// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("普通调用方式的线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
说明:
(1)handler 中的代码就使用普通的方式来处理耗时业务。
(2)当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。
(3)当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程Context(也就是 Handler) 的 executor 是业务线程,所以会异步执行, debug 下源码。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() { //执行run@Overridepublic void run() {next.invokeChannelRead(m);}});}}
(4)验证时,我们如果去掉 p.addLast(group,new EchoServerHandler() ); 改成 p.addLastnew EchoServerHandler() ); 你会发现代码不会进行异步执行。
(5)后面的整个流程就变成和第一个方式一样了
1.4 两种方式的比较
- 第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进 mpscTask 中。如果IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标。
- 第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
- 各有优劣,从灵活性考虑,第一种较好。
相关文章:

netty入门(二十六)任务加入异步线程池源码剖析
1.handler中加入线程池和Context添加线程池 1.1 源码剖析目的 (1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。 (2)而解决方法就是…...

神经网络算法入门和代码
文章内容 感知机(Perceptron)反向传播算法(Back Propagation algorithm)RBF(Radial Basis Function,径向基函数) 网络:单一层前馈网络,它使用径向基作为隐层神经元激活函数ART(Adaptive Resona…...

如何用一个端口同时暴露 HTTP1/2、gRPC、Dubbo 协议?
作者:华钟明 本文我们将介绍 Apache Dubbo 灵活的多协议设计原则,基于这一设计,在 Dubbo 框架底层可灵活的选用 HTTP/2、HTTP/REST、TCP、gRPC、JsonRPC、Hessian2 等任一 RPC 通信协议,同时享用统一的 API 与对等的服务治理能力。…...

ToBeWritten之杂项2
也许每个人出生的时候都以为这世界都是为他一个人而存在的,当他发现自己错的时候,他便开始长大 少走了弯路,也就错过了风景,无论如何,感谢经历 转移发布平台通知:将不再在CSDN博客发布新文章,敬…...

Linux三剑客之awk命令详解
1、概述 Linux三剑客:grep、sed、awk。grep主打查找功能,sed主要是编辑行,awk主要是分割列处理。本篇文章我们详细介绍awk命令。 awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。awk是一种编…...
C++异常处理:掌握高效、健壮代码的秘密武器
C异常处理全面解析:底层原理、编译器技巧与实用场景C异常机制:让我们迈向更安全、更可靠的代码C异常处理:掌握基本概念什么是异常?异常处理的重要性C异常处理的组成部分:try、catch、throw探索C异常处理的核心…...

Jetpack Compose基础组件之按钮组件
概述 按钮组件Button是用户和系统交互的重要组件之一,它按照Material Design风格实现,我们先看下Button的参数列表,通过参数列表了解下Button的整体功能 Composable fun Button(onClick: () -> Unit, // 点击按钮时的回调modifier: Modi…...

利用json-server快速在本地搭建一个JSON服务
1,json-server介绍 一个在前端本地运行,可以存储json数据的server。 通俗来说,就是模拟服务端接口数据,一般用在前后端分离后,前端人员可以不依赖API开发,而在本地搭建一个JSON服务,自己产生测…...

可重入函数与线程安全
指令乱序和线程安全 先来看什么是指令乱序问题以及为什么有指令乱序。程序的代码执行顺序有可能被编译器或CPU根据某种策略打乱指令执行顺序,目的是提升程序的执行性能,让程序的执行尽可能并行,这就是所谓指令乱序问题。理解指令乱序的策略是…...
一文彻底读懂异地多活
文章目录 系统可用性单机架构主从副本风险不可控同城灾备同城双活两地三中心伪异地双活真正的异地双活如何实施异地双活1、按业务类型分片2、直接哈希分片3、按地理位置分片异地多活总结系统可用性 要想理解异地多活,我们需要从架构设计的原则说起。 现如今,我们开发一个软件…...

孕酮PEG偶联物:mPEG Progestrone,PEG Progestrone,甲氧基聚乙二醇孕酮
中文名称:甲氧基聚乙二醇孕酮 英文名称:mPEG Progestrone,PEG Progestrone 一、反应机理: 孕酮-PEG衍生物是一类具有生物活性的类固醇-PEG偶联物,可用于药物发现或生物测定开发。孕酮是一种女性性激素,负…...

网络系统集成实验(一)| 网络系统集成基础
目录 一、前言 二、实验目的 三、实验需求 四、实验步骤与现象 (1)网络设置、网络命令的使用 ① 在华为设备中,常用指令的使用 ② 在思科设备中,常用指令的使用 ③ 在Windows设备中,常用网络指令的使用 …...

php composer 如何安装windows电脑
在 Windows 电脑上安装 PHP Composer,你需要按照以下步骤操作: 安装 PHP 确保你的电脑上已经安装了 PHP。如果还没有安装,可以从 PHP 官网(https://www.php.net/downloads.php)下载安装包并安装。 设置环境变量 将 P…...

API 鉴权插件上线!支持用户自定义鉴权插件
0.4.0 版本更新主要围绕这几个方面: 分组独立的 UI,支持分组 API 鉴权 API 测试支持继承 API 鉴权 支持用户自定义鉴权插件,仅需部分配置即可发布鉴权插件 开始介绍功能之前,我想先和大家分享一下鉴权功能设计的一些思考。 其实…...
2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1)
2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1) *1.Python自带的编程环境是? A、PyScripter B、Spyder C、Notepad++ D、IDLE *2.假设a=20,b-3,那么a or b的结果是? () A、20 B、0 C.1 D.3 *3.假设a=2,b=3,那么a-b*b的值是? A、 3 B、-2 C、-7 D、-11 *4.…...
day21—编程题
文章目录1.第一题1.1题目1.2思路1.3解题2.第二题2.1题目2.2思路2.3解题1.第一题 1.1题目 描述: 洗牌在生活中十分常见,现在需要写一个程序模拟洗牌的过程。 现在需要洗2n张牌,从上到下依次是第1张,第2张,第3张一直到…...

【数据结构】栈与队列经典选择题
🚀write in front🚀 📜所属专栏: 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对我最大的激励…...

Linux常用命令详细示例演示
一、Linux 常用命令一览表 Linux 下命令格式: command [-options] [parameter] 命令 [选项] [参数] command 是命令 例如:ls cd copy[-options] 带方括号的都是可选的 一些选项 例如:ls -l 中的 -l[parameter] 可选参数,可以是 0…...

9-数据可视化-动态柱状图
文章目录1.基础柱状图2.基础时间线柱状图3.动态柱状图1.基础柱状图 from pyecharts.charts import Bar bar Bar() # 构建柱状图对象 bar.add_xaxis(["中国","美国","英国"]) bar.add_yaxis("GDP",[30,20,10]) bar.render()反转xy轴…...

Linux系统【centos7x】安装宝塔面板教程
1. 下载宝塔面板安装包 在宝塔官网下载最新版的安装包,下载完后上传到服务器。 2. 安装宝塔面板 在终端中输入以下命令: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...

基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
接口 RESTful 中的超媒体:REST 架构的灵魂驱动
在 RESTful 架构中,** 超媒体(Hypermedia)** 是一个核心概念,它体现了 REST 的 “表述性状态转移(Representational State Transfer)” 的本质,也是区分 “真 RESTful API” 与 “伪 RESTful AP…...
【大厂机试题解法笔记】矩阵匹配
题目 从一个 N * M(N ≤ M)的矩阵中选出 N 个数,任意两个数字不能在同一行或同一列,求选出来的 N 个数中第 K 大的数字的最小值是多少。 输入描述 输入矩阵要求:1 ≤ K ≤ N ≤ M ≤ 150 输入格式 N M K N*M矩阵 输…...

以太网PHY布局布线指南
1. 简介 对于以太网布局布线遵循以下准则很重要,因为这将有助于减少信号发射,最大程度地减少噪声,确保器件作用,最大程度地减少泄漏并提高信号质量。 2. PHY设计准则 2.1 DRC错误检查 首先检查DRC规则是否设置正确,然…...