当前位置: 首页 > news >正文

netty入门(二十六)任务加入异步线程池源码剖析

1.handler中加入线程池和Context添加线程池

1.1 源码剖析目的

(1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。

(2)而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2中方式,而且这2种方式实现的区别也蛮大的。

(3)处理耗时业务的第一种方式 -- handler 中加入线程池

(4)处理耗时业务的第二种方式 -- Context 中添加线程池

1.2 处理耗时业务的第一种方式--handler 种加入线程池

对前面的 Netty demo 源码进行修改,在 EchoServerHandler 的 channelRead 方法进行异步

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

说明:

在 channelRead 方法,模拟了一个耗时 10 秒的操作,这里,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。

这样处理之后,整个程序的逻辑如图

 说明:

(1)解释上图,当 IO 线程轮询到一个 socket 事件,然后,IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。

(2)当耗时任务执行完毕再执行 pipeline write 方法的时候,(代码中使用的是 context 的 write 方法,上图画的是执行 pipeline 方法,是一个意思)会将任务交给 IO 线程

write 方法的源码(在AbstractChannelHandlerContext 类)

private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}}

说明:

(1)当判定下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务。

(2)这里可以Debug 来验证(提醒:Debug时,服务器端Debug ,客户端Run的方式),当我们使用了 group.submit(new Callable<Object>(){} 在handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)

1.3 处理耗时业务的第一种方式--Context 中添加线程池

在添加 pipeline 中的 handler 时候,添加一个线程池

public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));// 创建业务线程池// 创建2个子线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new LoggingHandler(LogLevel.INFO));
//                     p.addLast(new EchoServerHandler());// 说明:如果在 addLast 添加 handler,前面有指定 EventExecutorGroup,那么该 handler 会优先加入到该线程池中p.addLast(group, new EchoServerHandler());}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池/*group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});*/// 普通方式// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("普通调用方式的线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

说明:

(1)handler 中的代码就使用普通的方式来处理耗时业务。

(2)当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。

(3)当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程Context(也就是 Handler) 的 executor 是业务线程,所以会异步执行, debug 下源码。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() { //执行run@Overridepublic void run() {next.invokeChannelRead(m);}});}}

(4)验证时,我们如果去掉 p.addLast(group,new EchoServerHandler() ); 改成 p.addLastnew EchoServerHandler() ); 你会发现代码不会进行异步执行。

(5)后面的整个流程就变成和第一个方式一样了

1.4 两种方式的比较

  1. 第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进 mpscTask 中。如果IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标。
  2. 第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
  3. 各有优劣,从灵活性考虑,第一种较好。

相关文章:

netty入门(二十六)任务加入异步线程池源码剖析

1.handler中加入线程池和Context添加线程池 1.1 源码剖析目的 &#xff08;1&#xff09;在 Netty 中做耗时的&#xff0c;不可预料的操作&#xff0c;比如&#xff1a;数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。 &#xff08;2&#xff09;而解决方法就是…...

神经网络算法入门和代码

文章内容 感知机&#xff08;Perceptron&#xff09;反向传播算法&#xff08;Back Propagation algorithm&#xff09;RBF(Radial Basis Function&#xff0c;径向基函数) 网络&#xff1a;单一层前馈网络&#xff0c;它使用径向基作为隐层神经元激活函数ART(Adaptive Resona…...

如何用一个端口同时暴露 HTTP1/2、gRPC、Dubbo 协议?

作者&#xff1a;华钟明 本文我们将介绍 Apache Dubbo 灵活的多协议设计原则&#xff0c;基于这一设计&#xff0c;在 Dubbo 框架底层可灵活的选用 HTTP/2、HTTP/REST、TCP、gRPC、JsonRPC、Hessian2 等任一 RPC 通信协议&#xff0c;同时享用统一的 API 与对等的服务治理能力。…...

ToBeWritten之杂项2

也许每个人出生的时候都以为这世界都是为他一个人而存在的&#xff0c;当他发现自己错的时候&#xff0c;他便开始长大 少走了弯路&#xff0c;也就错过了风景&#xff0c;无论如何&#xff0c;感谢经历 转移发布平台通知&#xff1a;将不再在CSDN博客发布新文章&#xff0c;敬…...

Linux三剑客之awk命令详解

1、概述 Linux三剑客&#xff1a;grep、sed、awk。grep主打查找功能&#xff0c;sed主要是编辑行&#xff0c;awk主要是分割列处理。本篇文章我们详细介绍awk命令。 awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。awk是一种编…...

C++异常处理:掌握高效、健壮代码的秘密武器

C异常处理全面解析&#xff1a;底层原理、编译器技巧与实用场景C异常机制&#xff1a;让我们迈向更安全、更可靠的代码C异常处理&#xff1a;掌握基本概念什么是异常&#xff1f;异常处理的重要性C异常处理的组成部分&#xff1a;try、catch、throw探索C异常处理的核心&#xf…...

Jetpack Compose基础组件之按钮组件

