Home

Awesome

License Release Conan <br/> Travis-CI Build Status Appveyor Build Status<br/> Code Coverage Language grade: C/C++

Asynqro

Asynqro is a small library with purpose to make C++ programming easier by giving developers rich monadic Future API (mostly inspired by Future API in Scala language). This library is another implementation of ideas in https://github.com/opensoft/proofseed (now moved to asynqro usage, for historic purposes check tags before 02/25/19), but has much cleaner API, refined task scheduling logic and is not tied to any framework.

Dependencies

Asynqro has two main parts:

Future/Promise

There are already a lot of implementations of Future mechanism in C++:

So why not to create another one?

Future-related part of asynqro contains:

All classes are reentrant and thread-safe.

All higher-order methods are exception-safe. If any exception happens inside function passed to such method, then Future will fail (or task will gracefully stop if it is runAndForget).

It is possible to use Future with movable-only classes (except sequence()). In this case resultRef() should be used instead of result().

Asynqro is intended to be used by including asynqro/asynqro header that includes asynqro/future.h and asynqro/tasks.h. It is also possible to include only asynqro/futures.h if task scheduling is not needed. simplefuture.h provides simple wrapper with std::any as failure type. All other headers except these three are considered as implementation and should not be included directly.

Good example of customizing Future to specific needs can be found in https://github.com/opensoft/proofseed/blob/develop/include/proofseed/asynqro_extra.h .

Promise

There is not a lot of methods in this class and its main usage is to generate Future object which later will be filled by this Promise at most one time. All subsequent fills will be ignored.

Future

This class shouldn't be instantiated directly in users code, but rather is obtained either from Promise or from tasks scheduling part. Also new Future object is returned from all transformation Future methods. It complies with functor and monad laws from FP and provides all operators required by them (successful/failed, map, flatMap).

Future is also sort of left-biased EitherT with result type as left side and failure type as right value (to provide failure reason). Sides were chosen non-canonical way (typical Either usually has right side as result and left as error) for compatibility purposes: std::expected type in C++ is sided the same way.

Almost all Future methods returns Future object, so they can be effectively chained. Futures are almost immutable. Almost because they will change there state at most one time when they are filled. Adding callbacks doesn't change behavior of other already applied callbacks (i.e. it will not change state of Future and/or its value).

if higher-order method is called on already filled Future it will be called (in case of matching status) immediately in the same thread. If it is not yet filled, it will be put to queue, which will be called (in non-specified order) on Future filling in thread that filled the Future.

Future API

CancelableFuture

API of this class is the same as Future API plus cancel method, that immediately fills this Future. CancelableFuture can be created only from Promise so it is up to providing side to decide if return value should be cancelable or not. Returning CancelableFuture however doesn't bind to follow cancelation as order, it can be considered as a hint. For example, Network API can return CancelableFuture and cancelation will be provided only for requests that are still in queue.

Providing side can check if Future was canceled by checking if Promise was already filled.

All CancelableFuture methods return simple Future to prevent possible cancelation of original Promise somewhere in downstream.

WithFailure

It is possible to fail any transformation by using WithFailure helper struct.

Future<int, std::string> f = /*...*/;
f.flatMap([](int x) -> Future<int, std::string> {
  if (shouldNotPass(x))
    return WithFailure<std::string>("You shall not pass!");
  else
    return asyncCalculation(x);
})
.map([](int x) -> int {
  if (mayItPass(x))
    return 42;
  return WithFailure<std::string>("You shall not pass!");
})
.recover([](const std::string &reason) -> int {
  if (reason.empty())
    return -1;
  return WithFailure<std::string>("You shall not pass, I said.");
});

This structure SHOULD NOT be saved anyhow and should be used only as a helper to return failure. Implicit casting operator will move from stored failure.

Trampoline

Using map() and other blocking transformations is something where we expect that stack can overflow, because we know that it will be called immediately each after another.

Although, for flatMap() or andThen() it is definitely not something one can expect due to its pseudo-asynchronous nature. But, in case of lots of flatMaps, it will still overflow on backward filling when last Future is filled.

To avoid such behavior Trampoline struct can be used anywhere where Future return is expected. It wraps a Future with extra transformation which will make sure that stack will be reset by moving it to another thread from Intensive thread pool.

Future<int, std::string> f = /*...*/;
f.flatMap([](int x) -> Future<int, std::string> {
    return Trampoline(asyncCalculation(x));
});

Repeat Helpers

Header asynqro/repeat.h contains asynqro::repeat() function that allows to do while-loop-styled calls to user function that can return either data or future with this data. User function should return either Continue with new set of arguments or Finish with final result.

Typical use case for repeat() is when it is needed to process something in serial manner, but do so using Future mechanism.

repeat() signature can be:

In case when there is a container with data we need to pass to our function one by one in serial manner, it is better to use repeatForSequence(). It accepts container, initial value and (Data, T)->Future<T, FailureT> function, where first argument is element from container and second is previous result (or initial value in case of first element). repeatForSequence() function returns Future<T, FailureT> with either final result or first occurred failure (and will not proceed forward with container values after failed one).

