Home

Awesome

<div align="center"><a href="#dummy"><img src="https://github.com/foldright/cffu/assets/1063891/124658cd-025f-471e-8da1-7eea0e482915" alt="🦝 CompletableFuture-Fu(CF-Fu)"></a></div>

<p align="center"> <a href="https://github.com/foldright/cffu/actions/workflows/fast_ci.yaml"><img src="https://img.shields.io/github/actions/workflow/status/foldright/cffu/fast_ci.yaml?branch=main&logo=github&logoColor=white&label=fast ci" alt="Github Workflow Build Status"></a> <a href="https://github.com/foldright/cffu/actions/workflows/ci.yaml"><img src="https://img.shields.io/github/actions/workflow/status/foldright/cffu/ci.yaml?branch=main&logo=github&logoColor=white&label=strong ci" alt="Github Workflow Build Status"></a> <a href="https://app.codecov.io/gh/foldright/cffu/tree/main"><img src="https://img.shields.io/codecov/c/github/foldright/cffu/main?logo=codecov&logoColor=white" alt="Codecov"></a> <a href="https://openjdk.java.net/"><img src="https://img.shields.io/badge/Java-8+-339933?logo=openjdk&logoColor=white" alt="Java support"></a> <a href="https://kotlinlang.org"><img src="https://img.shields.io/badge/Kotlin-1.6+-7F52FF.svg?logo=kotlin&logoColor=white" alt="Kotlin"></a> <a href="https://www.apache.org/licenses/LICENSE-2.0.html"><img src="https://img.shields.io/github/license/foldright/cffu?color=4D7A97&logo=apache" alt="License"></a> <a href="https://foldright.io/api-docs/cffu/"><img src="https://img.shields.io/github/release/foldright/cffu?label=javadoc&color=339933&logo=read-the-docs&logoColor=white" alt="Javadocs"></a> <a href="https://foldright.io/api-docs/cffu-kotlin/"><img src="https://img.shields.io/github/release/foldright/cffu?label=dokka&color=339933&logo=kotlin&logoColor=white" alt="dokka"></a> <a href="https://central.sonatype.com/artifact/io.foldright/cffu/0.9.0/versions"><img src="https://img.shields.io/maven-central/v/io.foldright/cffu?logo=apache-maven&logoColor=white" alt="Maven Central"></a> <a href="https://github.com/foldright/cffu/releases"><img src="https://img.shields.io/github/release/foldright/cffu.svg" alt="GitHub Releases"></a> <a href="https://github.com/foldright/cffu/stargazers"><img src="https://img.shields.io/github/stars/foldright/cffu?style=flat" alt="GitHub Stars"></a> <a href="https://github.com/foldright/cffu/fork"><img src="https://img.shields.io/github/forks/foldright/cffu?style=flat" alt="GitHub Forks"></a> <a href="https://github.com/foldright/cffu/issues"><img src="https://img.shields.io/github/issues/foldright/cffu" alt="GitHub Issues"></a> <a href="https://github.com/foldright/cffu/graphs/contributors"><img src="https://img.shields.io/github/contributors/foldright/cffu" alt="GitHub Contributors"></a> <a href="https://github.com/foldright/cffu"><img src="https://img.shields.io/github/repo-size/foldright/cffu" alt="GitHub repo size"></a> <a href="https://gitpod.io/#https://github.com/foldright/cffu"><img src="https://img.shields.io/badge/Gitpod-ready to code-339933?label=gitpod&logo=gitpod&logoColor=white" alt="gitpod: Ready to Code"></a> </p>

👉 cffuCompletableFuture-Fu 🦝)是一个小小的CompletableFuture(CF)辅助增强库,提升CF使用体验并减少误用,在业务中更方便高效安全地使用CF

欢迎 👏 💖

<a href="#dummy"><img src="https://user-images.githubusercontent.com/1063891/230850403-87ff74de-1acb-4aff-b9b4-632e4e51e225.png" width="23%" align="right" alt="shifu" /></a>


<!-- START doctoc generated TOC please keep comment here to allow auto update --> <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE --> <!-- END doctoc generated TOC please keep comment here to allow auto update -->

