Skip to content
forked from foldright/cffu

🦝 Java CompletableFuture Fu, aka. CF-Fu, pronounced "Shifu"; include best practice/traps guide and a tiny sidekick library to improve user experience and reduce misuse.

License

Notifications You must be signed in to change notification settings

SYSU-Coder/cffu

 
 

Repository files navigation

🦝 CompletableFuture Fu (CF-Fu)

🚧 项目还在开发中,发布了v0.x版本: Maven Central

工作项列表及其进展,参见 issue 6


Github Workflow Build Status Github Workflow Build Status Codecov Java support Kotlin License Javadocs dokka Maven Central GitHub Releases GitHub Stars GitHub Forks GitHub Issues GitHub Contributors GitHub repo size gitpod: Ready to Code

shifu

如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。

并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单

其中CompletableFuture (CF)有其优点:

  • Java标准库内置
    • 无需额外依赖,几乎总是可用
    • 相信有极高的实现质量
  • 广为人知广泛使用,有一流的群众基础
    • CompletableFuture在2014年发布的Java 8提供,有~10年了
    • CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有~20年了
    • 虽然Future接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
  • 功能强大、但不会非常庞大复杂
    • 足以应对日常的业务需求开发
    • 其它的大型并发框架(比如AkkaRxJava)在使用上需要理解的内容要多很多
    • 当然基本的并发关注方面及其复杂性,与具体使用哪个工具无关,都是要理解与注意的
  • 高层抽象
    • 或说 以业务流程的形式表达技术的并发流程
    • 可以不使用繁琐易错的基础并发协调工具,如CountDownLatch、锁(Lock)、信号量(Semaphore

和其它并发工具、框架一样,CompletableFuture 用于

  • 并发执行业务逻辑,或说编排并发的处理流程/处理任务
  • 利用多核并行处理
  • 提升业务响应性

值得更深入了解和应用。 💕



🎯 〇、目标

  • 作为文档库(即CompletableFuture Guide):
    • 完备说明CompletableFuture的使用方式
    • 给出 最佳实践建议 与 使用陷阱注意
    • 期望在业务中,更有效安全地使用CompletableFuture
  • 作为代码库(即cffu库):
    • 补齐在业务使用中CompletableFuture所缺失的功能
    • 期望在业务中,更方便自然地使用CompletableFuture

🦮 一、CompletableFuture Guide

为了阅读的简洁方便,后文CompletableFuture会简写成CF

🔠 CF并发执行的描述及其用语

cf-graph

基本概念与术语:

  • 任务(Task)/ 计算(Computation
    • 任务逻辑(Task Logic)/ 业务逻辑(Biz Logic
    • 执行(Execute)任务
  • 状态(State
    • 运行中(Running〚1〛
    • 取消(Cancelled〚2〛
    • 完成(Completed / Done
      • 成功(Success / Successful)/ 正常完成(Completed Normally)/ 成功完成(Completed Successfully
      • 失败(Failed / Fail)/ 异常完成(Completed Exceptionally
  • 状态转变(Transition
    • 事件(Event)、触发(Trigger
  • 业务流程(Biz Flow)、CF链(Chain
    • 流程图(Flow Graph)、有向无环图 / DAG
      • 为什么构建的CF链一定是DAG
    • 流程编排(Flow Choreography
  • 前驱(Predecessor)/ 后继(Successor
    • 上游任务 / 前驱任务 / Dependency Task(我依赖的任务)
    • 下游任务 / 后继任务 / Dependent Task(依赖我的任务)

注:上面用/隔开的多个词是,在表述CF同一个概念时,会碰到的多个术语;在不影响理解的情况下,后文会尽量统一用第一个词来表达。

task stauts transition

更多说明:

  • 〚1〛 任务状态有且有只有 运行中(Running)、取消(Cancelled)、完成(Completed)这3种状态。
    • 对于「完成」状态,进一步可以分成 成功(Success)、失败(Failed)2种状态。
  • 所以也可以说,任务状态有且只有 运行中、取消、成功、失败 这4种状态。
    • 右图是任务的状态及其转变图。
    • 在概念上CF的状态转变只能是单次单向的,这很简单可靠、也容易理解并和使用直觉一致。
    • 注:虽然下文提到的obtrudeValue()/obtrudeException方法可以突破CF概念上的约定,但这2个后门方法在正常设计实现中不应该会用到,尤其在业务使用应该完全忽略;带来的问题也由使用者自己了解清楚并注意。

  • 〚2〛 关于「取消」状态:
  • 对于「取消」状态,或说设置了「CancellationException」失败异常的CompletableFuture cf,相比其它异常失败 / 设置了其它失败异常 的情况,不一样的地方:
    • 调用cf.get() / cf.get(timeout, unit)方法
      • 会抛出CancellationException异常
      • 其它异常失败时,这2个方法抛出的是包了一层的ExecutionExceptioncause是实际的失败异常
    • 调用cf.join() / cf.getNow(valueIfAbsent)方法
      • 会抛出CancellationException异常
      • 其它异常失败时,这2个方法抛出的是包了一层的CompletionExceptioncause是实际的失败异常
    • 调用cf.exceptionNow()方法
      • 会抛出IllegalStateException,而不是返回cf所设置的CancellationException异常
      • 其它异常失败时,exceptionNow()返回设置的异常
    • 调用cf.isCancelled()方法
      • 返回true
      • 其它异常失败时,isCancelled()返回false
  • 其它地方,CancellationException异常与其它异常是一样处理的。比如:
    • 调用cf.resultNow()方法
      都是抛出IllegalStateException异常
    • 调用cf.isDone()cf.isCompletedExceptionally()
      都是返回true
    • CompletionStage接口方法对异常的处理,如
      cf.exceptionally()的方法参数Function<Throwable, T>所处理的都是直接设置的异常对象没有包装过

🕹️ CF并发执行的关注方面

CF任务执行/流程编排,即执行提交的代码逻辑/计算/任务,涉及下面4个方面:

  • 任务的输入输出
    • CF所关联任务的输入参数/返回结果(及其数据类型)
  • 任务的调度,即在哪个线程来执行任务。可以是
    • 在触发的线程中就地连续执行任务
    • 在指定Executor(的线程)中执行任务
  • 任务的错误处理(任务运行出错)
  • 任务的超时控制
    • 超时控制是并发的基础关注方面之一
    • 到了Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法

本节「并发关注方面」,会举例上一些CF方法名,以说明CF方法的命名模式;
可以先不用关心方法的具体功能,在「CF的功能介绍」中会分类展开说明CF方法及其功能。

1. 输入输出

对应下面4种情况:

  • 无输入无返回(00)
    • 对应Runnable接口(包含单个run方法)
  • 无输入有返回(01)
    • 对应Supplier<O>接口(包含单个supply方法)
  • 有输入无返回(10)
    • 对应Consumer<I>接口(包含单个accept方法)
  • 有输入有返回(11)

注:

  • 对于有输入或返回的接口(即除了Runnable接口)
    • 都是泛型的,所以可以支持不同的具体数据类型
    • 都是处理单个输入数据
    • 如果要处理两个输入数据,即有两个上游CF的返回,会涉及下面的变体接口
  • 对于有输入接口,有两个输入参数的变体接口:

CF通过其方法名中包含的用词来体现:

  • run:无输入无返回(00)
    • 即是Runnable接口包含的run方法名
    • 相应的CF方法名的一些例子:
      • runAsync(Runnable runnable)
      • thenRun(Runnable action)
      • runAfterBoth(CompletionStage<?> other, Runnable action)
      • runAfterEitherAsync(CompletionStage<?> other, Runnable action)
  • supply:无输入有返回(01)
    • 即是Supplier接口包含的supply方法名
    • 相应的CF方法名的一些例子:
      • supplyAsync(Supplier<U> supplier)
      • supplyAsync(Supplier<U> supplier, Executor executor)
  • accept:有输入无返回(10)
    • 即是Consumer接口包含的accept方法名
    • 相应的CF方法名的一些例子:
      • thenAccept(Consumer<T> action)
      • thenAcceptAsync(Consumer<T> action)
      • thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
      • acceptEitherAsync(CompletionStage<T> other, Consumer<T> action)
  • apply:有输入有返回(11)
    • 即是Function接口包含的apply方法名。CF的方法如
    • 相应的CF方法名的一些例子:
      • thenApply(Function<T, U> fn)
      • thenApplyAsync(Function<T, U> fn)
      • applyToEither(CompletionStage<T> other, Function<T, U> fn)

2. 调度

任务调度是指,任务在哪个线程执行。有2种方式:

  • 在触发的线程中就地连续执行任务
  • 在指定Executor(的线程)中执行任务

CF通过方法名后缀Async来体现调度方式:

  • 有方法名后缀Async
    • 在触发CF后,任务在指定Executor执行
      • 如果不指定executor参数,缺省是ForkJoinPool.commonPool()
    • 相应的CF方法名的一些例子:
      • runAsync(Runnable runnable)
      • thenAcceptAsync(Consumer<T> action, Executor executor)
      • runAfterBothAsync(CompletionStage<?> other, Runnable action)
  • 无方法名后缀Async
    • 任务在触发线程就地连续执行
    • 相应的CF方法名的一些例子:
      • thenAccept(Consumer<T> action)
      • thenApply(Function<T, U> fn)
      • applyToEither(CompletionStage<T> other, Function<T, U> fn)

3. 错误处理

提交给CF的任务可以运行出错(抛出异常),即状态是失败(Failed)或取消(Cancelled)。

对于直接读取结果的方法:

  • 读取 成功结果的方法,如 cf.get()cf.join()会抛出异常(包装的异常)来反馈
  • 读取 失败结果的方法,如 cf.exceptionNow()会返回结果异常或是抛出异常来反馈

对于CompletionStage接口中编排执行的方法,会根据方法的功能 是只处理成功结果或失败结果一者,或是同时处理成功失败结果二者。如

  • exceptionally(...)只处理 失败结果
  • whenComplete(...)/handle(...)同时处理 成功与失败结果;
    • 这2个方法的参数lamdbaBiConsumer/BiFunction)同时输入成功失败结果2个参数:valueexception
  • 其它多数的方法只处理 成功结果
  • 对于不处理的结果,效果上就好像
    没有调用这个CompletionStage方法一样,即短路bypass了 👏

4. 任务执行的超时控制

超时控制是并发的基础关注方面之一。

到了Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法。

CF的超时控制,在实现上其实可以看成是CF的使用方式,并不是CF要实现基础能力;即可以通过其它已有的CF功能,在CF外围实现。

🔧 CF的功能介绍 | 💪 CF方法分类说明

见子文档页 cf-functions-intro.md

CF的方法个数比较多,所以介绍内容有些多,内容继续完善中… 💪 💕

📐 CF的设计模式 | 🐻 最佳实践与使用陷阱

见子文档页 cf-design-patterns.md

还没有什么内容,收集思考展开中… 💪 💕

📦 二、cffu

🔧 功能

新功能

  • 支持设置缺省的业务线程池
    • CompletableFuture的缺省线程池是ForkJoinPool.commonPool(),这个线程池差不多CPU个线程,合适执行CPU密集的任务。
    • 对于业务逻辑往往有很多等待操作(如网络IO、阻塞等待),并不是CPU密集的;使用这个缺省线程池ForkJoinPool.commonPool()很危险❗️
      所以每次调用CompletableFuture*async方法时,都传入业务线程池,很繁琐易错 🤯
    • Cffu支持设置缺省的业务线程池,规避上面的繁琐与危险
  • 一等公民支持Kotlin 🍩
  • cffuAllOf方法
    • 运行多个CompletableFuture并返回结果的allOf方法
  • cffuAnyOf方法
    • 返回具体类型的anyOf方法
  • cffuCombine(...)方法
    • 运行多个(2 ~ 5个)不同类型的CompletableFuture,返回结果元组
  • cffuJoin(timeout, unit)方法
    • 支持超时的join的方法;就像cf.get(timeout, unit) 之于 cf.get()
    • CompletableFuture缺少这个功能,cf.join()会「不超时永远等待」很危险❗️

Backport支持Java 8

BackportJava 9+高版本的所有CompletableFuture新功能,在Java 8可以直接使用。

其中重要的Backport功能有:

  • 超时控制:orTimeout(...)/completeOnTimeout(...)方法
  • 延迟执行:delayedExecutor(...)方法
  • 工厂方法:failedFuture(...)/completedStage(...)/failedStage(...)

🌿 业务使用中CompletableFuture所缺失的功能介绍

  • 运行多个CompletableFuture并返回结果的allOf方法:
    • resultAllOf方法,运行多个相同结果类型的CompletableFuture
      • CompletableFuture<List<T>> resultAllOf(CompletableFuture<T>... cfs)
      • CompletableFuture<List<T>> resultAllOf(List<? extends CompletableFuture<T>> cfs)
    • resultOf方法,运行多个不同结果类型的CompletableFuture
      • CompletableFuture<Pair<T1, T2>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2)
      • CompletableFuture<Triple<T1, T2, T3>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2, CompletableFuture<T3> cf3)
  • 具体类型的anyOf方法:
    • 提供的方法:
      • CompletableFuture<T> anyOf(CompletableFuture<T>... cfs)
      • CompletableFuture<T> anyOf(List<? extends CompletableFuture<T>> cfs)
    • CF返回的类型是Object,丢失具体类型:
      • CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

实现所在的类:

🎪 使用示例

Java

public class CffuDemo {
  private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();

  // Create a CffuFactory with configuration of the customized thread pool
  private static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizThreadPool).build();

  public static void main(String[] args) throws Exception {
    final Cffu<Integer> cf42 = cffuFactory
        .supplyAsync(() -> 21)  // Run in myBizThreadPool
        .thenApply(n -> n * 2);

    // Below tasks all run in myBizThreadPool
    final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    });
    final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    });
    final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
      sleep(100);
      return n * 2;
    });
    final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
      sleep(1_000);
      throw new RuntimeException("Bang!");
    });

    final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
        .orTimeout(1500, TimeUnit.MILLISECONDS);
    System.out.println("combined result: " + combined.get());
    final Cffu<Integer> anyOfSuccess = cffuFactory.cffuAnyOfSuccess(longTaskC, longFailedTask);
    System.out.println("anyOfSuccess result: " + anyOfSuccess.get());

    ////////////////////////////////////////
    // cleanup
    ////////////////////////////////////////
    myBizThreadPool.shutdown();
  }
}

# 完整可运行的Demo代码参见CffuDemo.java

Kotlin

private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()

// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = newCffuFactoryBuilder(myBizThreadPool).build()

fun main() {
  val cf42 = cffuFactory
    .supplyAsync { 21 }   // Run in myBizThreadPool
    .thenApply { it * 2 }

  // Below tasks all run in myBizThreadPool
  val longTaskA = cf42.thenApplyAsync { n: Int ->
    sleep(1001)
    n / 2
  }
  val longTaskB = cf42.thenApplyAsync { n: Int ->
    sleep(1002)
    n / 2
  }
  val longTaskC = cf42.thenApplyAsync { n: Int ->
    sleep(100)
    n * 2
  }
  val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
    sleep(1000)
    throw RuntimeException("Bang!")
  }

  val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
    .orTimeout(1500, TimeUnit.MILLISECONDS)
  println("combined result: ${combined.get()}")
  val anyOfSuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anyOfSuccessCffu()
  println("anyOfSuccess result: ${anyOfSuccess.get()}")

  ////////////////////////////////////////
  // cleanup
  ////////////////////////////////////////
  myBizThreadPool.shutdown()
}

# 完整可运行的Demo代码参见CffuDemo.kt

🔌 API Docs

🍪依赖

可以在 central.sonatype.com 查看最新版本与可用版本列表。

  • cffu库(包含Java CompletableFuture的增强CompletableFutureUtils):
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu</artifactId>
        <version>0.9.7</version>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation("io.foldright:cffu:0.9.7")
      // Gradle Groovy DSL
      implementation 'io.foldright:cffu:0.9.7'
  • cffu Kotlin支持库:
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-kotlin</artifactId>
        <version>0.9.7</version>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation("io.foldright:cffu-kotlin:0.9.7")
      // Gradle Groovy DSL
      implementation 'io.foldright:cffu-kotlin:0.9.7'
  • cffu bom:
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-bom</artifactId>
        <version>0.9.7</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      implementation(platform("io.foldright:cffu-bom:0.9.7"))
      // Gradle Groovy DSL
      implementation platform('io.foldright:cffu-bom:0.9.7')
  • 📌 TransmittableThreadLocal(TTL)cffu executor wrapper SPI实现
    • For Maven projects:

      <dependency>
        <groupId>io.foldright</groupId>
        <artifactId>cffu-ttl-executor-wrapper</artifactId>
        <version>0.9.7</version>
        <scope>runtime</scope>
      </dependency>
    • For Gradle projects:

      // Gradle Kotlin DSL
      runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:0.9.7")
      // Gradle Groovy DSL
      runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:0.9.7'

👋 ∞、关于库名

cffuCompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅

嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝

shifu

About

🦝 Java CompletableFuture Fu, aka. CF-Fu, pronounced "Shifu"; include best practice/traps guide and a tiny sidekick library to improve user experience and reduce misuse.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 66.6%
  • Kotlin 32.2%
  • Shell 1.2%