当前位置: 首页 > news >正文

netty入门(二十六)任务加入异步线程池源码剖析

1.handler中加入线程池和Context添加线程池

1.1 源码剖析目的

(1)在 Netty 中做耗时的,不可预料的操作,比如:数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。

(2)而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2中方式,而且这2种方式实现的区别也蛮大的。

(3)处理耗时业务的第一种方式 -- handler 中加入线程池

(4)处理耗时业务的第二种方式 -- Context 中添加线程池

1.2 处理耗时业务的第一种方式--handler 种加入线程池

对前面的 Netty demo 源码进行修改,在 EchoServerHandler 的 channelRead 方法进行异步

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

说明:

在 channelRead 方法,模拟了一个耗时 10 秒的操作,这里,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。

这样处理之后,整个程序的逻辑如图

 说明:

(1)解释上图,当 IO 线程轮询到一个 socket 事件,然后,IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。

(2)当耗时任务执行完毕再执行 pipeline write 方法的时候,(代码中使用的是 context 的 write 方法,上图画的是执行 pipeline 方法,是一个意思)会将任务交给 IO 线程

write 方法的源码(在AbstractChannelHandlerContext 类)

private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}safeExecute(executor, task, promise, m);}}

说明:

(1)当判定下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务。

(2)这里可以Debug 来验证(提醒:Debug时,服务器端Debug ,客户端Run的方式),当我们使用了 group.submit(new Callable<Object>(){} 在handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)

1.3 处理耗时业务的第一种方式--Context 中添加线程池

在添加 pipeline 中的 handler 时候,添加一个线程池

public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));// 创建业务线程池// 创建2个子线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}// Configure the server.EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new LoggingHandler(LogLevel.INFO));
//                     p.addLast(new EchoServerHandler());// 说明:如果在 addLast 添加 handler,前面有指定 EventExecutorGroup,那么该 handler 会优先加入到该线程池中p.addLast(group, new EchoServerHandler());}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 创建了 16 个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {System.out.println("EchoServerHandler 的线程是:" + Thread.currentThread().getName());/*ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);// 输出线程名System.out.println("EchoServerHandler execute 的线程是:" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常 " + e.getMessage());e.printStackTrace();}}});*/// 将任务提交到 group 线程池/*group.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("group.submit 的 call 线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));return null;}});*/// 普通方式// 接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, Charset.forName("utf-8"));// 休眠10秒Thread.sleep(10 * 1000);System.out.println("普通调用方式的线程是" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 2喵喵喵喵", CharsetUtil.UTF_8));System.out.println("go on.....");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

说明:

(1)handler 中的代码就使用普通的方式来处理耗时业务。

(2)当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。

(3)当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程Context(也就是 Handler) 的 executor 是业务线程,所以会异步执行, debug 下源码。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() { //执行run@Overridepublic void run() {next.invokeChannelRead(m);}});}}

(4)验证时,我们如果去掉 p.addLast(group,new EchoServerHandler() ); 改成 p.addLastnew EchoServerHandler() ); 你会发现代码不会进行异步执行。

(5)后面的整个流程就变成和第一个方式一样了

1.4 两种方式的比较

  1. 第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进 mpscTask 中。如果IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标。
  2. 第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。
  3. 各有优劣,从灵活性考虑,第一种较好。

相关文章:

netty入门(二十六)任务加入异步线程池源码剖析

1.handler中加入线程池和Context添加线程池 1.1 源码剖析目的 &#xff08;1&#xff09;在 Netty 中做耗时的&#xff0c;不可预料的操作&#xff0c;比如&#xff1a;数据库、网络请求、会严重影响 Netty 对 Socket 的处理速度。 &#xff08;2&#xff09;而解决方法就是…...

神经网络算法入门和代码

文章内容 感知机&#xff08;Perceptron&#xff09;反向传播算法&#xff08;Back Propagation algorithm&#xff09;RBF(Radial Basis Function&#xff0c;径向基函数) 网络&#xff1a;单一层前馈网络&#xff0c;它使用径向基作为隐层神经元激活函数ART(Adaptive Resona…...

如何用一个端口同时暴露 HTTP1/2、gRPC、Dubbo 协议?

作者&#xff1a;华钟明 本文我们将介绍 Apache Dubbo 灵活的多协议设计原则&#xff0c;基于这一设计&#xff0c;在 Dubbo 框架底层可灵活的选用 HTTP/2、HTTP/REST、TCP、gRPC、JsonRPC、Hessian2 等任一 RPC 通信协议&#xff0c;同时享用统一的 API 与对等的服务治理能力。…...

ToBeWritten之杂项2

也许每个人出生的时候都以为这世界都是为他一个人而存在的&#xff0c;当他发现自己错的时候&#xff0c;他便开始长大 少走了弯路&#xff0c;也就错过了风景&#xff0c;无论如何&#xff0c;感谢经历 转移发布平台通知&#xff1a;将不再在CSDN博客发布新文章&#xff0c;敬…...

