Netty核心组件EventLoop源码解析
源码解析目标
- 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现
源码解析
- 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup = new NioEventLoopGroup(1);,我们先来看看NioEventLoop的UML图
- 首先我们看到ScheduledEecutorService接口,这个接口是concurrent包下的一个定时任务接口,EventLoop实现了这个接口,因此可以接受定时任务,所以我们在Debug的时候,能在EventLoop中找到一个scheduledTaskQueue
- EventLoop接口我们看下源码,如下,从注释中我们了解到,EventLoop中一旦注册了Channel,就会处理该Channel对应的所有I/O操作
/*** Will handle all the I/O operations for a {@link Channel} once registered.** One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on* implementation details and internals.**/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}
- SingleThreadEventExecutor 也是一个比较重要的类,看源码注释,说明了SingleThreadEventExecutor 是一个单个线程的线程池
/*** Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.**/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......}
- 在SingleThreadEventExecutor 类中实现了很多对线程池的操作,例如runAllTask,executer,takeTask,pollTask,看下其中一个构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}
//其中taskQueue初始化protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {return new LinkedBlockingQueue<Runnable>(maxPendingTasks);}
- 如上,SingleThreadEventExecutor 队列中元素是实现了Runnable接口的对象,线程池中最重要的方法当然是executer方法,EventLoop是SingleThreadEventExecutor 的子类,那么EventLoop 类也可以直接调用executer方法来完成对事件的执行,我们来看源码
@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
- inEventLoop(); 首先判断EventLoop中线程是否是当前线程,如果是,则直接将task添加到线程池队列中
- 如果不是则尝试启动一个线程(因为是单个线程的线程池,所以只能且只需要启动一次),之后在将任务添加到队列中去
- isShutdown() && removeTask(task)) 中逻辑 如果线程已经停止并且删除任务失败,则直接拒绝策略
- 接着看下addTask的实现
/*** Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown* before.*/protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}
- 从注释中可以看出,addTask方法会添加一个task任务到队列中,如果当前线程是shutdown的状态,那么直接抛出异常RejectedExecutionException
- 接着来看executer方法中的startThread(); ,当我们判断当前线程不是EventLoop中的线程的时候会执行这个方法,他是NioEventLoop中的核心方法,如下源码
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
-
state == ST_NOT_STARTED 首先通过状态判断是否执行过,保证EventLoop只有一个线程
-
如果没有启动 用cas的方式STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED) 去修改状态为ST_STARTED,直接调用doStartThread方法
-
如果失败就回滚
-
接着分析doStartThread方法,首先会调用Executor的execute方法,这个Executor 是我们在创建EventLoopGroup时候创建的是一个ThreadPerTaskExecutor类,如下图是在channel中对应的EventLoop找到的对象信息,该execute方法会将Runable包装成Netty的FastThreadLocalThread
- 接着通过Thread.currentThread() 判断线程是否中断
- updateLastExecutionTime(); 然后设置最后一次执行的事件
- 核心方法是:SingleThreadEventExecutor.this.run(); 执行单曲NioEventLoop的run方法,等会重点关注
- 接着完成run方法的事物处理后,在finally中使用cas不断的修改state状态,设置为ST_SHUTTING_DOWN,也就是当loop中run方法结束运行后,关闭线程,最后还会通过不断轮询来二次确认是否关闭,否则不会break跳出
- 接下来分析EventLoop中的Run方法,我们进入Run方法,就到了我们之前分析过的NioEventLoop中的run方法,此方法做了三件事情,如下源码
@Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
-
NioEventLoop 中的loop轮询是依靠run方法来执行的,在方法中可以看到是一个for循环其中三件事情,如下图中EventLoop部分
- case SelectStrategy.SELECT: 当事件类似是SELECT 时候, 通过select(wakenUp.getAndSet(false));方法获取感兴趣的事件
- processSelectedKeys(); 处理选中的事件
- runAllTasks 执行队列中的任务。
-
上图不管是bossGroup还是WorkerGroup中的EventLoop都是次run方法执行流程
-
select方法实现
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}}}
-
关注点在于 select方法如何体现出非阻塞,如下图中,选择器获取对应事件debug
-
在select中传如参数是1 秒,也就是默认情况下阻塞1秒中,具体的算法: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
-
其中 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 表示当前时间 - 定时任务的时间,那么timeoutMillis 意思就是,当有定时任务的时候 delayNanos(currentTimeNanos) 时间就部位空,那么定时任务剩余时间 t +0.5秒阻塞的时间,否则就默认1秒中阻塞时间
-
接着判断: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())
- 如果1秒(或者t+0.5)后能获取到selectedKeys :selectedKeys != 0
- 或者select被用户唤醒 :oldWakenUp, wakenUp.get()
- 或者任务队列中有任务存在 : hasTasks()
- 或者有定时任务即将被执行 : hasScheduledTasks()
-
有以上任何情况则跳出循环,否则继续沦陷,直到满足其中一个条件为止
-
接着processSelectedKeys对获取到的selectKey处理
-
在接着runAllTasks 执行队列任务
总结
- 每次执行execute方法就会向队列中添加任务。当第一次添加时候就启动线程,执行run方法,run方法是EventLoop的核心实现,负责轮询获取事件,处理事件,执行队列中任务
- 其中调用selector的select方法默认阻塞一秒,有定时任务就t+0.5,t是定时任务剩余时间,当执行execute方法时候,也就是添加任务的时候,唤醒selector,防止selector阻塞事件过长
- 当selector返回的时候,会调用processSelectedKeys对selectKey进行处理
- 当processSelectedKeys 方法执行结束,按照ioRatio比例执行runAllTasks方法默认是 IO 任务时间和非 IO 任务时间是相同的代码如下
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
相关文章:

