Home

Awesome

RxGo

CI Go Report Card Join the chat at https://gitter.im/ReactiveX/RxGo

Reactive Extensions for the Go Language

ReactiveX

ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.

An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.

RxGo

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

Let's see a concrete example with each box being an operator:

In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.

Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.

The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.

Installation of RxGo v2

go get -u github.com/reactivex/rxgo/v2

Getting Started

Hello World

Let's create our first Observable and consume an item:

observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)

The Just operator creates an Observable from a static list of items. Of(value) creates an item from a given value. If we want to create an item from an error, we have to use Error(err). This is a difference with the v1 that was accepting a value or an error directly without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.

By the way, the Just operator uses currying as syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.

Once the Observable is created, we can observe it using Observe(). By default, an Observable is lazy in the sense that it emits items only once a subscription is made. Observe() returns a <-chan rxgo.Item.

We consumed an item from this channel and printed its value of the item using item.V.

An item is a wrapper on top of a value or an error. We may want to check the type first like this:

item := <-ch
if item.Error() {
    return item.E
}
fmt.Println(item.V)

item.Error() returns a boolean indicating whether an item contains an error. Then, we use either item.E to get the error or item.V to get the value.

By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g., OnError, Retry, etc.)

It is also possible to consume items using callbacks:

observable.ForEach(func(v interface{}) {
    fmt.Printf("received: %v\n", v)
}, func(err error) {
    fmt.Printf("error: %e\n", err)
}, func() {
    fmt.Println("observable is closed")
})

In this example, we passed three functions:

ForEach is non-blocking. Yet, it returns a notification channel that will be closed once the Observable completes. Hence, to make the previous code blocking, we simply need to use <-:

<-observable.ForEach(...)

Real-World Example

Let's say we want to implement a stream that consumes the following Customer structure:

type Customer struct {
	ID             int
	Name, LastName string
	Age            int
	TaxNumber      string
}

We create a producer that will emit Customers to a given chan rxgo.Item and create an Observable from it:

// Create the input channel
ch := make(chan rxgo.Item)
// Data producer
go producer(ch)

// Create an Observable
observable := rxgo.FromChannel(ch)

Then, we need to perform the two following operations:

As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines. Yet, let's imagine that all the Customer items need to be produced sequentially based on its ID.

observable.
	Filter(func(item interface{}) bool {
		// Filter operation
		customer := item.(Customer)
		return customer.Age > 18
	}).
	Map(func(_ context.Context, item interface{}) (interface{}, error) {
		// Enrich operation
		customer := item.(Customer)
		taxNumber, err := getTaxNumber(customer)
		if err != nil {
			return nil, err
		}
		customer.TaxNumber = taxNumber
		return customer, nil
	},
		// Create multiple instances of the map operator
		rxgo.WithPool(pool),
		// Serialize the items emitted by their Customer.ID
		rxgo.Serialize(func(item interface{}) int {
			customer := item.(Customer)
			return customer.ID
		}), rxgo.WithBufferedChannel(1))

In the end, we consume the items using ForEach() or Observe() for example. Observe() returns a <-chan Item:

for customer := range observable.Observe() {
	if customer.Error() {
		return err
	}
	fmt.Println(customer)
}

Observable Types

Hot vs. Cold Observables

In the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable.

In RxGo, there is a similar concept.

First, let's create a hot Observable using FromChannel operator and see the implications:

ch := make(chan rxgo.Item)
go func() {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
    close(ch)
}()
observable := rxgo.FromChannel(ch)

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

The result of this execution is:

0
1
2

It means the first Observer already consumed all items. And nothing left for others.
Though this behavior can be altered with Connectable Observables.
The main point here is the goroutine produced those items.

On the other hand, let's create a cold Observable using Defer operator:

observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
}})

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

Now, the result is:

0
1
2
0
1
2

In the case of a cold observable, the stream was created independently for every Observer.

Again, hot vs cold Observables are not about how you consume items, it's about where data is produced.
Good example for hot Observable are price ticks from a trading exchange.
And if you teach an Observable to fetch products from a database, then yield them one by one, you will create the cold Observable.

Backpressure

There is another operator called FromEventSource that creates an Observable from a channel. The difference between FromChannel operator is that as soon as the Observable is created, it starts to emit items regardless if there is an Observer or not. Hence, the items emitted by an Observable without Observer(s) are lost (while they are buffered with FromChannel operator).

A use case with FromEventSource operator is, for example, telemetry. We may not be interested in all the data produced from the very beginning of a stream—only the data since we started to observe it.

Once we start observing an Observable created with FromEventSource, we can configure the backpressure strategy. By default, it is blocking (there is a guaranteed delivery for the items emitted after we observe it). We can override this strategy this way:

observable := rxgo.FromEventSource(input, rxgo.WithBackPressureStrategy(rxgo.Drop))

The Drop strategy means that if the pipeline after FromEventSource was not ready to consume an item, this item is dropped.

By default, a channel connecting operators is non-buffered. We can override this behaviour like this:

observable.Map(transform, rxgo.WithBufferedChannel(42))

Each operator has an opts ...Option parameter allowing to pass such options.

Lazy vs. Eager Observation

The default observation strategy is lazy. It means an operator processes the items emitted by an Observable once we start observing it. We can change this behaviour this way:

observable := rxgo.FromChannel(ch).Map(transform, rxgo.WithObservationStrategy(rxgo.Eager))

In this case, the Map operator is triggered whenever an item is produced, even without any Observer.

Sequential vs. Parallel Operators

By default, each operator is sequential. One operator being one goroutine instance. We can override it using the following option:

observable.Map(transform, rxgo.WithPool(32))

In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the WithCPUPool() option that creates a pool based on the number of logical CPUs.

Connectable Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

Let's create a Connectable Observable using rxgo.WithPublishStrategy:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

Then, we create two Observers:

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) + 1, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) * 2, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

If observable was not a Connectable Observable, as DoOnNext creates an Observer, the source Observable would have begun emitting items. Yet, in the case of a Connectable Observable, we have to call Connect():

observable.Connect()

Once Connect() is called, the Connectable Observable begins to emit items.

There is another important change with a regular Observable. A Connectable Observable publishes its items. It means all the Observers receive a copy of the items.

Here is an example with a regular Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a regular Observable
observable := rxgo.FromChannel(ch)

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})
First observer: 1
First observer: 2
First observer: 3

Now, with a Connectable Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a Connectable Observable
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

disposed, cancel := observable.Connect()
go func() {
	// Do something
	time.Sleep(time.Second)
	// Then cancel the subscription
	cancel()
}()
// Wait for the subscription to be disposed
<-disposed
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

Observable, Single, and Optional Single

An Iterable is an object that can be observed using Observe(opts ...Option) <-chan Item.

An Iterable can be either:

Documentation

Package documentation: https://pkg.go.dev/github.com/reactivex/rxgo/v2

Assert API

How to use the assert API to write unit tests while using RxGo.

Operator Options

Operator options

Creating Observables

Transforming Observables

Filtering Observables

Combining Observables

Error Handling Operators

Observable Utility Operators

Conditional and Boolean Operators

Mathematical and Aggregate Operators

Operators to Convert Observables

Contributing

All contributions are very welcome! Be sure you check out the contributing guidelines first. Newcomers can take a look at ongoing issues and check for the help needed label.

Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the External Resources section.

Thanks to all the people who already contributed to RxGo!

<a href="https://github.com/ReactiveX/RxGo/graphs/contributors"> <img src="https://contrib.rocks/image?repo=ReactiveX/RxGo" /> </a>

External Resources

Special Thanks

A big thanks to JetBrains for supporting the project.