7.8 CompletableFuture
Future 接口理论知识复习
Future 接口(FutureTask 实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过了一会才去获取子任务的执行结果,或变更的任务状态。
总结:Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和复杂业务。
Future 接口常用实现类 FutureTask 异步任务
Future 接口能干什么
Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 Future 把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果。
本源的 Future 接口相关架构
绿色虚线:表示实现的关系,实现一个接口
绿色实线:表示接口之间的继承
蓝色实线:表示类之间的继承
Future 编码实战和优缺点分析
编码实战
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread());Thread t1 = new Thread(futureTask);t1.start();System.out.println(futureTask.get());}
}class MyThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("------come in call()");return "hello Callable";}
}
优缺点
优点
- Future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
public class FuturePoolDemo {public static void main(String[] args) {method1();System.out.println("------------------");method2();}private static void method2() {ExecutorService executorService = Executors.newFixedThreadPool(3);long startTime = System.currentTimeMillis();FutureTask<String> futureTask1 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});executorService.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task2 over";});executorService.submit(futureTask2);FutureTask<String> futureTask3 = new FutureTask<String>(() -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task3 over";});executorService.submit(futureTask3);long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");executorService.shutdown();}private static void method1() {// 3 个任务,目前只有一个线程 main 来处理long startTime = System.currentTimeMillis();// 暂停毫秒try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\t ----end");}
}
输出:
----costTime: 1114毫秒
main ----end
------------------
----costTime: 46毫秒
main ----end
缺点
- get() 阻塞
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(futureTask.get());log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");}
}
输出
20:59:50.067 [t1] - t1 -----come in
20:59:55.075 [main] - task over
20:59:55.075 [main] - main ----忙其他任务了
主线程需等待 get 方法获取到 t1 线程的执行结果才开始执行。
建议:
- get 方法容易导致阻塞,一般建议放在程序后面,一旦调用 get 方法求结果,如果计算没有完成容易导致程序阻塞。
- 如果不希望等待过长时间,希望在等待指定时间后自动结束。可以使用
get(long timeout, TimeUnit unit)
方法
get(long timeout, TimeUnit unit)
方法演示
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");log.debug(futureTask.get(2, TimeUnit.SECONDS));}
}
输出
- isDone() 轮询容易浪费系统资源
@Slf4j(topic = "c.FutureAPIDemo")
public class FutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug(Thread.currentThread().getName() + "\t -----come in");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}return "task over";});Thread t1 = new Thread(futureTask, "t1");t1.start();log.debug(Thread.currentThread().getName() + "\t ----忙其他任务了");while(true) {if(futureTask.isDone()) {log.debug(futureTask.get());break;} else {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("正在处理中...");}}}
}
输出
22:56:26.267 [t1] - t1 -----come in
22:56:26.267 [main] - main ----忙其他任务了
22:56:26.775 [main] - 正在处理中...
22:56:27.281 [main] - 正在处理中...
22:56:27.784 [main] - 正在处理中...
22:56:28.285 [main] - 正在处理中...
22:56:28.787 [main] - 正在处理中...
22:56:29.292 [main] - 正在处理中...
22:56:29.795 [main] - 正在处理中...
22:56:30.299 [main] - 正在处理中...
22:56:30.801 [main] - 正在处理中...
22:56:31.306 [main] - 正在处理中...
22:56:31.306 [main] - task over
轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果。如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
总结
Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
想完成一些复杂的任务
对于简单的业务场景使用 Future 是完全 OK 的。但是对于回调通知,通过轮询的方式去判断任务是否完成,这样非常占用 CPU,并且代码也不优雅。
一些复杂任务:
- 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。
- 将这两个或多个异步计算合成一个一步计算,这几个一步计算相互独立,同时后面这个又依赖前一个处理的结果。
- 对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理
对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。
CompletableFuture 对 Future 的改进
CompletableFuture 为什么出现
get() 方法在 Future 计算完成之前会一直处在阻塞状态下,isDone() 方法容易耗费 CPU 资源。对于真正的异步处理,我们希望是可以通过传入回调函数,在 Future 结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源。因此,JDK8 设计出 CompletableFuture。CompletableFuture 提供了一种观察者模式类似的机制,可以让人物执行完成后通知监听的一方。
CompletableFuture 和 CompletionStage 源码分别介绍
类架构说明
CompletableFuture 实现了 Future 接口和 CompletionStage 接口。
接口 CompletionStage
- CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如
stage.thenApply(x -> square(x)).thenAccept(x -> Systemm.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
类 CompletableFuture
- 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了 Future 和 CompletionStage 接口
核心的四个静态方法
方法介绍
runAsync 无返回值:
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}
supplyAsync 有返回值:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}
上述Executor executor
参数说明:
- 没有指定
Executor
的方法,直接使用默认的ForkJoinPool.commonPool()
作为它的线程池执行异步代码。 - 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码。
代码演示
runAsync 未设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");});System.out.println(future.get());}
}
输出:
runAsync 设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");}, executorService);System.out.println(future.get());executorService.shutdown();}
}
输出
supplyAsync 未设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";});log.debug(future.get());}
}
输出
supplyAsync 设置线程池参数
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {log.debug(Thread.currentThread().getName() + "\t ------come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}log.debug("--------task is over");return "hello supplyAsync";}, executorService);log.debug(future.get());executorService.shutdown();}
}
输出
CompletableFuture 之通用异步编程
从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue" + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}
执行结果
没有发生异常,为什么没有执行 whenComplete 中的逻辑?
Fork/Join 线程池中的线程类似于守护线程,由于主线程执行速度过快,先执行结束,导致 Fork/Join 线程池被关闭。
解决方法
- 可以让主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:暂停 3 秒钟主线程。
- 使用自定义线程池
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出:
whenComplete 方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);
}
传入的参数是一个 BiConsumer 函数接口,BiConsumer 接口可以接收两个参数,其中第一个参数为上一步中产生的结果,第二个参数为上一步代码执行过程中产生的异常。
exceptionally 方法
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
}
如果 CompletableFuture 在执行时发生异常,则执行 exceptionally 中的逻辑,并返回新的 CompletableFuture
代码实例如下,当产生的随机数大于 2,触发异常,执行 exceptionally 中的逻辑
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("----1秒钟后出结果:" + result);if(result > 2) {int i = 1 / 0;}return result;}, pool).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算完成,更新系统UpdateValue: " + v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出
CompletableFuture 优点
- 异步任务结束时,会自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法
案例精讲 - 从电商网站的比价需求说开去
函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值 |
---|---|---|---|
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1 个参数 | 有返回值 |
Consumer | accept | 1 个参数 | 无返回值 |
Supplier | get | 无参数 | 有返回值 |
BiConsumer | accept | 2 个参数 | 无返回值 |
业务需求
业务需求说明
电商网站比价需求分析:
- 需求说明:
- 同一款产品,同时搜索出同款产品在各大电商平台的售价
- 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
- 输出返回:
- 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43
- 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表
- step by step,按部就班,查完淘宝查京东,查完京东查天猫…
- all in,万箭齐发,一口气多线程异步任务同时查询
代码实现
public class CompletableFutureMallDemo {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));/*** 一家一家的查* @param list* @param productName* @return*/public static List<String> getPrice(List<NetMall> list, String productName) {return list.stream().map(netMall ->String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName))).collect(Collectors.toList());}public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();
// List<String> list1 = getPrice(list, "mysql");List<String> list1 = getPriceByCompletableFuture(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: " + (endTime - startTime) + "毫秒");}
}@AllArgsConstructor
class NetMall {@Getterprivate String netMallName;public double calPrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
普通方式查询,即调用 getPrice() 方法
采用 CompletableFuture 方式查询,即调用 getPriceByCompletableFuture 方法
CompletableFuture 常用方法
获得结果和触发计算
获取结果
public T get() throws InterruptedException, ExecutionException {// ...
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {// ...
}// 和get一样的作用,只是不需要抛出异常
public T join() {// ...
}// 计算完成就返回正常值,否则返回备胎值(传入的参数),立即获取结果不阻塞
public T getNow(T valueIfAbsent) {// ...
}
主动触发计算
// 是否打断 get 方法立即返回括号值
public boolean complete(T value)
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "abc";});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(future.getNow("xxx"));System.out.println(future.complete("completeValue") + "\t" + future.join());}
}
输出
如果主线程中睡够 2 秒再去获取,输出结果:
对计算结果进行处理
thenApply
- 计算结果存在依赖关系,两个线程串行化
- 由于存在依赖关系(当前步骤出错,不再执行下一步),当前步骤有异常的话就停止运行。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");}
}
输出
如下,若是在第二步执行过程中出错,程序将退出运行。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f -> {int i = 1 / 0;System.out.println("222");return f + 2;}).thenApply(f -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出结果
handle
- 计算结果存在依赖关系,两个线程串行化
- 有异常也可以继续向下运行,根据带的异常参数可以进一步处理
如下代码中,步骤 2 中出错,步骤 3 使用 handle 进行处理
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, pool).thenApply(f-> {int i = 1 / 0;System.out.println("222");return f + 2;}).handle((f, e) -> {System.out.println("333");return f + 3;}).whenComplete((v, e) -> {if(e == null) {System.out.println("----计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();;System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");pool.shutdown();}
}
输出结果
步骤 3 可以正常执行,但是由于步骤 2 中出错,最终会执行 exceptionally 中的逻辑
对计算结果进行消费
thenAccept
接收任务的处理结果,并消费处理,无返回结果
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenAccept(r -> {System.out.println(r);});}
}
输出
对比补充
theRun(Runnable runnable)
任务 A 执行完执行任务 B,并且 B 不需要 A 的结果theAccept(Consumer action)
任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值thenApply(Function fn)
任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,并且 B 不需要 A 的结果.thenRun(() -> {}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,但是任务 B 无返回值.thenAccept(r -> {System.out.println(r);}).join());System.out.println("--------------------------------");System.out.println(CompletableFuture.supplyAsync(() -> "resultA")// 任务 A 执行完执行任务 B,B 需要 A 的结果,同时任务 B 有返回值.thenApply(r -> r + "resultB").join());}
}
输出结果
CompletableFuture 和线程池说明
theRun 和 theRunAsync 有什么区别?
- 没有传入自定义线程池,都使用默认线程池 ForkJoinPool
- 传入了一个自定义线程池
- 如果执行第一个任务的时候,传入了一个自定义线程池
- 调用 theRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
- 调用 theRunAsync 执行第二个任务时,则第一个人任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoinPool 线程池
- 有可能处理太快,系统优化切换原则,直接使用 main 线程处理
其他如:thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是同理。
使用 theRun 方法且未使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));}
}
输出结果
使用 theRunAsync 方法且未使用自定义线程池与使用 theRun 方法且未使用自定义线程池情况一样
使用 theRun 方法且使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRun(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}
输出结果
使用 theRunAsync 方法且未使用自定义线程池
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("1号任务\t" + Thread.currentThread().getName());return "abcd";}, pool).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("2号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("3号任务\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("4号任务\t" + Thread.currentThread().getName());});System.out.println(future.get(2L, TimeUnit.SECONDS));pool.shutdown();}
}
输出结果
对计算速度进行选用
applyToEither
谁快用谁
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> planA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planA";});CompletableFuture<String> planB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planB";});CompletableFuture<String> result = planA.applyToEither(planB, f -> {return f + " is winer";});System.out.println(Thread.currentThread().getName() + "\t----" + result.join());}
}
输出结果
对计算结果进行合并
thenCombine
两个 CompletionStage 任务都完成后,最终把两个任务的结果一起交给 thenCombine 来处理。先完成的先等着,等待其他分支任务结束。
@Slf4j(topic = "c.CompletableFutureDemo")
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t----启动");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFuture<Integer> combine = future1.thenCombine(future2, (x, y) -> {System.out.println("-----开始两个结果的合并");return x + y;});System.out.println(combine.join());}
}
输出结果
相关文章:

