当前位置: 首页 > news >正文

FutureCompletableFuture实战

1. Callable&Future&FutureTask介绍

直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景,而Future和FutureTask就可以和Callable接口配合起来使用。

@FunctionalInterface
public interface Runnable {public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

Runnable 的缺陷:

  • 不能返回一个返回值
  • 不能抛出 checked Exception

Callable的call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。

new Thread(new Runnable() {@Overridepublic void run() {System.out.println("通过Runnable方式执行任务");}
}).start();FutureTask task = new FutureTask(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("通过Callable方式执行任务");Thread.sleep(3000);return "返回任务结果";}
});
new Thread(task).start();
System.out.println(task.get());

1.1 Future 的API

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

  • boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
  • boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
  • boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
  • V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
  • V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

1.2 FutureTask 使用

Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。

FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。

把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。

public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Task task = new Task();//构建futureTaskFutureTask<Integer> futureTask = new FutureTask<>(task);//作为Runnable入参new Thread(futureTask).start();System.out.println("task运行结果:"+futureTask.get());}static class Task implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("子线程正在计算");int sum = 0;for (int i = 0; i < 100; i++) {sum += i;}return sum;}}
}

使用案例:促销活动中商品信息查询

在维护促销活动时需要查询商品信息(包括商品基本信息、商品价格、商品库存、商品图片、商品销售状态等)。这些信息分布在不同的业务中心,由不同的系统提供服务。如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要200ms-300ms,这对于我们来说是不满意的。如果使用Future改造则需要的就是最长耗时服务的接口,也就是50ms左右。

public class FutureTaskDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> ft1 = new FutureTask<>(new T1Task());FutureTask<String> ft2 = new FutureTask<>(new T2Task());FutureTask<String> ft3 = new FutureTask<>(new T3Task());FutureTask<String> ft4 = new FutureTask<>(new T4Task());FutureTask<String> ft5 = new FutureTask<>(new T5Task());//构建线程池ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.submit(ft1);executorService.submit(ft2);executorService.submit(ft3);executorService.submit(ft4);executorService.submit(ft5);//获取执行结果System.out.println(ft1.get());System.out.println(ft2.get());System.out.println(ft3.get());System.out.println(ft4.get());System.out.println(ft5.get());executorService.shutdown();}static class T1Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T1:查询商品基本信息...");TimeUnit.MILLISECONDS.sleep(50);return "商品基本信息查询成功";}}static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:查询商品价格...");TimeUnit.MILLISECONDS.sleep(50);return "商品价格查询成功";}}static class T3Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T3:查询商品库存...");TimeUnit.MILLISECONDS.sleep(50);return "商品库存查询成功";}}static class T4Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T4:查询商品图片...");TimeUnit.MILLISECONDS.sleep(50);return "商品图片查询成功";}}static class T5Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T5:查询商品销售状态...");TimeUnit.MILLISECONDS.sleep(50);return "商品销售状态查询成功";}}}

1.3 Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

2. CompletableFuture使用详解

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Future 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

2.1 应用场景

描述依赖关系:

  1. thenApply() 把前面异步任务的结果,交给后面的Function
  2. thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回

描述and聚合关系:

  1. thenCombine:任务合并,有返回值
  2. thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
  3. runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。

描述or聚合关系:

  1. applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
  2. acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
  3. runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。

并行执行:

CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行

2.2 创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 

这四个方法区别在于:

  1. runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞
  2. 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
  3. 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

runAsync&supplyAsync

Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});String result = future.get();System.out.println(result);

执行无返回结果的异步任务

执行有返回值的异步任务

2.3 获取结果

join&get

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

2.4 结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
  • Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常

whenComplete&exceptionally

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("执行结束!");return "test";});future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 执行完成!");}});future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("执行失败:" + t.getMessage());return "异常xxxx";}}).join();

执行结束!

test 执行完成!

或者

执行失败:java.lang.ArithmeticException: / by zero

null 执行完成!

2.5 结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

thenApply

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一阶段:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("二阶段:" + result);return result;});System.out.println("最终结果:" + future.get());

一阶段:100

二阶段:300

最终结果:300

thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一阶段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二阶段:" + number);return number;}});}});System.out.println("最终结果: " + future.get());

第一阶段:10

第二阶段:20

最终结果:20

thenApply 和 thenCompose的区别

  • thenApply转换的是泛型中的类型,并返回一个新的封装了转换结果的

CompletableFuture实例;

  • thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());System.out.println(result2.get());

Hello World

Hello World

2.6 结果消费

与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。

