一止长渊

JUC异步编排——CompletableFuture

N 人看过
字数:3.1k字 | 预计阅读时长:14分钟

1、业务背景

截屏2021-05-04 16.36.46.png
业务场景:查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

截屏2021-05-04 16.58.34.png
假如商品详情页的每个查询,需要如下标注的时间才能完成
那么,用户需要 6.5s 后才能看到商品详情页的内容。很显然是不能接受的。
如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

使用异步编排原因:商品详情页涉及到很多表查询,例如基本信息、销售属性、图片信息等,
如果按照顺序查询,则查询时间则是所有查询时间的总和,如果使用异步编排,将可以并行的任务一起执行,有顺序的任务在前一个任务完成就执行,就可以简短查询时间,提高页面的响应效率。
同时如果业务上线,大并发量的话,为了提高响应效率使用线程池来进行,减少线程的创建时间和销毁时间的消耗也可以防止恶意查询造成系统吞吐量降低。

    public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
        SkuItemVo skuItemVo = new SkuItemVo();
        // 1、sku 基本信息 pms_sku_info
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
            SkuInfoEntity info = getById(skuId);
            skuItemVo.setInfo(info);
            return info;
        }, executor);

        // 2、sku的图片信息 pms_sku_images,与infoFuture并行
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
            List<SkuImagesEntity> skuImagesEntities = skuImagesService.selectImages(skuId);
            skuItemVo.setImages(skuImagesEntities);
        }, executor);


        // 3、获取sku所属spu下所有sku的销售属性组合,在infoFuture之后
        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((info) -> {
            List<SkuItemVo.SkuItemSaleAttrVo> saleAttrs = skuSaleAttrValueService.getSaleAttrsBySpuId(info.getSpuId());
            skuItemVo.setSaleAttrs(saleAttrs);
        }, executor);


        // 4、获取spu的介绍 pms_spu_info_desc,在infoFuture之后
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((info) -> {
            if (info.getSpuId() != null && info.getSpuId() > 0) {
                SpuInfoDescEntity spuDesc = spuInfoDescService.getById(info.getSpuId());
                skuItemVo.setDesp(spuDesc);
            }
        }, executor);


        // 5、获取spu的规格参数信息,在infoFuture之后
        CompletableFuture<Void> baseFuture = infoFuture.thenAcceptAsync((info) -> {
            if (info.getSpuId() != null && info.getSpuId() > 0) {
                List<SkuItemVo.SpuItemAttrGroupVo> attrGroupWithAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(info.getCatelogId(), info.getSpuId());
                skuItemVo.setGroupAttrs(attrGroupWithAttrs);
            }
        }, executor);

        // 阻塞等待以上任务全部完成才返回
        CompletableFuture.allOf(infoFuture, imageFuture, saleAttrFuture, descFuture, baseFuture).get();

        return skuItemVo;
    }

可以看见 CompletableFuture 是实现了 Future 接口,而之前线程池中使用 FutureTask 也是实现了 Future 接口,Future 接口最大的特点就是可以获取异步线程的运行结果。CompletableFuture 类似于 js 中的 Promise 可以通过 then + lambda 表达式定义在某一个操作之后接着干什么,抛出异常干什么

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

以上可以分为两种runAsync 无返回值,所以泛型中为 Void,而supplyAsync 可以返回值
此外 Executor 可以为线程池,可以传入自定义的线程池,利用线程池中的线程来运行任务;否则使用默认的线程池

public class CompletableFutureTest {
    // 自定义线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 不接收返回值
        System.out.println("main...start...");
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executor);
        System.out.println("main...end...");

        // 接受返回值
        System.out.println("main...start...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor);
        Integer integer = future1.get();
        System.out.println("main...end..." + integer);
    }
}

2、计算完成时回调方法

