博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java8的CompletableFuture详解
阅读量:7218 次
发布时间:2019-06-29

本文共 12299 字,大约阅读时间需要 40 分钟。

hot3.png

Java5中,Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却不是很方便,只能通过get()方法阻塞住调用线程直至计算完成返回结果或者isDone()方法轮询的方式得到任务结果,也可以用cancel方法来停止任务的执行,阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源,而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法:

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));        future.addListener((channelFuture) -> {            if (channelFuture.isSuccess()) {                // SUCCESS            } else {                // FAILURE            }        });

guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程。

作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?

Java8里面新增加了一个包含50个方法左右的类:CompletableFuture。

CompletableFuture类实现了CompletionStage和Future接口。提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。

CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。

public T get()public T get(long timeout, TimeUnit unit)public T getNow(T valueIfAbsent)public T join()

其中的getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值。 join返回计算的结果或抛出一个uncheckd异常。

CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。

进行变换

public  CompletionStage thenApply(Function
fn);public CompletionStage thenApplyAsync(Function
fn);public CompletionStage thenApplyAsync(Function
fn,Executor executor);

以Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。

例如:

@Test    public void thenApply() {        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();        System.out.println(result); // hello world    }

进行消耗

public CompletionStage
thenAccept(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action,Executor executor);

thenAccept是针对结果进行消耗,因为他的入参是Consumer,有入参无返回值。

例如:

@Testpublic void thenAccept(){           CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world"));}

对上一步的计算结果不关心,执行下一个操作

public CompletionStage
thenRun(Runnable action);public CompletionStage
thenRunAsync(Runnable action);public CompletionStage
thenRunAsync(Runnable action,Executor executor);

thenRun它的入参是一个Runnable的实例,表示当得到上一步的结果时的操作。

例如:

@Test    public void thenRun(){        CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "hello";        }).thenRun(() -> System.out.println("hello world")); // hello world        while (true){}    }

结合两个CompletionStage的结果,进行转化后返回

public 
CompletionStage
thenCombine(CompletionStage
other,BiFunction
fn);public
CompletionStage
thenCombineAsync(CompletionStage
other,BiFunction
fn);public
CompletionStage
thenCombineAsync(CompletionStage
other,BiFunction
fn,Executor executor);

它需要原来的处理返回值,并且other代表的CompletionStage也要返回值之后,利用这两个返回值,进行转换后返回指定类型的值。 例如:

@Test    public void thenCombine() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "hello";        }).thenCombine(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "world";        }), (s1, s2) -> s1 + " " + s2).join();        System.out.println(result); // hello world    }

结合两个CompletionStage的结果,进行消耗

public  CompletionStage
thenAcceptBoth(CompletionStage
other,BiConsumer
action);public
CompletionStage
thenAcceptBothAsync(CompletionStage
other,BiConsumer
action);public
CompletionStage
thenAcceptBothAsync(CompletionStage
other,BiConsumer
action, Executor executor);

它需要原来的处理返回值,并且other代表的CompletionStage也要返回值之后,利用这两个返回值,进行消耗。

例如:

@Test    public void thenAcceptBoth() {        CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "hello";        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "world";        }), (s1, s2) -> System.out.println(s1 + " " + s2)); // hello world        while (true){}    }

在两个CompletionStage都运行完执行

public CompletionStage
runAfterBoth(CompletionStage
other,Runnable action);public CompletionStage
runAfterBothAsync(CompletionStage
other,Runnable action);public CompletionStage
runAfterBothAsync(CompletionStage
other,Runnable action,Executor executor);

不关心这两个CompletionStage的结果,只关心这两个CompletionStage执行完毕,之后在进行操作(Runnable)。

例如:

@Test    public void runAfterBoth(){        CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s1";        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s2";        }), () -> System.out.println("hello world")); // hello world        while (true){}    }

两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作

public  CompletionStage applyToEither(CompletionStage
other,Function
fn);public CompletionStage applyToEitherAsync(CompletionStage
other,Function
fn);public CompletionStage applyToEitherAsync(CompletionStage
other,Function
fn,Executor executor);

我们现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

例如:

@Test    public void applyToEither() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s1";        }).applyToEither(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "hello world";        }), s -> s).join();        System.out.println(result); // hello world    }

两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的消耗操作

