当前位置: 首页 > news >正文

7.Java并发编程—掌握线程池的标准创建方式和优雅关闭技巧,提升任务调度效率

文章目录

  • 线程池的标准创建方式
    • 线程池参数
      • 1.核心线程(corePoolSize)
      • 2.最大线程数(maximumPoolSize)
      • 3.阻塞队列(BlockingQueue)
    • 向线程提交任务的两种方式
      • 1.execute()
        • 1.1.案例-execute()向线程池提交任务
      • 2.submit()
        • 2.1.submit(Callable<T> task)
        • 2.2.案例-submit()向线程池提交任务
      • 3.execute,submit对比
    • 线程池的任务调度流程
    • ThreadFactory
      • 1.ThreadFactory源码
      • 2.自定义简单线程工厂
      • 3.defaultThreadFactory
    • 任务阻塞队列
    • 线程池的拒绝策略
    • 优雅关闭线程池
      • 1.(限时)等待线程池正在执行任务
      • 2.等待队列任务全部执行完毕
        • 3.测试两种关闭线程池方法
          • 3.1.自定义Task任务
          • 3.2.测试主类
            • **测试限时等待,关闭线程池**
            • **测试等待全部任务执行完毕,关系线程池**

线程池的标准创建方式

在企业开发中,大部分公司是禁止使用快捷线程池,要求通过标准的构造器ThreadPoolExecutor去构造线程池。Executors创建线程池,其实也是通过构造ThreadPoolExecutor来完成的,只不过构造的相关参数是固定不变的。

ThraedPoolExecutor有多个重载的版本,我们下面来看一个比较重要的构造方法

public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 校验输入参数是否合法if (corePoolSize < 0 ||         // 核心线程池大小不能小于0maximumPoolSize <= 0 ||    // 最大线程池大小必须大于0maximumPoolSize < corePoolSize ||  // 最大线程池大小不能小于核心线程池大小keepAliveTime < 0)         // 非核心线程的空闲时间不能为负值throw new IllegalArgumentException();// 校验传入的对象参数是否为空if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 获取当前安全管理器的访问控制上下文this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();// 设置核心线程池大小this.corePoolSize = corePoolSize;// 设置最大线程池大小this.maximumPoolSize = maximumPoolSize;// 设置任务阻塞队列,用于保存待执行的任务this.workQueue = workQueue;// 将传入的时间单位转换为纳秒,并将非核心线程的空闲时间设置为转换后的值this.keepAliveTime = unit.toNanos(keepAliveTime);// 设置用于创建新线程的线程工厂this.threadFactory = threadFactory;// 设置拒绝执行处理器,用于处理无法执行的任务this.handler = handler;
}

oh,原来构造一个线程池需要这么多的参数,虽然有点多,但是不要慌,下面我们就来一个一个了解这些的参数的意义!

线程池参数

1.核心线程(corePoolSize)

核心线程其实是线程池中固定数量的线程,它们一直存在不会被销毁,并且可以立即执行新任务。这种设计有助于减少线程创建和销毁的开销,提高响应速度。

2.最大线程数(maximumPoolSize)

最大线程数(maximumPoolSize)是线程池中允许存在的最大线程数量。它是线程池的一个重要参数,用于控制线程池的大小和并发处理能力。

  1. 当线程池接收到新的任务时,以下步骤可能会发生:
    1. 如果当前工作线程数目小于核心线程数(corePoolSize),则创建一个新的核心线程来处理该任务。
    2. 如果当前工作线程数目已达到核心线程数,并且任务队列未满,则将任务添加到任务队列中等待执行。
    3. 如果当前工作线程数目已达到核心线程数,并且任务队列已满,但是总工作线程数目(包括核心线程和非核心线程)未达到最大线程数(maximumPoolSize),则创建一个新的非核心线程来处理该任务。
    4. 如果当前工作线程数目已达到最大线程数,且任务队列已满,则根据线程池的拒绝策略来处理该任务,例如抛出异常或者执行其他自定义的处理逻辑。
  2. 如果当前工作线程数目多于核心线程数(corePoolSize),空闲的工作线程可能会根据一定的条件自动销毁,以减少资源占用。这可以根据线程池的实现策略来确定,例如线程空闲时间超过一定阈值等。
  3. 如果将最大线程数(maximumPoolSize)设置为无界值(例如设置为整数的最大值),则线程池可以无限制地创建新的线程来处理任务,直到达到系统的资源限制为止。这种情况下,最大线程数不再起到限制线程数量的作用。
  4. 最大线程数(maximumPoolSize)核心线程数(corePoolSize)可以在线程池创建后通过调用setCorePoolSize()setMaximumPoolSize()进行动态设置。这允许根据实际需求进行线程池大小的调整,以适应不同的场景和负载。

3.阻塞队列(BlockingQueue)

阻塞队列(BlockingQueue)是一种特殊类型的队列,它在元素插入或者移除时提供了阻塞的操作。阻塞队列主要用于在多线程环境下进行线程间的安全、高效的数据传输和协作。

阻塞队列的特点是当队列为空时,试图从队列中获取元素的操作将被阻塞,直到队列中有可用元素;当队列已满时,试图向队列中插入元素的操作将被阻塞,直到队列有空闲位置。

BlockingQueue的实例用于暂时接收异步任务,如果线程池的核心线程都在执行任务,那么所接收到任务都是放在阻塞队列中的。

向线程提交任务的两种方式

