Heng Lin

  • 主頁
  • 所有文章
搜尋文章 關於我

Heng Lin

  • 主頁
  • 所有文章

關於 Java CompletableFuture 的用法

2020-04-19

已經好久沒有進行更新
最近都在熟悉新工作,與過往不同的 domain knowledge ,融入貫通的過程中學習到滿多關於硬體的知識,目前已經進入狀況

剛好最近都在使用 CompleteableFuture 處理 thread 非同步的事件
來寫篇文章記錄一下用法,方便之後查詢使用

CompletableFuture 個人覺得很像 JS 的 Promise
JS Promise 串接async callback function
而 CompletableFuture 用於串連不同的 thread 接點,某種程度將一連串執行流程中 thread 之間的前後相依性解耦
某個 thread 結果要串給另一個 thread 當作輸入條件,使用 CompletableFuture 可輕鬆綁定
所以在設計時只需定義好每個事件個自的 input 和 output ,不需要將關聯寫死

首先是最簡單的 runAsync 和 supplyAsync 的基礎用法,差別在於是否需要 return result
它們都可以搭配 thread pool 使用 ( 在實際應用場景都會根據實際需求自建 thread pool 使用,若沒設置則預設是使用 JVM 本身的 thread pool )

1
2
3
4
5
6
7
// basic
CompletableFuture<Void> runAsync(Runnable runnable)
CompletableFuture<T> supplyAsync(Supplier<T> supplier)

// use your own thread pool
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
CompletableFuture<T> supplyAsync(Supplier<T> supplier, Executor executor)

提供自建的 thread pool 給 CompletableFuture 使用

1
2
3
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("runAsync"), threadPool);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "supplyAsync", threadPool);


CompletableFuture 也允許在事件執行完成時 callback 取得結果,方便進行下一步的處理,而且在執行過程中發生 exception ,也可以攔截 exception 後執行特定的 action

1
2
3
4
5
6
7
8
// basic
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// catch exception
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

// use your own thread pool
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

將剛剛的 supplyAsync 例子改寫為強制拋出 exception ,可在 completeAsync 或 exceptionally 處理異常,非常的方便

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
throw new CompletionException(new Exception("throw exception"));
}, threadPool);

supplyAsync.whenCompleteAsync((result, ex) -> {
System.out.println("result: " + result);
System.out.println("exception: " + ex);
}, threadPool).exceptionally(ex -> {
System.out.println("exceptionally: " + ex.getMessage());
return ex.getMessage();
}).join();

輸出

1
2
3
result: null
exception: java.util.concurrent.CompletionException: java.lang.Exception: throw exception
exceptionally: java.lang.Exception: throw exception


除此之外 CompletableFuture 也提供 handle 來處理 exception ,差別在於 handle 是允許有返回值,用於串接過程中處理 exception
handle 也用於做進一步的資料轉換

1
2
3
4
5
6
// basic
CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

// use your own thread pool
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

再次改寫 supplyAsync ,改為使用 handle 處理 exception ,此例子將 exception 轉換成字串當作一般的結果往後傳遞

1
2
3
4
5
6
7
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
throw new CompletionException(new Exception("throw exception"));
}, threadPool);

String ans = supplyAsync.handle((result, ex) -> (null != ex) ? ex.getMessage() : result).join();
System.out.println("ans: " + ans);

輸出

1
ans: java.lang.Exception: throw exception


若是沒有處理 exception 的需求則可以使用 thenApply 進行資料的轉換,類似於 stream map 的效果

1
2
3
4
5
6
// basic
CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)

// use your own thread pool
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

還有對映 flatMap 的 thenCompose

1
2
3
4
CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)

CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

不需要返回值的 thenAccept

1
2
3
4
5
6
// basic
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)

// use your own thread pool
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)


使用 thenCombine 還可以將兩個獨立的 CompletableFuture 執行結果進行整合,非常的強大

1
2
3
4
5
6
// basic
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)

// use your own thread pool
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)

現在將 supplyAsync1 和 supplyAsync2 結果整合

1
2
3
4
5
6
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);

String ans = supplyAsync1.thenCombine(supplyAsync2, (result1, result2) -> result1 + ", " + result2).join();
System.out.println("and: " + ans);

輸出

1
and: supplyAsync 1, supplyAsync 2

不需要返回值的 thenAcceptBoth

1
2
3
4
5
6
7
// basic
CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

// use your own thread pool
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

使用方式與 thenCombine 相同

1
2
3
4
5
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);

supplyAsync1.thenAcceptBothAsync(supplyAsync2, (result1, result2) -> System.out.println(result1 + ", " + result2), threadPool).join();

輸出

1
supplyAsync 1, supplyAsync 2


前面提到的都是兩個獨立 CompletableFuture 的例子,若大於兩個的時候會使用 allOf
allOf 可以等待全部的 CompletableFuture 執行完成後執行