public CompletionStage
acceptEither(CompletionStage
other,Consumer
action);public CompletionStage
acceptEitherAsync(CompletionStage
other,Consumer
action);public CompletionStage
acceptEitherAsync(CompletionStage
other,Consumer
action,Executor executor);

例如:

@Test    public void acceptEither() {        CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s1";        }).acceptEither(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "hello world";        }), System.out::println); // hello world        while (true){}    }

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

public CompletionStage
runAfterEither(CompletionStage
other,Runnable action);public CompletionStage
runAfterEitherAsync(CompletionStage
other,Runnable action);public CompletionStage
runAfterEitherAsync(CompletionStage
other,Runnable action,Executor executor);

例如:

@Test    public void runAfterEither() {        CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s1";        }).runAfterEither(CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s2";        }), () -> System.out.println("hello world")); // hello world        while (true) {        }    }

当运行时出现了异常,可以通过exceptionally进行补偿

public CompletionStage
exceptionally(Function
fn);

例如:

@Test    public void exceptionally() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            if (1 == 1) {                throw new RuntimeException("测试一下异常情况");            }            return "s1";        }).exceptionally(e -> {            System.out.println(e.getMessage()); // java.lang.RuntimeException: 测试一下异常情况            return "hello world";        }).join();        System.out.println(result); // hello world    }

当运行完成时,对结果的记录。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断。这里为什么要说成记录,因为这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用

public CompletionStage
whenComplete(BiConsumer
action);public CompletionStage
whenCompleteAsync(BiConsumer
action);public CompletionStage
whenCompleteAsync(BiConsumer
action,Executor executor);

例如:

@Test    public void whenComplete() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            if (1 == 1) {                throw new RuntimeException("测试一下异常情况");            }            return "s1";        }).whenComplete((s, t) -> {            System.out.println(s);            System.out.println(t.getMessage());        }).exceptionally(e -> {            System.out.println(e.getMessage());            return "hello world";        }).join();        System.out.println(result);    }

结果:

nulljava.lang.RuntimeException: 测试一下异常情况java.lang.RuntimeException: 测试一下异常情况hello world

运行完成时,对结果的处理。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断

public  CompletionStage handle(BiFunction
fn);public CompletionStage handleAsync(BiFunction
fn);public CompletionStage handleAsync(BiFunction
fn,Executor executor);

例如: 出现异常时

@Test    public void handle() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            //出现异常            if (1 == 1) {                throw new RuntimeException("测试一下异常情况");            }            return "s1";        }).handle((s, t) -> {            if (t != null) {                return "hello world";            }            return s;        }).join();        System.out.println(result); // hello world    }

未出现异常时

@Test    public void handle() {        String result = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "s1";        }).handle((s, t) -> {            if (t != null) {                return "hello world";            }            return s;        }).join();        System.out.println(result); // s1    }

上面就是CompletionStage接口中方法的使用实例,CompletableFuture同样也同样实现了Future,所以也同样可以使用get进行阻塞获取值,总的来说,CompletableFuture使用起来还是比较爽的,看起来也比较优雅一点。

转载于:https://my.oschina.net/hensemlee/blog/3006987

你可能感兴趣的文章
shell的详细介绍和编程(上)
查看>>
软件开发性能优化经验总结
查看>>
面试题编程题05-python 有一个无序数组,如何获取第K 大的数,说下思路,实现后的时间复杂度?...
查看>>
kendo grid序号显示
查看>>
Spring 教程(二) 体系结构
查看>>
Indexes
查看>>
2.Web中使用iReport 整合----------创建html格式的
查看>>
异常备忘:java.lang.UnsupportedClassVersionError: Bad version number in .class file
查看>>
最全三大框架整合(使用映射)——applicationContext.xml里面的配置
查看>>
初步理解Java的三大特性——封装、继承和多态
查看>>
知识点积累(一)
查看>>
iphone-common-codes-ccteam源代码 CCFile.m
查看>>
python:浅析python 中__name__ = '__main__' 的作用
查看>>
修改tomcat端口后不能IP访问问题
查看>>
review board
查看>>
URAL 1495 One-two, One-two 2
查看>>
牛客国庆集训派对Day3 G Stones
查看>>
虚函数简单总结
查看>>
插入排序--算法导论
查看>>
NoSQL -- Redis使用
查看>>