根据对结果的处理方式,结果消费函数又分为:

  • thenAccept系列:对单个结果进行消费
  • thenAcceptBoth系列:对两个结果进行消费
  • thenRun系列:不关心结果,只对结果执行Action

thenAccept

通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);


 

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenAccept(number ->System.out.println("第二阶段:" + number * 5));System.out.println("最终结果:" + future.get());

第一阶段:8

第二阶段:40

最终结果:null

thenAcceptBoth

thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));}}).join();

第二阶段:1

第一阶段:2

最终结果:3

thenRun

thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。

public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenRun(() ->System.out.println("thenRun 执行"));System.out.println("最终结果:" + future.get());

第一阶段:2

thenRun 执行

最终结果:null

2.7 结果组合

thenCombine

thenCombine 方法,合并两个线程任务的结果,并进一步处理。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段:" + number);return number;}});CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});System.out.println("最终结果:" + result.get());

第一阶段:9

第二阶段:5

最终结果:14

2.8 任务交互

所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

applyToEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段end:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段end:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}}).join();

第一阶段start:6

第二阶段start:5

第二阶段end:5

最快结果:5

acceptEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);}}).join();

第二阶段:3

最快结果:3

runAfterEither

两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}}).join();"); } }).join();

第一阶段:3

已经有一个任务完成了

runAfterBoth

两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:1");return 1;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:2");return 2;}});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面两个任务都执行完成了。");}}).get();

第一阶段:1

第二阶段:2

上面两个任务都执行完成了。

anyOf

anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);System.out.println(result.get());

world

allOf

allOf方法用来实现多 CompletableFuture 的同时返回。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);try {combindFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("future1: " + future1.isDone() + ",future2: " + future2.isDone());

future2完成!

future1完成!

future1: true,future2: true

2.9 使用案例:实现最优的“烧水泡茶”程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

基于Future实现

public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());}}// T1Task需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}}

基于CompletableFuture实现