1
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

等待 supplyAsync1, supplyAsync2, supplyAsync3 執行結束後取得最終結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);
CompletableFuture<String> supplyAsync3 = CompletableFuture.supplyAsync(() -> "supplyAsync 3", threadPool);

CompletableFuture.allOf(supplyAsync1, supplyAsync2, supplyAsync3).thenRun(() -> {
try {
StringBuffer ans = new StringBuffer();
ans.append(supplyAsync1.get()).append(", ")
.append(supplyAsync2.get()).append(", ")
.append(supplyAsync3.get());
System.out.println("ans: " + ans.toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}).join();

輸出

1
ans: supplyAsync 1, supplyAsync 2, supplyAsync 3

在 allOf 後面串接一個 thenRun
它的用法很簡單,使用方式與 thenAccept 一樣,差別在於不需要傳入上一個 CompletableFuture 的執行結果

1
2
3
4
5
6
// basic
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)

// use your own thread pool
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)


上述提到的都是全部完成的例子,反過來說我只想要其中一個有完成就往下執行
CompletableFuture 也可以做到這點

1
2
3
4
5
6
// basic
CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)

// use your own thread pool
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

沒有返回值的 acceptEither

1
2
3
4
5
6
// basic
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

// use your own thread pool
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

用於多個 CompletableFuture 的 anyOf ,在眾多 CompletableFuture 只需一個完成就可往下執行

1
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

  • java
  • thread

扫一扫,分享到微信

微信分享二维码
Java Unit Test 使用 Mockito + Powermock (1)
Java NIO ByteBuffer 基本操作圖
© 2020 Heng Lin
Hexo Theme Yilia by Litten
  • 搜尋文章
  • 關於我

tag:

  • hadoop
  • big data
  • conference
  • java
  • nio
  • thread
  • unit test
  • shell
  • gradle
  • docker
  • spring cloud
  • android
  • memory leak
  • math
  • hexo
  • Blog
  • probability
  • statistics
  • clean code
  • illustrator
  • js

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • JCConf 2020

    2020-11-28

    #conference

  • Java Unit Test 使用 Mockito + Powermock (3)

    2020-08-29

    #unit test

  • Java Unit Test 使用 Mockito + Powermock (2)

    2020-06-14

    #unit test

  • Java Unit Test 使用 Mockito + Powermock (1)

    2020-04-25

    #unit test

  • 關於 Java CompletableFuture 的用法

    2020-04-19

    #java#thread

  • Java NIO ByteBuffer 基本操作圖

    2020-01-13

    #java#nio

  • 對於 Java Fork Join framework 的理解

    2020-01-07

    #java#thread

  • Java BlockingQueue 適合應用在哪些場景

    2020-01-06

    #java#thread

  • 對於 Java ThreadPoolExecutor 的理解

    2020-01-05

    #java#thread

  • Java ReentrantLock 和 ReentrantReadWriteLock 的使用時機

    2020-01-04

    #java#thread

  • Java volatile 的使用時機

    2020-01-03

    #java#thread

  • spring cloud 使用 gradle 打包 docker image

    2019-11-11

    #gradle#docker#spring cloud

  • Hadoop RecordReader

    2019-11-06

    #hadoop#big data

  • Hadoop Writable

    2019-11-03

    #hadoop#big data

  • JSDC 2019

    2019-10-26

    #conference

  • 簡易版shell script作自動化處理

    2019-10-22

    #shell

  • JCConf 2019

    2019-10-04

    #conference

  • JCConf 2018

    2018-10-21

    #conference

  • 用illustrator幫自己畫一個新頭像

    2018-07-15

    #illustrator

  • 使用視覺化來解釋數學原理的youtube - 3Blue1Brown

    2018-06-24

    #math

  • 使用 Gradle 自動初始化 Spock

    2018-05-20

    #unit test#gradle

  • 提升Socket傳送Large file的速度

    2018-05-13

    #java

  • 你真的了解Java中的Thread運作嗎? - 容易讓人誤解的synchronized method

    2018-04-29

    #java#thread

  • 從無到有DIY chart (二)

    2018-04-14

    #js

  • 從無到有DIY chart (一)

    2018-04-01

    #js

  • 標準常態分佈的機率密度函數

    2018-03-24

    #math#probability#statistics

  • Linux 如何避免重複執行特定jar

    2018-03-17

    #shell

  • 使用 WeakReference 解決 Android 發生 memory leak 問題

    2018-03-10

    #java#android#memory leak

  • 比較好的nested map寫法

    2018-03-03

    #java#clean code

  • 更換hexo themes

    2018-02-20

    #hexo

  • 第一篇Blog,從SVN到Git

    2018-02-16

    #Blog

全端工程師

60% Java
40% Javascript