7.8 CompletableFuture
Future 接口理论知识复习 Future 接口(FutureTask 实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。 比如主线程让一个子线程去执行任务,子线…...

iPad锁屏密码忘记怎么办?有什么方法可以解锁?
当我们在日常使用iPad时,偶尔可能会遇到忘记锁屏密码的尴尬情况。这时,不必过于担心,因为有多种方法可以帮助您解锁iPad。接下来,小编将为您详细介绍这些解决方案。 一、使用iCloud的“查找我的iPhone”功能 如果你曾经启用了“查…...

了解并缓解 IP 欺骗攻击
欺骗是黑客用来未经授权访问计算机或网络的一种网络攻击,IP 欺骗是其他欺骗方法中最常见的欺骗类型。通过 IP 欺骗,攻击者可以隐藏 IP 数据包的真实来源,使攻击来源难以知晓。一旦访问网络或设备/主机,网络犯罪分子通常会挖掘其中…...

java LogUtil输出日志打日志的class文件内具体方法和行号
最近琢磨怎么把日志打的更清晰,方便查找问题,又不需要在每个class内都创建Logger对象,还带上不同的颜色做区分,简直不要太爽。利用堆栈的方向顺序拿到日志的class问题。看效果,直接上代码。 1、demo test 2、输出效果…...

02. Hibernate 初体验之持久化对象
1. 前言 本节课程让我们一起体验 Hibernate 的魅力!编写第一个基于 Hibernate 的实例程序。 在本节课程中,你将学到 : Hibernate 的版本发展史;持久化对象的特点。 为了更好地讲解这个内容,这个初体验案例分上下 2…...

MySQL超详细学习教程,2023年硬核学习路线
文章目录 前言1. 数据库的相关概念1.1 数据1.2 数据库1.3 数据库管理系统1.4 数据库系统1.5 SQL 2. MySQL数据库2.1 MySQL安装2.2 MySQL配置2.2.1 添加环境变量2.2.2 新建配置文件2.2.3 初始化MySQL2.2.4 注册MySQL服务2.2.5 启动MySQL服务 2.3 MySQL登录和退出2.4 MySQL卸载2.…...

初识SpringBoot
1.Maven Maven是⼀个项⽬管理⼯具, 通过pom.xml⽂件的配置获取jar包,⽽不⽤⼿动去添加jar包 主要功能 项⽬构建管理依赖 构建Maven项目 1.1项目构建 Maven 提供了标准的,跨平台(Linux, Windows, MacOS等)的⾃动化项⽬构建⽅式 当我们开发了⼀个项⽬之后, 代…...

Qt之元对象系统
Qt的元对象系统提供了信号和槽机制(用于对象间的通信)、运行时类型信息和动态属性系统。 元对象系统基于三个要素: 1、QObject类为那些可以利用元对象系统的对象提供了一个基类。 2、在类声明中使用Q_OBJECT宏用于启用元对象特性,…...

Provider(1)- 什么是AudioBufferProvider
什么是AudioBufferProvider? 顾名思义,Audio音频数据缓冲提供,就是提供音频数据的缓冲类,而且这个AudioBufferProvider派生出许多子类,每个子类有不同的用途,至关重要;那它在Android哪个地方使…...

加密与安全_密钥体系的三个核心目标之完整性解决方案
文章目录 Pre机密性完整性1. 哈希函数(Hash Function)定义特征常见算法应用散列函数常用场景散列函数无法解决的问题 2. 消息认证码(MAC)概述定义常见算法工作原理如何使用 MACMAC 的问题 不可否认性数字签名(Digital …...

【C++】:继承[下篇](友元静态成员菱形继承菱形虚拟继承)
目录 一,继承与友元二,继承与静态成员三,复杂的菱形继承及菱形虚拟继承四,继承的总结和反思 点击跳转上一篇文章: 【C】:继承(定义&&赋值兼容转换&&作用域&&派生类的默认成员函数…...

昇思25天学习打卡营第13天|基于MindNLP+MusicGen生成自己的个性化音乐
关于MindNLP MindNLP是一个依赖昇思MindSpore向上生长的NLP(自然语言处理)框架,旨在利用MindSpore的优势特性,如函数式融合编程、动态图功能、数据处理引擎等,致力于提供高效、易用的NLP解决方案。通过全面拥抱Huggin…...

nigix的下载使用
1、官网:https://nginx.org/en/download.html 双击打开 nginx的默认端口是80 配置文件 默认访问页面 在目录下新建pages,放入图片 在浏览器中输入地址进行访问 可以在电脑中配置本地域名 Windows设置本地DNS域名解析hosts文件配置 文件地址…...

nginx+lua 实现URL重定向(根据传入的参数条件)
程序版本说明 程序版本URLnginx1.27.0https://nginx.org/download/nginx-1.27.0.tar.gzngx_devel_kitv0.3.3https://github.com/simpl/ngx_devel_kit/archive/v0.3.3.tar.gzluajitv2.1https://github.com/openresty/luajit2/archive/refs/tags/v2.1-20240626.tar.gzlua-nginx-m…...

算法学习笔记(8.4)-完全背包问题
目录 Question: 图例: 动态规划思路 2 代码实现: 3 空间优化: 代码实现: 下面是0-1背包和完全背包具体的例题: 代码实现: 图例: 空间优化代码示例 Question: 给定n个物品…...

C++catch (...)陈述
catch (...)陈述 例外处理可以有多个catch,如果catch后的小括弧里面放...,就表示不限型态种类的任何例外。 举例如下 #include <iostream>int main() {int i -1;try {if (i > 0) {throw 0;}throw 2.0;}catch (const int e) {std::cout <…...

Redis实践
Redis实践 使用复杂度高的命令 如果在使用Redis时,发现访问延迟突然增大,如何进行排查? 首先,第一步,建议你去查看一下Redis的慢日志。Redis提供了慢日志命令的统计功能,我们通过以下设置,就…...

【Lora模型推荐】Stable Diffusion创作具有玉石翡翠质感的图标设计
站长素材AI教程是站长之家旗下AI绘图教程平台 海量AI免费教程,每日更新干货内容 想要深入学习更多AI绘图教程,请访问站长素材AI教程网: AI教程_深度学习入门指南 - 站长素材 (chinaz.com) logo版权归各公司所有!本笔记仅供AIGC…...

vscode 远程开发
目录 vscode 远程连接 选择 Python 环境 vscode 远程连接 按 CtrlShiftP 打开命令面板。输入并选择 Remote-SSH: Open SSH Configuration File...。选择 ~/.ssh/config 文件(如果有多个选项)。在打开的文件中添加或修改你的 SSH 配置。 这个可以右键…...

前端Vue组件化实践:打造灵活可维护的地址管理组件
随着前端技术的不断演进,复杂度和开发难度也随之上升。传统的一体化开发模式使得每次小小的修改或功能增加都可能牵一发而动全身,严重影响了开发效率和维护成本。组件化开发作为一种解决方案,通过模块化、独立化的开发方式,实现了…...

虚幻引擎ue5游戏运行界面白茫茫一片,怎么处理
根剧下图顺序即可调节游戏运行界面光照问题: 在大纲里找到post,然后选中它,找到Exposure 把最低亮度和最高亮度的0改为1即可...

《代理选择与反爬虫策略探究:如何优化网络爬虫效率与稳定性》
代理IP如何选以及常见反爬策略 为什么需要代理? 因为有的网站会封IP,用户如果没有登录,那IP就是身份标识,如果网站发现用户行为异常就非常可能封IP 什么是代理IP 就是让一个人帮你转交请求,帮你转交的人对面不熟&a…...

Kotlin Flow 防抖 节流
防抖和节流是针对响应跟不上触发频率这类问题的两种解决方案。 一:防抖(debounce)的概念: 防抖是指当持续触发事件时,一定时间段内没有再触发事件,事件处理函数才会执行一次, 如果设定时间到来之前&#x…...

Android Studio下载与安装
Android Studio下载与安装_android studio下载安装-CSDN博客...

【LC刷题】DAY24:122 55 45 1005
122. 买卖股票的最佳时机 II class Solution { public:int maxProfit(vector<int>& prices) {int result 0;for(int i 1; i < prices.size(); i ){result max(prices[i] - prices[ i - 1], 0);}return result;} };55. 跳跃游戏 link class Solution { public…...

从零开始的python学习生活2
接上封装 class Phone:__volt0.5def __keepsinglecore(self):print("让cpu以单核运行")def if5G(self):if self.__volt>1:print("5G通话已开启")else:self.__keepsinglecore()print("电量不足,无法使用5G通话,已经设置为单…...

【并发编程】进程 线程 协程
进程(Process)、线程(Thread)和协程(Coroutine)构成了计算机科学中实现任务并发执行的三种核心抽象机制。通常,为了提高程序的执行效率,开发者会根据应用场景和性能需求,…...

Vue的生命周期函数有哪些?详细说明
Vue.js 的生命周期函数包括以下几个阶段,每个阶段都有相应的钩子函数可以用来在特定时机执行自定义的逻辑。这些生命周期钩子函数使得我们可以在组件的不同阶段进行操作,从而管理组件的状态和行为。 1. beforeCreate: - 描述:…...

大语言模型应用--AI工程化落地
文章目录 大语言模型概述什么是大语言模型什么是机器学习什么是深度学习 理解大语言模型历史沿革关键 AIGC系统AI工程化项目的落地落地的方法Prompt工程(第一阶段)RAG检索(第二阶段)训练特定功能模型(第三阶段…...

我会什么开发技能
java我会什么? 一、并发编程 1、并发编程:jdk中的courren包只能够类实现(seamplore,CountDownLaunch,Pharse,CycliBarrier,CompletableFuture),AQS的原理,线…...