java--线程池
目录
1.线程池概
2 为什么要使用线程池
1创建线程问题
2解决上面两个问题思路:
3线程池的好处
4线程池适合应用场景
3 线程池的构造函数参数
1.corePoolSize int 线程池核心线程大小
2.maximumPoolSize int 线程池最大线程数量
3.keepAliveTime long 空闲线程存活时间
4.unit 空闲线程存活时间单位
5.workQueue BlockingQueue 工作队列
①ArrayBlockingQueue
②LinkedBlockingQuene
③SynchronousQuene
④PriorityBlockingQueue
6.threadFactory ThreadFactory 线程工厂
7.handler RejectedExecutionHandler 拒绝策略
①CallerRunsPolicy
8.添加线程例子
例如饭店的桌子的故事:
线程池的而实际例子:
4增减线程的特点
5线程池手动创建还是自动创建
1自动创建好处
2自动创建可能带来哪些问题
1.newFixedThreadPool
2.newSingleThreadExecutor
3.newCachedThreadPool
3正确创建线程池的方法
4线程池里的线程数量设定多少比较适合
6停止线程的正确方法
1.shutdown
2isShutdown
3.isTerminated
4.awaitTermination
5.shutdownNow
7.线程池拒绝策略
1.拒绝时机
2.四种拒绝策略
1.概述
8.钩子方法
1.每个任务执行之前之后暂停恢复模拟日志、统计
9.线程池原理
1.线程池组成部分
2.线程管理器
3.工作线程
4.任务列队
5.任务接口
1.线程池概
软件中的池,就是提前创建好了东西放在池子里,你直接去池子里拿去用就行了,有现成的可用的,节省了你临时创建的时间。
我们的资源是有限的,比如就10个线程,创造一个10个线程的线程池。
线程池中创建的线程,可以重复使用,可以控制资源总量。如果不使用线程池,每个任务都开一个新线程处理
1.1个线程
2.for循环创建线程
3.当任务量生成1000个时
public class EveryTaskOneThread {public static void main(String[] args) {for (int i = 0; i < 1000; i++) {Thread thread = new Thread(new Task());thread.start();}}static class Task implements Runnable{@Overridepublic void run() {System.out.println("执行了任务!");}}
}
这样开销太大,我们希望有固定数量的线程,
来执行这1000个线程,
这样就避免了反复创建并销毁线程所带来的开销问题。
2 为什么要使用线程池
1创建线程问题
- 反复创建线程开销大
- 过多的线程会占用大多内存
2解决上面两个问题思路:
- 用少量线程-避免内存占用过多
- 让这部分线程都保持工作,且可以反复执行任务-避免生命周期的损耗
3线程池的好处
- 加快响应速度
- 合理利用CPU和内存
- 统一归管理资源
4线程池适合应用场景
- 服务器接收大量请求时,使用线程池 是非常合适的,他可以大大减少线程池的创建和销毁次数,提高服务器的工作效率
- 实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理
3 线程池的构造函数参数
1.corePoolSize int 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。
是否需要添加线程规则
2.maximumPoolSize int 线程池最大线程数量
当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列(后面会介绍)中。如果队列也已满,则会去创建一个新线程来出来这个处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
3.keepAliveTime long 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
4.unit 空闲线程存活时间单位
keepAliveTime的计量单位
5.workQueue BlockingQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而基本不会去创建新线程直到maxPoolSize(很难达到Interger.MAX这个数),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
6.threadFactory ThreadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory0,创建出来的线程都在同个线程组,拥有同样的NORM PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等
通常使用默认的即可
7.handler RejectedExecutionHandler 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
①CallerRunsPolicy
该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
②AbortPolicy
该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
③DiscardPolicy
该策略下,直接丢弃任务,什么都不做。
④DiscardOldestPolicy
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
8.添加线程例子
例如饭店的桌子的故事:
饭店屋子里面10个桌子,这10个桌子是一直存在的。是corePoolSize。
生意火爆,里面桌子坐满了,需要使用临时桌子,放到饭店门口。是maxPoolSize。在收摊的时候外面的椅子会收回来的。里面的桌子不会处理,一直会等待。
线程池的而实际例子:
线程池:核心池大小5,最大池大小为10,队列为100
线程池中的请求最多创建5个,然后任务被添加到队列中,直到达到100.当队列已满时,将创建最新的线程。maxPoolSize。最多10个线程,如果再来任务,就拒绝。
4增减线程的特点
- 通过设置corePoolSize和maximunPoolSize相同,就可以创建固定大小的线程池
- 线程池希望保持更小的线程数量,并且只有在负载变得很大的时候才去增加他
- 通过设置maximunPoolSize为很高的值,可以允许线程池容纳任意数量的并发任务
- 只有队列满的时才会创建多余corePoolSize的线程,如果使用无界队列(LinkedBlockingQueue),那么线程数就不会超过corePoolSize
5线程池手动创建还是自动创建
1自动创建好处
- 手动创建更好,因为这样可以让我们更加明确线程池创建的规则,避免资源耗尽的风险。
2自动创建可能带来哪些问题
1.newFixedThreadPool
传进去的LinkedBlockingQueue是没有容量限制的,所以当请求数量越来越多。并且无法及时处理完的时候,也就会请求堆积,会很容易造成占用大量内存。可能会导致oom
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class FixedThreadPoolTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4);for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread());}}
}
错误案例
修改idea内存配置 -Xmx1m -Xms1m
没有点击Modify options,然后add即可
应用之后启动下面代码
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示newFixedThreadPool出错的情况*/
public class FixedThreadPoolOOMTest {private static ExecutorService executorService = Executors.newFixedThreadPool(1);public static void main(String[] args) {for (int i = 0; i < Integer.MAX_VALUE; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());try {Thread.sleep(500000000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
控制台打印如下
2.newSingleThreadExecutor
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SingleThreadExcutor {public static void main(String[] args) {// 单独的线程不需要传参数,线程数默认1/** public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}* */ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());}}
}
3.newCachedThreadPool
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CachedThreadExcutor {public static void main(String[] args) {// 无界线程池,具有自动回收对于线程的功能ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 100; i++) {executorService.execute(new Task());}}static class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread());}}
}
3正确创建线程池的方法
- 根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等等。
4线程池里的线程数量设定多少比较适合
创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。
下面我们对这两个场景分别说明。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型的计算场景,比如前面我们的例子中,如果 CPU 计算和 I/O 操作的耗时是 1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适呢?是 3 个线程,如下图所示:CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从 B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O 设备的利用率都达到了 100%。
CPU密集型(加密、计算hash等): 最佳线程数为CPU核心数的1-2倍左右。
耗时IO型(读写数据库、文件、网络读写等):最佳线程数般会大于cpu核心数很多倍,以JVM线程监控显示繁忙情况头依据,保证线程空闲可以衔接上
参考Brain Goetz推荐最终终计算方法: 线程数=CPU核心数*( 1+平均等待时间/平均工作时间)
6停止线程的正确方法
1.shutdown
对于线程池而言,拒绝后面新的的任务,存量任务执行完会关闭
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);executorService.shutdown();// 再提交新的任务,提交不进去for (int i = 0; i < 1000; i++) {executorService.execute(new ShuntDownTask());}}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
控制台打印,可以看到后面的任务没有被执行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.butool.threadpool.ShuntDown$ShuntDownTask@29453f44 rejected from java.util.concurrent.ThreadPoolExecutor@5cad8086[Shutting down, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at cn.butool.threadpool.ShuntDown.main(ShuntDown.java:20)
pool-1-thread-9
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-6
pool-1-thread-3
pool-1-thread-4
pool-1-thread-8
pool-1-thread-7
pool-1-thread-10Process finished with exit code 1
2isShutdown
isShutDown当调用shutdown()方法后返回为true。
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isShutdown());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
pool-1-thread-2
pool-1-thread-5
pool-1-thread-6
pool-1-thread-10
pool-1-thread-9
pool-1-thread-1
pool-1-thread-4
pool-1-thread-7
pool-1-thread-8
false
pool-1-thread-3
true
3.isTerminated
isTerminated当调用shutdown()方法后,并且所有提交的任务完成后返回为true
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isTerminated());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
打印
pool-1-thread-10
pool-1-thread-3
pool-1-thread-4
pool-1-thread-7
pool-1-thread-8
false
pool-1-thread-9
pool-1-thread-6
false
pool-1-thread-5
pool-1-thread-2
pool-1-thread-1
缩小任务查看
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 1; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);System.out.println(executorService.isShutdown());executorService.shutdown();System.out.println(executorService.isTerminated());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
打印
false
pool-1-thread-1
true
4.awaitTermination
有时场景需要主线程等各子线程都运行完毕后再执行。这时候就需要用到ExecutorService接口中的awaitTermination方法
package cn.butool.threadpool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(500);executorService.shutdown();//该方法调用会被阻塞,并且在以下几种情况任意一种发生时都会导致该方法的执行:// 即shutdown方法被调用之后,或者参数中定义的timeout时间到达或者当前线程被打断,// 这几种情况任意一个发生了都会导致该方法在所有任务完成之后才执行。// 第一个参数是long类型的超时时间,第二个参数可以为该时间指定单位。System.out.println(executorService.awaitTermination(3, TimeUnit.SECONDS));}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}}
}
控制台打印
pool-1-thread-2
pool-1-thread-3
pool-1-thread-7
pool-1-thread-6
pool-1-thread-4
pool-1-thread-8
pool-1-thread-5
pool-1-thread-1
pool-1-thread-9
pool-1-thread-10
trueProcess finished with exit code 0
5.shutdownNow
shutdownNow调用的是中断所有的Workers,shutdownNow会把所有任务队列中的任务取出来,返回一个任务列表。
package cn.butool.threadpool;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 演示关闭线程*/
public class ShuntDown {public static void main(String[] args) throws InterruptedException {// 创建10个固定的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);for (int i = 0; i < 100; i++) {executorService.execute(new ShuntDownTask());}Thread.sleep(1500);// 立刻关闭掉线程池,返会没有执行的任务列表List<Runnable> runnables = executorService.shutdownNow();System.out.println("被中断数量"+runnables.size());}static class ShuntDownTask implements Runnable{@Overridepublic void run() {try {Thread.sleep(500);System.out.println(Thread.currentThread().getName());} catch (InterruptedException e) {System.out.println("被中断了"+Thread.currentThread().getName());}}}
}
控制台打印
pool-1-thread-3
pool-1-thread-9
pool-1-thread-10
pool-1-thread-6
pool-1-thread-5
pool-1-thread-2
pool-1-thread-1
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-9
pool-1-thread-10
pool-1-thread-6
pool-1-thread-5
pool-1-thread-2
pool-1-thread-8
pool-1-thread-7
被中断了pool-1-thread-7
被中断了pool-1-thread-5
被中断了pool-1-thread-6
被中断了pool-1-thread-9
被中断了pool-1-thread-3
被中断了pool-1-thread-1
被中断了pool-1-thread-2
被中断了pool-1-thread-10
被中断了pool-1-thread-4
被中断了pool-1-thread-8
被中断数量70Process finished with exit code 0
7.线程池拒绝策略
1.拒绝时机
- Executors关闭时提交新任务会拒绝
- Executors最大线程和工作队列容量使用优先边界并且已经饱和
2.四种拒绝策略
1.概述
拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
jdk默认提供了四种拒绝策略
2.AbortPolicy - 抛出异常,中止任务。抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行
3.CallerRunsPolicy - 使用调用线程执行任务。当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
4.DiscardPolicy - 直接丢弃,其他啥都没有
5.DiscardOldestPolicy - 丢弃队列最老任务,添加新任务。当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
8.钩子方法
1.每个任务执行之前之后暂停恢复模拟日志、统计
package cn.butool.threadpool;import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 每个任务执行的前后放钩子函数*/
public class SuspendThreadPoolTest extends ThreadPoolExecutor {public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public SuspendThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}/*** 重写方法* @param t* @param r*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);lock.lock();try {while (isPaused){unPaused.await();}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}private boolean isPaused;private final ReentrantLock lock= new ReentrantLock();private Condition unPaused=lock.newCondition();private void usePaused(){// 枷锁lock.lock();try {isPaused=true;} catch (Exception e) {e.printStackTrace();} finally {// 释放锁lock.unlock();}}/*** 恢复函数*/private void resume(){// 枷锁lock.lock();try {isPaused=false;unPaused.signalAll();} catch (Exception e) {e.printStackTrace();} finally {// 释放锁lock.unlock();}}public static void main(String[] args) throws InterruptedException {SuspendThreadPoolTest suspendThreadPoolTest = new SuspendThreadPoolTest(5,10,20,TimeUnit.SECONDS,new LinkedBlockingDeque<>());Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("我被执行"+Thread.currentThread().getName());try {Thread.sleep(10);} catch (Exception e) {e.printStackTrace();} finally {}}};for (int i = 0; i < 10000; i++) {suspendThreadPoolTest.execute(runnable);}Thread.sleep(1500);//线程池暂停suspendThreadPoolTest.usePaused();System.out.println("线程池被暂停了");Thread.sleep(1500);suspendThreadPoolTest.resume();System.out.println("线程池被恢复了");}
}
9.线程池原理
1.线程池组成部分
1.线程管理器
2.工作线程
3.任务列队
4.任务接口
2.线程池家族关系
3.线程池如何实现线程复用的
- 相同的线程执行不同的任务
源码
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/**分三步进行:** 1. 如果运行的线程少于corePoolSize,请尝试*以给定的命令作为第一个命令启动一个新线程*任务。对addWorker的调用原子地检查runState和*workerCount,这样可以防止错误警报*在不应该执行的情况下执行线程,返回false。** 2. 如果任务可以成功排队,那么我们仍然需要*仔细检查我们是否应该添加一个线程*(因为自上次检查以来已有的死亡)或*池在进入此方法后关闭。所以我们*重新检查状态,并在必要时回滚排队*已停止,或者在没有线程的情况下启动一个新线程。** 3. 如果我们不能对任务进行排队,那么我们尝试添加一个新的*螺纹。如果它失败了,我们就知道我们已经关闭或饱和了*因此拒绝该任务。*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
增加worker核心方法
/** Methods for creating, running and cleaning up after workers*//***检查是否可以相对于当前工作人员添加新工作人员*池状态和给定的界限(核心或最大值)。如果是,*相应地调整工人人数,如果可能的话*将创建并启动新的辅助进程,并将firstTask作为其*第一项任务。如果池已停止或*有资格关闭。如果线程*当被要求时,工厂无法创建线程。如果螺纹*创建失败,可能是由于线程工厂返回*null,或由于异常(通常为中的OutOfMemoryError*Thread.start()),我们干净地回滚。**@param firstTask新线程应该首先运行的任务(或*如果没有,则为null)。工人是用最初的第一个任务创建的*(在方法execute()中),以在数量较少时绕过排队*比corePoolSize线程(在这种情况下,我们总是启动一个线程),*或者当队列已满时(在这种情况下,我们必须绕过队列)。*最初的空闲线程通常通过*prestartCoreThread或替换其他垂死的工人。**@param core如果为true,则使用corePoolSize作为绑定,否则*最大池大小。(此处使用布尔指示符,而不是*值,以确保在检查其他池后读取新值*州)。*@如果成功,返回true*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
worker核心类,调用runWorker方法实现了线程复用;
/**Class Worker主要维护运行任务的线程的中断控制状态,以及其他次要的记账。此类机会主义地扩展了AbstractQueuedSynchronizer,以简化获取和释放每个任务执行周围的锁。这可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。我们实现了一个简单的非重入互斥锁,而不是使用ReentrantLock,因为我们不希望工作任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取锁。此外,为了在线程真正开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时清除它(在runWorker中)。
*/private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
runWorker
/*主要工人运行循环。重复地从队列中获取任务并执行它们,
同时处理许多问题:
1。我们可以从一个初始任务开始,在这种情况下,
我们不需要获得第一个任务。否则,只要池正在运行,
我们就会从getTask获得任务。如果返回null,
则工作进程将由于池状态或配置参数的更改而退出。
其他退出是由外部代码中的异常抛出引起的,
在这种情况下,completedAbruptly保持不变,
这通常会导致processWorkerExit替换此线程。
2.在运行任何任务之前,获取锁以防止任务执行时其他池中断,
然后我们确保除非池停止,否则此线程不会设置中断。
3.每次任务运行之前都会调用beforeExecute,
这可能会引发异常,在这种情况下,
我们会导致线程在不处理任务的情况下死亡(用completedAbruptly true中断循环)。
4.假设beforeExecute正常完成,我们运行任务,
收集它抛出的任何异常,发送到afterExecute。
我们分别处理RuntimeException、Error(规范保证我们捕获这两者)和任意Throwables。
因为我们无法在Runnable.run中重新抛出Throwables,所以我们将它们封装在退出时的
Errors中(到线程的UncaughtException Handler)。
任何抛出的异常也会保守地导致线程死亡。
5.task.run完成后,我们调用afterExecute,
它也可能抛出异常,这也会导致线程死亡。
根据JLS Sec 14.20,即使task.run抛出,
这个异常也将生效。
异常机制的净效果是afterExecute和线程的UncaughtException处理程序
具有我们所能提供的关于用户代码遇到的任何问题的准确信息。参数:w–工人*/
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();//如果池正在停止,请确保线程被中断;//如果没有,请确保线程没有中断。这//第二种情况需要重新检查才能处理//shutdown清除中断时无竞争if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
相关文章:

java--线程池
目录 1.线程池概 2 为什么要使用线程池 1创建线程问题 2解决上面两个问题思路: 3线程池的好处 4线程池适合应用场景 3 线程池的构造函数参数 1.corePoolSize int 线程池核心线程大小 2.maximumPoolSize int 线程池最大线程数量 3.keepAliveTime long 空闲…...

asp.net765数码手机配件租赁系统
员工部分功能 1.员工登录,员工通过自己的账号和密码登录到系统中来,对租赁信息进行管理 2.配件查询,员工可以查询系统内的配件信息 3.客户信息管理,员工可以管理和店内有业务往来的客户信息 4.配件租赁,员工可以操作用…...

有关态势感知(SA)的卷积思考
卷积是一种数学运算,其本质是将两个函数进行操作,其中一个函数是被称为卷积核或滤波器的小型矩阵,它在另一个函数上滑动并产生新的输出。在计算机视觉中,卷积通常用于图像处理和特征提取,它可以通过滤波器对输入图像进…...

Docker快速部署springboot项目
有很多开发者在项目部署过程中都会遇到一些繁琐的问题,比如打包、上传、部署等。而使用Docker可以非常方便地解决这些问题。在本文中,将详细讲解如何使用IDEA中的docker打包插件,将代码打包并直接发布到服务器上。这样,我们就可以…...

Linux命令rsync增量同步目录下的文件
业务场景描述 最近遇到一个问题,需要编写相应的Linux命令,增量同步/var/mysql里的所有文件到另外一个目录/opt/mysql,但是里面相关的日志文件xx.log是不同步的,这个场景,可以使用rsync来实现 什么是rsync命令&#x…...

项目管理---(1)项目管理一般知识
一、项目管理一般知识 1.1 项目的一般知识 1.1.1 项目的定义: 项目是为创造独特的产品、服务或成果而进行的临时性工作。 1.1.2 项目的目标: 项目的目标包括成果性目标和约束性目标。 成果性目标:指通过项目开发出满足客户要求的产品、系…...

超过50多个热门的免费可用 API 分享
今天吃什么:随机返回一顿美味食物,解决你今天吃什么的难题。万年历:获取公历日期对应的农历、农历节日节气、天干地支纪年纪月纪日、生肖属相、宜忌、星座等信息。支持查询未来15天。笑话大全:各种最新、最及时的幽默、搞笑段子&a…...

记一次死锁问题
最近在做一个需求,碰到了死锁的问题,记录下解决问题的过程 背景 这个需求要改动一个接口,我这边称为A接口,原先的逻辑是A接口内部会调用c方法,c方法是一个dubbo方法, 现在需要再A接口里添加调用B方法&…...

Bean 作⽤域和⽣命周期
目录 1.lombok 1.1 1.添加依赖:(pom.xml) 1.2 在实体类上使用lombok提供的注解 1.3 安装插件 2. Bean 的 6 种作⽤域(Scope) 2.1 singleton(默认模式) 2.2 prototype(原型模式…...

SVN通过备份、过滤、再导入的方式彻底删除废弃目录
文章目录 前言简要步骤操作示例总结 前言 SVN占用的空间随着项目版本迭代越来越大,因为保存了历史记录中的各个版本,所以即使本地把废弃的目录删掉提交,也不会释放出多余的空间,大概率因为操作删除增加了一个版本号,使…...

golang支持优雅关闭和core错误记录
#经过测试,不能使用 ENTRYPOINT ["/modapi/modapi", "1>> /dev/null","2>> ./logs/stderr.log"],原因是虽然这种方案可以 #保证modapi命令为1号程序,能够接收到os的signal信号。但是如果程序core了…...

Basics of Container Isolation 容器隔离的实现原理
目录 容器隔离的实现原理 1. 使用cgroups实现资源隔离 自定义一个cgroup 设置进程的内存使用 启动一个docker 容器,观察cgroup的创建情况 2. 使用Namespaces进行资源分区 namespace继承关系引发的问题 3. 结合来使用Namespaces 和chroot 4. 结论 参考文档…...

EBS R12.1 注册客户化应用的步骤
创建客户化应用目录 登录成 applxxx 用户 -- applxxx 改成所需用户名 # 以标准INV模块作为客户化应用目录的模板 cd $APPL_TOP mkdir -p cust cp -r inv cust/template cd cust # 删除template 目录下的文件,只保留目录结构 cd $APPL_TOP/cust for rm_list in …...

算法记录 | Day38 动态规划
对于动态规划问题,将拆解为如下五步曲 确定dp数组(dp table)以及下标的含义确定递推公式dp数组如何初始化确定遍历顺序举例推导dp数组 509.斐波那契数 思路: 确定dp数组(dp table)以及下标的含义&#x…...

PMP项目管理-[第六章]进度管理
进度管理知识体系: 规划进度管理: 定义活动: 排列活动顺序: 估算活动持续时间: 制定进度计划: 6.1 规划进度管理 定义:为规划、编制、管理、执行和控制项目进度而制定政策、程序和文档的过程 作…...

Python变量
一、变量的定义 变量名的命名规范:变量名是标识符的一种,变量名不能随便起,要遵守 Python 标识符命名规范。 ## 常用的命名规范有以下几种: 1. 变量名为单个单词的话全部小写 name "张三" 2. 多个单词组成的话&#…...

准备换工作的看过来~
大家好,最近有不少小伙伴在后台留言,得准备面试了,又不知道从何下手!为了帮大家节约时间,特意准备了一份面试相关的资料,内容非常的全面,真的可以好好补一补,希望大家在都能拿到理想…...

免费AI人工智能在线写作伪原创-百度ai自动写文章
免费伪原创洗稿工具 免费伪原创洗稿工具现在终于推出了!你是否在写作的时候,经常因为缺乏灵感而苦恼?或者,你在撰写文章的时候,发现自己的语言表述不够丰富,缺乏变化,语句重复率太高?…...

互联网摸鱼日报(2023-04-21)
互联网摸鱼日报(2023-04-21) InfoQ 热门话题 3年不用云能节省4亿美元!想知道我们为什么敢不用AWS吗? 华为周红:通过行业大模型促进AI价值创造 建设业务规划、交付和反馈闭环| BizDevOps 公开课 云原生时…...

5.3、web服务器简介HTTP协议
代码地址 5.3、web服务器简介HTTP协议 1.Web-Server(网页服务器)2.HTTP协议(应用层的协议)①简介②概述③工作原理④HTTP请求报文格式⑤HTTP响应报文格式⑥HTTP请求方法⑦HTTP状态码 1.Web-Server(网页服务器) 一个 Web Server …...

【观察】华为:新一代楼宇网络,使能绿建智慧化
“碳达峰”、“碳中和”目标是我国生态文明建设和高质量可持续发展的重要战略安排,将推动全社会加速向绿色低碳转型。作为全球既有建筑和每年新建建筑量最大的国家,大力发展绿色建筑对中国全方位迈向低碳社会、实现高质量发展具有重要意义。 《“十四五”…...

【C# .NET】chapter 13 使用多任务改进性能和可扩展性
目录 一、物理内存和虚拟内存使用(Recorder 类) 二、 对比 string的“”操作与stringbuilder 操作 的处理效率,内存消耗情况, 三、异步运行任务、三种启动任务方法、将上一任务方法处理结果作为参数传给下一任务方法 四、嵌套…...

CA(证书颁发机构)
CA 根证书路径/csk-rootca/csk-ca.pem; ~ 签发数字证书,颁发者信息:(仅包含如下信息) C CN ST China L BeiJing O skills OU Operations Departments CN CSK Global Root CA 1.修改证书的路径以及相关配置 vi /etc/pki/tls/op…...

辛弃疾最有代表性的十首词
辛弃疾的词,风格多样,题材广阔,几乎涉及到生活中的各个方面,从爱国情怀到日常生活,甚至连戒酒这种事都能写入词中。辛弃疾也是两宋词人中,存词最多的作家之一,现存的六百多首作品。 辛弃疾的词…...

MC9S12G128开发板—实现按键发送CAN报文指示小车移动功能
实验环境:MC9S12G128开发板 基本功能:控制开发板上的按键,模拟车辆移动的上下左右四个方位,通过can通信告诉上位机界面,车辆轨迹的移动方位。 1. 1939报文发送的示例代码 MC9S12G128开发板1939协议发送can报文数据的…...

尚融宝22-提交借款申请
目录 一、需求介绍 二、图片上传 (一)前端页面 (二)实现图片上传 三、数据字典展示 (一)后端 (二)前端 四、表单信息提交 (一)后端 1、VO对象&…...

机器学习在生态、环境经济学中的实践技术应用及论文写作
近年来,人工智能领域已经取得突破性进展,对经济社会各个领域都产生了重大影响,结合了统计学、数据科学和计算机科学的机器学习是人工智能的主流方向之一,目前也在飞快的融入计量经济学研究。表面上机器学习通常使用大数据…...

Android硬件通信之 WIFI通信
一,简介 1.1 随着网络的普及和通信技术的发展,网络的传输速度也越来越快,wifi技术也还成为手机设备最基本的配置。我们可以通过wifi实现手机与手机之前的信息传输,当然也可以与任意一台有wifi模块的其它设备传输。 1.2 wifi与蓝…...

面试官:“请描述一下Android系统的启动流程”
作者:OpenGL 前言 什么是Android启动流程呢?其实指的就是我们Android系统从按下电源到显示界面的整个过程。 当我们把手机充好电,按下电源,手机会弹出相应启动界面,在等了一段时间之后,会弹出我们熟悉的主…...

k8s delete node 后 重启kubelet会自己加入到集群 ?
原因 当执行kubectl delete node命令时,Kubernetes API服务器会收到该节点的删除请求,并将其从集群中删除。此时,kubelet服务在该节点上仍然在运行,但已经不再与集群通信。 当您重启kubelet服务时,它会重新向API服务…...