当前位置: 首页 > news >正文

聊聊httpclient的CPool

本文主要研究一下httpclient的CPool

ConnPool

org/apache/http/pool/ConnPool.java

public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** @param route route of the connection.* @param state arbitrary object that represents a particular state*  (usually a security principal or a unique token identifying*  the user whose credentials have been used while establishing the connection).*  May be {@code null}.* @param callback operation completion callback.** @return future for a leased pool entry.*/Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);/*** Releases the pool entry back to the pool.** @param entry pool entry leased from the pool* @param reusable flag indicating whether or not the released connection*   is in a consistent state and is safe for further use.*/void release(E entry, boolean reusable);}

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry

ConnPoolControl

public interface ConnPoolControl<T> {void setMaxTotal(int max);int getMaxTotal();void setDefaultMaxPerRoute(int max);int getDefaultMaxPerRoute();void setMaxPerRoute(final T route, int max);int getMaxPerRoute(final T route);PoolStats getTotalStats();PoolStats getStats(final T route);}

ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法

AbstractConnPool

org/apache/http/pool/AbstractConnPool.java

@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>implements ConnPool<T, E>, ConnPoolControl<T> {private final Lock lock;private final Condition condition;private final ConnFactory<T, C> connFactory;private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;private final Set<E> leased;private final LinkedList<E> available;private final LinkedList<Future<E>> pending;private final Map<T, Integer> maxPerRoute;private volatile boolean isShutDown;private volatile int defaultMaxPerRoute;private volatile int maxTotal;private volatile int validateAfterInactivity;public AbstractConnPool(final ConnFactory<T, C> connFactory,final int defaultMaxPerRoute,final int maxTotal) {super();this.connFactory = Args.notNull(connFactory, "Connection factory");this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");this.maxTotal = Args.positive(maxTotal, "Max total value");this.lock = new ReentrantLock();this.condition = this.lock.newCondition();this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();this.leased = new HashSet<E>();this.available = new LinkedList<E>();this.pending = new LinkedList<Future<E>>();this.maxPerRoute = new HashMap<T, Integer>();}/*** Creates a new entry for the given connection with the given route.*/protected abstract E createEntry(T route, C conn);//......}   

AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType

shutdown

    public void shutdown() throws IOException {if (this.isShutDown) {return ;}this.isShutDown = true;this.lock.lock();try {for (final E entry: this.available) {entry.close();}for (final E entry: this.leased) {entry.close();}for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {pool.shutdown();}this.routeToPool.clear();this.leased.clear();this.available.clear();} finally {this.lock.unlock();}}

shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown

lease方法

    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {Args.notNull(route, "Route");Asserts.check(!this.isShutDown, "Connection pool shut down");return new Future<E>() {private final AtomicBoolean cancelled = new AtomicBoolean(false);private final AtomicBoolean done = new AtomicBoolean(false);private final AtomicReference<E> entryRef = new AtomicReference<E>(null);@Overridepublic boolean cancel(final boolean mayInterruptIfRunning) {if (done.compareAndSet(false, true)) {cancelled.set(true);lock.lock();try {condition.signalAll();} finally {lock.unlock();}if (callback != null) {callback.cancelled();}return true;}return false;}@Overridepublic boolean isCancelled() {return cancelled.get();}@Overridepublic boolean isDone() {return done.get();}@Overridepublic E get() throws InterruptedException, ExecutionException {try {return get(0L, TimeUnit.MILLISECONDS);} catch (final TimeoutException ex) {throw new ExecutionException(ex);}}@Overridepublic E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {for (;;) {synchronized (this) {try {final E entry = entryRef.get();if (entry != null) {return entry;}if (done.get()) {throw new ExecutionException(operationAborted());}final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);if (validateAfterInactivity > 0)  {if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {if (!validate(leasedEntry)) {leasedEntry.close();release(leasedEntry, false);continue;}}}if (done.compareAndSet(false, true)) {entryRef.set(leasedEntry);done.set(true);onLease(leasedEntry);if (callback != null) {callback.completed(leasedEntry);}return leasedEntry;} else {release(leasedEntry, true);throw new ExecutionException(operationAborted());}} catch (final IOException ex) {if (done.compareAndSet(false, true)) {if (callback != null) {callback.failed(ex);}}throw new ExecutionException(ex);}}}}};}

lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法

getPoolEntryBlocking

org/apache/http/pool/AbstractConnPool.java

    private E getPoolEntryBlocking(final T route, final Object state,final long timeout, final TimeUnit timeUnit,final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {Date deadline = null;if (timeout > 0) {deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));}this.lock.lock();try {final RouteSpecificPool<T, C, E> pool = getPool(route);E entry;for (;;) {Asserts.check(!this.isShutDown, "Connection pool shut down");if (future.isCancelled()) {throw new ExecutionException(operationAborted());}for (;;) {entry = pool.getFree(state);if (entry == null) {break;}if (entry.isExpired(System.currentTimeMillis())) {entry.close();}if (entry.isClosed()) {this.available.remove(entry);pool.free(entry, false);} else {break;}}if (entry != null) {this.available.remove(entry);this.leased.add(entry);onReuse(entry);return entry;}// New connection is neededfinal int maxPerRoute = getMax(route);// Shrink the pool prior to allocating a new connectionfinal int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);if (excess > 0) {for (int i = 0; i < excess; i++) {final E lastUsed = pool.getLastUsed();if (lastUsed == null) {break;}lastUsed.close();this.available.remove(lastUsed);pool.remove(lastUsed);}}if (pool.getAllocatedCount() < maxPerRoute) {final int totalUsed = this.leased.size();final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);if (freeCapacity > 0) {final int totalAvailable = this.available.size();if (totalAvailable > freeCapacity - 1) {if (!this.available.isEmpty()) {final E lastUsed = this.available.removeLast();lastUsed.close();final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());otherpool.remove(lastUsed);}}final C conn = this.connFactory.create(route);entry = pool.add(conn);this.leased.add(entry);return entry;}}boolean success = false;try {pool.queue(future);this.pending.add(future);if (deadline != null) {success = this.condition.awaitUntil(deadline);} else {this.condition.await();success = true;}if (future.isCancelled()) {throw new ExecutionException(operationAborted());}} finally {// In case of 'success', we were woken up by the// connection pool and should now have a connection// waiting for us, or else we're shutting down.// Just continue in the loop, both cases are checked.pool.unqueue(future);this.pending.remove(future);}// check for spurious wakeup vs. timeoutif (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {break;}}throw new TimeoutException("Timeout waiting for connection");} finally {this.lock.unlock();}}

getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建

release

    @Overridepublic void release(final E entry, final boolean reusable) {this.lock.lock();try {if (this.leased.remove(entry)) {final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());pool.free(entry, reusable);if (reusable && !this.isShutDown) {this.available.addFirst(entry);} else {entry.close();}onRelease(entry);Future<E> future = pool.nextPending();if (future != null) {this.pending.remove(future);} else {future = this.pending.poll();}if (future != null) {this.condition.signalAll();}}} finally {this.lock.unlock();}}

release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

CPool

org/apache/http/impl/conn/CPool.java

@Contract(threading = ThreadingBehavior.SAFE)
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {private static final AtomicLong COUNTER = new AtomicLong();private final Log log = LogFactory.getLog(CPool.class);private final long timeToLive;private final TimeUnit timeUnit;public CPool(final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,final int defaultMaxPerRoute, final int maxTotal,final long timeToLive, final TimeUnit timeUnit) {super(connFactory, defaultMaxPerRoute, maxTotal);this.timeToLive = timeToLive;this.timeUnit = timeUnit;}@Overrideprotected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {final String id = Long.toString(COUNTER.getAndIncrement());return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);}@Overrideprotected boolean validate(final CPoolEntry entry) {return !entry.getConnection().isStale();}@Overrideprotected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumAvailable(callback);}@Overrideprotected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {super.enumLeased(callback);}}

CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale

小结

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。

AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

相关文章:

聊聊httpclient的CPool

序 本文主要研究一下httpclient的CPool ConnPool org/apache/http/pool/ConnPool.java public interface ConnPool<T, E> {/*** Attempts to lease a connection for the given route and with the given* state from the pool.** param route route of the connecti…...

B2主题优化:WordPress文章每次访问随机增加访问量

老站长都知道&#xff0c;一个新站刚开始创建&#xff0c;内容也不多的时候&#xff0c;用户进来看到文章浏览量要么是0&#xff0c;要么是 个位数&#xff0c;非常影响体验&#xff0c;就会有一种“这个网站没人气&#xff0c;看来不行”的感觉。 即使你的内容做的很好&#x…...

大模型部署手记(1)ChatGLM2+Windows GPU

1.简介&#xff1a; 组织机构&#xff1a;智谱/清华 代码仓&#xff1a;https://github.com/THUDM/ChatGLM2-6B 模型&#xff1a;THUDM/chatglm2-6b 下载&#xff1a;https://huggingface.co/THUDM/chatglm2-6b 镜像下载&#xff1a;https://aliendao.cn/models/THUDM/chat…...

Rust Rocket: 构建Restful服务项目实战

前言 这几天我的笔记系统开发工作进入了搬砖期&#xff0c;前端基于Yew&#xff0c;后端基于Rocket。关于Rocket搭建Restful服务&#xff0c;官方也有介绍&#xff0c;感觉很多细节不到位。因此我打算花2到3天的时间来整理一下&#xff0c;也算是对自己的一个交代。 对于有一…...

苹果签名有多少种类之TF签名(TestFlight签名)是什么?优势是什么?什么场合需要应用到?

&#xff08;一&#xff09;TestFlight 能够让您&#xff1a;邀请内部和外部的测试人员为应用程序提供反馈。 跟踪应用程序在测试过程中发现的 bug 和用户体验问题。 收集 Crash 报告&#xff0c;了解应用程序在真实设备上的运行状况。 要使用 TestFlight&#xff0c;您可以按照…...

如何将图片存到数据库(以mysql为例), 使用ORM Bee更加简单

如何将图片存到数据库 1. 创建数据库: 2. 生成Javabean public class ImageExam implements Serializable {private static final long serialVersionUID 1596686274309L;private Integer id;private String name; // private Blob image;private InputStream image; //将In…...

【“栈、队列”的应用】408数据结构代码

王道数据结构强化课——【“栈、队列”的应用】代码&#xff0c;持续更新 链式存储栈&#xff08;单链表实现&#xff09;&#xff0c;并基于上述定义&#xff0c;栈顶在链头&#xff0c;实现“出栈、入栈、判空、判满”四个基本操作 #include <stdio.h> #include <…...

es的nested查询

一、一层嵌套 mapping: PUT /nested_example {"mappings": {"properties": {"name": {"type": "text"},"books": {"type": "nested","properties": {"title": {"t…...

<一>Qt斗地主游戏开发:开发环境搭建--VS2019+Qt5.15.2

1. 开发环境概述 对于Qt的开发环境来说&#xff0c;主流编码IDE界面一般有两种&#xff1a;Qt Creator或VSQt。为了简单起见&#xff0c;这里的操作系统限定为windows&#xff0c;编译器也通用VS了。Qt版本的话自己选择就可以了&#xff0c;当然VS的版本也是依据Qt版本来选定的…...

python:进度条的使用(tqdm)

摘要&#xff1a;为python程序进度条&#xff0c;可以知道程序运行进度。 python中&#xff0c;常用的进度条模块是tqdm&#xff0c;将介绍tqdm的安装和使用 1、安装tqdm: pip install tqdm2、tqdm的使用&#xff1a; &#xff08;1&#xff09;在for循环中的使用&#xff1…...

Java类型转换和类型提升

目录 一、类型转换 1.1 自动类型转换&#xff08;隐式&#xff09; 1.1.1 int 与 long 之间 1.1.2 float 与 double 之间 1.1.3 int 与 byte 之间 1.2 强制类型转换&#xff08;显示&#xff09; 1.2.1 int 与 long 之间 1.2.2 float 与 double 之间 1.2.3 int 与 d…...

C# 读取 Excel xlsx 文件,显示在 DataGridView 中

编写 read_excel.cs 如下 using System; using System.Collections.Generic; using System.ComponentModel; using System.IO; using System.Data; using System.Linq; using System.Text; using System.Data.OleDb;namespace ReadExcel {public partial class Program{static…...

Docker02基本管理

目录 1、Docker 网络 1.1 Docker 网络实现原理 1.2 Docker 的网络模式 1.3 网络模式详解 1.4 资源控制 1.5 进行CPU压力测试 1.6 清理docker占用的磁盘空间 1.7 生产扩展 1、Docker 网络 1.1 Docker 网络实现原理 Docker使用Linux桥接&#xff0c;在宿主机虚拟一个Docke…...

Scala第十章

Scala第十章 章节目标 1.数组 2.元组 3.列表 4.集 5.映射 6.迭代器 7.函数式编程 8.案例&#xff1a;学生成绩单 scala总目录 文档资料下载...

10.4 校招 实习 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | 集度2024届秋招正式启动&#xff08;内推&#xff09; 校招 | 集度2024届秋招正式启动&#xff08;内推&#xff09; 2、校招 | 道通科技2024秋季校园招聘正式启动啦&#xff01; …...

从0开始深入理解并发、线程与等待通知机制(中)

一&#xff0c;深入学习 Java 的线程 线程的状态/生命周期 Java 中线程的状态分为 6 种&#xff1a; 1. 初始(NEW)&#xff1a;新创建了一个线程对象&#xff0c;但还没有调用 start()方法。 2. 运行(RUNNABLE)&#xff1a;Java 线程中将就绪&#xff08;ready&#xff09;和…...

UE5报错及解决办法

1、编译报错&#xff0c;内容如下&#xff1a; Unable to build while Live Coding is active. Exit the editor and game, or press CtrlAltF11 if iterating on code in the editor or game 解决办法 取消Enable Live Coding勾选...

怎么通过docker/portainer部署vue项目

这篇文章分享一下如何通过docker将vue项目打包成镜像文件&#xff0c;并使用打包的镜像在docker/portainer上部署运行&#xff0c;写这篇文章参考了vue-cli和docker的官方文档。 首先&#xff0c;阅读vue-cli关于docker部署的说明&#xff0c;上面提供了关键的几个步骤。 从上面…...

【面试经典150 | 矩阵】旋转图像

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;原地旋转方法二&#xff1a;翻转代替旋转 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带…...

机器人制作开源方案 | 家庭清扫拾物机器人

作者&#xff1a;罗诚、李旭洋、胡旭、符粒楷 单位&#xff1a;南昌交通学院 人工智能学院 指导老师&#xff1a;揭吁菡 在家庭中我们有时无法到一些低矮阴暗的地方进行探索&#xff0c;比如茶几下或者床底下&#xff0c;特别是在部分家庭中&#xff0c;如果没有及时对这些阴…...

Midjourney 2026将取消/imagine?不,它正悄悄部署「自然语言-图像-3D资产」三合一原生工作流(附实测对比数据)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Midjourney 2026战略转向&#xff1a;从文本生成图像到原生三维资产创作范式跃迁 Midjourney 在 2026 年正式终止对纯 2D 图像输出的默认支持&#xff0c;全面启用 v6.5 “Tesseract” 引擎&#xff0c…...

016、气压计原理与高度测量

飞控算法从入门到精通 016 气压计原理与高度测量 一、一次炸机带来的教训 去年夏天,我在一个四轴飞行器上调试定高悬停。气压计用的是MS5611,数据手册翻烂了,滤波算法也上了,地面站里高度曲线看着挺平滑。结果一上天,飞机像喝醉了酒——先是莫名其妙往下掉半米,然后猛…...

BetterGI自动化工具:每天为原神玩家节省2小时

BetterGI自动化工具&#xff1a;每天为原神玩家节省2小时 【免费下载链接】better-genshin-impact &#x1f4e6;BetterGI 更好的原神 - 自动拾取 | 自动剧情 | 全自动钓鱼(AI) | 全自动七圣召唤 | 自动伐木 | 自动刷本 | 自动采集/挖矿/锄地 | 一条龙 | 全连音游 | 自动烹饪 …...

5步解决网易云音乐NCM文件难题:ncmdumpGUI实战指南

5步解决网易云音乐NCM文件难题&#xff1a;ncmdumpGUI实战指南 【免费下载链接】ncmdumpGUI C#版本网易云音乐ncm文件格式转换&#xff0c;Windows图形界面版本 项目地址: https://gitcode.com/gh_mirrors/nc/ncmdumpGUI 你是否曾经遇到过这样的情况&#xff1a;在网易…...

芯片入门必看:CPU、MCU、SoC、GPU、TPU、NPU

本文首先介绍了芯片的基础分类&#xff0c;包括模拟/数字芯片和逻辑/计算芯片。接着&#xff0c;对8类核心芯片进行了通俗解析&#xff0c;包括CPU、MCU、SoC、GPU、TPU、NPU、FPGA和DSP&#xff0c;涵盖了它们的定义、用途、类型和代表性标的。最后&#xff0c;文章从通用性和…...

独立开发者如何借助taotoken模型广场低成本验证产品创意

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 独立开发者如何借助Taotoken模型广场低成本验证产品创意 对于资源有限的独立开发者或小型工作室而言&#xff0c;验证一个需要AI功…...

YOLO26改进| downsample |网络深层多分支互补鲁棒下采样模块

&#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 本文给大家带来的教程是将YOLO26的下采样替换为DRFD来提取特征。文章在介绍主要的原理后&#xff0c;将手把手教学如何进行模块的代码添加和修…...

Kafka高效的原因

Kafka高效的原因Kafka的高效性源于其独特的架构设计和多项优化技术&#xff0c;以下是关键因素&#xff1a;分布式架构与分区机制 Kafka采用分布式设计&#xff0c;主题&#xff08;Topic&#xff09;被划分为多个分区&#xff08;Partition&#xff09;&#xff0c;每个分区可…...

从零到一:OWASP ZAP实战渗透测试全流程解析

1. OWASP ZAP入门&#xff1a;渗透测试的瑞士军刀 第一次接触OWASP ZAP时&#xff0c;我完全被它复杂的界面吓到了。但用了三个月后&#xff0c;我发现这简直是Web安全测试的"瑞士军刀"——功能强大但需要正确打开方式。简单来说&#xff0c;ZAP就是个会自动帮你找网…...

泰拉瑞亚整合包下载灾厄大杂烩整合包2026最新版下载

1. 游戏基础介绍 《泰拉瑞亚》是一款经典的二维像素风格沙盒冒险游戏。游戏拥有极高的自由度&#xff0c;玩家可以自由探索地图、收集资源、建造房屋、打造装备、挑战BOSS。凭借自由开放的玩法、丰富的道具体系和独特的冒险氛围&#xff0c;这款游戏长久以来备受玩家喜爱。原版…...