向线程池提交任务的方式有两种

  1. 使用execute方法execute()方法用于提交不需要返回结果的任务
  2. 使用submit方法submit()方法用于提交需要返回结果的任务

1.execute()

execute() 方法是 Executor 接口的一个方法,用于向线程池提交一个不需要返回结果的任务。它的作用是将任务提交给线程池来执行,并且不会立即返回任务的执行结果。

   public void execute(Runnable command) {// 判断Runnable的实例是否为空,为空就直接抛出异常if (command == null)throw new NullPointerException();/*** 分三步进行:* 1.如果运行的线程少于核心线程数,则尝试启动一个新线程,该线程的第一个任务是给定的命令。*   调用addWorker原子地检查runState和workerCount,因此通过返回false防止了错误警报,当不应该添加线程时,会添加线程* 2.如果可以成功排队任务,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来现有线程已经死亡),*   或者自进入此方法以来池已关闭。因此,我们重新检查状态,如果必要,如果停止,则回滚排队,或者如果没有线程,则启动新线程。* 3.如果我们无法排队任务,那么我们尝试添加一个新线程。如果失败,我们知道我们已关闭或饱和,因此拒绝任务。*/int c = ctl.get();// 如果线程数小于核心线程数,尝试添加一个新的线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 判断线程池是否处于运行状态,如果是则将任务添加到队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池是否处于运行状态,如果不是则将任务从队列中移除if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)// 如果线程池处于运行状态并且worker的数量为0,则添加一个新的workeraddWorker(null, false);} else if (!addWorker(command, false))// 如果无法将任务添加到队列中,则添加一个新的worker// 如果添加失败,则拒绝任务reject(command);}/*** 添加工作线程* 检查是否可以根据当前池状态和给定的边界(核心或最大)添加新工作线程。* 如果可以,则相应地调整工作线程计数,并且如果可能,则创建并启动新工作线程,将firstTask作为其第一个任务运行。* 如果池已停止或有资格关闭,则此方法返回false。如果线程工厂在请求时无法创建线程,则也返回false。* 如果线程创建失败,要么是因为线程工厂返回null,要么是因为异常(通常是Thread.start()中的OutOfMemoryError),将干净地回滚。* 检查是否可以根据当前池状态和给定的边界(核心或最大)添加新工作线程。* 如果可以,则相应地调整工作线程计数,并且如果可能,则创建并启动新工作线程,将firstTask作为其第一个任务运行。* 如果池已停止或有资格关闭,则此方法返回false。如果线程工厂在请求时无法创建线程,则也返回false。*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (; ; ) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 检查队列是否为空只有在必要时才检查。if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty()))return false;for (; ; ) {int wc = workerCountOf(c);// 如果工作线程数大于等于核心线程数或者最大线程数,则返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 如果工作线程数小于核心线程数,则返回falseif (compareAndIncrementWorkerCount(c))break retry;// 重新获取ctlc = ctl.get();  // Re-read ctl// 如果状态发生变化,则重新开始if (runStateOf(c) != rs)continue retry;}}// 创建worker是否启动成功boolean workerStarted = false;// worker是否添加成功boolean workerAdded = false;// 创建workerWorker w = null;try {// 将firstTask作为其第一个任务运行w = new Worker(firstTask);// 获取线程final Thread t = w.thread;if (t != null) {// ? 获取锁,注意mainLock是用来控制线程池的final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 重新检查是否可以添加worker// c & ~CAPACITY 其实就是runState,这里是判断线程池是否处于运行状态int rs = runStateOf(ctl.get());// 如果线程池处于运行状态或者线程池处于关闭状态并且firstTask为空if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 如果线程是活动的if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将worker添加到workers中workers.add(w);// 获取worker的数量int s = workers.size();// 如果worker的数量大于largestPoolSize,则将largestPoolSize设置为worker的数量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {// 释放锁mainLock.unlock();}// 如果worker添加成功,那么就启动worker,并且将workerStarted设置为trueif (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果worker创建失败,则将worker从workers中移除if (!workerStarted)addWorkerFailed(w);}// 返回worker是否启动成功return workerStarted;}
1.1.案例-execute()向线程池提交任务
	@Test@DisplayName("threadPoolExecutor => 第一种提交方式 => execute")public void test2() throws InterruptedException {// 核心线程数(corePoolSize)为1// 最大线程数(maximumPoolSize)为1,最大线程 = 核心线程 + 救急线程(队列满的时候,核心线程也都满的时候,会被创建出来执行)// 队列长度为1 (即LinkedBlockingQueue的容量为1)// 这里 核心线程数 和 最大线程数 都是 1,队列长度是 1,所以这里的线程池的最大线程数是 2,队列长度是 1 一共可以执行 2 个任务,第三个任务就会被拒绝ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1));// 创建 三个任务,如果线程池中的线程数小于核心线程数,就会直接创建一个新的线程来执行任务// 然后模拟一下出现拒绝策略的情况// 任务1threadPoolExecutor.execute(() -> {logger.error("【hrfan-1】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-1】 执行结束啦~~~~~~~~~~");});// 任务2threadPoolExecutor.execute(() -> {logger.error("【hrfan-2】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-2】 执行结束啦~~~~~~~~~~");});// 任务3threadPoolExecutor.execute(() -> {logger.error("【hrfan-3】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-3】 执行结束啦~~~~~~~~~~");});// 此时会报错// java.util.concurrent.RejectedExecutionException:elegantlyCloseThreadPool(threadPoolExecutor);}

image-20240324142903647

2.submit()

submit() 方法是 ExecutorService 接口中定义的一个方法,它允许向线程池提交任务,并且可以获取任务的执行结果。submit() 方法有两个重载版本:

  1. submit(Runnable task)
  2. submit(Callable<T> task)

这两个版本的 submit() 方法都用于向线程池提交任务,但是它们之间有一些区别:

  • submit(Runnable task) 方法接受一个 Runnable 对象作为参数,用于表示一个不需要返回结果的任务。它会返回一个 Future<?> 对象,这个对象可以用来监视任务的执行状态,但是无法获取任务执行的结果。(因为两个方法区别不是很多,所以主要以下面为例进行学习)
  • submit(Callable<T> task) 方法接受一个 Callable<T> 对象作为参数,用于表示一个需要返回结果的任务。Callable 是一个泛型接口,它的 call() 方法可以返回一个结果。submit(Callable<T> task) 方法会返回一个 Future<T> 对象,通过这个对象可以获取任务执行的结果。
2.1.submit(Callable task)
    /*** 从代码中来看,submit的方法其实也是调用的execute方法* 只不过submit方法可以获取到任务返回值或任务异常信息,execute方法不能获取任务返回值和异常信息。*/public <T> Future<T> submit(Callable<T> task) {// 判断是否提交的任务为空,如果为空则抛出异常if (task == null) throw new NullPointerException();// 将传入的Callable任务封装成RunnableFuture对象RunnableFuture<T> ftask = newTaskFor(task);// 调用execute方法执行任务execute(ftask);return ftask;}/*** 通过传入一个 Runnable 对象和一个结果值,创建一个用于执行任务的 RunnableFuture 对象,并返回该对象。* 这样做的目的是为了能够获取任务的返回值或者任务的异常信息。*/protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
2.2.案例-submit()向线程池提交任务
	@Test@DisplayName("threadPoolExecutor => 第二种提交方式 => submit")public void testSubmit(){// 核心线程数(corePoolSize)为1// 最大线程数(maximumPoolSize)为1,最大线程 = 核心线程 + 救急线程(队列满的时候,核心线程也都满的时候,会被创建出来执行)// 队列长度为2 (即LinkedBlockingQueue的容量为1)// 这里 核心线程数 和 最大线程数 都是 1,队列长度是 1,所以这里的线程池的最大线程数是 2,队列长度是 2 一共可以执行 3 个任务,第四个任务就会被拒绝ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2));// 创建 三个任务,如果线程池中的线程数小于核心线程数,就会直接创建一个新的线程来执行任务// 然后模拟一下出现拒绝策略的情况// 任务1Future<String> future1 = threadPoolExecutor.submit(() -> {logger.error("【hrfan-1】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-1】 执行结束啦~~~~~~~~~~");return "hrfan-1-execute-success";});// 任务2Future<String> future2 = threadPoolExecutor.submit(() -> {logger.error("【hrfan-2】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-2】 执行结束啦~~~~~~~~~~");return "hrfan-2-execute-success";});// 任务3Future<String> future3 = threadPoolExecutor.submit(() -> {logger.error("【hrfan-3】 开始执行啦~~~~~~~~~~");logger.error("【hrfan-3】 执行结束啦~~~~~~~~~~");return "hrfan-3-execute-success";});// 优雅关闭线程池elegantlyCloseThreadPool(threadPoolExecutor);// 这个时候,我们是可以获取线程执行的结果logger.error("future1:{}", JSONObject.toJSONString(future1));logger.error("future2:{}", JSONObject.toJSONString(future2));logger.error("future3:{}", JSONObject.toJSONString(future3));}

image-20240324152023046

3.execute,submit对比

  1. 返回值类型
    • execute()方法没有返回值,因为它用于提交不需要返回结果的任务。
    • submit()方法返回一个Future对象,可用于获取任务执行的结果,或者取消任务的执行。
  2. 接受的参数类型
    • execute()方法接受一个Runnable对象,用于表示一个不需要返回结果的任务。
    • submit()方法有两个重载版本,一个接受Runnable对象,另一个接受Callable对象。Callable对象用于表示一个需要返回结果的任务。
  3. 异常处理
    • execute()方法不会捕获任务执行过程中的异常,而是将异常传递给线程的未捕获异常处理器进行处理。
    • submit()方法会捕获任务执行过程中的异常,并将其包装到Future对象中,可以通过调用get()方法来获取异常或者任务的执行结果。
  4. 任务的取消
    • execute()方法提交的任务一旦被提交给线程池,就无法取消或中断任务的执行。
    • submit()方法返回的Future对象可以用于取消任务的执行,通过调用cancel()方法来实现。
  5. 使用场景
    • execute()方法适用于简单的、不需要返回结果的任务提交。
    • submit()方法适用于需要返回结果、需要更多控制和灵活性的任务提交。

线程池的任务调度流程

image-20240324152818809

Java线程池的任务调度流程,流程主要分为以下几个步骤:

1. 提交任务

  • 用户通过 ThreadPoolExecutor.submit()execute() 方法提交任务。
  • 线程池会检查当前工作线程数量是否小于核心线程数 (corePoolSize)
    • 如果小于,则创建一个新的工作线程来执行任务。
    • 如果大于或等于,则将任务放入任务队列 (workQueue) 中等待执行。

2. 获取任务

  • 空闲的工作线程会从任务队列中获取任务执行。
    • 如果任务队列为空,则线程会进入阻塞状态,等待任务入队。
    • 如果任务队列非空,则线程会获取队首任务并开始执行。

3. 执行任务

  • 工作线程执行任务。
  • 任务执行完成后,工作线程会释放资源,并回到空闲状态。

4. 拒绝策略

  • 当线程池无法为新任务创建工作线程时,会触发拒绝策略。
  • Java线程池提供了四种拒绝策略:
    • AbortPolicy:直接抛出 RejectedExecutionException 异常。
    • CallerRunsPolicy:由调用者所在的线程执行任务。
    • DiscardOldestPolicy:丢弃任务队列中最旧的任务,然后尝试再次执行新任务。
    • DiscardPolicy:直接丢弃新任务。

addWoker源代码在上面

ThreadFactory

ThreadFactory 是一个接口,用于创建线程的工厂。它提供了一种方式来自定义线程创建过程,允许您在创建线程时指定一些特定的配置,比如线程名称、优先级、是否为守护线程等。在 Java 中,通常是通过 Executors 工厂类的静态方法来创建线程池,而这些方法通常会接受一个 ThreadFactory 对象作为参数,用来创建线程。

1.ThreadFactory源码

这里我们先来看一下 Java为我们提供的线程工厂源代码,通过实现这个工厂,我们可以创建自定义的线程工厂。

public interface ThreadFactory {/*** ThreadFactory接口是一个线程工厂接口,用于创建新线程。* 使用线程工厂可以避免直接调用Thread的构造方法,从而使应用程序可以使用特殊的线程子类、优先级等。* 其中newThread方法负责接收一个Runnable对象,将其封装成一个Thread对象并返回。* 这样,应用程序就可以使用特殊的线程子类、优先级等。*/Thread newThread(Runnable r);
}

2.自定义简单线程工厂

通过自定义线程工程,我们来对工厂中线程的名称 和优先级进行统一更改

public class ThreadPoolFactoryDemo {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());@Test@DisplayName("threadFactory => 线程工厂")public void testThreadPoolFactory(){ExecutorService executorService = Executors.newSingleThreadExecutor( new MyThreadFactory());Future<String> future = executorService.submit(()->{// 这里即使发生了异常,也不会报错,因为线程池会捕获异常,不会抛出// 只有通过future.get()方法获取结果时才会抛出异常,这个时候需要我们自己捕获异常// int i = 1/0;logger.error("开始执行任务了!");return "ok";});// 程序运行期间 并不会报错,即使发生了异常,这个时候需要通过 get去获取时 进行捕获异常try {String s = future.get();logger.error("获取结果 :{}",s);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}static class MyThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {// 这里我们从新设置线程的名称 ,这里一定要传入Runnable的实现类 不然不会报错,但是线程不会执行// 因为线程池会通过这个Runnable实现类来创建线程Thread thread = new Thread(r,"my-thread-" + r.hashCode());// 还可以自定义线程的其他属性,比如优先级等thread.setPriority(Thread.MAX_PRIORITY);return thread;}}
}

image-20240324165307734

3.defaultThreadFactory

如果我们在创建线程池,没有指定自定义工厂,那么就会使用JDK提供的自定义工厂

默认情况下,ThreadPoolExecutor 使用的是 Executors.defaultThreadFactory() 方法提供的线程工厂。这个默认线程工厂创建的线程将具有如下特性:

  1. 线程名称: 默认情况下,线程的名称将以 “pool-” 为前缀,后跟递增的数字。
  2. 线程优先级: 默认情况下,线程的优先级将与创建它的线程相同。
  3. 是否为守护线程: 默认情况下,创建的线程将不是守护线程(daemon thread)。

源码如下:

如果不传递参数那么就会使用默认线程工厂

	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();}
    /*** The default thread factory* 默认线程工厂*/static class DefaultThreadFactory implements ThreadFactory {// 线程池编号private static final AtomicInteger poolNumber = new AtomicInteger(1);// 线程组private final ThreadGroup group;// 线程编号private final AtomicInteger threadNumber = new AtomicInteger(1);// 线程名称前缀private final String namePrefix;DefaultThreadFactory() {// 这里首先获取了当前线程的安全管理器,如果安全管理器不为空,则获取当前线程的线程组,否则获取当前线程的线程组SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();// 重新设置线程名称前缀namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {// 创建线程Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);// 判断是否是守护线程if (t.isDaemon())// 设置为非守护线程t.setDaemon(false);// 设置线程优先级(这里设置为普通优先级)if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}

当我们需要创建具有统一特性的线程时,使用线程池和工厂方法是一种更优雅、更有效的方式,使用线程工厂创建线程有以下优势

1. 代码简洁:

  • 无需在每个线程类中重复定义相同的属性和方法。
  • 只需在工厂方法中定义一次即可。

2. 代码复用:

  • 可以将创建线程的逻辑封装在工厂方法中,方便复用。
  • 可以根据不同的需求创建不同的线程池和工厂方法。

3. 易于管理:

  • 可以通过线程池统一管理线程的创建、销毁和生命周期。
  • 可以方便地监控线程池的运行状态。

4. 提高性能:

  • 可以通过线程池复用线程,减少线程创建和销毁的开销。
  • 可以通过线程池优化线程调度,提高线程的执行效率。

任务阻塞队列

关于常见的阻塞队列,后面会有专门的文章去详细比较,这里这是单纯列举一下

队列类别描述重点信息
ArrayBlockingQueue基于数组的有界阻塞队列,按照 FIFO(先进先出)原则操作。有界阻塞队列,容量固定,当队列满时插入操作将被阻塞。
LinkedBlockingQueue基于链表的可选有界或无界阻塞队列,按照 FIFO(先进先出)原则操作。默认为无界队列,但可指定容量创建有界队列。插入操作可能会阻塞,取决于队列的大小限制。
PriorityBlockingQueue基于优先级堆的无界阻塞队列,按照元素的优先级顺序操作,可自定义元素比较器。无界阻塞队列,支持优先级排序,使用 Comparator 或元素自然顺序。
DelayQueue存储实现了 Delayed 接口的元素,需要等待一段时间后才能被取出,常用于任务调度。存储延迟元素,只有延迟期满后才能被取出。
SynchronousQueue不存储元素的阻塞队列,每个插入操作必须等待一个对应的移除操作,反之亦然。每个插入操作必须等待一个对应的移除操作,是一种直接传递的队列。
LinkedTransferQueue无界阻塞队列,支持生产者和消费者的对等交换。提供了 transfer 方法以支持等待插入或移除。支持生产者和消费者的对等交换,可用于实现直接的请求/应答模式。
LinkedBlockingDeque基于链表的双端阻塞队列,支持在两端进行插入和移除操作,可作为栈或双端队列使用。双端队列,支持在两端进行插入和移除操作,可用于栈或双端队列。

线程池的拒绝策略

线程池的拒绝策略定义了当任务无法被提交到线程池执行时应该采取的行为。当线程池的工作队列已满且无法再接受新任务时,就会触发拒绝策略。Java中的ThreadPoolExecutor类允许开发者指定拒绝策略

这里我们也是简单列举一下,后面章节会详细介绍

拒绝策略描述
ThreadPoolExecutor.AbortPolicy默认策略,抛出RejectedExecutionException异常来拒绝新任务的提交。
ThreadPoolExecutor.CallerRunsPolicy在调用者线程上执行被拒绝的任务。如果线程池已关闭,则任务会被丢弃。
ThreadPoolExecutor.DiscardPolicy直接丢弃被拒绝的任务,不提供任何反馈。
ThreadPoolExecutor.DiscardOldestPolicy丢弃工作队列中等待时间最长的任务,然后尝试重新提交当前任务。
自定义拒绝策略开发者可以实现RejectedExecutionHandler接口来定义自己的拒绝策略,以满足特定需求。

优雅关闭线程池

1.(限时)等待线程池正在执行任务

(限时内)等待线程池正在执行的任务结束后,立即关闭线程池(超时任务全部执行完毕)

  1. 首先,定义了一个shutdownHook线程,它会在JVM关闭时被调用。这个线程的目的是在JVM关闭之前执行一系列操作,包括关闭线程池和等待任务执行完毕。

  2. shutdownHook线程的执行体中,定义了一个重试次数retry,初始值为2。这表示在关闭线程池之前最多会进行两次重试。

  3. 调用threadPoolExecutor.shutdown()方法来关闭线程池。需要注意的是,这里并不会等待队列中的任务执行完毕,因为队列中的任务可能还没有开始执行,直接关闭线程池即可。

  4. 使用threadPoolExecutor.awaitTermination(3, TimeUnit.MINUTES)方法等待线程池中的任务执行完毕,等待时间为3分钟。如果超过等待时间仍有任务未执行完毕,并且重试次数大于0(retry-- > 0),则执行下一步。

  5. 在超时且仍有任务未执行完毕的情况下,调用threadPoolExecutor.shutdownNow()方法取消正在执行的任务,并强制关闭线程池。

  6. 如果任务执行完毕或超时后线程池成功关闭,程序继续执行。如果任务未正常执行结束,会打印错误日志。

  7. 最后,通过Runtime.getRuntime().addShutdownHook(shutdownHook)方法注册shutdownHook线程,使其在JVM关闭时被调用。

/*** 优雅关闭线程池* @param threadPoolExecutor 线程池 实例对象*/
private static void elegantlyCloseThreadPool(ThreadPoolExecutor threadPoolExecutor) {// 优雅关闭线程池// 一个线程池,提交了五个任务去执行,执行完得需要一段时间。// 增加一个JVM的钩子,这个钩子可以简单理解为监听器,注册后,JVM在关闭的时候就会调用这个方法,调用完才会正式关闭JVM。// 优雅关闭线程池 ,这里是注册一个JVM的钩子,当JVM关闭的时候,会调用这个方法,这个方法里面是关闭线程池,等待线程池中的任务执行完毕Thread shutdownHook = new Thread(() -> {// 设定最大重试次数int retry = 2;// 关闭线程池 这里如果超时 不会等待队列中的任务执行完毕,因为队列中的任务还没有开始执行,所以直接关闭// 如果没超时,还会继续执行队列中的任务logger.error("剩余【{}】次尝试关闭线程池!",retry);threadPoolExecutor.shutdown();try {// 这个方法是等待线程池中的任务执行完毕,如果超时了,就直接关闭  retry-- > 0 是为了防止死循环if(!threadPoolExecutor.awaitTermination(3, TimeUnit.MINUTES) && retry-- > 0){// 调用shutdownNow()取消正在执行的任务,SHUTDOWN->STOP是被允许的threadPoolExecutor.shutdownNow();}else {logger.error("线程池任务未正常执行结束");}} catch (InterruptedException e) {// 等待超时logger.error("等待超时,直接关闭:{}",e.getMessage());// 立即关闭线程池threadPoolExecutor.shutdownNow();}});// 注册JVM钩子Runtime.getRuntime().addShutdownHook(shutdownHook);
}
  1. shutdown()方法:调用该方法后,线程池进入SHUTDOWN状态,不再接受新的任务提交,但会执行队列中已有的任务和在执行的任务。
  2. awaitTermination()方法:该方法会等待线程池中的任务执行完毕,或者等待超时,由于设置了3分钟的等待时间,如果在规定时间内线程池中的任务都执行完毕了,那么线程池就会被正常关闭。
  3. 如果线程池在规定时间内仍然有未完成的任务,则会调用shutdownNow()方法来尝试取消正在执行的任务,并立即关闭线程池。

2.等待队列任务全部执行完毕

要实现等待所有任务执行完毕再关闭线程池,可以按照以下步骤进行操作:

  1. 首先,创建一个线程池对象,可以使用Executors类提供的工厂方法之一创建线程池。

    1. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 30, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
      
  2. 在向线程池提交任务之前,创建一个计数器,用于跟踪任务的完成情况。可以使用CountDownLatch类来实现计数器。

    1. // 总任务数
      public static final int totalTasks = 8;
      // 使用CountDownLatch来等待线程执行完毕,CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。
      public static final CountDownLatch latch = new CountDownLatch(totalTasks);
      
  3. 提交任务到线程池执行,并在每个任务的最后调用计数器的countDown()方法,表示任务已完成。

    1. for (int i = 0; i < totalTasks; i++) {executor.submit(() -> {// 执行任务的代码   // ............// 任务执行完毕后调用计数器的countDown()方法ThreadPoolExecutorProductDemo.latch.countDown();
      });
      
  4. 在所有任务提交完成后,调用计数器的await()方法,阻塞当前线程,直到所有任务完成。

    1. // 优雅关闭线程池
      // 关闭线程池前 等待所有线程执行结束
      try {latch.await(); // 等待所有任务完成logger.error("所有任务执行完毕,关闭线程池");// 关闭线程池threadPoolExecutor.shutdown();
      } catch (InterruptedException e) {// 处理中断异常logger.error("等待线程池中的任务执行结束时发生异常:{}",e.getMessage());threadPoolExecutor.shutdownNow();}
      
  5. 当所有任务完成后,可以关闭线程池。调用线程池对象的shutdown()方法来优雅地关闭线程池。

    1. // 这里其实在4中就已经体现了
      threadPoolExecutor.shutdown();
      
3.测试两种关闭线程池方法
3.1.自定义Task任务
/*** 自定义task任务(有返回值)*/
class AsyncTask implements Callable<String> {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());private String name;public AsyncTask(String name){this.name = name;}@Overridepublic String call() throws Exception {logger.error("【{}】 开始执行啦~~~~~~~~~~",name);// 随机睡眠int sleepTime = new Random().nextInt(10000);Thread.sleep(sleepTime);// 任务执行完毕后调用计数器的countDown()方法ThreadPoolExecutorProductDemo.latch.countDown();logger.error("【{}】 执行结束啦~~~~~~~~~~",name);return "task" + name + "completed successfully! current task sleep :"+sleepTime;}
}/*** 自定义Task无返回值*/
class AsyncRunnable implements Runnable{private String name;AsyncRunnable(String name){this.name = name;}private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());@Overridepublic void run() {logger.error("【{}】 开始执行啦~~~~~~~~~~",name);// 随机睡眠int sleepTime = new Random().nextInt(10000);try {Thread.sleep(sleepTime);} catch (InterruptedException e) {throw new RuntimeException(e);}// 任务执行完毕后调用计数器的countDown()方法ThreadPoolExecutorProductDemo.latch.countDown();logger.error("【{}】 执行结束啦~~~~~~~~~~",name);}
}
3.2.测试主类
public class ThreadPoolExecutorProductDemo {// 总任务数public static final int totalTasks = 8;// 使用CountDownLatch来等待线程执行完毕,CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。public static final CountDownLatch latch = new CountDownLatch(totalTasks);private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());@Testpublic void test() throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 30, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)) {// beforeExecute: 线程执行之前调用@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);logger.error("beforeExecute,线程名称:{}", t.getName());}// afterExecute: 线程执行之后调用@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);}// terminated: 线程池关闭之后调用@Overrideprotected void terminated() {super.terminated();logger.error("terminated");}};// 创建任务AsyncTask task1 = new AsyncTask("hrfan-1");AsyncTask task2 = new AsyncTask("hrfan-2");AsyncTask task3 = new AsyncTask("hrfan-3");AsyncTask task4 = new AsyncTask("hrfan-4");AsyncTask task5 = new AsyncTask("hrfan-5");// 提交任务threadPoolExecutor.submit(task1);threadPoolExecutor.submit(task2);threadPoolExecutor.submit(task3);threadPoolExecutor.submit(task4);threadPoolExecutor.submit(task5);AsyncRunnable asyncRunnable1 = new AsyncRunnable("hrfan-run-6");AsyncRunnable asyncRunnable2 = new AsyncRunnable("hrfan-run-7");AsyncRunnable asyncRunnable3 = new AsyncRunnable("hrfan-run-8");// 创建任务threadPoolExecutor.execute(asyncRunnable1);threadPoolExecutor.execute(asyncRunnable2);threadPoolExecutor.execute(asyncRunnable3);//【submit】 和 【execute】的区别// 基本没有区别,在submit方法中仍然是调用的execute方法进行任务的执行或进入等待队列或拒绝。// submit方法比execute方法多的只是将提交的任务(不管是runnable类型还是callable类型)包装成RunnableFuture然后传递给execute方法执行。// submit方法和execute方法最大的不同点在于submit方法可以获取到任务返回值或任务异常信息,execute方法不能获取任务返回值和异常信息。// 需要等待阻塞队列中线程 全部执行完毕 再关闭线程池// 等待正在执行的线程全部执行结束,然后就关闭线程池// elegantlyCloseThreadPool(threadPoolExecutor);// 等待队列中的任务都执行完毕,在去关闭线程池waitingQueueExecuteClose(threadPoolExecutor);}
}
测试限时等待,关闭线程池

image-20240324200914343

测试等待全部任务执行完毕,关系线程池

image-20240324201244753

相关文章:

7.Java并发编程—掌握线程池的标准创建方式和优雅关闭技巧,提升任务调度效率

文章目录 线程池的标准创建方式线程池参数1.核心线程(corePoolSize)2.最大线程数(maximumPoolSize)3.阻塞队列(BlockingQueue) 向线程提交任务的两种方式1.execute()1.1.案例-execute()向线程池提交任务 2.submit()2.1.submit(Callable<T> task)2.2.案例-submit()向线程池…...

从边缘设备丰富你的 Elasticsearch 文档

作者&#xff1a;David Pilato 我们在之前的文章中已经了解了如何丰富 Elasticsearch 本身和 Logstash 中的数据。 但如果我们可以从边缘设备中做到这一点呢&#xff1f; 这将减少 Elasticsearch 要做的工作。 让我们看看如何从具有代理处理器的 Elastic 代理中执行此操作。 E…...

day29|leetcode|C++|491. 非递减子序列|46. 全排列|47. 全排列 II

Leetcode 491. 非递减子序列 链接&#xff1a;491. 非递减子序列 thought: 设 stack 中最后一个值的位置为 last。如果 stack 为空&#xff0c;则 last -1。 设当前正在处理的位置为 pos。如果在 nums 的子区间 [last1, pos) 中&#xff0c;存在和 nums[pos] 相同的值&…...

[Java、Android面试]_12_java访问修饰符、抽象类和接口

文章目录 1. java访问修饰符2. 抽象类和接口2.1 抽象类2.2 接口2.3 抽象类和接口的区别 本人今年参加了很多面试&#xff0c;也有幸拿到了一些大厂的offer&#xff0c;整理了众多面试资料&#xff0c;后续还会分享众多面试资料。 整理成了面试系列&#xff0c;由于时间有限&…...

Linux:Prometheus的源码包安装及操作(2)

环境介绍 三台centos 7系统&#xff0c;运行内存都2G 1.prometheus监控服务器&#xff1a;192.168.6.1 主机名&#xff1a;pm 2.grafana展示服务器:192.168.6.2 主机名&#xff1a;gr 3.被监控服务器&#xff1a;192.168.6.3 …...

MongoDB聚合运算符:$integral

文章目录 语法使用举例 $integral聚合运算符只能用在$setWindowField阶段&#xff0c;返回曲线下面积的近似值&#xff0c;该曲线是使用梯形规则计算的&#xff0c;其中每组相邻文档使用以下公式形成一个梯形&#xff1a; $setWindowFields阶段中用于积分间隔的sortBy字段值$i…...

手撕算法-买卖股票的最佳时机 II(买卖多次)

描述 分析 使用动态规划。dp[i][0] 代表 第i天没有股票的最大利润dp[i][1] 代表 第i天持有股票的最大利润 状态转移方程为&#xff1a;dp[i][0] max(dp[i-1][0], dp[i-1][1] prices[i]); // 前一天没有股票&#xff0c;和前一天有股票今天卖掉的最大值dp[i][1] max(dp[i-1…...

技术创新与产业升级

在政府工作报告中,新兴技术如云计算、大数据、人工智能等被多次提及,这反映了政府高度重视新一代信息技术在推动经济社会发展中的重要作用。对于计算机行业而言,抓住这些新兴技术的发展机遇,推动技术创新和产业升级,将是未来发展的关键所在。 云计算作为一种新兴的计算模式,正…...

透视未来工厂:山海鲸可视化打造数字孪生新篇章

在信息化浪潮的推动下&#xff0c;数字孪生工厂项目正成为工业制造领域的新宠。作为一名山海鲸可视化的资深用户&#xff0c;我深感其强大的数据可视化能力和数字孪生技术在工厂管理中的应用价值&#xff0c;同时我们公司之前也和山海鲸可视化合作制作了一个智慧工厂项目&#…...

三.寄存器(内存访问)

1.内存中字的存储 2.并不是所有cpu都支持将数据段送入段寄存器&#xff0c;所以有时候用个别的寄存器先把数据段存储起来&#xff0c;再把该寄存器mov到段寄存器。 3.字的传送 4.栈 5.栈机制 举例说明 6.栈顶超界问题 push超界 pop超界 7.栈段...

Day31 贪心算法

Day31 贪心算法 455.分发饼干 我的思路&#xff1a; 小孩数组g指针一直前移&#xff0c;只有饼干数组s满足条件时&#xff0c;才前移&#xff0c;并且更新num 解答&#xff1a; class Solution {public int findContentChildren(int[] g, int[] s) {Arrays.sort(g);Arrays.…...

【WEEK4】 【DAY5】AJAX - Part Two【English Version】

2024.3.22 Friday Following the previous article 【WEEK4】 【DAY4】AJAX - Part One【English Version】 Contents 8.4. Ajax Asynchronous Data Loading8.4.1. Create User.java8.4.2. Add lombok and jackson support in pom.xml8.4.3. Change Tomcat Settings8.4.4. Mo…...

力扣100热题[哈希]:最长连续序列

原题&#xff1a;128. 最长连续序列 题解&#xff1a; 官方题解&#xff1a;. - 力扣&#xff08;LeetCode&#xff09;题解&#xff0c;最长连续序列 &#xff1a;哈希表 官方解题思路是先去重&#xff0c;然后判断模板长度的数值是否存在&#xff0c;存在就刷新&#xff0c…...

python笔记基础--文件和存储数据(7)

目录 1.从文件中读取数据 2.写入文件 3.存储数据 3.1使用json.dump()和json.load() 3.2保存和读取用户生成的数据 3.3重构 1.从文件中读取数据 读取整个文件 with open(data.txt) as file_object: contents file_object.read()print(contents)print(contents.rstrip…...

Vue黑马笔记(最新)

VUE vue是一个用于构建用户界面的渐进式框架 创建一个VUE实例 核心步骤&#xff1a; 准备容器引包&#xff08;官网&#xff09;-开发版本/生产版本创建一个vue实例 new vue()指定配置项->渲染数据 el指定挂载点&#xff08;选择器&#xff09;,指定管理的是哪个容器。dat…...

安全工具介绍 SCNR/Arachni

关于SCNR 原来叫Arachni 是开源的&#xff0c;现在是SCNR&#xff0c;商用工具了 可试用一个月 Arachni Web Application Security Scanner Framework 看名字就知道了&#xff0c;针对web app 的安全工具&#xff0c;DASTIAST吧 安装 安装之前先 sudo apt-get update sudo…...

赋能数据收集:从机票网站提取特价优惠的JavaScript技巧

背景介绍 在这个信息时代&#xff0c;数据的收集和分析对于旅游行业至关重要。在竞争激烈的市场中&#xff0c;实时获取最新的机票特价信息能够为旅行者和旅游企业带来巨大的优势。 随着机票价格的频繁波动&#xff0c;以及航空公司和旅行网站不断推出的限时特价优惠&#xff…...

【大模型】在VS Code(Visual Studio Code)上安装中文汉化版插件

文章目录 一、下载安装二、配置显示语言&#xff08;一&#xff09;调出即将输入命令的搜索模式&#xff08;二&#xff09;在大于号后面输入&#xff1a;Configure Display Language&#xff08;三&#xff09;重启 三、总结 【运行系统】win 11 【本文解决的问题】 1、英文不…...

自定义WordPress顶部的菜单的方法

要自定义WordPress顶部的菜单&#xff0c;你需要使用WordPress的菜单系统。首先&#xff0c;你需要创建自定义菜单&#xff0c;然后将其设置为顶部导航菜单。 以下是创建自定义菜单并设置其为顶部导航菜单的步骤&#xff1a; 登录到WordPress管理界面。转到“外观”>“菜单…...

独孤思维:流量暴涨,却惨遭违规

最近独孤操作虚拟资料短视频&#xff0c;有个很深的感悟。 每天发10条短视频&#xff0c;积累到20天左右&#xff0c;播放量和粉丝数开始暴涨。 虽然很多牛比的比我数据好&#xff0c;但是对于刚做短视频的独孤来说&#xff0c;我已经满足了。 但是又发了10来天&#xff0c;…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

Springboot社区养老保险系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;社区养老保险系统小程序被用户普遍使用&#xff0c;为方…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

SQL Server 触发器调用存储过程实现发送 HTTP 请求

文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...

Modbus RTU与Modbus TCP详解指南

目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...

鸿蒙HarmonyOS 5军旗小游戏实现指南

1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;采用DevEco Studio实现&#xff0c;包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...

归并排序:分治思想的高效排序

目录 基本原理 流程图解 实现方法 递归实现 非递归实现 演示过程 时间复杂度 基本原理 归并排序(Merge Sort)是一种基于分治思想的排序算法&#xff0c;由约翰冯诺伊曼在1945年提出。其核心思想包括&#xff1a; 分割(Divide)&#xff1a;将待排序数组递归地分成两个子…...

EasyRTC音视频实时通话功能在WebRTC与智能硬件整合中的应用与优势

一、WebRTC与智能硬件整合趋势​ 随着物联网和实时通信需求的爆发式增长&#xff0c;WebRTC作为开源实时通信技术&#xff0c;为浏览器与移动应用提供免插件的音视频通信能力&#xff0c;在智能硬件领域的融合应用已成必然趋势。智能硬件不再局限于单一功能&#xff0c;对实时…...

【Pandas】pandas DataFrame dropna

Pandas2.2 DataFrame Missing data handling 方法描述DataFrame.fillna([value, method, axis, …])用于填充 DataFrame 中的缺失值&#xff08;NaN&#xff09;DataFrame.backfill(*[, axis, inplace, …])用于**使用后向填充&#xff08;即“下一个有效观测值”&#xff09…...