🔧 功能

提供的功能有:

更多cffu的使用方式与功能说明详见 User Guide

关于CompletableFuture

如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。

并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单

其中CompletableFuture(CF)有其优点:

和其它并发工具、框架一样,CompletableFuture用于

值得更深入了解和应用。 💕

👥 User Guide

1. cffu的三种使用方式

cffu支持三种使用方式:

在介绍功能点之前,可以先看看cffu不同使用方式的示例。 🎪

1) Cffu

public class CffuDemo {
  private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
  // Create a CffuFactory with configuration of the customized thread pool
  private static final CffuFactory cffuFactory = CffuFactory.builder(myBizThreadPool).build();

  public static void main(String[] args) throws Exception {
    final Cffu<Integer> cf42 = cffuFactory
        .supplyAsync(() -> 21)  // Run in myBizThreadPool
        .thenApply(n -> n * 2);

    // Below tasks all run in myBizThreadPool
    final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    });
    final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    });
    final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
      sleep(100);
      return n * 2;
    });
    final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
      sleep(1000);
      throw new RuntimeException("Bang!");
    });

    final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
        .orTimeout(1500, TimeUnit.MILLISECONDS);
    System.out.println("combined result: " + combined.get());

    final Cffu<Integer> anySuccess = cffuFactory.anySuccessOf(longTaskC, longFailedTask);
    System.out.println("any success result: " + anySuccess.get());
  }
}

# 完整可运行的Demo代码参见CffuDemo.java

2) CompletableFutureUtils工具类

public class CompletableFutureUtilsDemo {
  private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();

  public static void main(String[] args) throws Exception {
    final CompletableFuture<Integer> cf42 = CompletableFuture
        .supplyAsync(() -> 21, myBizThreadPool)  // Run in myBizThreadPool
        .thenApply(n -> n * 2);

    final CompletableFuture<Integer> longTaskA = cf42.thenApplyAsync(n -> {
      sleep(1001);
      return n / 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longTaskB = cf42.thenApplyAsync(n -> {
      sleep(1002);
      return n / 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longTaskC = cf42.thenApplyAsync(n -> {
      sleep(100);
      return n * 2;
    }, myBizThreadPool);
    final CompletableFuture<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
      sleep(1000);
      throw new RuntimeException("Bang!");
    }, myBizThreadPool);

    final CompletableFuture<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum);
    final CompletableFuture<Integer> combinedWithTimeout =
        orTimeout(combined, 1500, TimeUnit.MILLISECONDS);
    System.out.println("combined result: " + combinedWithTimeout.get());

    final CompletableFuture<Integer> anySuccess = anySuccessOf(longTaskC, longFailedTask);
    System.out.println("any success result: " + anySuccess.get());
  }
}

# 完整可运行的Demo代码参见CompletableFutureUtilsDemo.java

3) Kotlin扩展方法

private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()

// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = CffuFactory.builder(myBizThreadPool).build()

fun main() {
  val cf42 = cffuFactory
    .supplyAsync { 21 }   // Run in myBizThreadPool
    .thenApply { it * 2 }

  // Below tasks all run in myBizThreadPool
  val longTaskA = cf42.thenApplyAsync { n: Int ->
    sleep(1001)
    n / 2
  }
  val longTaskB = cf42.thenApplyAsync { n: Int ->
    sleep(1002)
    n / 2
  }
  val longTaskC = cf42.thenApplyAsync { n: Int ->
    sleep(100)
    n * 2
  }
  val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
    sleep(1000)
    throw RuntimeException("Bang!")
  }

  val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
    .orTimeout(1500, TimeUnit.MILLISECONDS)
  println("combined result: ${combined.get()}")

  val anySuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anySuccessOfCffu()
  println("any success result: ${anySuccess.get()}")
}

# 完整可运行的Demo代码参见CffuDemo.kt

2. cffu功能介绍

2.1 返回多个运行CF的结果

CompletableFutureallOf方法没有返回结果,只是返回Void。不方便获取所运行的多个CF结果:

cffuallResultsFastFailOf/allResultsOf方法提供了返回多个CF结果的功能,使用库的功能直接获取整体结果:

