Netty服务端请求接受过程源码剖析

目标
- 服务器启动后,客户端进行连接,服务器端此时要接受客户端请求,并且返回给客户端想要的请求,下面我们的目标就是分析Netty 服务器端启动后是怎么接受到客户端请求的。
- 我们的代码依然与上一篇中用同一个demo, 用io.netty.example下的echo包下的代码
- 我们直接debug模式启动Server端,让后在浏览器输入Http://localhost:8007,接着以下代码分析
源码剖析
- 在上一篇文章Netty启动过程源码分析中,我们知道了服务器最终注册 一个Accept事件等待客户端的连接,同时将NioServerSocketChannel注册到boss单例线程池中,也就是EventLoop如上图左边黄色区域部分
- 因此我们想要分析接受client连接的代码,先找到对应的EventLoop源码,如上图中NioEventLoop 循环,找到如下源码
//代码位置 NioEventLoop --- >  run()
@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {.......// 处理各种strategy类型default:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {//对strategy事件进行处理processSelectedKeys();}} finally {ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {.......}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}.......} catch (CancelledKeyException e) {.......} catch (Throwable t) {handleLoopException(t);}.......}}
- 如上代码中 strategy 更具请求的类型走不同的策略,最后处理策略的方法是 processSelectedKeys();,我们继续根核心方法 processSelectedKeys();,如下源码
//进入processSelectedKeys ---》processSelectedKeysOptimized(); ---〉processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop == this) {unsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}- 第一个if中最事件合法性验证,接着获取readyOps,我们debug得到是16,如下图

- 找到SelectionKey中16 代码的意义
 /*** Operation-set bit for socket-accept operations.** <p> Suppose that a selection key's interest set contains* <tt>OP_ACCEPT</tt> at the start of a <a* href="Selector.html#selop">selection operation</a>.  If the selector* detects that the corresponding server-socket channel is ready to accept* another connection, or has an error pending, then it will add* <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its* selected-key set.  </p>*/public static final int OP_ACCEPT = 1 << 4;
- 术语连接请求,这就是我们拿到了之前用Http://localhost:8007 请求的连接,接着继续跟进代码 EventLoopGroup —> processSelectedKey —> unsafe.read(); 其中unsafe是NioMessageUnsafed,上一篇中有过分析用来处理消息接收
- 继续跟进AbstractNioMessageChannel —> read() ,得到如下源码,删了一些对本次无关的一些代码,如下
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);......allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}......if (exception != null) {......}if (closed) {......}} finally {......}}}-  assert eventLoop().inEventLoop(); 判断改eventLoop线程是否当前线程 
-  ChannelConfig config = config(); 获取NioServerSocketChannelConfig 
-  ChannelPipeline pipeline = pipeline(); 获取DefaultChannelPipeline。他是一个双向链表,可以看到内部包含 LoggingHandler,ServerBootStraptHandler 
-  继续跟进 NioServersocketChannel —> doMessage(buf),可以进入到NioServerSocketChannel,找到doMessage方法 
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
-  参数buf是一个静态队列。private final List readBuf = new ArrayList(); 读取boss线程中的NioServerSocketChannel接受到的请求,并且将请求放到buf容器中 
-  SocketChannel ch = SocketUtils.accept(javaChannel()); 通过Nio中工具类的建立连接,其实底层是调用了ServerSocketChannelImpl —> accept()方法建立TCP连接,并返回一个Nio中的SocketChannel 
-  buf.add(new NioSocketChannel(this, ch)); 将获取到的Nio中SocketCHannel包装成Netty中的NioSocketChannel 并且添加到buf队列中(list) 
-  doReadMessages到这分析完。 
-  我们回到回到EventLoopGroup —> ProcessSelectedKey 
-  循环遍历之前doReadMessage中获取的buf中的所有请求,调用Pipeline的firstChannelRead方法,用于处理这些接受的请求或者其他事件,在read方法中,循环调用ServerSocket的Pipeline的fireChannelRead方法,开始执行管道中的handler的ChannelRead方法,如下 

- 继续跟进,进入 pipeline.fireChannelRead(readBuf.get(i)); 一直跟到AbstracChannelHandlerContext —> invokeChannelRead
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}
- 进入 handler() 中,DefaultChannelPipeline —> handler()
- debug源码可以看到,在管道中添加了多个Handler,分别是:HeadContext,LoggingContext,ServerBootStrapAcceptor,TailContext 因此debug时候会依次进入每一个Handler中。我们重点看ServerBootStrapAcceptor中的channelRead方法
 @Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