Linux三剑客之awk命令详解

1、概述 Linux三剑客&#xff1a;grep、sed、awk。grep主打查找功能&#xff0c;sed主要是编辑行&#xff0c;awk主要是分割列处理。本篇文章我们详细介绍awk命令。 awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。awk是一种编…...

C++异常处理:掌握高效、健壮代码的秘密武器

C异常处理全面解析&#xff1a;底层原理、编译器技巧与实用场景C异常机制&#xff1a;让我们迈向更安全、更可靠的代码C异常处理&#xff1a;掌握基本概念什么是异常&#xff1f;异常处理的重要性C异常处理的组成部分&#xff1a;try、catch、throw探索C异常处理的核心&#xf…...

Jetpack Compose基础组件之按钮组件

概述 按钮组件Button是用户和系统交互的重要组件之一&#xff0c;它按照Material Design风格实现&#xff0c;我们先看下Button的参数列表&#xff0c;通过参数列表了解下Button的整体功能 Composable fun Button(onClick: () -> Unit, // 点击按钮时的回调modifier: Modi…...

利用json-server快速在本地搭建一个JSON服务

1&#xff0c;json-server介绍 一个在前端本地运行&#xff0c;可以存储json数据的server。 通俗来说&#xff0c;就是模拟服务端接口数据&#xff0c;一般用在前后端分离后&#xff0c;前端人员可以不依赖API开发&#xff0c;而在本地搭建一个JSON服务&#xff0c;自己产生测…...

可重入函数与线程安全

指令乱序和线程安全 先来看什么是指令乱序问题以及为什么有指令乱序。程序的代码执行顺序有可能被编译器或CPU根据某种策略打乱指令执行顺序&#xff0c;目的是提升程序的执行性能&#xff0c;让程序的执行尽可能并行&#xff0c;这就是所谓指令乱序问题。理解指令乱序的策略是…...

一文彻底读懂异地多活

文章目录 系统可用性单机架构主从副本风险不可控同城灾备同城双活两地三中心伪异地双活真正的异地双活如何实施异地双活1、按业务类型分片2、直接哈希分片3、按地理位置分片异地多活总结系统可用性 要想理解异地多活,我们需要从架构设计的原则说起。 现如今,我们开发一个软件…...

孕酮PEG偶联物:mPEG Progestrone,PEG Progestrone,甲氧基聚乙二醇孕酮

中文名称&#xff1a;甲氧基聚乙二醇孕酮 英文名称&#xff1a;mPEG Progestrone&#xff0c;PEG Progestrone 一、反应机理&#xff1a; 孕酮-PEG衍生物是一类具有生物活性的类固醇-PEG偶联物&#xff0c;可用于药物发现或生物测定开发。孕酮是一种女性性激素&#xff0c;负…...

网络系统集成实验(一)| 网络系统集成基础

目录 一、前言 二、实验目的 三、实验需求 四、实验步骤与现象 &#xff08;1&#xff09;网络设置、网络命令的使用 ① 在华为设备中&#xff0c;常用指令的使用 ② 在思科设备中&#xff0c;常用指令的使用 ③ 在Windows设备中&#xff0c;常用网络指令的使用 &#xf…...

php composer 如何安装windows电脑

在 Windows 电脑上安装 PHP Composer&#xff0c;你需要按照以下步骤操作&#xff1a; 安装 PHP 确保你的电脑上已经安装了 PHP。如果还没有安装&#xff0c;可以从 PHP 官网&#xff08;https://www.php.net/downloads.php&#xff09;下载安装包并安装。 设置环境变量 将 P…...

API 鉴权插件上线!支持用户自定义鉴权插件

0.4.0 版本更新主要围绕这几个方面&#xff1a; 分组独立的 UI&#xff0c;支持分组 API 鉴权 API 测试支持继承 API 鉴权 支持用户自定义鉴权插件&#xff0c;仅需部分配置即可发布鉴权插件 开始介绍功能之前&#xff0c;我想先和大家分享一下鉴权功能设计的一些思考。 其实…...

2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1)

2023年NOC大赛加码未来编程赛道-初赛-Python(初中组-卷1) *1.Python自带的编程环境是? A、PyScripter B、Spyder C、Notepad++ D、IDLE *2.假设a=20,b-3,那么a or b的结果是? () A、20 B、0 C.1 D.3 *3.假设a=2,b=3,那么a-b*b的值是? A、 3 B、-2 C、-7 D、-11 *4.…...

day21—编程题

文章目录1.第一题1.1题目1.2思路1.3解题2.第二题2.1题目2.2思路2.3解题1.第一题 1.1题目 描述&#xff1a; 洗牌在生活中十分常见&#xff0c;现在需要写一个程序模拟洗牌的过程。 现在需要洗2n张牌&#xff0c;从上到下依次是第1张&#xff0c;第2张&#xff0c;第3张一直到…...

