Home

Awesome

Implementation of IO, Stream, Fiber using go1.18 generics

Coverage Codacy Badge Go Reference GoDoc Go Report Card Version Badge Go codecov

This library is an attempt to fill the gap of a decent generics streaming libraries in Go lang. The existing alternatives do not yet use Go 1.18 generics to their full potential.

The design is inspired by awesome Scala libraries cats-effect and fs2.

Functions

This package provides a few general functions that are sometimes useful.

There are also basic data structures - Unit, Pair and Either.

For debug purposes it's useful to convert arbitrary data to strings.

For compatibility with interface {}:

Is there a way to obtain a value of an arbitrary type?

Predicates

Predicate is a function with a boolean result type.

type Predicate[A any] func(A) bool

Option

A convenient data structure Option[A] that provides safe mechanisms to work with a potentially empty value.

Either

Data structure that models sum type. It can contain either A or B.

For Either there are a few helper functions:

IO

IO encapsulates a calculation and provides a mechanism to compose a few calculations (flat map or bind).

Error handling

An arbitrary calculation may either return good result that one might expect, or fail with an error or even panic. In Go a recommended pattern is to represent failure explicitly in the function signature by returning both result and an error. There is a convention that says that when an error is not nil, the result should not be used.

While the requirement to explicitly deal with errors helps implementing robust systems a lot, it is often very verbose and it advocates the bad practice of explicit control flow via return:

a, err = foo()
if err != nil {
	return
}

Here a single semantically important action (foo) requires 4 lines of code, a branch and a return statement.

This style is also not very friendly to function composition. If you need to pass the result of foo further to bar, you'll have to first bow to error handling ceremony.

The composition of two consequtive calculations is fundamental to programming. There is even a mathematical model that studies the properties of composition of calculations.

From error handling perspective IO[A] provides the following features:

Interaction with outer world vs simple (pure) functions/calculations.

From compiler's perspective things that happen in the program can be either ordinary pure computations or modification of some state outside of the function. Pure computation is special, because it has the following benefits:

The ability to understand and reason about programs is crucial to the ability of creation of somewhat complex programs.

Unfortunately all these nice and desired properties break when there are so called "side effects" - change of state, outer world interaction, ... - all things that make the computation to produce a different effect (and probably return different function results) even being called with the same arguments.

IO[A] provides a mechanism to arrange these side-effectful computations in such a way that it's easier to predict what is happening in the program. The main feature is the delay of actual effect execution until the late moment possible. A typical IO-based program does not perform any action until it is executed. It's often possible to construct the whole large computation for a complex program and only after that perform the execution.

Construction

To construct an IO one may use the following functions:

Manipulation

The following functions could be used to manipulate computations:

To and from GoResult - allows to handle both good value and an error:

Sometimes there is a need to perform some sideeffectful operation on a value. This can be achieved with Consumer[A].

// Consumer can receive an instance of A and perform some operation on it.
type Consumer[A any] func(A) IOUnit

Execution

To finally run all constructed computations one may use UnsafeRunSync or ForEach:

Auxiliary functions

Implementation details

IO might be implemented in various ways. Here we implement IO using continuations. A simple step in the constructed IO program might either complete (returning a result or an error), or return a continuation - another execution of the same kind. In order to obtain result we should execute the returned function. Continuations help avoiding deeply nested stack traces. It's a universal way to do "trampolining".

Resources

Resource is a thing that could only be used inside brackets - acquire/release.

type Resource[A any]

The only allowed way to use the resource is through Use:

ClosableIO is a simple resource that implements Close method:

type ClosableIO interface {
	Close() io.IOUnit
}

Transaction-like resources

Parallel computing

Go routine is represented using the Fiber[A] interface:

type Fiber[A any] interface {
	// Join waits for results of the fiber.
	// When fiber completes, this IO will complete and return the result.
	// After this fiber is closed, all join IOs fail immediately.
	Join() IO[A]
	// Closes the fiber and stops sending callbacks.
	// After closing, the respective go routine may complete
	// This is not Cancel, it does not send any signals to the fiber.
	// The work will still be done.
	Close() IO[fun.Unit]
	// Cancel sends cancellation signal to the Fiber.
	// If the fiber respects the signal, it'll stop.
	// Yet to be implemented.
	// Cancel() IO[Unit]
}

Execution contexts

Execution context is a low level resource for configuring how much processing power should be used for certain tasks. The executions are represented by Runnable type which is just a function without input/output. All interaction should be encapsulated inside it.

type ExecutionContext interface {
	// Start returns an IO which will return immediately when executed.
	// It'll place the runnable into this execution context.
	Start(neverFailingTask Runnable) IOUnit
	// Shutdown stops receiving new tasks. Subsequent start invocations will fail.
	Shutdown() IOUnit
}