Tasks scheduling

The same as with futures, there are lots of implementations of task scheduling:

Asynqro's task scheduling provides next functionality:

Limitations:

Task scheduling performance

Task scheduling engine should be not only rich in its API but also has good performance in scheduling itself. benchmarks directory contains 4 synthetic benchmarks that can show at least some hints about how big the overhead of Asynqro is.

Tests were run few times for each solution on i7 with 4 cores+HT (MacBook Pro 15 mid 2014, i7 2.2GHz). Smallest value was chosen for each case.

Intensive and ThreadBound mean what type of scheduling was used in this suite. In ThreadBound tasks were assigned to amount of cores not bigger than number of logic cores.

If asynqro benchmark is marked with +F then it is using run function (that returns Future). If it isn't mark so - it uses runAndForget. +F mark can indirectly show how much overhead Future usage adds. Keep in mind that this overhead is not only about pure Future versus nothing, but also about run() logic overhead related to Future filling.

For Intel TBB repost benchmarks there are two different modes.

Ideas behind what exactly each benchmarks measures:

These benchmarks are synthetical and it is not an easy thing to properly benchmark such thing as task scheduling especially due to non-exclusive owning of CPU, non-deterministic nature of spinlocks and other stuff, but at least it can be used to say with some approximation how big overhead is gonna be under different amount of load.

Benchmarks listed below were collected with 0.7.0 version.

empty-avalanche

Big for loop that sends a lot of tasks without any payload (except filling current time of execution) to thread pool. It produces only one result - how many time it took to go through whole list.

System/Jobs10000100000100000010000000
asynqro (idle=1000, Intensive)4.4409545.4319455.24666.12
asynqro (idle=1000, Intensive, +F)9.0585794.1342926.1539508.67
asynqro (idle=1000, ThreadBound)2.9852727.4865270.9692714.79
asynqro (idle=1000, ThreadBound, +F)8.9874192.0745908.9519383.99
boostasio33.7501318.9112955.6330074.4
Intel TBB3.7989326.0585252.7152545.12
qtconcurrent131.6741339.3313335.3133160
threadpoolcpp1.21254.5020647.2289472.346

timed-avalanche

The same as empty-avalanche, but in this case tasks are with some payload that tracks time. Each task should be ~0.1ms of payload. Result in this benchmark is difference between total time and summary of payload time divided by number of cores.

System/Jobs100001000001000000
asynqro (idle=1000, Intensive)6.3068954.69011701.55
asynqro (idle=1000, Intensive, +F)5.72308229.7568162.24
asynqro (idle=1000, ThreadBound)0.8494817.8876844.8928
asynqro (idle=1000, ThreadBound, +F)3.193823.9305159.716
boostasio0.9209969.22965105.14
Intel TBB19.8788185.3261841.44
qtconcurrent5.66463102.1612437.86
threadpoolcpp2.75147.5475818.915

empty-repost

This benchmark was originally taken from thread-pool-cpp and adapted to qtconcurrent and asynqro usage. It starts C tasks, each of them counts how many times it was sent and if not enough yet (1kk) - sends itself again. Otherwise it reports time spent. It produces C different results. For each run we take highest one as a result (which actually means how much time it took to run all of them).

System/Concurrency124681632
asynqro (idle=1, Intensive)3902.7843108734.677076.8810073.922368.956975
asynqro (idle=1000, Intensive)600.418778.712284.938127.412763.226450.147548.6
asynqro (idle=1000, Intensive, +F)1379.671623.072103.988135.3312011.925870.558762.1
asynqro (idle=100000, Intensive)546.062758.2352262.837868.7412336.426092.858722.1
asynqro (idle=1, ThreadBound)200.017402.3911225.842107.633095.36473.6212505.2
asynqro (idle=1000, ThreadBound)201.293390.6171132.822293.562616.46108.0412549.6
asynqro (idle=1000, ThreadBound, +F)483.751650.649978.4011436.612235.94357.58627.51
asynqro (idle=100000, ThreadBound)202.246409.6611302.812239.012623.156250.311650.1
boostasio1493.451890.091874.661809.042166.564754.339756.77
Intel TBB (enqueue)309.177526.463715.759876.1141062.4818113339.66
Intel TBB (spawn)109.763137.671148.276153.028262.128427.955773.134
qtconcurrent8233.5426872.448353.254523.559111.9118219237817
threadpoolcpp32.800933.203434.94546.96356.2666110.815221.312

timed-repost

Almost the same as empty-repost, but tasks are filled with payload (the same way as timed-avalanche). Number of task runs for each task is reduced to 100k. Result of benchmark is again difference between total time and summary of payload time divided by number of cores.

ThreadBound asynqro and threadpoolcpp behaves poorly on this benchmark on 10, 12, 14 jobs (i.e. more than cores and not the multiplier) due to nature of these schedulers. Intensive asynqro, boostasio and qtconcurrent worked with similar behavior as 8 and 16 results. These odd results are not included in the table for brevity, but it is something reader should be aware of.

payload of ~0.1ms