- 因为参数msg是NioSocketChannel,此处强制转成channel,
- child.pipeline().addLast(childHandler); 将我们在main方法中设置的EchoServerHandler添加到pipeline的handler链表中
- setChannelOptions 对TCP参数赋值
- setAttributes 设置各种属性
- childGroup.register(child).addListener(…) 将NioSocketChannel注册到 NioEventLoopGroup中的一个EventLoop中,并且添加一个监听器
- 以上NioEventLoopGroup就是我们main方法创建的数组workerGroup
- 进入register方法, MultithreadEventLoopGroup —>register , SingleThreadEventLoop —>register , AbstractChannel —> register,如下
- 首先看MultithreadEventLoopGroup中的register
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
- 进入next()方法中,最终我们可以追到 DefaultEventExecutorChooserFactory — > PowerOfTwoEventExecutorChooser — > next() 内部类中的next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}
-  入上我们通过debug可以看到,next返回的就是我们在workerGroup中创建的线程数组中的某一个子线程EventExecutor 
  
-  接下来我们在回到register方法: AbstractChannel —> register 方法,如下: 
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {......AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {......}}}
- 关键方法register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
- 进入 doRegister(); 方法:AbstractNioChannel —> doRegister
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
- 上代码,selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);此处我们将bossGroup中的EventLoop的channel 注册到workerGroup中的EventLoop中的 select中,方法中会得到一个selectionKey
- 我们可以看register方法的注视,如下:
Registers this channel with the given selector, returning a selectionkey.
使用给定的选择器注册此通道,并返回选择键。
- 接着debug,最终会到 AbstractNioChannel 中的doBeginRead方法
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
- 此方法比较难进入,包含了几个异步,将之前的断电去掉,再次http请求,可以到这个方法中
- 追到这里,针对客户的连接已经完成,接下来是读取监听事件,也就是bossGroup的连接建立,注册步骤已近完成了,接下来就是workerGroup中的事件处理了
Netty接收请求过程梳理
-  总流程:接收连接 — 》创建一个新的NioSocketChannel —〉 注册到一个WorkerEventLoop上 —》 注册selecotRead事件 - 服务器沦陷Accept事件(文中最开始的那个for循环),获取事件后调用unsafe的read方法,这个unsafe是ServerSocket的内部类,改方法内部由2部分组成
- doReadMessage 用于创建NioSocketChannel对象,改对象包装JDK的NioChannel客户端,该方法创建一个ServerSocketChannel
- 之后执行pipeline.firstChannelRead方法,并且将自己绑定到一个chooser选择器选择的workerGroup中的某个EventLoop上,并且注册一个0(连接),表示注册成功,但是并没有注册1 (读取)
 
-  上一篇:Netty启动流程源码剖析 
相关文章:
 
Netty服务端请求接受过程源码剖析
目标 服务器启动后,客户端进行连接,服务器端此时要接受客户端请求,并且返回给客户端想要的请求,下面我们的目标就是分析Netty 服务器端启动后是怎么接受到客户端请求的。我们的代码依然与上一篇中用同一个demo, 用io.…...
 
金三银四春招特供|高质量面试攻略
🔰 全文字数 : 1万5千 🕒 阅读时长 : 20min 📋 关键词 : 求职规划、面试准备、面试技巧、谈薪职级 👉 公众号 : 大摩羯先生 本篇来聊聊一个老生常谈的话题————“面试”。利用近三周工作午休时间整理了这篇洋洋洒洒却饱含真诚…...
 
搭建Hexo博客-第4章-绑定自定义域名
搭建Hexo博客-第4章-绑定自定义域名 搭建Hexo博客-第4章-绑定自定义域名 搭建Hexo博客-第4章-绑定自定义域名 在这一篇文章中,我将会介绍如何给博客绑定你自己的域名。其实绑定域名本应该很简单的,但我当初在这上走了不少弯路,所以我觉得有…...
lightdb-sql拦截
文章目录LightDB - sql 审核拦截一 简介二 参数2.1 lightdb_sql_mode2.2 lt_firewall.lightdb_business_time三 规则介绍及使用3.1 select_without_where3.1.1 案例3.2 update_without_where/delete_without_where3.2.1 案例3.3 high_risk_ddl3.3.1 案例LightDB - sql 审核拦截…...
 
