netty入门(二十六)任务加入异步线程池源码剖析
1.handler中加入线程池和Context添加线程池
1.1 源码剖析目的
(1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。
(2)而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2中方式,而且这2种方式实现的区别也蛮大的。
(3)处理耗时业务的第一种方式 -- handler 中加入线程池
(4)处理耗时业务的第二种方式 -- Context 中添加线程池
1.2 处理耗时业务的第一种方式--handler 种加入线程池
对前面的 Netty demo 源码进行修改,在 EchoServerHandler 的 channelRead 方法进行异步
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
说明:
在 channelRead 方法,模拟了一个耗时 10 秒的操作,这里,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。
这样处理之后,整个程序的逻辑如图

说明:
(1)解释上图,当 IO 线程轮询到一个 socket 事件,然后,IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。
(2)当耗时任务执行完毕再执行 pipeline write 方法的时候,(代码中使用的是 context 的 write 方法,上图画的是执行 pipeline 方法,是一个意思)会将任务交给 IO 线程
write 方法的源码(在AbstractChannelHandlerContext 类)
private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);} else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}}
说明:
(1)当判定下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务。
(2)这里可以Debug 来验证(提醒:Debug时,服务器端Debug ,客户端Run的方式),当我们使用了 group.submit(new Callable<Object>(){} 在handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)
1.3 处理耗时业务的第一种方式--Context 中添加线程池
在添加 pipeline 中的 handler 时候,添加一个线程池
public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));// 创建业务线程池// 创建2个子线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new LoggingHandler(LogLevel.INFO));
// p.addLast(new EchoServerHandler());// 说明:如果在 addLast 添加 handler,前面有指定 EventExecutorGroup,那么该 handler 会优先加入到该线程池中p.addLast(group, new EchoServerHandler());}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池/*group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});*/// 普通方式// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("普通调用方式的线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
说明:
(1)handler 中的代码就使用普通的方式来处理耗时业务。
(2)当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。
(3)当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程Context(也就是 Handler) 的 executor 是业务线程,所以会异步执行, debug 下源码。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() { //执行run@Overridepublic void run() {next.invokeChannelRead(m);}});}}
(4)验证时,我们如果去掉 p.addLast(group,new EchoServerHandler() ); 改成 p.addLastnew EchoServerHandler() ); 你会发现代码不会进行异步执行。
(5)后面的整个流程就变成和第一个方式一样了
1.4 两种方式的比较
- 第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进 mpscTask 中。如果IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标。
- 第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
- 各有优劣,从灵活性考虑,第一种较好。
相关文章:
netty入门(二十六)任务加入异步线程池源码剖析
1.handler中加入线程池和Context添加线程池 1.1 源码剖析目的 (1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。 (2)而解决方法就是…...
神经网络算法入门和代码
文章内容 感知机(Perceptron)反向传播算法(Back Propagation algorithm)RBF(Radial Basis Function,径向基函数) 网络:单一层前馈网络,它使用径向基作为隐层神经元激活函数ART(Adaptive Resona…...
如何用一个端口同时暴露 HTTP1/2、gRPC、Dubbo 协议?
作者:华钟明 本文我们将介绍 Apache Dubbo 灵活的多协议设计原则,基于这一设计,在 Dubbo 框架底层可灵活的选用 HTTP/2、HTTP/REST、TCP、gRPC、JsonRPC、Hessian2 等任一 RPC 通信协议,同时享用统一的 API 与对等的服务治理能力。…...
ToBeWritten之杂项2
也许每个人出生的时候都以为这世界都是为他一个人而存在的,当他发现自己错的时候,他便开始长大 少走了弯路,也就错过了风景,无论如何,感谢经历 转移发布平台通知:将不再在CSDN博客发布新文章,敬…...
Linux三剑客之awk命令详解
1、概述 Linux三剑客:grep、sed、awk。grep主打查找功能,sed主要是编辑行,awk主要是分割列处理。本篇文章我们详细介绍awk命令。 awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。awk是一种编…...
C++异常处理:掌握高效、健壮代码的秘密武器
C异常处理全面解析:底层原理、编译器技巧与实用场景C异常机制:让我们迈向更安全、更可靠的代码C异常处理:掌握基本概念什么是异常?异常处理的重要性C异常处理的组成部分:try、catch、throw探索C异常处理的核心…...
Jetpack Compose基础组件之按钮组件
概述 按钮组件Button是用户和系统交互的重要组件之一,它按照Material Design风格实现,我们先看下Button的参数列表,通过参数列表了解下Button的整体功能 Composable fun Button(onClick: () -> Unit, // 点击按钮时的回调modifier: Modi…...
利用json-server快速在本地搭建一个JSON服务
1,json-server介绍 一个在前端本地运行,可以存储json数据的server。 通俗来说,就是模拟服务端接口数据,一般用在前后端分离后,前端人员可以不依赖API开发,而在本地搭建一个JSON服务,自己产生测…...
可重入函数与线程安全
指令乱序和线程安全 先来看什么是指令乱序问题以及为什么有指令乱序。程序的代码执行顺序有可能被编译器或CPU根据某种策略打乱指令执行顺序,目的是提升程序的执行性能,让程序的执行尽可能并行,这就是所谓指令乱序问题。理解指令乱序的策略是…...
一文彻底读懂异地多活
文章目录 系统可用性单机架构主从副本风险不可控同城灾备同城双活两地三中心伪异地双活真正的异地双活如何实施异地双活1、按业务类型分片2、直接哈希分片3、按地理位置分片异地多活总结系统可用性 要想理解异地多活,我们需要从架构设计的原则说起。 现如今,我们开发一个软件…...
孕酮PEG偶联物:mPEG Progestrone,PEG Progestrone,甲氧基聚乙二醇孕酮
中文名称:甲氧基聚乙二醇孕酮 英文名称:mPEG Progestrone,PEG Progestrone 一、反应机理: 孕酮-PEG衍生物是一类具有生物活性的类固醇-PEG偶联物,可用于药物发现或生物测定开发。孕酮是一种女性性激素,负…...
网络系统集成实验(一)| 网络系统集成基础
目录 一、前言 二、实验目的 三、实验需求 四、实验步骤与现象 (1)网络设置、网络命令的使用 ① 在华为设备中,常用指令的使用 ② 在思科设备中,常用指令的使用 ③ 在Windows设备中,常用网络指令的使用 …...
php composer 如何安装windows电脑
在 Windows 电脑上安装 PHP Composer,你需要按照以下步骤操作: 安装 PHP 确保你的电脑上已经安装了 PHP。如果还没有安装,可以从 PHP 官网(https://www.php.net/downloads.php)下载安装包并安装。 设置环境变量 将 P…...
API 鉴权插件上线!支持用户自定义鉴权插件
0.4.0 版本更新主要围绕这几个方面: 分组独立的 UI,支持分组 API 鉴权 API 测试支持继承 API 鉴权 支持用户自定义鉴权插件,仅需部分配置即可发布鉴权插件 开始介绍功能之前,我想先和大家分享一下鉴权功能设计的一些思考。 其实…...
2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1)
2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1) *1.Python自带的编程环境是? A、PyScripter B、Spyder C、Notepad++ D、IDLE *2.假设a=20,b-3,那么a or b的结果是? () A、20 B、0 C.1 D.3 *3.假设a=2,b=3,那么a-b*b的值是? A、 3 B、-2 C、-7 D、-11 *4.…...
day21—编程题
文章目录1.第一题1.1题目1.2思路1.3解题2.第二题2.1题目2.2思路2.3解题1.第一题 1.1题目 描述: 洗牌在生活中十分常见,现在需要写一个程序模拟洗牌的过程。 现在需要洗2n张牌,从上到下依次是第1张,第2张,第3张一直到…...
【数据结构】栈与队列经典选择题
🚀write in front🚀 📜所属专栏: 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对我最大的激励…...
Linux常用命令详细示例演示
一、Linux 常用命令一览表 Linux 下命令格式: command [-options] [parameter] 命令 [选项] [参数] command 是命令 例如:ls cd copy[-options] 带方括号的都是可选的 一些选项 例如:ls -l 中的 -l[parameter] 可选参数,可以是 0…...
9-数据可视化-动态柱状图
文章目录1.基础柱状图2.基础时间线柱状图3.动态柱状图1.基础柱状图 from pyecharts.charts import Bar bar Bar() # 构建柱状图对象 bar.add_xaxis(["中国","美国","英国"]) bar.add_yaxis("GDP",[30,20,10]) bar.render()反转xy轴…...
Linux系统【centos7x】安装宝塔面板教程
1. 下载宝塔面板安装包 在宝塔官网下载最新版的安装包,下载完后上传到服务器。 2. 安装宝塔面板 在终端中输入以下命令: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】,分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
WebRTC调研
WebRTC是什么,为什么,如何使用 WebRTC有什么优势 WebRTC Architecture Amazon KVS WebRTC 其它厂商WebRTC 海康门禁WebRTC 海康门禁其他界面整理 威视通WebRTC 局域网 Google浏览器 Microsoft Edge 公网 RTSP RTMP NVR ONVIF SIP SRT WebRTC协…...
鸿蒙HarmonyOS 5军旗小游戏实现指南
1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发,采用DevEco Studio实现,包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...
怎么开发一个网络协议模块(C语言框架)之(六) ——通用对象池总结(核心)
+---------------------------+ | operEntryTbl[] | ← 操作对象池 (对象数组) +---------------------------+ | 0 | 1 | 2 | ... | N-1 | +---------------------------+↓ 初始化时全部加入 +------------------------+ +-------------------------+ | …...