Netty核心组件EventLoop源码解析
源码解析目标 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现 源码解析 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup new NioEventLoopGroup(…...
排障命令-汇总
目录 日志查询 1. grep 2. zgrep cpu 1. top 内存 1. free tcp相关 1. netstat 2. ulimit 3. lsof jvm常用 1. jps 2. jinfo 3. jstack 4. jmap 5. jstat 进制转换 1. 十进制转16进制 日志查询 1. grep 定义:(global regular expression) 命令用于查…...

python+pytest接口自动化(4)-requests发送get请求
python中用于请求http接口的有自带的urllib和第三方库requests,但 urllib 写法稍微有点繁琐,所以在进行接口自动化测试过程中,一般使用更为简洁且功能强大的 requests 库。下面我们使用 requests 库发送get请求。requests库简介requests 库中…...

开源电子书工具Calibre 6.3 发布
Calibre 开源项目是 Calibre 官方出的电子书管理工具。它可以查看,转换,编辑和分类所有主流格式的电子书。Calibre 是个跨平台软件,可以在 Linux、Windows 和 macOS 上运行。Calibre 6.3 正式发布,此次更新内容如下:新…...
C++ STL:适配器 Adapter
文章目录1、容器适配器1.1、stack1.2、queue1.3、priority_queue2、迭代器适配器2.1、插入迭代器2.2、反向迭代器2.3、流迭代器3、函数适配器3.1、* bindbind 使用方法bind 简化原理3.2、mem_fn适配器就是接口,对容器、迭代器、算法进行包装,但其实质还是…...
防抖和节流
防抖和节流的区别?防抖:触发高频事件后n 秒内 函数只会执行一次,如果n秒内 高频事件在在次触发,则会重新计算节流:高频事件触发,但在n 秒内 只会执行一次,所以节流会稀释函数的执行频率下面就是…...
vue3 微信扫码登录及获取个人信息实现的三种方法
一、流程: 微信提供的扫码方式有两种,分别是: 跳转二维码扫描页面 内嵌式二维码根据文档我们可以知道关于扫码授权的模式整体流程为: 1. 第三方发起微信授权登录请求,微信用户允许授权第三方应用后,微信会拉起应用或重定向到第三方网站&…...

Java8 新特性强大的Stream API
一、Stream API 说明 Java8中有两大最为重要的改变。第一个是 Lambda 表达式;另外一个则是 Stream API。 Stream API ( java.util.stream) 把真正的函数式编程风格引入到Java中。这是目前为止对Java类库最好的补充,因为Stream API可以极大提供Ja…...

day22_IO
今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、作业 二、缓冲流 三、字符流 四、缓冲字符流 五、匿名内部类 零、 复习昨日 File: 通过路径代表一个文件或目录 方法: 创建型,查找类,判断类,其他 IO …...

第三十八章 linux-并发解决方法二(信号量)
第三十八章 linux-并发解决方法二(信号量) 文章目录第三十八章 linux-并发解决方法二(信号量)信号量的定义DOWN操作UP操作相对于自旋锁,信号量的最大特点是允许调用它的线程进入睡眠状态这意味着试图获得某一信号的进程…...

数据结构-考研难点代码突破(C++实现树型查找 - B树插入与遍历,B+树基本概念)
数据结构(C)[B树(B-树)插入与中序遍历,效率分析]、B树、B*树、B树系列应用 文章目录1. B树B树的插入与删除流程2. B树(MySQL)3. B树与B树对比4. C实现B树插入,中序遍历1. B树 B树类…...

Python可视化界面编程入门
Python可视化界面编程入门具体实现代码如所示: (1)普通可视化界面编程代码入门: import sys from PyQt5.QtWidgets import QWidget,QApplication #导入两个类来进行程序界面编程if __name__"__main__":#创建一个Appl…...

基于Java+SpringBoot+Vue前后端分离书店购书系统设计与实现
博主介绍:✌全网粉丝3W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战✌ 博主作品:《微服务实战》专栏是本人的实战经验总结,《Spring家族及…...
Android:截屏/视频截图
需求描述 实现截取Android应用当前界面的功能,需包含界面中视频(此博客的参考代码以存储在设备本地的视频为例,未检验在线视频的情况)当前的播放帧截图。 调研准备 首先应用需要获取设备存储的读写权限,需要在Andro…...

leecode-C语言实现-28. 找出字符串中第一个匹配项的下标
一、题目给你两个字符串 haystack 和 needle ,请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标(下标从 0 开始)。如果 needle 不是 haystack 的一部分,则返回 -1 。示例 1:输入:haystack …...

使用 Postman 实现 API 自动化测试
目录:导读 背景介绍 名词解析 使用说明 执行 API 测试 集成 CI 实现 API 自动化测试 写在最后 背景介绍 相信大部分开发人员和测试人员对 postman 都十分熟悉,对于开发人员和测试人员而言,使用 postman 来编写和保存测试用例会是一种比…...

k8s环境jenkins发布vue项目指定nodejs版本
k8s环境jenkins发布vue项目指定nodejs版本1、背景2、分析3、解决方法3.1、 找到配置镜像位置3.2、 制作新镜像3.3、 推送镜像到私有仓库3.4、 修改配置文件1、背景 发布一个前端项目,它需要nodejs 16.9.0版本支持,而kubesphere 3.2.0集成的jenkins 的镜…...

我应该把毕业设计做到什么程度才能过关?
本篇博客包含了狗哥多年职业生涯对于软件项目的一丢丢理解,也讲述了对于大学生毕业设计的一些理解。如果你还是懵懵懂懂就要离开学校了,被老师告知不得不做出一套毕业设计的时候,希望你可以看到这篇博客,让你有点头绪,…...

力扣-合作过至少三次的演员和导演
大家好,我是空空star,本篇带大家了解一道简单的力扣sql练习题。 文章目录前言一、题目:1050. 合作过至少三次的演员和导演二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.正确示范③提交SQL运行结果4.正确示范④提交SQL运…...

【 PMU】信号生成、采样、分割、估计器应用和误差计算(Matlab代码实现)
👨🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...

技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
在树莓派上添加音频输入设备的几种方法
在树莓派上添加音频输入设备可以通过以下步骤完成,具体方法取决于设备类型(如USB麦克风、3.5mm接口麦克风或HDMI音频输入)。以下是详细指南: 1. 连接音频输入设备 USB麦克风/声卡:直接插入树莓派的USB接口。3.5mm麦克…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...