Java5中,Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却不是很方便,只能通过get()方法阻塞住调用线程直至计算完成返回结果或者isDone()方法轮询的方式得到任务结果,也可以用cancel方法来停止任务的执行,阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源,而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法:
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener((channelFuture) -> { if (channelFuture.isSuccess()) { // SUCCESS } else { // FAILURE } });
guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程。
作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?
Java8里面新增加了一个包含50个方法左右的类:CompletableFuture。
CompletableFuture类实现了CompletionStage和Future接口。提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。
CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。
public T get()public T get(long timeout, TimeUnit unit)public T getNow(T valueIfAbsent)public T join()
其中的getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值。 join返回计算的结果或抛出一个uncheckd异常。
CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。
进行变换
public CompletionStage thenApply(Function fn);public CompletionStage thenApplyAsync(Function fn);public CompletionStage thenApplyAsync(Function fn,Executor executor);
以Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行,下文中将会有好多类似的,都不详细解释了。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
例如:
@Test public void thenApply() { String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join(); System.out.println(result); // hello world }
进行消耗
public CompletionStagethenAccept(Consumer action);public CompletionStage thenAcceptAsync(Consumer action);public CompletionStage thenAcceptAsync(Consumer action,Executor executor);
thenAccept是针对结果进行消耗,因为他的入参是Consumer,有入参无返回值。
例如:
@Testpublic void thenAccept(){ CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world"));}
对上一步的计算结果不关心,执行下一个操作
public CompletionStagethenRun(Runnable action);public CompletionStage thenRunAsync(Runnable action);public CompletionStage thenRunAsync(Runnable action,Executor executor);
thenRun它的入参是一个Runnable的实例,表示当得到上一步的结果时的操作。
例如:
@Test public void thenRun(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenRun(() -> System.out.println("hello world")); // hello world while (true){} }
结合两个CompletionStage的结果,进行转化后返回
public CompletionStagethenCombine(CompletionStage other,BiFunction fn);public CompletionStage thenCombineAsync(CompletionStage other,BiFunction fn);public CompletionStage thenCombineAsync(CompletionStage other,BiFunction fn,Executor executor);
它需要原来的处理返回值,并且other代表的CompletionStage也要返回值之后,利用这两个返回值,进行转换后返回指定类型的值。 例如:
@Test public void thenCombine() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenCombine(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> s1 + " " + s2).join(); System.out.println(result); // hello world }
结合两个CompletionStage的结果,进行消耗
public CompletionStagethenAcceptBoth(CompletionStage other,BiConsumer action);public CompletionStage thenAcceptBothAsync(CompletionStage other,BiConsumer action);public CompletionStage thenAcceptBothAsync(CompletionStage other,BiConsumer action, Executor executor);
它需要原来的处理返回值,并且other代表的CompletionStage也要返回值之后,利用这两个返回值,进行消耗。
例如:
@Test public void thenAcceptBoth() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> System.out.println(s1 + " " + s2)); // hello world while (true){} }
在两个CompletionStage都运行完执行
public CompletionStagerunAfterBoth(CompletionStage other,Runnable action);public CompletionStage runAfterBothAsync(CompletionStage other,Runnable action);public CompletionStage runAfterBothAsync(CompletionStage other,Runnable action,Executor executor);
不关心这两个CompletionStage的结果,只关心这两个CompletionStage执行完毕,之后在进行操作(Runnable)。
例如:
@Test public void runAfterBoth(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true){} }
两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作
public CompletionStage applyToEither(CompletionStage other,Function fn);public CompletionStage applyToEitherAsync(CompletionStage other,Function fn);public CompletionStage applyToEitherAsync(CompletionStage other,Function fn,Executor executor);
我们现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。
例如:
@Test public void applyToEither() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), s -> s).join(); System.out.println(result); // hello world }
两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的消耗操作
public CompletionStageacceptEither(CompletionStage other,Consumer action);public CompletionStage acceptEitherAsync(CompletionStage other,Consumer action);public CompletionStage acceptEitherAsync(CompletionStage other,Consumer action,Executor executor);
例如:
@Test public void acceptEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).acceptEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), System.out::println); // hello world while (true){} }
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
public CompletionStagerunAfterEither(CompletionStage other,Runnable action);public CompletionStage runAfterEitherAsync(CompletionStage other,Runnable action);public CompletionStage runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor);
例如:
@Test public void runAfterEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true) { } }
当运行时出现了异常,可以通过exceptionally进行补偿
public CompletionStageexceptionally(Function fn);
例如:
@Test public void exceptionally() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("测试一下异常情况"); } return "s1"; }).exceptionally(e -> { System.out.println(e.getMessage()); // java.lang.RuntimeException: 测试一下异常情况 return "hello world"; }).join(); System.out.println(result); // hello world }
当运行完成时,对结果的记录。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断。这里为什么要说成记录,因为这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用
public CompletionStagewhenComplete(BiConsumer action);public CompletionStage whenCompleteAsync(BiConsumer action);public CompletionStage whenCompleteAsync(BiConsumer action,Executor executor);
例如:
@Test public void whenComplete() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("测试一下异常情况"); } return "s1"; }).whenComplete((s, t) -> { System.out.println(s); System.out.println(t.getMessage()); }).exceptionally(e -> { System.out.println(e.getMessage()); return "hello world"; }).join(); System.out.println(result); }
结果:
nulljava.lang.RuntimeException: 测试一下异常情况java.lang.RuntimeException: 测试一下异常情况hello world
运行完成时,对结果的处理。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断
public CompletionStage handle(BiFunction fn);public CompletionStage handleAsync(BiFunction fn);public CompletionStage handleAsync(BiFunction fn,Executor executor);
例如: 出现异常时
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //出现异常 if (1 == 1) { throw new RuntimeException("测试一下异常情况"); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // hello world }
未出现异常时
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // s1 }
上面就是CompletionStage接口中方法的使用实例,CompletableFuture同样也同样实现了Future,所以也同样可以使用get进行阻塞获取值,总的来说,CompletableFuture使用起来还是比较爽的,看起来也比较优雅一点。