概述 按钮组件Button是用户和系统交互的重要组件之一&#xff0c;它按照Material Design风格实现&#xff0c;我们先看下Button的参数列表&#xff0c;通过参数列表了解下Button的整体功能 Composable fun Button(onClick: () -> Unit, // 点击按钮时的回调modifier: Modi…...

利用json-server快速在本地搭建一个JSON服务

1&#xff0c;json-server介绍 一个在前端本地运行&#xff0c;可以存储json数据的server。 通俗来说&#xff0c;就是模拟服务端接口数据&#xff0c;一般用在前后端分离后&#xff0c;前端人员可以不依赖API开发&#xff0c;而在本地搭建一个JSON服务&#xff0c;自己产生测…...

可重入函数与线程安全

指令乱序和线程安全 先来看什么是指令乱序问题以及为什么有指令乱序。程序的代码执行顺序有可能被编译器或CPU根据某种策略打乱指令执行顺序&#xff0c;目的是提升程序的执行性能&#xff0c;让程序的执行尽可能并行&#xff0c;这就是所谓指令乱序问题。理解指令乱序的策略是…...

一文彻底读懂异地多活

文章目录 系统可用性单机架构主从副本风险不可控同城灾备同城双活两地三中心伪异地双活真正的异地双活如何实施异地双活1、按业务类型分片2、直接哈希分片3、按地理位置分片异地多活总结系统可用性 要想理解异地多活,我们需要从架构设计的原则说起。 现如今,我们开发一个软件…...

孕酮PEG偶联物:mPEG Progestrone,PEG Progestrone,甲氧基聚乙二醇孕酮

中文名称&#xff1a;甲氧基聚乙二醇孕酮 英文名称&#xff1a;mPEG Progestrone&#xff0c;PEG Progestrone 一、反应机理&#xff1a; 孕酮-PEG衍生物是一类具有生物活性的类固醇-PEG偶联物&#xff0c;可用于药物发现或生物测定开发。孕酮是一种女性性激素&#xff0c;负…...

网络系统集成实验(一)| 网络系统集成基础

目录 一、前言 二、实验目的 三、实验需求 四、实验步骤与现象 &#xff08;1&#xff09;网络设置、网络命令的使用 ① 在华为设备中&#xff0c;常用指令的使用 ② 在思科设备中&#xff0c;常用指令的使用 ③ 在Windows设备中&#xff0c;常用网络指令的使用 &#xf…...

php composer 如何安装windows电脑

在 Windows 电脑上安装 PHP Composer&#xff0c;你需要按照以下步骤操作&#xff1a; 安装 PHP 确保你的电脑上已经安装了 PHP。如果还没有安装&#xff0c;可以从 PHP 官网&#xff08;https://www.php.net/downloads.php&#xff09;下载安装包并安装。 设置环境变量 将 P…...

API 鉴权插件上线!支持用户自定义鉴权插件

0.4.0 版本更新主要围绕这几个方面&#xff1a; 分组独立的 UI&#xff0c;支持分组 API 鉴权 API 测试支持继承 API 鉴权 支持用户自定义鉴权插件&#xff0c;仅需部分配置即可发布鉴权插件 开始介绍功能之前&#xff0c;我想先和大家分享一下鉴权功能设计的一些思考。 其实…...

2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1)

2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1) *1.Python自带的编程环境是? A、PyScripter B、Spyder C、Notepad++ D、IDLE *2.假设a=20,b-3,那么a or b的结果是? () A、20 B、0 C.1 D.3 *3.假设a=2,b=3,那么a-b*b的值是? A、 3 B、-2 C、-7 D、-11 *4.…...

day21—编程题

文章目录1.第一题1.1题目1.2思路1.3解题2.第二题2.1题目2.2思路2.3解题1.第一题 1.1题目 描述&#xff1a; 洗牌在生活中十分常见&#xff0c;现在需要写一个程序模拟洗牌的过程。 现在需要洗2n张牌&#xff0c;从上到下依次是第1张&#xff0c;第2张&#xff0c;第3张一直到…...

【数据结构】栈与队列经典选择题

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a; &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对我最大的激励…...

Linux常用命令详细示例演示

一、Linux 常用命令一览表 Linux 下命令格式&#xff1a; command [-options] [parameter] 命令 [选项] [参数] command 是命令 例如&#xff1a;ls cd copy[-options] 带方括号的都是可选的 一些选项 例如&#xff1a;ls -l 中的 -l[parameter] 可选参数&#xff0c;可以是 0…...

9-数据可视化-动态柱状图

文章目录1.基础柱状图2.基础时间线柱状图3.动态柱状图1.基础柱状图 from pyecharts.charts import Bar bar Bar() # 构建柱状图对象 bar.add_xaxis(["中国","美国","英国"]) bar.add_yaxis("GDP",[30,20,10]) bar.render()反转xy轴…...

Linux系统【centos7x】安装宝塔面板教程

1. 下载宝塔面板安装包 在宝塔官网下载最新版的安装包&#xff0c;下载完后上传到服务器。 2. 安装宝塔面板 在终端中输入以下命令&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!

目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...