手写一个简单的线程池
手写一个简单的线程池
项目仓库:https://gitee.com/bossDuy/hand-tearing-thread-pool
基于一个b站up的课程:https://www.bilibili.com/video/BV1cJf2YXEw3/?spm_id_from=333.788.videopod.sections&vd_source=4cda4baec795c32b16ddd661bb9ce865
理解线程池的原理
线程池就是为了减少频繁的创建和销毁线程带来的性能损耗,工作原理:
在这里插入图片描述
简单的说:线程池就是有一个存放线程的集合和一个存放任务的阻塞队列。当提交一个任务的时候,判断核心线程是否满了,没满就会创建一个核心线程加入线程池并且执行任务,核心线程是不会被销毁的即使没有任务执行;满了就会放入任务队列等待;如果队列满了的话就会创建非核心线程进行执行任务,这些非核心线程在不执行任务的时候就会等一段时间销毁(配置的过期时间),如果创建的线程达到了最大线程数,那么就会执行拒绝策略。
可以简要整理如下:
提交任务 -> 核心任务是否已满为满,创建核心线程并执行任务已满,则加入任务队列队列未满 -> 等待执行队列已满 -> 创建非核心线程达到线程最大数量 -> 拒绝策略未达到最大数量 -> 执行任务
自己实现简单的线程池
第一步:实现了一个线程复用的线程池
package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class MyThreadPool {//1、线程什么时候创建?/**核心线程中我们要保证线程是可以复用的,那么就不可以直接new Thread(task).start(); 这样执行完task线程就会被销毁了我们将接收到的任务对象放到队列中,然后线程从队列中取出任务,通过任务的run方法进行调用,这样就是在该线程上调用任务,并且调用完后不会销毁线程*///2、我们一开始使用 while (true) if(!tasks.isEmpty()) Runnable task = tasks.remove(0);/**这样如果任务队列一直为空就会一循环,消耗cpu资源。此时就是阻塞队列出现了,当为空阻塞等待 非空执行*/BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1024);Thread thread = new Thread(()->{while (true){if(!taskQueue.isEmpty()){try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}},"唯一线程");{thread.start();//启动线程}public void execute(Runnable task){taskQueue.offer(task);//向队列添加元素 尽量是否offer 满则返回false add满则排除异常}
}
package com.yb0os1;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool();for (int i = 0; i < 5; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕");});}System.out.println("主线程没有被阻塞");}
}
测试结果:
第二步:实现多个线程复用的线程池
package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {//任务队列private final BlockingQueue<Runnable> taskQueue;//核心线程的数量private final int corePoolSize;//最大线程的数量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;}//核心线程List<Thread> coreList = new ArrayList<>();//非核心线程List<Thread> supportList = new ArrayList<>();//添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决public void execute(Runnable command) {//目前线程列表中线程数量小于核心线程的数量,则创建线程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread();coreList.add(thread);thread.start();
// return;}//成功添加到阻塞队列if (taskQueue.offer(command)) {return;}//任务队列也满了 需要创建非核心线程//核心线程满 任务队列满 但是非核心线程没有满才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread();supportList.add(thread);thread.start();return;}//我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了if (!taskQueue.offer(command)) {//真的满了 抛出异常throw new RuntimeException("线程池已满");}}class CoreThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回nullif (command == null) {//线程结束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心线程结束");supportList.remove(Thread.currentThread());System.out.println("当前非核心线程数量为:" + supportList.size());}}
}
package com.yb0os1;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2));for (int i = 0; i < 4; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕");});}System.out.println("主线程没有被阻塞");}
}
存在问题,任务没有被正确的执行:
b站评论区指出的:if (blockingQueue.offer(command)) { return; }
这里如果任务成功放入队列,方法就直接 return 了。 但在 创建 SupportThread 的逻辑中,没有保证这个任务会被执行,因为 offer() 失败后你才创建新线程。 但 command 并没有交给这个新线程,而是再次尝试 offer(),如果失败就直接走拒绝策略了。 这样的话,可能 SupportThread 已经启动,但任务却没被执行。
理解:如果队列满了,我们创建非核心线程,但是并没有将这任务直接交给我们创建的新线程,而是再次尝试加入队列中,这就导致了一个不确定的状态:
- 如果此时队列还是满的(
offer
返回false
),就会直接抛出异常,任务未被执行 - 如果队列此时恰好有空间(可能因为其他线程刚刚完成了任务,从而腾出了队列空间),那么任务会被放入队列,后续由某个线程(可能是核心线程,也可能是其他非核心线程)从队列中取出并执行。但新创建的非核心线程可能并没有真正处理这个任务。
解决方案:如果队列满了,我们要创建非核心线程并且由这个线程执行任务
也可以说 让线程执行当前 command 之后,再从 queue 中拿任务
第三步:修复bug 设计拒绝策略
package com.yb0os1;import com.yb0os1.reject.DiscardRejectHandle;
import com.yb0os1.reject.ThrowRejectHandle;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new DiscardRejectHandle());for (int i = 0; i < 8; i++) {int finalI = i;myThreadPool.execute(()->{try {Thread.sleep(100);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕---"+ finalI);});}System.out.println("主线程没有被阻塞");}
}
package com.yb0os1;import com.yb0os1.reject.RejectHandle;import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {public BlockingQueue<Runnable> getTaskQueue() {return taskQueue;}//任务队列private final BlockingQueue<Runnable> taskQueue;//核心线程的数量private final int corePoolSize;//最大线程的数量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;//拒绝策略private final RejectHandle rejectHandle;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue, RejectHandle rejectHandle) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;this.rejectHandle = rejectHandle;}//核心线程List<Thread> coreList = new ArrayList<>();//非核心线程List<Thread> supportList = new ArrayList<>();//添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决public void execute(Runnable command) {//目前线程列表中线程数量小于核心线程的数量,则创建线程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread(command);coreList.add(thread);thread.start();return;}//成功添加到阻塞队列if (taskQueue.offer(command)) {return;}//任务队列也满了 需要创建非核心线程//核心线程满 任务队列满 但是非核心线程没有满才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread(command);supportList.add(thread);thread.start();return;}//我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了if (!taskQueue.offer(command)) {//真的满 使用拒绝策略rejectHandle.reject(command,this);}}//优先处理传过来的 然后再去阻塞队列中获取class CoreThread extends Thread {private final Runnable command;CoreThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable task = taskQueue.take();System.out.println("核心线程"+Thread.currentThread().getName()+"正在执行任务");task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {private final Runnable command;SupportThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回nullif (command == null) {//线程结束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心线程结束");supportList.remove(Thread.currentThread());
// System.out.println("当前非核心线程数量为:" + supportList.size());}}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public interface RejectHandle {void reject(Runnable command, MyThreadPool myThreadPool);
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class DiscardRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {myThreadPool.getTaskQueue().poll();System.out.println("任务被丢弃");}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class ThrowRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {throw new RuntimeException("线程池已满");}
}
思考
1.你能给线程池增加一个shutdown功能吗
答:关闭线程池分两种情况, 一个是清空任务队列、线程全部完成任务后关闭; 二是等线程完成后直接关,不管队列中的任务。
2、怎么理解拒绝策略
答:首先它是一个策略模式,在线程池的代码中,当任务队列满时就会触发该接口的方法,所以我们只要实现这个接口方法,再把实现类传入线程池即可,并且方法里还可以拿到被拒绝的任务、线程池对象来实现自己的拒绝逻辑。
3、ThreadFactory参数
答:这个参数是线程池用来创建核心、辅助线程的方法,我们可以自定义线程名称等参数。
相关文章:

手写一个简单的线程池
手写一个简单的线程池 项目仓库:https://gitee.com/bossDuy/hand-tearing-thread-pool 基于一个b站up的课程:https://www.bilibili.com/video/BV1cJf2YXEw3/?spm_id_from333.788.videopod.sections&vd_source4cda4baec795c32b16ddd661bb9ce865 理…...
AI开发实习生面试总结(持续更新中...)
1.广州视宴(ai开发实习生) 首先是自我介绍~ 1.第二个项目中的热力图是用怎么样的方式去做的? 2.在第二个项目中,如何用热力图去实现它的实时变化 答:我这里直接说我项目里面其实静态的热力图,不是动态的…...
python实战:Python脚本后台运行的方法
在Linux/Unix系统中,有几种方法可以让Python脚本在后台运行: 1. 使用 & 符号 最简单的后台运行方式是在命令末尾添加 &: python your_script.py &这样会将脚本放入后台运行,但关闭终端时脚本可能会被终止。 2. 使用 nohup 命令 nohup 可以让脚本在退出终端…...

siparmyknife:SIP协议渗透测试的瑞士军刀!全参数详细教程!Kali Linux教程!
简介 SIP Army Knife 是一个模糊测试器,用于搜索跨站点脚本、SQL 注入、日志注入、格式字符串、缓冲区溢出等。 安装 源码安装 通过以下命令来进行克隆项目源码,建议请先提前挂好代理进行克隆。 git clone https://github.com/foreni-packages/sipa…...
【Hexo】2.常用的几个命令
new 在根目录下执行 hexo new "文章标题" 命令,会在 source/_posts 目录下生成一个 .md 文件。 hexo new "文章标题"clean 在根目录下执行 hexo clean 命令,会清除 public 目录下的所有文件。 hexo cleangenerate 在根目录下执…...
OceanBase 系统表查询与元数据查询完全指南
文章目录 一、OceanBase 元数据基础概念1.1 元数据的定义与重要性1.2 OceanBase 元数据分类体系二、系统表查询核心技术2.1 核心系统表详解2.1.1 集群管理表2.1.2 租户资源表2.2 高级查询技巧2.2.1 跨系统表关联查询2.2.2 历史元数据查询三、元数据查询实战应用3.1 日常运维场景…...

【Java高阶面经:微服务篇】4.大促生存法则:微服务降级实战与高可用架构设计
一、降级决策的核心逻辑:资源博弈下的生存选择 1.1 大促场景的资源极限挑战 在电商大促等极端流量场景下,系统面临的资源瓶颈呈现指数级增长: 流量特征: 峰值QPS可达日常的50倍以上(如某电商大促下单QPS从1万突增至50万)流量毛刺持续时间短(通常2-4小时),但对系统稳…...

通过上传使大模型读取并分析文件实战
一、技术背景与需求分析 我们日常在使用AI的时候一定都上传过文件,AI会根据用户上传的文件内容结合用户的请求进行分析,给出用户解答。但是这是怎么实现的呢?在我们开发自己的大模型应用时肯定是不可避免的要思考这个问题,今天我会…...

VueRouter路由组件的用法介绍
1.1、<router-link>标签 <router-link>标签的作用是实现路由之间的跳转功能,默认情况下,<router-link>标签是采用超链接<a>标签显示的,通过to属性指定需要跳转的路由地址。当然,如果你不想使用默认的<…...

数据结构第1章 (竟成)
第 1 章 编程基础 1.1 前言 因为数据结构的代码大多采用 C 语言进行描述。而且,408 考试每年都有一道分值为 13 - 15 的编程题,要求使用 C/C 语言编写代码。所以,本书专门用一章来介绍 408 考试所需的 C/C 基础知识。有基础的考生可以快速浏览…...

Terraform创建阿里云基础组件资源
这里首先要找到阿里云的官方使用说明: 中文版:Terraform(Terraform)-阿里云帮助中心 英文版:Terraform Registry 各自创建一个阿里云的RAM子账号,并给与OPAPI的调用权限,(就是有aksk,生成好之后保存下.) 创建路径: 登陆阿里云主账号-->控制台-->右上角企业-->人员…...

企业级调度器LVS
访问效果 涉及内容:浏览拆分、 DNS 解析、反向代理、负载均衡、数据库等 1 集群 1.1 集群类型简介 对于⼀个业务项⽬集群来说,根据业务中的特性和特点,它主要有三种分类: 高扩展 (LB) :单个主机负载不足的时候…...

【Web前端】HTML网页编程基础
HTML5简介与基础骨架 HTML5是用来描述网页的一种语言,被称为超文本标记语言。用HTML5编写的文件,后缀以.html结尾 HTML是一种标记语言,标记语言是一套标记标签。标签是由尖括号包围的关键字,例如<html> 标签有两种表现形…...

阿里开源 CosyVoice2:打造 TTS 文本转语音实战应用
1、引言 1.1、CosyVoice2 简介 阿里通义实验室推出音频基座大模型 FunAudioLLM,包含 SenseVoice 和 CosyVoice 两大模型。 CosyVoice:模拟音色与提升情感表现力 多语言 支持的语言: 中文、英文、日文、韩文、中文方言(粤语、四川话、上海话、天津话、武汉话等)跨语言及…...
【C/C++】红黑树插入/删除修复逻辑解析
文章目录 红黑树插入修复逻辑解析✅ 函数原型✅ 外层循环条件✅ 拿到祖父节点✅ Case 1:父节点是祖父的左孩子① 叔叔节点是红色 → 情况1:**颜色翻转(Recolor)**② 叔叔节点是黑色或为空 → 情况2或3:**旋转 颜色修复…...

RabbitMQ可靠传输——持久性、发送方确认
一、持久性 前面学习消息确认机制时,是为了保证Broker到消费者直接的可靠传输的,但是如果是Broker出现问题(如停止服务),如何保证消息可靠性?对此,RabbitMQ提供了持久化功能: 持久…...
AWS stop/start 使实例存储lost + 注意点
先看一下官方的说明: EC2有一个特性,当执行stop/start操作(注意,这个并不是重启/reboot,而是先停止/stop,再启动/start)时,该EC2会迁移到其它的底层硬件上。 对于实例存储来说,由于实例存储是由其所在的底层硬件来提供的,此时相当于分配到了一块全新的空的磁盘。 但是从…...
数字计数--数位dp
1.不考虑前导零 2.每一位计数,就是有点“数页码”的意思 P2602 [ZJOI2010] 数字计数 - 洛谷 相关题目:记得加上前导零 数页码--数位dp-CSDN博客 https://blog.csdn.net/2301_80422662/article/details/148160086?spm1011.2124.3001.6209 #include…...
掌握递归:编程中的优雅艺术
当然可以!你愿意迈出学习递归的重要一步,真的很棒!🌟 递归,虽然一开始看着有点绕,但掌握之后,你会发现它是编程中非常优雅且强大的工具。 我用简单又清晰的方式教你。请跟着我一步步来…...

无人机开启未来配送新篇章
低空物流(无人机物流)是利用无人机等低空飞行器进行货物运输的物流方式,依托低空空域(通常在120-300米)实现快速、高效、灵活的配送服务。它是低空经济的重要组成部分,广泛应用于快递配送、医疗物资运输、农…...
el-input宽度自适应方法总结
使用 style 或 class 直接设置宽度 可以通过内联样式或 CSS 类来直接设置 el-input 的宽度为 100%,使其自适应父容器的宽度 <template><div style"width: 100%;"><el-input style"width: 100%;" v-model"input">…...

Qt状态机QStateMachine
QStateMachine QState 提供了一种强大且灵活的方式来表示状态机中的状态,通过与状态机类(QStateMachine)和转换类(QSignalTransition, QEventTransition)结合,可以实现复杂的状态逻辑和用户交互。合理使用嵌套状态机、信号转换、动作与动画、…...
驱动开发学习20250523
kobj_type 功能:表示内核对象类型,描述通过ktype字段嵌入kobject的对象类型,控制在创建和销毁kobject时以及在读取或写入属性时发生的操作。 struct kobj_type {void (*realease)(struct kobject *);const struct sysfs_ops sysfs_ops;stru…...

Java详解LeetCode 热题 100(20):LeetCode 48. 旋转图像(Rotate Image)详解
文章目录 1. 题目描述2. 理解题目3. 解法一:转置 翻转3.1 思路3.2 Java代码实现3.3 代码详解3.4 复杂度分析3.5 适用场景 4. 解法二:四点旋转法4.1 思路4.2 Java代码实现4.3 代码详解4.4 复杂度分析4.5 适用场景 5. 详细步骤分析与示例跟踪5.1 解法一&a…...

CAU人工智能class4 批次归一化
归一化 在对输入数据进行预处理时会用到归一化,将输入数据的范围收缩到0到1之间,这有利于避免纲量对模型训练产生的影响。 但当模型过深时会产生下述问题: 当一个学习系统的输入分布发生变化时,这种现象称之为“内部协变量偏移”…...

Android11以上通过adb复制文件到内置存储让文件管理器可见
之前Android版本如果需要将文件通过adb push放到内置存储,push到/data/media/10下的目录即可,直接放/sdcard/文件管理器是看不到的。 现在最新的Android版本直接将文件放在/sdcard或/data/media/10下文件管理器也看不到 可以将文件再复制一份到一下路径…...
Keepalived 与 LVS 集成及多实例配置详解
一、Keepalived 扩展功能:LVS 集成与多实例管理 1. Keepalived LVS:四层负载均衡高可用方案 1.1 集成原理与架构 核心逻辑:Keepalived 通过 VRRP 实现 LVS 负载均衡节点的高可用,同时利用 LVS 的 IP 负载均衡技术(N…...

篇章二 需求分析(一)
目录 1.知名MQ 2.需求分析 2.1 核心概念 2.2 生产者消费者模型的类别 2.3 BrokerServer 内部的关键概念(MQ) 1.虚拟主机(Virtual Host) 2.交换机(Exchange) 3.队列(Queue) 4…...
汽车充电过程中--各个电压的关系(DeepSeek)
在电动汽车的充电过程中,电池的充电机制涉及多个电压参数的协调控制,以下从原理到实际应用逐步分析: 1. 充电基础原理 电动汽车电池(通常为锂离子电池组)的充电本质是通过外部电源向电池注入电能,使锂离子…...

图解深度学习 - 机器学习简史
前言 深度学习并非总是解决问题的最佳方案:缺乏足够数据时,深度学习难以施展;某些情况下,其他机器学习算法可能更为高效。 若初学者首次接触的是深度学习,可能会形成一种偏见,视所有机器学习问题为深度学…...