示例代码如下:

public class AllResultsOfDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    //////////////////////////////////////////////////
    // CffuFactory#allResultsOf
    //////////////////////////////////////////////////
    Cffu<Integer> cffu1 = cffuFactory.completedFuture(21);
    Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);

    Cffu<Void> all = cffuFactory.allOf(cffu1, cffu2);
    // Result type is Void!
    //
    // the result can be got by input argument `cf1.get()`, but it's cumbersome.
    // so we can see a lot of util methods to enhance `allOf` with result in our project.

    Cffu<List<Integer>> allResults = cffuFactory.allResultsOf(cffu1, cffu2);
    System.out.println(allResults.get());

    //////////////////////////////////////////////////
    // or CompletableFutureUtils#allResultsOf
    //////////////////////////////////////////////////
    CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
    CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);

    CompletableFuture<Void> all2 = CompletableFuture.allOf(cf1, cf2);
    // Result type is Void!

    CompletableFuture<List<Integer>> allResults2 = allResultsOf(cf1, cf2);
    System.out.println(allResults2.get());
  }
}

# 完整可运行的Demo代码参见AllResultsOfDemo.java

上面多个相同结果类型的CFcffu还提供了返回多个不同类型CF结果的方法,allTupleFastFailOf/allTupleOf方法。

示例代码如下:

public class AllTupleOfDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    //////////////////////////////////////////////////
    // allTupleFastFailOf / allTupleOf
    //////////////////////////////////////////////////
    Cffu<String> cffu1 = cffuFactory.completedFuture("21");
    Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);

    Cffu<Tuple2<String, Integer>> allTuple = cffuFactory.allTupleFastFailOf(cffu1, cffu2);
    System.out.println(allTuple.get());

    //////////////////////////////////////////////////
    // or CompletableFutureUtils.allTupleFastFailOf / allTupleOf
    //////////////////////////////////////////////////
    CompletableFuture<String> cf1 = CompletableFuture.completedFuture("21");
    CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);

    CompletableFuture<Tuple2<String, Integer>> allTuple2 = allTupleFastFailOf(cf1, cf2);
    System.out.println(allTuple2.get());
  }
}

# 完整可运行的Demo代码参见AllTupleOfDemo.java

2.2 支持设置缺省的业务线程池并封装携带

结果就是,

示例代码如下:

public class NoDefaultExecutorSettingForCompletableFuture {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    CompletableFuture<Void> cf1 = CompletableFuture.runAsync(
        () -> System.out.println("doing a long time work!"),
        myBizExecutor);

    CompletableFuture<Void> cf2 = CompletableFuture
        .supplyAsync(
            () -> {
              System.out.println("doing another long time work!");
              return 42;
            },
            myBizExecutor)
        .thenAcceptAsync(
            i -> System.out.println("doing third long time work!"),
            myBizExecutor);

    CompletableFuture.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见NoDefaultExecutorSettingForCompletableFuture.java

Cffu支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:

public class DefaultExecutorSettingForCffu {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) {
    Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));

    Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
      System.out.println("doing another long time work!");
      return 42;
    }).thenAcceptAsync(i -> System.out.println("doing third long time work!"));

    cffuFactory.allOf(cf1, cf2).join();
  }
}

# 完整可运行的Demo代码参见DefaultExecutorSettingForCffu.java

