Spring架构篇--2.5 远程通信基础Select 源码篇--window--Select.open()
前言:在Socket通信中使用Select 来对NIO 进行实现,那么它们的实现方式是怎样的呢,本文从 Selector.open() 进行第一步的分析;
Selector.open() :
Selector 类:
public static Selector open() throws IOException {// 通过 SelectorProvider.provider() 获取SelectorProvider实例// 通过openSelector() 获取不同系统的Selector 实现return SelectorProvider.provider().openSelector();}
先看:SelectorProvider.provider():
SelectorProvider 类:
private static final Object lock = new Object();
private static SelectorProvider provider = null;public static SelectorProvider provider() {synchronized (lock) {// 通过 lock 对象锁,保证当前进程只会有一个SelectorProvider 对象if (provider != null)// 如果发现进程中已经实例化过SelectorProvider 对象则直接返回return provider;// 进程下没有过SelectorProvider 对象则进行加载return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}
SelectorProvider.provider()方法会在System Property中不存在java.nio.channels.spi.SelectorProvider属性和不能找到SelectorProvider的实现类时,创建一个默认的sun.nio.ch.DefaultSelectorProvider来作为SelectorProvide;DefaultSelectorProvider 会根据不同的操作系统返回:
window 下的DefaultSelectorProvider:
package sun.nio.ch;import java.nio.channels.spi.SelectorProvider;public class DefaultSelectorProvider {private DefaultSelectorProvider() {}public static SelectorProvider create() {return new WindowsSelectorProvider();}
}
DefaultSelectorProvider.create() 创建方法:可以看到返回了WindowsSelectorProvider一个实例对象;
package sun.nio.ch;import java.io.IOException;
import java.nio.channels.spi.AbstractSelector;public class WindowsSelectorProvider extends SelectorProviderImpl {public WindowsSelectorProvider() {}public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}
}
在回到Selector 中的 SelectorProvider.provider().openSelector(), SelectorProvider.provider() 实际上返回了WindowsSelectorProvider一个实例对象,然后调用WindowsSelectorProvider的openSelector()方法,可以看到这里创建了一个WindowsSelectorImpl 实例对象并进行了返回:
这里看下对象的关系:
1 ) WindowsSelectorImpl extends SelectorImpl;
2) SelectorImpl extends AbstractSelector;
3) AbstractSelector extends Selector
4) Selector implements Closeable
WindowsSelectorImpl 类:
//poll数组和channel数组的初始容量
private final int INIT_CAP = 8;
//select操作时,每个线程处理的最大FD数量。为INIT_CAP乘以2的幂
private final static int MAX_SELECTABLE_FDS = 1024;
//由这个选择器服务的SelectableChannel的列表
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
//存放所有FD的包装器,主要用于poll操作
private PollArrayWrapper pollWrapper;
//注册到当前选择器上总的通道数量,初始化为1是因为实例化选择器时加入了wakeupSourceFd
private int totalChannels = 1;
//选择操作所需要的辅助线程数量。每增加一组MAX_SELECTABLE_FDS - 1个通道,就需要一个线程。
private int threadsCount = 0;
//辅助线程列表
private final List<SelectThread> threads = new ArrayList();
//创建一个Pipe实例,用于实现唤醒选择器的功能
private final Pipe wakeupPipe ;
//管道的read端FD,用于实现唤醒选择器的功能
private final int wakeupSourceFd;
//管道的write端FD,用于实现唤醒选择器的功能
private final int wakeupSinkFd;
//关闭锁,通常在注册、注销,关闭,修改选择键的interestOps时都存在竞态条件,主要保护channelArray、pollWrapper等
private Object closeLock = new Object();
//FD为键,SelectionKeyImpl为value的内部map,方便通过FD查找SelectionKeyImpl
private final FdMap fdMap = new FdMap();
//内部类SubSelector中封装了发起poll调用和处理poll调用结果的细节。由主线程调用
private final SubSelector subSelector = new SubSelector();
//选择器每次选择的超时参数
private long timeout;
//中断锁,用于保护唤醒选择器使用的相关竞态资源,如interruptTriggered
private final Object interruptLock = new Object();
//是否触发中断,唤醒选择器的重要标志,由interruptLock保护
private volatile boolean interruptTriggered = false;
//启动锁,当使用多线程处理选择器上Channel的就绪事件时,用于协调这些线程向内核发起系统调用
//辅助线程会在该锁上等待
private final WindowsSelectorImpl.StartLock startLock = new WindowsSelectorImpl.StartLock();
//完成锁,当使用多线程处理选择器上Channel的就绪事件时,用于协调这些线程从系统调用中返回
//主线程会在该锁上等待
private final WindowsSelectorImpl.FinishLock finishLock = new WindowsSelectorImpl.FinishLock();
//updateSelectedKeys调用计数器
//SubSelector.fdsMap中的每个条目都有一个的updateCount值。调用processFDSet时,当我们增加numKeysUpdated,
//会同步将updateCount设置为当前值。 这用于避免多次计算同一个选择键更新多次numKeysUpdated。
//同一个选择键可能出现在readfds和writefds中。
private long updateCount = 0L;WindowsSelectorImpl(SelectorProvider var1) throws IOException {// 调用 SelectorImpl 的父类构造方法 // 将 上一步WindowsSelectorProvider的openSelector() SelectorProvider 实例传入super(var1);// Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();// 禁用 Nagle 算法,当 sink 端写入 1 字节数据时,将立即发送,而不必等到将较小的包组合成较大的包再发送,// 这样 source 端就可以立马读取数据var2.sc.socket().setTcpNoDelay(true);// Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值this.wakeupSinkFd = var2.getFDVal();// Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值加入到管道数组中// 把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
WindowsSelectorImpl 类中通过socket 建立了管道连接,并将管道符进行保存;
super(var1) 调用 SelectorImpl 的父类构造方法:
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {// 调用 AbstractSelector 的父类构造方法super(var1);// publicKeys 和 publicSelectedKeys 进行映射赋值if (Util.atBugLevel("1.4")) {this.publicKeys = this.keys;this.publicSelectedKeys = this.selectedKeys;} else {this.publicKeys = Collections.unmodifiableSet(this.keys);this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);}}
SelectorImpl 的构造,对publicKeys和publicSelectedKeys 集合做了映射,方便操作selectedKeys和keys 集合可以直接影响数据;
super(var1) 调用 AbstractSelector的父类构造方法:
private final SelectorProvider provider;
protected AbstractSelector(SelectorProvider provider) {this.provider = provider;
}
因为WindowsSelectorImpl 的实例处理构造方法的调用就是对Pipe wakeupPipe = Pipe.open() 产生的文件操作符进行赋值,这里看下 Pipe.open():
Pipe 类:
public static Pipe open() throws IOException {// SelectorProvider.provider() 返回直接创建好的WindowsSelectorProvider的openSelector() SelectorProvider 实例传入// 然后调用openPipe()return SelectorProvider.provider().openPipe();
}
SelectorProviderImpl类中的openPipe():
public Pipe openPipe() throws IOException {// 返回 PipeImpl 的实例,传入创建好的WindowsSelectorProvider的openSelector() SelectorProvider 实例return new PipeImpl(this);
}
PipeImpl 的构造方法:
private static final int NUM_SECRET_BYTES = 16;
private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
private SourceChannel source;
private SinkChannel sink;PipeImpl(SelectorProvider var1) throws IOException {try {// AccessController.doPrivileged() 借用权限完成方法的执行AccessController.doPrivileged(new PipeImpl.Initializer(var1));} catch (PrivilegedActionException var3) {throw (IOException)var3.getCause();}
}
重点看下PipeImpl 内部类中 new PipeImpl.Initializer(var1),在进行构造之后,执行run方法:
private class Initializer implements PrivilegedExceptionAction<Void> {private final SelectorProvider sp;private IOException ioe;private Initializer(SelectorProvider var2) {// 异常原因this.ioe = null;// SelectorProvider 实例this.sp = var2;}public Void run() throws IOException {// 启动线程执行LoopbackConnector 的run方法PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();var1.run();if (this.ioe instanceof ClosedByInterruptException) {// 如果建立通道过程中发生了关闭异常则重新发起线程调用LoopbackConnector 的run方法this.ioe = null;Thread var2 = new Thread(var1) {public void interrupt() {}};var2.start();while(true) {try {var2.join();break;} catch (InterruptedException var4) {}}Thread.currentThread().interrupt();}if (this.ioe != null) {// 发生异常抛出异常throw new IOException("Unable to establish loopback connection", this.ioe);} else {return null;}}private class LoopbackConnector implements Runnable {private LoopbackConnector() {}public void run() {// 声明服务端的 ServerSocketChannel ServerSocketChannel var1 = null;// 声明客户端的SocketChannel SocketChannel var2 = null;SocketChannel var3 = null;try {// 声明var4 和 var5 两个内存空间分别为16个字节大小ByteBuffer var4 = ByteBuffer.allocate(16);ByteBuffer var5 = ByteBuffer.allocate(16);// 声明本机的地址InetAddress var6 = InetAddress.getByName("127.0.0.1");assert var6.isLoopbackAddress();// 声明var7 地址InetSocketAddress var7 = null;while(true) {if (var1 == null || !var1.isOpen()) {// 初始化服务端的ServerSocketChannel var1 = ServerSocketChannel.open();// 绑定服务端监听的端口ip 和端口var1.socket().bind(new InetSocketAddress(var6, 0));// 将地址赋值给var7 var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());}// 打开客户端SocketChannel ,地址为var7 建立连接var2 = SocketChannel.open(var7);// 向var4 写入随机数PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());do {// 向SocketChannel 建立的连接写入数据var2.write(var4);} while(var4.hasRemaining());// 返回此缓冲区var4.rewind();// 服务端阻塞等待连接连接var3 = var1.accept();do {// 从服务端连接中读取数据var3.read(var5);} while(var5.hasRemaining());// 返回此缓冲区var5.rewind();// 如果客户端发送的数据和服务端接收到的数据相同说通道完成建立if (var5.equals(var4)) {// 放入通道描述PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);// 跳出循环break;}// 关闭 var3.close();var2.close();}} catch (IOException var18) {try {if (var2 != null) {var2.close();}if (var3 != null) {var3.close();}} catch (IOException var17) {}// 异常原因Initializer.this.ioe = var18;} finally {try {if (var1 != null) {// 关闭var1.close();}} catch (IOException var16) {}}}}
}
- windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。
- source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0));
- 改通道的建立并不是为了确保连接,而是为了后续Select在获取内核准备好的数据时,一旦有socket返回可读/可写时间,方便唤醒对主线程的唤醒使用;
pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0):
PollArrayWrapper类:
void putDescriptor(int var1, int var2) {this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}void putEventOps(int var1, int var2) {this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}void addWakeupSocket(int var1, int var2) {this.putDescriptor(var2, var1);this.putEventOps(var2, Net.POLLIN);
}
这里将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态;
总结:
- window–Select.open() 通过加锁的方式获取进程下的唯一一个 WindowsSelectorImpl对象实例。
- WindowsSelectorImpl 的对象实例中通过 对127.0.0.1 端口为0 ,建立两个连接的方式拿到文件操作符,并把 把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里;
参考:
Java NIO 之 Selector(第一部分Selector.open());
相关文章:
Spring架构篇--2.5 远程通信基础Select 源码篇--window--Select.open()
前言:在Socket通信中使用Select 来对NIO 进行实现,那么它们的实现方式是怎样的呢,本文从 Selector.open() 进行第一步的分析; Selector.open() : Selector 类: public static Selector open() throws IOEx…...
WEB静态交互展示【数据mock】
文章目录背景需求分析实现过程1.爬取原有项目数据2.将数据引入项目3.打包收工后记背景 接到公司一个【离谱】的需求,要求把已有的项目做一个演示版本(静态文件版本);本人觉得前端、后端搞个容器包,一个演示版本不就有…...
(4)C#传智:分支Switch与循环While(第四天)
一、异常捕获 定义:语法无错,程序因某些原因出现的错误,而不能正常运行。 用try-catch进行捕获。哪行代码可能出现异常,你就踹它一脚。 try { 可能会出现异常的代码; ---- …...
Stable-Baselines 3 部分源代码解读 2 on_policy_algorithm.py
Stable-Baselines 3 部分源代码解读 ./common/on_policy_algorithm.py 前言 阅读PPO相关的源码,了解一下标准库是如何建立PPO算法以及各种tricks的,以便于自己的复现。 在Pycharm里面一直跳转,可以看到PPO类是最终继承于基类,也…...

15. Qt中OPenGL的参数传递问题
1. 说明 在OPenGL中,需要使用GLSL语言来编写着色器的函数,在顶点着色器和片段着色器之间需要参数值的传递,且在CPU中的数据也需要传递到顶点着色器中进行使用。本文简单介绍几种参数传递的方式: (本文内容仅个人理解&…...

注意,这本2区SCI期刊最快18天录用,还差一步录用只因犯了这个错
发表案例分享: 2区医学综合类SCI,仅18天录用,录用后28天见刊 2023.02.10 | 见刊 2023.01.13 | Accepted 2023.01.11 | 提交返修稿 2022.12.26 | 提交论文至期刊部系统 录用截图来源:期刊部投稿系统 见刊截图来源:…...

Could not find resource jdbc.properties问题的解决
以如下开头的内容: Exception in thread "main" org.apache.ibatis.exceptions.PersistenceException: ### Error building SqlSession. ### The error may exist in SQL Mapper Configuration 出现以上问题是没有在src/main/resources下创建jdbc.prop…...
【面试题】==与equals区别、Hashcode作用、hashcode相同equals()也一定为true吗?泛型特点与好处
文章目录1. 和 equals 的区别是什么?2.Hashcode的作用3. 两个对象的hashCode() 相同, 那么equals()也一定为 true吗?4.泛型常用特点5.使用泛型的好处?1. 和 equals 的区别是什么? “” 对于基本类型和引用类型 的作…...
Flex布局中的flex属性
1.flex-grow,flex-shrink,flex-basis取值含义 flex-grow: 延申性描述。在满足“延申条件”时,flex容器中的项目会按照设置的flex-grow值的比例来延申,占满容器剩余空间。 取值情况: 取负值无效。取0值表示不…...
SpringBoot + Ant Design Pro Vue实现动态路由和菜单的前后端分离框架
Ant Design Pro Vue默认路由和菜单配置是采用中心化的方式,在 router.config.js统一配置和管理,同时也提供了动态获取路由和菜单的解决方案,并将在2.0.3版本中提供,因到目前为止,官方发布的版本为2.0.2,所以…...

robotframework自动化测试环境搭建
环境说明 win10 python版本:3.8.3rc1 安装清单 安装配置 selenium安装 首先检查pip命令是否安装: C:\Users\name>pipUsage:pip <command> [options]Commands:install Install packages.download Do…...

尚硅谷《Redis7》(小白篇)
尚硅谷《Redis7 》(小白篇) 02 redis 是什么 官方网站: https://redis.io/ 作者 Git Hub https://github.com/antirez 03 04 05 能做什么 06 去哪下 Download https://redis.io/download/ redis中文文档 https://www.redis.com.cn/docu…...
并非从0开始的c++ day6
并非从0开始的c day6二级指针练习-文件读写位运算位逻辑运算符按位取反 ~位于(AND):&位或(OR): |位异或: ^移位运算符左移<<右移>>多维数组一维数组数组名一维数组名传入到函数参数中数组指…...
PMP考前冲刺2.22 | 2023新征程,一举拿证
承载2023新一年的好运让我们迈向PMP终点一起冲刺!一起拿证!每日5道PMP习题助大家上岸PMP!!!题目1-2:1.在新产品开发过程中,项目经理关注到行业排名第一的公司刚刚发布同类型的产品。相比竞品&am…...
RxJava的订阅过程
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0首先从最基本的Observable的创建到订阅开始分析 Observable.create(new Observable.OnSubscribe<S…...
【2.22】MySQL、Redis、动态规划
认识Redis Redis是一种基于内存的数据库,对数据的读写操作都是在内存中完成的,因此读写速度非常快,常用于缓存,消息队列,分布式锁等场景。 Redis提供了多种数据类型来支持不同的业务场景,比如String(字符串…...

2年手动测试,裸辞后找不到工作怎么办?
我们可以从以下几个方面来具体分析下,想通了,理解透了,才能更好的利用资源提升自己。一、我会什么?先说第一个我会什么?第一反应:我只会功能测试,在之前的4年的中我只做了功能测试。内心存在一种…...

Leetcode6. N字形变换
一、题目描述: 将一个给定字符串 s 根据给定的行数 numRows ,以从上往下、从左到右进行 Z 字形排列。 比如输入字符串为 “PAYPALISHIRING” 行数为 3 时,排列如下: 之后,你的输出需要从左往右逐行读取,产…...
将Nginx 核心知识点扒了个底朝天(十)
ngx_http_upstream_module的作用是什么? ngx_http_upstream_module用于定义可通过fastcgi传递、proxy传递、uwsgi传递、memcached传递和scgi传递指令来引用的服务器组。 什么是C10K问题? C10K问题是指无法同时处理大量客户端(10,000)的网络套接字。 Nginx是否支持将请求压…...
GPU显卡环境配置安装
前言 最近公司购买了一张RTX3090的显卡和一台新的服务器,然后对机器的GPU环境进行了安装和配置,然后简单记录一下 环境版本 操作系统:Centos7.8 显卡型号:RTX3090 Python版本:3.7.6 Tensorflow版本:2…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)
前言: 双亲委派机制对于面试这块来说非常重要,在实际开发中也是经常遇见需要打破双亲委派的需求,今天我们一起来探索一下什么是双亲委派机制,在此之前我们先介绍一下类的加载器。 目录 编辑 前言: 类加载器 1. …...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...

DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态
前言 在人工智能技术飞速发展的今天,深度学习与大模型技术已成为推动行业变革的核心驱动力,而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心,系统性地呈现了两部深度技术著作的精华:…...