There are two kinds of execution contexts - UnboundedExecutionContext and BoundedExecutionContext. Unbounded is recommended for IO-bound operations while bounded is for CPU-intensive tasks.

Using channels with IO and parallel computations

Running things in parallel

Working with time

Simple async operations

type Callback[A any] func(A, error) - is used as a notification mechanism for asyncronous communications.

Stream

Stream represents a potentially infinite source of values. Internally stream is a state machine that receives some input, updates internal state and produces some output. Implementation is immutable, so we have to maintain the updated stream along the way.

Construction

The following functions could be used to create a new stream:

Simple streams (for test and experimental purposes):

Manipulation

Typical manipulations with a stream includes Map, FlatMap, Filter and some other helper functions.

Important functions that allow to implement stateful stream transformation:

Functions to explicitly deal with failures:

Functions to explicitly deal with failures and stream completion:

// Fields should be checked in order - If Error == nil, If !IsFinished, then Value
type StreamEvent[A any] struct {
	Error      error
	IsFinished bool // true when stream has completed
	Value      A
}

Execution

After constructing the desired pipeline, the stream needs to be executed.

Channels

Provides a few utilities for working with channels:

Pipes and sinks

Pipe is as simple as a function that takes one stream and returns another stream.

Sink is a Pipe that returns a stream of units. That stream could be drained afterwards.

Length manipulation

A few functions that can produce infinite stream (Repeat), cut the stream to known position (Take) or skip a few elements in the beginning (Drop).

Mangling

We sometimes want to intersperse the stream with some separators.

Parallel computing in streams

There is io.Parallel that allows to run a slice of IOs in parallel. It's not very convenient when we have a lot of incoming requests that we wish to execute with a certain concurrency level (to not exceed a receiver capacity). In this case we can represent the tasks as ordinary IO and have a stream of tasks Stream[IO[A]]. The evaluation results could be represented as GoResult[A]. We may wish to execute these tasks using a pool of workers of a given size. Pool is a pipe that takes some computations and return their results (possibly failures): Pipe[io.IO[A], io.GoResult[A]].

Text processing

Reading and writing large text files line-by-line.

Slice utilities

Some utilities that are convenient when working with slices.

We can convert a slice to a set:

Set utilities

The Set type is defined as follows:

And we can perform some operations with sets:

Slices of numbers

Numbers support numerical operations. In generics this require defining an interface (in fun package):

// Number is a generic number interface that covers all Go number types.
type Number interface {
	int | int8 | int16 | int32 | int64 | 
	uint | uint8 | uint16 | uint32 | uint64 | 
	float32 | float64 |
	complex64 | complex128
}

Having this definition we now can aggregate slices of numbers:

Maps utilities

Some helper functions to deal with map[K]V.

Performance considerations

There is a small benchmark of stream sum that can give some idea of what performance one might expect.

In all benchmarks the same computation (sum([1,10000]) is performed using 3 different mechanisms:

Here is the result of a run on a computer:

✗ go test -benchmem -run=^$ -bench ^Benchmark ./stream
goos: linux
goarch: amd64
pkg: github.com/primetalk/goio/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkStreamSum-12                 94          13969686 ns/op        10241737 B/op     310057 allocs/op
BenchmarkSliceSum-12              305767              3806 ns/op               8 B/op          1 allocs/op
BenchmarkForSum-12                375842              3145 ns/op               8 B/op          1 allocs/op
PASS
ok      github.com/primetalk/goio/stream        5.224s

The following conclusions could be inferred:

  1. There are certain tasks that might benefit from lower-level implementation ;).
  2. Slice operation is slower than for by ~20%.
  3. Handling a single stream element takes ~1.4 mks. There are ~31 allocations per single stream element. And memory overhead is ~1024 bytes per stream element.

Hence, it seems to be easy to decide, whether stream-based approach will fit a particular application needs. If the size of a single stream element is greater than 1K and it's processing requires more than 1.4 mks, then stream-based approach won't hinder the performance much.

For example, if each element is a json structure of size 10K that is received via 1G internet connection, it's transmission would take 10 mks. So stream processing will add ~10% overhead to these numbers. These numbers might be a good boundary for consideration. If element size is greater and processing is more complex, then stream overhead becomes negligible.

As a reminder, here are some benefits of the stream processing:

  1. Zero boilerplate error-handling.
  2. Composable and reusable functions/modules.
  3. Zero debug effort (in case of following best practices of functional programming - immutability, var-free code).
  4. Constant-memory (despite allocations which are short-lived and GC-consumable).