用于处理异步任务编排的一个类。是Future的功能增强版,减少阻塞和轮询,可以传入
1 Future接口理论知识
Future接口(FutureTask实现类)定义操作异步任务执行
的一些方法. 如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。简单讲:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
2 Future 接口常用实现类FutureTask异步任务
FutureTask future = new FutureTask(new Callable(){public String call(){return "OJBK";}
});
Thread t1 = new Thread(future);
t1.start();
// Executor threadPool = new ThreadPoolExecutor(3,5,1,TimeUnit.SECONDS,new LinkedBlockingDeque<>());
-
优点
- Future+线程池异步多线程任务配合,能显著提高程序的执行效率
threadPool.execute(callableInterfaceImpl);
- Future+线程池异步多线程任务配合,能显著提高程序的执行效率
-
缺点:
- 直接调用Future接口的get方法获取线程返回值可能导致主线线程阻塞
future.get()
- 通过Future接口的
isDone()
方法轮询, 但是轮询会导致CPU空转,浪费计算资源。
- 直接调用Future接口的get方法获取线程返回值可能导致主线线程阻塞
3 CompletableFuture对Future的改进
- CompletableFuture为什么会出现
为了解决Future接口的缺点,阻塞和轮询资源浪费问题,提供了一种类似观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
CompletionState接口代表异步计算过程中的某一个阶段。一个阶段完成之后可能会触发另一个阶段。
- 核心的4个静态方法,来创建一个异步的任务
官方不推荐通过new
创建CompletableFuture对象,而是使用静态的方法进行创建。
// runAsync无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(ASYNC_POOL, runnable); // 如果没有指定Executor,直接使用默认的`ForkJoinPool.commonPool()`线程池。}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}// =====================================//
// supplyAsync 有返回值 1
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(ASYNC_POOL, supplier); // 如果没有指定Executor,直接使用默认的`ForkJoinPool.commonPool()`线程池。}
// SupplyAsync 有返回值 2
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}
如果没有指定Executor,直接使用默认的ForkJoinPool.commonPool()
线程池。
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务supplyAsync");return "ok";}, threadPool);
如何解决阻塞问题的呢????
默认情况下,开启协程,线程池中的线程作为协程,主线程结束,协程直接终止执行。
将返回结果通知给主线程或者其他线程
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(), new ThreadPoolExecutor.AbortPolicy()); // 工作中尽量使用自己创建的线程池
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "异步任务supplyAsync");return "ok";
}, threadPool).whenCompleteAsync((v,e)->{ // 异步任务成功结束回调方法System.out.println( Thread.currentThread().getName() + "对返回值进行处理:" + v);}
}, threadPool).exceptionallyAsync(e ->{// 异步任务出现异常回调方法System.out.println(Thread.currentThread().getName() + "异常处理:" + e.getMessage());return null;
}, threadPool);
System.out.println("主线程结束======");
CompletableFuture优点
-
异步任务结束时,会自动回调某个自定义的回调方法(
whenComplete()
) -
异步任务出错时,会自动回调某个定义异常处理方法(
exceptionally()
) -
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
whenComplete().exceptionally()
,
4 案例-电商网站的比价需求开发
大厂业务需求说明:
- 切记: 先是功能实现,再说性能提升
- 方法论: 需求分析 -> 功能实现-> 性能提升
5 CompletableFuture的常用方法
(1) 获取结果和触发规则
- complete(T value): 获取结果,默认值为value
- get():等待获取,需要处理结果异常
- join(): 获取完成后的结果,无需强制捕获异常,代码更简洁
- getNow(T valueIfAbsent): 获取当前运行结果,当前还未完成则返回默认值
(2) 对计算结果进行处理
-
计算结果存在依赖关系,这两个线程串行化
-
thenApply(f->{}): 根据前述返回结果进一步处理,出现异常不走下一步
-
handle(f->{}):类似thenApply, 出现异常仍然走下一步。
(3)对计算结果进行消费
-
接受任务的处理结果,并消费处理,无返回结果
-
thenAccept() :无返回值
-
对比:
- thenRun(Runnable r):既没有输入参数,也没有返回结果
- thenAccept(Consumer action): 只有输入参数,没有返回结果
- thenApply(Function f): 有输入参数,也有返回值
-
CompletableFuture和线程池:
- thenRun和thenRunAsync区别:
(4)对计算速度进行选用
- 谁快用谁
- a.applyToEither(b,f->{}); a和b谁快用谁的结果
(5)对计算结果进行合并
- 两个CompletionStage任务都完成后, 最终把两个任务的结果一起交给thenCombine来处理
- a.thenCombine(b, (x,y) -> {}): 将两个任务的运行结果进行合并处理
(5)等待多个任务完成
- CompletableFuture.allOf(a,b,c,d).join();