【数据结构】栈与队列经典选择题

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a; &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对我最大的激励…...

Linux常用命令详细示例演示

一、Linux 常用命令一览表 Linux 下命令格式&#xff1a; command [-options] [parameter] 命令 [选项] [参数] command 是命令 例如&#xff1a;ls cd copy[-options] 带方括号的都是可选的 一些选项 例如&#xff1a;ls -l 中的 -l[parameter] 可选参数&#xff0c;可以是 0…...

9-数据可视化-动态柱状图

文章目录1.基础柱状图2.基础时间线柱状图3.动态柱状图1.基础柱状图 from pyecharts.charts import Bar bar Bar() # 构建柱状图对象 bar.add_xaxis(["中国","美国","英国"]) bar.add_yaxis("GDP",[30,20,10]) bar.render()反转xy轴…...

Linux系统【centos7x】安装宝塔面板教程

1. 下载宝塔面板安装包 在宝塔官网下载最新版的安装包&#xff0c;下载完后上传到服务器。 2. 安装宝塔面板 在终端中输入以下命令&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh…...

蓝易云:Linux系统【Centos7】top命令详细解释

top命令是一个非常常用的Linux系统性能监控工具&#xff0c;它可以实时动态地查看系统的各项性能指标&#xff0c;并且可以按照不同的排序方式进行排序&#xff0c;方便用户查找信息。 下面是top命令的详细解释&#xff1a; 1. 第一行&#xff1a;显示系统的运行时间、当前登…...

Muduo库源码剖析(一)——Channel

Muduo库源码剖析(一)——Channel 说明 本源码剖析是在muduo基础上&#xff0c;保留关键部分进行改写分析。 要点总结 事件分发器 event dispatcher中最重要的两个类型 channel 和 Poller Channel可理解为通道&#xff0c;poller往通道传输数据(事件发生情况)。 EventLoop…...

Java多线程:定时器Timer

前言 定时/计划功能在Java应用的各个领域都使用得非常多&#xff0c;比方说Web层面&#xff0c;可能一个项目要定时采集话单、定时更新某些缓存、定时清理一批不活跃用户等等。定时计划任务功能在Java中主要使用的就是Timer对象&#xff0c;它在内部使用多线程方式进行处理&am…...

设计模式---装饰模式

目录 介绍 实现 优缺点 装饰模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其结构。这种类型的设计模式属于结构型模式&#xff0c;它是作为现有类的一个包装。这种模式创建了一个装饰类&#xff0c;用来包装原有…...

跨时钟域传输数据——单bit和多bit信号(总结)

文章目录前言一、慢时钟域到快时钟域1、单bit信号2、多bit信号二、快时钟域到慢时钟域1、单bit信号2、多bit信号三、多bit信号跨时钟域传输1、多个信号合并2、多周期路径 Multi-cycle Path/MCP3、使用格雷码4、使用异步FIFO5、使用DMUX电路结构6、握手信号传输四、简答题1、跨时…...

高并发下如何保证接口幂等

文章目录 1. insert前先select2. 加悲观锁3. 加乐观锁4. 加唯一索引5. 建防重表6. 根据状态机7. 加分布式锁8. 获取token接口幂等性问题,对于开发人员来说,是一个跟语言无关的公共问题。本文分享了一些解决这类问题非常实用的办法,绝大部分内容我在项目中实践过的,给有需要…...

Retrofit源码分析小结

Retrofit源码分析&小结 简介 Retrofit是对Okhttp网络请求的二次封装&#xff0c;通过注解动态代理的方式&#xff0c;简化了Okhttp的使用&#xff0c;使得通过简单的配置就可以像调用接口一样去请求网络接口&#xff1b;除此之外Retrofit还支持RxJava和kotlin的协程 基本…...

【从零开始学习 UVM】11.4、UVM Register Layer —— UVM Register Model 实战项目(RAL实战,交通灯为例)

文章目录 DesignInterfaceRegister Model ExampleRegister EnvironmentAPB Agent ExampleTestbench EnvironmentSequencesTest在之前的几篇文章中,我们已经了解了寄存器模型是什么以及如何使用它来访问给定设计中的寄存器。现在让我们看一个完整的例子,展示如何为给定设计编写…...

session和token的登录机制

做登录的时候遇到了token &#xff0c;web和smtp的登录情况&#xff0c;这里 记录一下我所学习的两种登录方式&#xff0c;一种是token &#xff0c;一种是session session 登录机制 当用户请求登录接口进行登录服务端 获得登录的信息&#xff0c;从而在数据库中查到相应的用…...

大厂研发成本大曝光,研发占大头

近日&#xff0c;腾讯发布《2022 年腾讯研发大数据报告》&#xff0c;披露了 2022 年腾讯在研发投入、研发效能、开源协同等方面的重要数据。 《报告》显示&#xff0c;2022 年腾讯内部研发人员占比达到 74%&#xff0c;这意味着&#xff0c;平均每四个腾讯员工中&#xff0c;…...