类似于 JS 中 Promise 中的用法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况,但是没有返回值
whenComplete 和 whenCompleteAsync 区别:

  • whenComplete 是执行当前任务的线程继续执行 whenComplete 的任务
  • whenCompleteAsync 是将这个任务继续提交给线程池来进行

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 **
**执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main...start...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).whenComplete((result, exception) -> {
            // 虽然能得到异常信息,但是没法修改返回数据
            System.out.println("异步任务成功了...结果是:" + result + ";异常是:" + exception);
        }).exceptionally(throwable -> {
            // 可以感知异常,出现异常指定默认返回值
            return 10;
        });
        Integer integer = future.get();
        System.out.println("main...end..." + integer);
    }

3、接着处理结果

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
        System.out.println("main...start...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).handle((result, throwable) -> {
            if(result != null){
                return result * 2;
            }
            if(throwable != null){
                return 0;
            }
            return 0;
        });
        Integer integer = future.get();
        System.out.println("main...end..." + integer);

和 complete 一样,但是可以返回值,也就是能在上一步的结果再进一步处理,可以改变返回值
handle 可以处理异常,可以接收上一步的处理结果,可以改变返回值,属于全能
同理 handle 是使用前一个调用方法的线程来执行,而 handleAsync 则是

4、线程串行化方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务的返回结果,并返回当前任务的返回值
thenAccept 方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun 方法,不可以接收上一步的返回值,也无返回值。只是处理完任务后,执行 thenRun 的后续操作
带有 Async 默认都是异步执行的,同之前;以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

5、两任务组合——都要完成

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

以上方法都是在前两个任务完成后,才开始第三个任务
**使用模式:A.xxx(B,C)**, C 任务在 A、B 任务都完成之后开始

  • thenCombineXXX 第三个任务 C 可以接收到 A T 和 B U 的返回值,自己也有返回值 V BiFunction<? super T,? super U,? extends V> fn
  • thenAcceptBothXXX 第三个任务 C 可以接收到 A 和 B 的返回值,自己没有返回值
  • runAfterBothXXX 第三个任务不可以接收到 A 和 B 的返回值,自己也没有返回值
        System.out.println("main...start...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务一结束:");
            return i;
        }, executor);

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程:" + Thread.currentThread().getId());
            System.out.println("任务二结束:");
            return "hello";
        }, executor);

        future1.runAfterBothAsync(future2, () -> {
            System.out.println("任务3开始...");
        }, executor);

        future1.thenAcceptBothAsync(future2, (result1, result2) -> {
            System.out.println("任务3开始...任务一结果:" + result1 + ",任务二结果:" + result2);
        },executor);
        System.out.println("main...end...");

        CompletableFuture<String> future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
            return result1 + result2 + "->" + "world";
        }, executor);

        System.out.println("main...end..." + future3.get());

6、两个任务组合——其中一个完成触发第三个任务

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

runAfterEitherXXX 都是二个任务完成之一触发第三个任务,不可接收值,也无返回值
acceptEitherXXX:接收二个任务之一先完成的返回值,无返回值
applyToEitherXXX:接收二个任务之一先完成的返回值,有返回值
以上要求任务一和任务二返回值相同,可以用 Object 代替当返回值不同

        System.out.println("main...start...");
        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务一结束:");
            return i;
        }, executor);

        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程:" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
                System.out.println("任务二结束:");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, executor);

        future1.runAfterEitherAsync(future2, () -> {
            System.out.println("任务3开始...之前的结果");
        }, executor);

        future1.acceptEitherAsync(future2, (result) -> {
            System.out.println("任务3开始...之前的结果..." + result);
        }, executor);

        CompletableFuture<String> future3 = future1.applyToEitherAsync(future2, (result) -> {
            System.out.println("任务3开始...之前的结果" + result.toString());
            return result.toString() + "哈哈";
        }, executor);

        System.out.println("main...end..." + future3.get());

7、多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

当任务时,要求很多任务都完成才正式结束,或者其中任意一个任务完成即可结束

        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品的图片信息");
            return "hello.jpg";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品的属性");
            return "黑色+256G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("查询商品的介绍");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为";
        }, executor);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
        allOf.get(); // 等待所有结果完成
        System.out.println("main...end..." + futureImg.get() + "=>" + futureAttr.get() + "=>" + futureDesc.get());

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
        anyOf.get();
        System.out.println("main...end..." + anyOf.get());

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。