System/Concurrency124681632
asynqro (idle=1, Intensive)393.207412.942913.603655.622121.654206.67385.578
asynqro (idle=1000, Intensive)237.078231.968189.993177.917109.911204.463407.792
asynqro (idle=1000, Intensive, +F)299.721282.969252.367255.991182.743359.916687.301
asynqro (idle=100000, Intensive)77.209982.682683.301999.53117.448220.647408.673
asynqro (idle=1, ThreadBound)27.857738.168141.309345.953853.8854134.18221.157
asynqro (idle=1000, ThreadBound)27.443340.016837.603546.576877.7791137.169239.959
asynqro (idle=1000, ThreadBound, +F)60.276371.810977.6684102.253122.571250.267489.204
asynqro (idle=100000, ThreadBound)27.82941.354737.574945.29266.3068117.178239.459
boostasio178.826195.186216.133225.05440.723889.3711187.989
Intel TBB (enqueue)168.437122.945105.61371.81651494.42983.285961.23
Intel TBB (spawn)159.379100.80365.965448.572510189.710167.910176.9
qtconcurrent327.731345.655392.61526.27271.911482.7231131.03
threadpoolcpp10.349111.345711.976713.974323.246535.177859.5406

payload of ~1ms

System/Concurrency124681632
asynqro (idle=1, Intensive)345.436480.8841032.17700.264193.722231.541442.502
asynqro (idle=1000, Intensive)333.487366.489500.979460.211187.748225.557454.981
asynqro (idle=1000, Intensive, +F)371.643404.025582.341508.068275.083398.329760.355
asynqro (idle=100000, Intensive)153.605135.368119.951186.767181.475239.224460.824
asynqro (idle=1, ThreadBound)33.851646.852245.266460.2888132.012228.833337.351
asynqro (idle=1000, ThreadBound)33.366445.235445.148165.5007159.857249.508368.766
asynqro (idle=1000, ThreadBound, +F)68.483677.1890.0423117.263222.851346.748672.402
asynqro (idle=100000, ThreadBound)33.67546.001645.58259.6052128.506195.254330.591
boostasio151.232165.312213.826234.66866.371775.0968137.803
Intel TBB (enqueue)193.201136.318128.363111.21814432.828828.857611.2
Intel TBB (spawn)183.936121.877101.8387.967100227100253100354
qtconcurrent275.124295.076382.866464.971249.251476.755926.302
threadpoolcpp17.431218.444520.152719.987647.150657.697188.9992

Examples

Student in library

Let's say we need to authenticate student in library system, and after that fetch list of books she loaned with extra info about each of them. We also will need to fetch personalized suggestions and show them with list of books to return. However we know that there is a bug in suggestions and sometimes it can return book with age restriction higher than users age, so we need to filter them out.

We already have library system API designed as class that returns Future for each request.

We need to emit a signal loanedBooksFetched with loaned books list and suggestionsFetched with suggestions list. We can't, however send list of Book objects directly to QML, we need to transform it to QVariantList using static Book method.

We need to return resulting Future<bool> to know when everything is loaded or if any error occurred.

Future<bool, MyFailure> Worker::fetchData(QString username, QString password)
{
  return api->authenticate(username, password).flatMap([this](const User &userInfo) {
    auto taken = api->fetchTakenBooks().flatMap([this](const QVector<QString> &bookIds) {
      QVector<Future<Book>> result;
      result.reserve(bookIds.count());
      for (const QString &id : bookIds)
        result << api->fetchBook(id);
      return Future<Book>::sequence(result);
    })
    .map([this](const QVector<Book> &books) { return Book::qmled(books); })
    .onSuccess([this](const QVariantList &books) { emit loanedBooksFetched(books); });

    auto suggestions = api->fetchSuggestions().innerFilter([userInfo](const Book &book) {
      return book->ageRestriction < userInfo.age;
    })
    .map([](const QVector<Book> &books) { return Book::qmled(books); })
    .onSuccess([this](const QVariantList &books) { emit suggestionsFetched(books); });

    return taken.zip(suggestions).andThenValue(true);
  });
}

Repeat

We have some data that we need to send to our API and we want to do few retries before we decide that it is not possible to send it now.

using DataSendRepeater = RepeaterFutureResult<bool, std::string, int>;
Future<bool, std::string> Worker::sendData(Data data, int retries)
{
  return repeat<bool, int>([data](int retriesLeft) -> DataSendRepeater {
    if (retries < 0)
      return Future<bool, MyFailure>::failed("Too many retries");
    return api->sendData(data)
      .map([](bool result) -> DataSendRepeater::Value { return Finish(result); }
      .recover([](const auto &) -> DataSendRepeater::Value { return TrampolinedContinue(retries - 1); };
  }, retries);
}

Repeat for known sequence

We have input sequence that we need to process serially in determined order. If error occurs during calculation - it will fastfail.

Future<double, std::string> Worker::blackBox(int value, double accumulator);

Future<double, std::string> Worker::calculate(std::vector<int> data)
{
  return repeatForSequence(data, 0.0, [](int x, double result) -> Future<double, std::string> {
    return blackBox(x, result) >> [](double result){ return result < 0.0 ? 0.0 : result; };
  });
}