二进制中1的个数-剑指Offer-java位运算
一、题目描述编写一个函数,输入是一个无符号整数(以二进制串的形式),返回其二进制表达式中数字位数为 1 的个数(也被称为 汉明重量).)。提示:请注意,在某些语言(如 Java&…...
 
学自动化测试可以用这几个练手项目
练手项目的业务逻辑比较简单,只适合练手,不能代替真实项目。 学习自动化测试最难的是没有合适的项目练习。 测试本身既要讲究科学,又有艺术成分,单单学几个 api 的调用很难应付工作中具体的问题。 你得知道什么场景下需要添加显…...
 
2023年保健饮品行业分析:市场规模不断攀升,年度销额增长近140%
随着人们健康意识的不断增强,我国保健品市场需求持续增长,同时,保健饮品的市场规模也在不断攀升。 根据鲸参谋电商数据显示,2022年度,京东平台上保健饮品的年度销量超60万件,同比增长了约124%;该…...
 
2023-02-17 学习记录--TS-邂逅TS(一)
TS-邂逅TS(一) 不积跬步,无以至千里;不积小流,无以成江海。💪🏻 一、TypeScript在线编译器 https://www.typescriptlang.org/play/ 二、类型 1、普通类型 number(数值型ÿ…...
 
SpringMVC创建异步回调请求的4种方式
首先要明确一点,同步请求和异步请求对于客户端用户来讲是一样的,都是需客户端等待返回结果。不同之处在于请求到达服务器之后的处理方式,下面用两张图解释一下同步请求和异步请求在服务端处理方式的不同:同步请求异步请求两个流程…...
 
MySQL(二)表的操作
一、创建表 CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; 说明: field 表示列名 datatype 表示列的类型 character set 字符集,如…...
 
SpringCloud - 入门
目录 服务架构演变 单体架构 分布式架构 分布式架构要考虑的问题 微服务 初步认识 案例Demo 服务拆分注意事项 服务拆分示例 服务调用 服务架构演变 单体架构 将业务的所有功能集中在一个项目中开发,打成一个包部署优点: 架构简单部署成本低缺…...
 
进一步了解C++函数的各种参数以及重载,了解C++部分的内存模型,C++独特的引用方式,巧妙替换指针,初步了解类与对象。满满的知识,希望大家能多多支持
C的编程精华,走过路过千万不要错过啊!废话少说,我们直接进入正题!!!! 函数高级 C的函数提高 函数默认参数 在C中,函数的形参列表中的形参是可以有默认值的。 语法:返…...
 
Chapter6:机器人SLAM与自主导航
ROS1{\rm ROS1}ROS1的基础及应用,基于古月的课,各位可以去看,基于hawkbot{\rm hawkbot}hawkbot机器人进行实际操作。 ROS{\rm ROS}ROS版本:ROS1{\rm ROS1}ROS1的Melodic{\rm Melodic}Melodic;实际机器人:Ha…...
Sass的使用要点
Sass 是一个 CSS 预处理器,完全兼容所有版本的 CSS。实际上,Sass 并没有真正为 CSS 语言添加任何新功能。只是在许多情况下可以可以帮助我们减少 CSS 重复的代码,节省开发时间。 一、注释 方式一:双斜线 // 方式二:…...
计算机启动过程,从按下电源按钮到登录界面的详细步骤
1、背景 自接触计算机以来,一直困扰着我一个问题。当我们按下电脑的开机键后,具体发生了哪些过程呢?计算机启动的具体步骤是什么? 计算机启动过程通常分为五个步骤:电源自检、BIOS自检、引导设备选择、引导程序加载和…...
 
LeetCode 刷题之 BFS 广度优先搜索【Python实现】
1. BFS 算法框架 BFS:用来搜索 最短路径 比较合适,如:求二叉树最小深度、最少步数、最少交换次数,一般与 队列 搭配使用,空间复杂度比 DFS 大很多DFS:适合搜索全部的解,如:寻找最短…...
 
Hadoop01【尚硅谷】
大数据学习笔记 大数据概念 大数据:指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。 主要解决,海量数据的存储…...
 
Echarts 配置横轴竖轴指示线,更换颜色、线型和大小
第018个点击查看专栏目录本示例是描述如何在Echarts上配置横轴竖轴指示线,更换颜色、线型和大小。方法很简单,参考示例源代码。 文章目录示例效果示例源代码(共85行)相关资料参考专栏介绍示例效果 示例源代码(共85行&a…...
OpenAI 官方API Java版SDK,两行代码即可调用。包含GhatGPT问答接口。
声明:这是一个非官方的社区维护的库。 已经支持OpenAI官方的全部api,有bug欢迎朋友们指出,互相学习。 注意:由于这个接口: https://platform.openai.com/docs/api-reference/files/retrieve-content 免费用户无法使…...
 
SpringBoot 日志文件
(一)日志文件有什么用?除了发现和定位问题之外,我们还可以通过日志实现以下功能:记录用户登录日志,以便分析用户是正常登录还是恶意破解用户。记录系统的操作日志,以便数据恢复和定位操作 。记录程序的执行时间&#x…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
 
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
 
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
 
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
 
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
 
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
 
DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...
linux设备重启后时间与网络时间不同步怎么解决?
linux设备重启后时间与网络时间不同步怎么解决? 设备只要一重启,时间又错了/偏了,明明刚刚对时还是对的! 这在物联网、嵌入式开发环境特别常见,尤其是开发板、树莓派、rk3588 这类设备。 解决方法: 加硬件…...