2.3 高效灵活的并发执行策略(AllFastFail/AnySuccess/AllSuccess/MostSuccess

📔 关于多个CF的并发执行策略,可以看看JavaScript规范Promise Concurrency;在JavaScript中,Promise即对应CompletableFuture

JavaScript Promise提供了4个并发执行方法:

PS:JavaScript Promise的方法命名真考究~ 👍

cffu新加2个方法后,对齐了JavaScript Promise规范的并发方法~ 👏

示例代码如下:

public class ConcurrencyStrategyDemo {
  public static final Executor myBizExecutor = Executors.newCachedThreadPool();
  public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();

  public static void main(String[] args) throws Exception {
    ////////////////////////////////////////////////////////////////////////
    // CffuFactory#allResultsFastFailOf
    // CffuFactory#anySuccessOf
    // CffuFactory#mostSuccessResultsOf
    ////////////////////////////////////////////////////////////////////////
    final Cffu<Integer> successAfterLongTime = cffuFactory.supplyAsync(() -> {
      sleep(3000); // sleep LONG time
      return 42;
    });
    final Cffu<Integer> failed = cffuFactory.failedFuture(new RuntimeException("Bang!"));

    Cffu<List<Integer>> fastFailed = cffuFactory.allResultsFastFailOf(successAfterLongTime, failed);
    // fast failed without waiting successAfterLongTime
    System.out.println(fastFailed.exceptionNow());

    Cffu<Integer> anySuccess = cffuFactory.anySuccessOf(successAfterLongTime, failed);
    System.out.println(anySuccess.get());

    Cffu<List<Integer>> mostSuccess = cffuFactory.mostSuccessResultsOf(
        0, 100, TimeUnit.MILLISECONDS, successAfterLongTime, failed);
    System.out.println(mostSuccess.get());

    ////////////////////////////////////////////////////////////////////////
    // or CompletableFutureUtils#allResultsFastFailOf
    //    CompletableFutureUtils#anySuccessOf
    //    CompletableFutureUtils#mostSuccessResultsOf
    ////////////////////////////////////////////////////////////////////////
    final CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
      sleep(3000); // sleep LONG time
      return 42;
    });
    final CompletableFuture<Integer> failedCf = failedFuture(new RuntimeException("Bang!"));

    CompletableFuture<List<Integer>> fastFailed2 = allResultsFastFailOf(successAfterLongTimeCf, failedCf);
    // fast failed without waiting successAfterLongTime
    System.out.println(exceptionNow(fastFailed2));

    CompletableFuture<Integer> anySuccess2 = anySuccessOf(successAfterLongTimeCf, failedCf);
    System.out.println(anySuccess2.get());

    CompletableFuture<List<Integer>> mostSuccess2 = mostSuccessResultsOf(
        0, 100, TimeUnit.MILLISECONDS, successAfterLongTime, failed);
    System.out.println(mostSuccess2.get());
  }
}

# 完整可运行的Demo代码参见ConcurrencyStrategyDemo.java

2.4 支持超时的join的方法

cf.join()会「不超时永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:

join(timeout, unit)方法即支持超时的join的方法;就像cf.get(timeout, unit) 之于 cf.get()

这个新方法使用简单类似,不附代码示例。

2.5 Backport支持Java 8

Java 9+高版本的所有CF新功能方法在Java 8低版本直接可用。

其中重要的Backport功能有:

这些backport的方法是CompletableFuture的已有功能,不附代码示例。

2.6 返回具体类型的anyOf方法

CompletableFuture.anyOf方法返回类型是Object,丢失具体类型,不类型安全,使用时需要转型也不方便。

cffu提供的anySuccessOf/anyOf方法,返回具体类型T,而不是返回Object

这个方法使用简单类似,不附代码示例。

2.7 输入宽泛类型的allof/anyOf方法

CompletableFuture#allof/anyOf方法输入参数类型是CompletableFuture,而输入更宽泛的CompletionStage类型;对于CompletionStage类型的输入,则需要调用CompletionStage#toCompletableFuture方法做转换。

cffu提供的allof/anyOf方法输入更宽泛的CompletionStage参数类型,使用更方便。

方法使用简单类似,不附代码示例。

更多功能说明

可以参见:

3. 如何从直接使用CompletableFuture类迁移到Cffu

为了使用cffu增强功能,可以迁移已有直接使用CompletableFuture类的代码到Cffu类。包含2步修改:

之所以可以这样迁移,是因为:

🔌 API Docs

代码示例:

🍪依赖

可以在 central.sonatype.com 查看最新版本与可用版本列表。

📚 更多资料

👋 关于库名

cffuCompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅

嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝

<a href="#dummy"><img src="https://user-images.githubusercontent.com/1063891/230850403-87ff74de-1acb-4aff-b9b4-632e4e51e225.png" width="40%" alt="shifu" /></a>