聊聊AsyncHttpClient的ChannelPool
序
本文主要研究一下AsyncHttpClient的ChannelPool
ChannelPool
org/asynchttpclient/channel/ChannelPool.java
public interface ChannelPool {/*** Add a channel to the pool** @param channel an I/O channel* @param partitionKey a key used to retrieve the cached channel* @return true if added.*/boolean offer(Channel channel, Object partitionKey);/*** Remove the channel associated with the uri.** @param partitionKey the partition used when invoking offer* @return the channel associated with the uri*/Channel poll(Object partitionKey);/*** Remove all channels from the cache. A channel might have been associated* with several uri.** @param channel a channel* @return the true if the channel has been removed*/boolean removeAll(Channel channel);/*** Return true if a channel can be cached. A implementation can decide based* on some rules to allow caching Calling this method is equivalent of* checking the returned value of {@link ChannelPool#offer(Channel, Object)}** @return true if a channel can be cached.*/boolean isOpen();/*** Destroy all channels that has been cached by this instance.*/void destroy();/*** Flush partitions based on a predicate** @param predicate the predicate*/void flushPartitions(Predicate<Object> predicate);/*** @return The number of idle channels per host.*/Map<String, Long> getIdleChannelCountPerHost();
}
ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool
NoopChannelPool
org/asynchttpclient/channel/NoopChannelPool.java
public enum NoopChannelPool implements ChannelPool {INSTANCE;@Overridepublic boolean offer(Channel channel, Object partitionKey) {return false;}@Overridepublic Channel poll(Object partitionKey) {return null;}@Overridepublic boolean removeAll(Channel channel) {return false;}@Overridepublic boolean isOpen() {return true;}@Overridepublic void destroy() {}@Overridepublic void flushPartitions(Predicate<Object> predicate) {}@Overridepublic Map<String, Long> getIdleChannelCountPerHost() {return Collections.emptyMap();}
}
NoopChannelPool是个枚举,用枚举实现了单例,其方法默认为空操作
DefaultChannelPool
/*** A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}*/
public final class DefaultChannelPool implements ChannelPool {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;private final AtomicBoolean isClosed = new AtomicBoolean(false);private final Timer nettyTimer;private final int connectionTtl;private final boolean connectionTtlEnabled;private final int maxIdleTime;private final boolean maxIdleTimeEnabled;private final long cleanerPeriod;private final PoolLeaseStrategy poolLeaseStrategy;public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {this(config.getPooledConnectionIdleTimeout(),config.getConnectionTtl(),hashedWheelTimer,config.getConnectionPoolCleanerPeriod());}public DefaultChannelPool(int maxIdleTime,int connectionTtl,Timer nettyTimer,int cleanerPeriod) {this(maxIdleTime,connectionTtl,PoolLeaseStrategy.LIFO,nettyTimer,cleanerPeriod);}public DefaultChannelPool(int maxIdleTime,int connectionTtl,PoolLeaseStrategy poolLeaseStrategy,Timer nettyTimer,int cleanerPeriod) {this.maxIdleTime = maxIdleTime;this.connectionTtl = connectionTtl;connectionTtlEnabled = connectionTtl > 0;channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;this.nettyTimer = nettyTimer;maxIdleTimeEnabled = maxIdleTime > 0;this.poolLeaseStrategy = poolLeaseStrategy;this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));if (connectionTtlEnabled || maxIdleTimeEnabled)scheduleNewIdleChannelDetector(new IdleChannelDetector());}//......
}
DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行
offer
public boolean offer(Channel channel, Object partitionKey) {if (isClosed.get())return false;long now = unpreciseMillisTime();if (isTtlExpired(channel, now))return false;boolean offered = offer0(channel, partitionKey, now);if (connectionTtlEnabled && offered) {registerChannelCreation(channel, partitionKey, now);}return offered;}private boolean isTtlExpired(Channel channel, long now) {if (!connectionTtlEnabled)return false;ChannelCreation creation = channelId2Creation.get(channel.id());return creation != null && now - creation.creationTime >= connectionTtl;} private boolean offer0(Channel channel, Object partitionKey, long now) {ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);if (partition == null) {partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());}return partition.offerFirst(new IdleChannel(channel, now));} private void registerChannelCreation(Channel channel, Object partitionKey, long now) {ChannelId id = channel.id();if (!channelId2Creation.containsKey(id)) {channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey));}}
offer接口先判断isTtlExpired,如果channel的存活时间超过connectionTtl则返回false,否则执行offer0,往ConcurrentLinkedDeque添加,若添加成功且connectionTtlEnabled则执行registerChannelCreation,维护创建时间
poll
/*** {@inheritDoc}*/public Channel poll(Object partitionKey) {IdleChannel idleChannel = null;ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);if (partition != null) {while (idleChannel == null) {idleChannel = poolLeaseStrategy.lease(partition);if (idleChannel == null)// pool is emptybreak;else if (!Channels.isChannelActive(idleChannel.channel)) {idleChannel = null;LOGGER.trace("Channel is inactive, probably remotely closed!");} else if (!idleChannel.takeOwnership()) {idleChannel = null;LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");}}}return idleChannel != null ? idleChannel.channel : null;}
poll方法是根据partitionKey找到对应的ConcurrentLinkedDeque,然后循环执行poolLeaseStrategy.lease(partition),若idleChannel为null直接break,若isChannelActive为false则重置为null继续循环,若idleChannel.takeOwnership()为false也重置为null继续循环
removeAll
/*** {@inheritDoc}*/public boolean removeAll(Channel channel) {ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null;return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE));}
removeAll方法会将指定的channel从channelId2Creation及ConcurrentLinkedDeque中移除
isOpen
/*** {@inheritDoc}*/public boolean isOpen() {return !isClosed.get();}
isOpen则取的isClosed变量
destroy
/*** {@inheritDoc}*/public void destroy() {if (isClosed.getAndSet(true))return;partitions.clear();if (connectionTtlEnabled) {channelId2Creation.clear();}}
destroy会设置isClosed为true,然后清空partitions及channelId2Creation
flushPartitions
public void flushPartitions(Predicate<Object> predicate) {for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {Object partitionKey = partitionsEntry.getKey();if (predicate.test(partitionKey))flushPartition(partitionKey, partitionsEntry.getValue());}}private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {if (partition != null) {partitions.remove(partitionKey);for (IdleChannel idleChannel : partition)close(idleChannel.channel);}}private void close(Channel channel) {// FIXME pity to have to do this hereChannels.setDiscard(channel);if (connectionTtlEnabled) {channelId2Creation.remove(channel.id());}Channels.silentlyCloseChannel(channel);}
flushPartitions会遍历partitions,然后执行predicate.test,为true则执行flushPartition,它将从partitions移除指定的partitionKey,然后遍历idleChannels挨个执行close
getIdleChannelCountPerHost
public Map<String, Long> getIdleChannelCountPerHost() {return partitions.values().stream().flatMap(ConcurrentLinkedDeque::stream).map(idle -> idle.getChannel().remoteAddress()).filter(a -> a.getClass() == InetSocketAddress.class).map(a -> (InetSocketAddress) a).map(InetSocketAddress::getHostName).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));}
getIdleChannelCountPerHost则遍历partitions,然后map出remoteAddress获取hostName,然后进行groupBy
PoolLeaseStrategy
public enum PoolLeaseStrategy {LIFO {public <E> E lease(Deque<E> d) {return d.pollFirst();}},FIFO {public <E> E lease(Deque<E> d) {return d.pollLast();}};abstract <E> E lease(Deque<E> d);}
PoolLeaseStrategy是个枚举,定义了LIFO及FIFO两个枚举,LIFO则是对Deque执行pollFirst,FIFO则是对Deque执行pollLast
IdleChannelDetector
private final class IdleChannelDetector implements TimerTask {private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;}private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {// lazy createList<IdleChannel> idleTimeoutChannels = null;for (IdleChannel idleChannel : partition) {boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel);boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);if (idleTimeoutChannels == null)idleTimeoutChannels = new ArrayList<>(1);idleTimeoutChannels.add(idleChannel);}}return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList();}private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {// lazy create, only if we hit a non-closeable channelList<IdleChannel> closedChannels = null;for (int i = 0; i < candidates.size(); i++) {// We call takeOwnership here to avoid closing a channel that has just been taken out// of the pool, otherwise we risk closing an active connection.IdleChannel idleChannel = candidates.get(i);if (idleChannel.takeOwnership()) {LOGGER.debug("Closing Idle Channel {}", idleChannel.channel);close(idleChannel.channel);if (closedChannels != null) {closedChannels.add(idleChannel);}} else if (closedChannels == null) {// first non closeable to be skipped, copy all// previously skipped closeable channelsclosedChannels = new ArrayList<>(candidates.size());for (int j = 0; j < i; j++)closedChannels.add(candidates.get(j));}}return closedChannels != null ? closedChannels : candidates;}public void run(Timeout timeout) {if (isClosed.get())return;if (LOGGER.isDebugEnabled())for (Object key : partitions.keySet()) {int size = partitions.get(key).size();if (size > 0) {LOGGER.debug("Entry count for : {} : {}", key, size);}}long start = unpreciseMillisTime();int closedCount = 0;int totalCount = 0;for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {// store in intermediate unsynchronized lists to minimize// the impact on the ConcurrentLinkedDequeif (LOGGER.isDebugEnabled())totalCount += partition.size();List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));if (!closedChannels.isEmpty()) {if (connectionTtlEnabled) {for (IdleChannel closedChannel : closedChannels)channelId2Creation.remove(closedChannel.channel.id());}partition.removeAll(closedChannels);closedCount += closedChannels.size();}}if (LOGGER.isDebugEnabled()) {long duration = unpreciseMillisTime() - start;if (closedCount > 0) {LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration);}}scheduleNewIdleChannelDetector(timeout.task());}}
IdleChannelDetector实现了netty的TimerTask接口,其run方法主要是遍历partitions,通过expiredChannels取出过期的IdleChannel,这里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在内,然后挨个执行takeOwnership及close,再从channelId2Creation及partition中移除,最后再次调度一下IdleChannelDetector
小结
AsyncHttpClient的ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行。
poll方法会判断是active,不是的话继续循环lease,而IdleChannelDetector则会定期检查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都会被close,offer的时候还会判断isTtlExpired,这样子来保证连接的活性。
相关文章:
聊聊AsyncHttpClient的ChannelPool
序 本文主要研究一下AsyncHttpClient的ChannelPool ChannelPool org/asynchttpclient/channel/ChannelPool.java public interface ChannelPool {/*** Add a channel to the pool** param channel an I/O channel* param partitionKey a key used to retrieve the cac…...

[MySQL] MySQL复合查询(多表查询、子查询)
前面我们学习了MySQL简单的单表查询。但是我们发现,在很多情况下单表查询并不能很好的满足我们的查询需求。本篇文章会重点讲解MySQL中的多表查询、子查询和一些复杂查询。希望本篇文章会对你有所帮助。 文章目录 一、基本查询回顾 二、多表查询 2、1 笛卡尔积 2、2…...

[架构之路-256]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 架构设计 - 软件系统不同层次的复用与软件系统向越来越复杂的方向聚合
目录 前言: 一、CPU寄存器级的复用:CPU寄存器 二、指令级复用:二进制指令 三、过程级复用:汇编语言 四、函数级复用:C语言 五、对象级复用:C, Java, Python 六、组件级复用 七、服务级复用 八、微…...
C++初学教程三
目录 一、运算符 一、自增自减运算符 二、位运算符 三、关系运算符...
雷达点云数据.pcd格式转.bin格式
雷达点云数据.pcd格式转.bin格式 注意,方法1原则上可行,但是本人没整好pypcd的环境 方法2是绝对可以的。 方法1 1 源码如下: def pcb2bin1(): # save as bin formatimport os# import pypcdfrom pypcd import pypcdimport numpy as np…...
Fiddler抓包测试
模拟弱网测试 操作:一、Rules - Customize Rules (快捷键CtrlR)弹出编辑器 二、接着CtrlF查找m_SimulateModem标志位 三、默认上传300ms,下载150ms 四、更改后,继续Rules - Performances - Simulate Modem Speeds勾上 …...

视频处理关键知识
1 引言 视频技术发展到现在已经有100多年的历史,虽然比照相技术历史时间短,但在过去很长一段时间之内都是最重要的媒体。由于互联网在新世纪的崛起,使得传统的媒体技术有了更好的发展平台,应运而生了新的多媒体技术。而多媒体技术…...
LeetCode435. Non-overlapping Intervals
文章目录 一、题目二、题解 一、题目 Given an array of intervals intervals where intervals[i] [starti, endi], return the minimum number of intervals you need to remove to make the rest of the intervals non-overlapping. Example 1: Input: intervals [[1,2]…...
ffmpeg 实现多视频轨录制到同一个文件
引言 在视频录制中,有时会碰到这样一个需求,将不同摄像头的画面写入到一个视频文件,这个叫法很多,有的厂家叫合流模式,有的叫多画面多流模式。无论如何,它们的实质都是在一个视频文件上实现多路不同分辨率视…...

vue3中子组件调用父组件的方法
<script lang"ts" setup>前提 父组件: 子组件: const emit defineEmits([closeson]) 在子组件的方法中使用: emit(closeson)...
使用OkHttp上传本地图片及参数
下面以一个例子来讲解在项目中如何使用OKHttp来对本地图片做个最简单的上传功能,基本上无封装,只需要简单调用便可(对于OKHttp的引入不再单独做介绍)。 1:构建上传图片附带的参数(params) Map…...

无公网IP环境如何SSH远程连接Deepin操作系统
文章目录 前言1. 开启SSH服务2. Deppin安装Cpolar3. 配置ssh公网地址4. 公网远程SSH连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 前言 Deepin操作系统是一个基于Debian的Linux操作系统,专注于使用者对日常办公、学习、生活和娱乐的操作体验的极致࿰…...

不会代码(零基础)学语音开发(语音控制板载双继电器)
继电器的用途可广了,这个语音控制用处也特别广。继电器,它实际上是一种“自动开关”,用小电流去控制大电流运作,在电路中起着自动调节、安全保护、转换电路等作用。 在日常生活中,你插入汽车钥匙,车辆可以…...

在imx6ull中加入ov5640模块
本来觉得是一件很简单的事情但是走了很多的弯路,记录一下调试过程。 先使用正点原子提供的出厂内核把摄像头影像调试出来,然后cat /dev/video1,看一下video1牵扯到哪些模块,可以看到需要ov5640_camera.ko和 mx6s_capture.ko这两个…...

Kafka中的auto-offset-reset配置
Kafka这个服务在启动时会依赖于Zookeeper,Kafka相关的部分数据也会存储在Zookeeper中。如果kafka或者Zookeeper中存在脏数据的话(即错误数据),这个时候虽然生产者可以正常生产消息,但是消费者会出现无法正常消费消息的…...

TCP/IP_整理起因
先分享一个初级的问题;有个客户现场,终端设备使用客户网络更新很慢,使用手机热点更新速度符合预期;网络部署情况如下: 前期花费了很大的精力进行问题排查对比,怀疑是客户网络问题(其他的客户现…...

CG-0A 电子水尺水导电测量原理应用于道路积水监测
CG-0A 电子水尺水导电测量原理应用于道路积水监测产品概述 本产品是一种采用微处理器芯片为控制器,内置通讯电路的数字式水位传感器,具备高的可靠性及抗干扰性能。适用于江、河、湖、水库及蓄水池、水渠等处的水位测量使用。 本产品采用了生产工艺技术…...

openEuler JDK21 部署 Zookeeper 集群
zookeeper-jdk21 操作系统:openEuler JDK:21 主机名IP地址spark01192.168.171.101spark02192.168.171.102spark03192.168.171.103 安装 1. 升级内核和软件 yum -y update2. 安装常用软件 yum -y install gcc gcc-c autoconf automake cmake make \zl…...

前端——html拖拽原理
文章目录 ⭐前言⭐draggable属性💖 api💖 单向拖动示例💖 双向拖动示例 ⭐总结⭐结束 ⭐前言 大家好,我是yma16,本文分享关于 前端——html拖拽原理。 vue3系列相关文章: vue3 fastapi 实现选择目录所有文…...

JVM 执行引擎篇
机器码、指令、汇编语言 机器码 各种用二进制编码方式表示的指令,叫做机器指令码。开始,人们就用它采编写程序,这就是机器语言。机器语言虽然能够被计算机理解和接受,但和人们的语言差别太大,不易被人们理解和记忆&a…...

手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...