public class CompletableFutureDemo2 {public static void main(String[] args) {//任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}}

相关文章:

FutureCompletableFuture实战

1. Callable&Future&FutureTask介绍 直接继承Thread或者实现Runnable接口都可以创建线程&#xff0c;但是这两种方法都有一个问题就是&#xff1a;没有返回值&#xff0c;也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景&#xff0c;而Fu…...

Loki 微服务模式组件介绍

目录 一、简介 二、架构图 三、组件介绍 Distributor&#xff08;分发器&#xff09; Ingester&#xff08;存储器&#xff09; Querier&#xff08;查询器&#xff09; Query Frontend&#xff08;查询前端&#xff09; Index Gateway&#xff08;索引网关&#xff09…...

peerDependencies对等依赖

在 package.json 中平时常用的有字段有 dependencies 和 devDependencies&#xff0c;但 peerDependencies 平时都没咋看到过&#xff0c;今天具体讲讲 peerDependencies 的作用 一、什么是对等依赖 peerDependencies 可以翻译为“对等依赖”或“同行依赖”。这个术语在 npm …...

贪心算法 part01

class Solution { public:int maxSubArray(vector<int>& nums) {int result INT32_MIN;int count 0;for (int i 0; i < nums.size(); i) {count nums[i];if (count > result) { // 取区间累计的最大值&#xff08;相当于不断确定最大子序终止位置&#xff…...

java开发入门学习二 - 变量

目录 一 关键字 ​编辑 二 标识符 三 变量 变量数据类型 变量注意点 四 数据类型 前置知识 - 计算机存储单位 整型数据类型 浮点数据类型 字符数据类型 布尔数据类型 五 数据类型间的计算 基本数据类型之间的计算 自动类型提升 强制类型转换 引用数据类型 Sti…...

Qt Q_ENUM enum 转 QString 枚举字符串互转; C++模板应用

Part1: Summary 项目中我们常用到命名&#xff0c;使用 enum 转成 string &#xff0c;方便简洁&#xff1b;Qt给我们提供了一个很方便的功能 Q_ENUM&#xff0c;可以实现枚举字符串互转&#xff1b; Q_ENUM宏将枚举注册到元对象系统中&#xff1b; QMetaEnum::fromType获取枚…...

0004.基于springboot+elementui的在线考试系统

适合初学同学练手项目&#xff0c;部署简单&#xff0c;代码简洁清晰&#xff1b; 愿世界和平再无bug 一、系统架构 前端&#xff1a;vue| elementui 后端&#xff1a;springboot | mybatis-plus 环境&#xff1a;jdk1.8 | mysql | maven 二、登录角色 1.管理员 2.老师 …...

基于 iAP2 协议 的指令协议,用于对安防设备的 MCU 进行操作

协议设计目标 1. 安全性&#xff1a;通过 iAP2 协议与 MCU 设备进行安全通信。 2. 通用性&#xff1a;支持对安防设备的常见功能进行操作&#xff0c;如状态查询、设备控制、参数配置等。 3. 高效性&#xff1a;数据结构简洁清晰&#xff0c;易于解析和扩展。 4. 扩展性&#x…...

02-5.python入门基础一控制流(while)

Python 控制流是指控制程序执行顺序的机制&#xff0c;它允许程序根据不同的条件和情况执行不同的代码块或重复执行某些代码。 while 循环的用法与示例 语法结构及要点 在 Python 中&#xff0c;while循环是一种基于条件判断的循环结构&#xff0c;其语法构成如下&#xff1a;…...

Go语言开发入门与实战

Go语言(简称Golang)由Google开发,是一门现代化的编程语言,因其简洁高效、并发支持友好、跨平台特性而在后端服务开发、云计算等领域大放异彩。本文将介绍Go语言的基本特点、开发环境配置,并通过一个简单的实战项目带领大家快速上手。 一、Go语言的特点 简单易学:语法简洁…...

HarmonyOS Next应用开发实战:ArkWeb组件使用介绍及使用举例

ArkWeb简介 ArkWeb&#xff08;方舟Web&#xff09;是HarmonyOS Next中提供的一个Web组件&#xff0c;主要用于在应用程序中显示Web页面内容。这个组件使得开发者可以在HarmonyOS应用中嵌入Web页面&#xff0c;从而降低开发成本&#xff0c;提升开发和运营效率。 使用场景 A…...

【已解决】在Visual Studio里将应用与Microsoft Store关联时提示网络异常

发布Windows应用时。在Visual Studio里点击"发布“&#xff0c;将应用与Microsoft Store关联时&#xff0c;一直提示网络错误。 查了一下论坛&#xff0c;发现之前也经常出现&#xff0c;但我是第一次遇到。 不能就这样一直被卡着呀&#xff0c;研究了一下&#xff0c;还…...

springcloud-gateway获取应用响应信息乱码

客户端通过springcloud gateway跳转访问tongweb上的应用&#xff0c;接口响应信息乱码。使用postman直接访问tongweb上的应用&#xff0c;响应信息显示正常。 用户gateway中自定义了实现GlobalFilter的Filter类&#xff0c;在该类中获取了上游应用接口的响应信息&#xff0c;直…...

[笔记]关于Qt的nativeEvent事件无法接收window消息的Bug

1.nativeEvent事件无法接收window消息 此处不是nativeEvent不能接收&#xff0c;是possmessage一定要写对发送的软件名称&#xff0c;这个名称在Qt中是主界面类的名称&#xff0c;就是主界面UI的名称&#xff0c;而不是rc文件中定义的名称。 所以在FindWindow函数获取目标窗口…...

LeetCode 热题 100_K 个一组翻转链表(31_25_困难_C++)(四指针法)

LeetCode 热题 100_K 个一组翻转链表&#xff08;31_25&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;四指针法&#xff09;&#xff1a; 代码实现代码实现&#xff08;思路一&#xff08;四指针法&#x…...

Pytorch | 从零构建MobileNet对CIFAR10进行分类

Pytorch | 从零构建MobileNet对CIFAR10进行分类 CIFAR10数据集MobileNet设计理念网络结构技术优势应用领域 MobileNet结构代码详解结构代码代码详解DepthwiseSeparableConv 类初始化方法前向传播 forward 方法 MobileNet 类初始化方法前向传播 forward 方法 训练和测试训练代码…...

CSS系列(18)-- 工程化实践详解

前端技术探索系列&#xff1a;CSS 工程化实践详解 &#x1f3d7;️ 致读者&#xff1a;探索 CSS 工程化之路 &#x1f44b; 前端开发者们&#xff0c; 今天我们将深入探讨 CSS 工程化实践&#xff0c;学习如何在大型项目中管理 CSS。 工程化配置 &#x1f680; 项目结构 …...

日拱一卒(18)——leetcode学习记录:二叉树中的伪回文路径

一、题目 给你一棵二叉树&#xff0c;每个节点的值为 1 到 9 。我们称二叉树中的一条路径是 「伪回文」的&#xff0c;当它满足&#xff1a;路径经过的所有节点值的排列中&#xff0c;存在一个回文序列。 请你返回从根到叶子节点的所有路径中 伪回文 路径的数目。 二、思路 …...

hive—炸裂函数explode/posexplode

1、Explode炸裂函数 将hive某列一行中复杂的 array 或 map 结构拆分成多行&#xff08;只能输入array或map&#xff09; 语法&#xff1a; select explode(字段) as 字段命名 from 表名; 举例&#xff1a; 1&#xff09;explode(array)使得结果中将array列表里的每个元素生…...

SpringBoot 新特性

优质博文&#xff1a;IT-BLOG-CN 2.1.0新特性最低支持jdk8,支持tomcat9 对响应式编程的支持&#xff0c;spring-boot-starter-webflux starter POM可以快速开始使用Spring WebFlux&#xff0c;它由嵌入式Netty服务器支持 1.5.8 2.1.0/2.7.0/3.0.0 Configuration propertie…...

鸿蒙app封装 axios post请求失败问题

这个问题是我的一个疏忽大意&#xff0c;在这里记录一下。如果有相同问题的朋友&#xff0c;可以借鉴。 当我 ohpm install ohos/axios 后&#xff0c;进行简单post请求验证&#xff0c;可以请求成功。 然后&#xff0c;我对axios 进行了封装。对axios 添加请求拦截器/添加响…...

消息队列 Kafka 架构组件及其特性

Kafka 人们通常有时会将 Kafka 中的 Topic 比作队列&#xff1b; 在 Kafka 中&#xff0c;数据是以主题&#xff08;Topic&#xff09;的形式组织的&#xff0c;每个 Topic 可以被分为多个分区&#xff08;Partition&#xff09;。每个 Partition 是一个有序的、不可变的消息…...

网络攻击与防范

目录 选填 第一章 1、三种网络模式 2、几种创建网络拓扑结构 NAT模式 VPN模式 软路由模式1 软路由模式2 3、Linux网络配置常用指令 4、常见网络服务配置 DHCP DNS Web服务与FTP服务 FTP用户隔离 第二章 DNS信息收集&#xff08;dnsenum、dnsmap&#xff09; 路…...

文献研读|基于像素语义层面图像重建的AI生成图像检测

前言&#xff1a;本篇文章主要对基于重建的AI生成图像检测的四篇相关工作进行介绍&#xff0c;分别为基于像素层面重建的检测方法 DIRE 和 Aeroblade&#xff0c;以及基于语义层面重建的检测方法 SimGIR 和 Zerofake&#xff1b;并对相应方法进行比较。 相关文章&#xff1a;论…...

【操作系统】为什么需要架构裁剪?

为什么需要架构裁剪&#xff1f; 原因 减小核心大小提高架构初始化速度降低内存占用提高系统性能移除不需要的功能&#xff0c;增加安全性 裁剪方法 初始化配置设置功能模块化移除不需要的驱动底层 一般裁剪对象&#xff08;以操作系统为例&#xff09; 文件系统的支持网…...

LSTM长短期记忆网络

LSTM&#xff08;长短期记忆网络&#xff09;数学原理 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种特殊的递归神经网络&#xff08;RNN&#xff09;&#xff0c;解决了标准RNN中存在的梯度消失&#xff08;Vanishing Gradient&#xff09; 和**梯度爆炸&#x…...

基于前端技术UniApp和后端技术Node.js的电影购票系统

文章目录 摘要Abstruct第一章 绪论1.1 研究背景与意义1.2 国内外研究现状 第二章 需求分析2.1 功能需求分析2.2 非功能性需求分析 第二章系统设计3.1 系统架构设计3.1.1 总体架构3.1.2 技术选型 3.2 功能架构 第四章 系统实现4.1 用户端系统实现4.1.1 用户认证模块实现4.1.2 电…...

数据结构与算法:稀疏数组

前言 此文以整型元素的二维数组为例&#xff0c;阐述稀疏数组的思想。其他类型或许有更适合压缩算法或者其他结构的稀疏数组&#xff0c;此文暂不扩展。 稀疏数组的定义 在一个二维数据数组里&#xff0c;由于大量的元素的值为同一个值&#xff0c;比如 0或者其他已知的默认值…...

Meta重磅发布Llama 3.3 70B:开源AI模型的新里程碑

在人工智能领域&#xff0c;Meta的最新动作再次引起了全球的关注。今天&#xff0c;我们见证了Meta发布的Llama 3.3 70B模型&#xff0c;这是一个开源的人工智能模型&#xff0c;它不仅令人印象深刻&#xff0c;而且在性能上达到了一个新的高度。 一&#xff0c;技术突破&#…...

VSCode中的Black Formatter没有生效的解决办法

说明 如果正常按照配置进行的话&#xff0c;理论上是可以生效的。 "[python]": {"editor.defaultFormatter": "ms-python.black-formatter","editor.formatOnSave": true }但我在一种情况下发现不能生效&#xff0c;应为其本身的bug…...