Home

Awesome

RxKotlin

Maven Central

Kotlin Extensions for RxJava

RxKotlin is a lightweight library that adds convenient extension functions to RxJava. You can use RxJava with Kotlin out-of-the-box, but Kotlin has language features (such as extension functions) that can streamline usage of RxJava even more. RxKotlin aims to conservatively collect these conveniences in one centralized library, and standardize conventions for using RxJava with Kotlin.

import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.kotlin.toObservable

fun main() {

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    list.toObservable() // extension function for Iterables
            .filter { it.length >= 5 }
            .subscribeBy(  // named arguments for lambda Subscribers
                    onNext = { println(it) },
                    onError =  { it.printStackTrace() },
                    onComplete = { println("Done!") }
            )

}

Contributing

:warning: (16/10/2023) We are currently not accepting contributions due to lack of developers capable of handling them in a reasonable manner.

Since Kotlin makes it easy to implement extensions for anything and everything, this project has to be conservative in what features are in scope. Intentions to create syntactic sugar can quickly regress into syntactic saccharin, and such personal preferences belong in one's internal domain rather than an OSS library.

Here are some basic guidelines to determine whether your contribution might be in scope for RxKotlin:

Kotlin Slack Channel

Join us on the #rx channel in Kotlin Slack!

https://kotlinlang.slack.com/messages/rx

Support for RxJava 3.x, RxJava 2.x and RxJava 1.x

Use RxKotlin 3.x versions to target RxJava 3.x.

Use RxKotlin 2.x versions to target RxJava 2.x.

Use RxKotlin 1.x versions to target RxJava 1.x.

The maintainers do not update the RxJava dependency version for every minor or patch RxJava release, so you should explicitly add the desired RxJava dependency version to your pom.xml or build.gradle(.kts).

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

RxKotlin 3.x Build Status

Example for Maven:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>3.x.y</version>
</dependency>

Example for Gradle:

implementation("io.reactivex.rxjava3:rxkotlin:3.x.y")

RxKotlin 2.x Build Status

Example for Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>2.x.y</version>
</dependency>

Example for Gradle:

implementation("io.reactivex.rxjava2:rxkotlin:2.x.y")

RxKotlin 1.x

Example for Maven:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>1.x.y</version>
</dependency>

Example for Gradle:

implementation("io.reactivex:rxkotlin:1.x.y")

Building with JitPack

You can also use Gradle or Maven with JitPack to build directly off a snapshot, branch, or commit of this repository.

For example, to build off the 3.x branch, use this setup for Gradle:

repositories {
    maven { url 'https://jitpack.io' }
}

dependencies {
    implementation 'com.github.ReactiveX:RxKotlin:3.x-SNAPSHOT'
}

Use this setup for Maven:

	<repositories>
		<repository>
		    <id>jitpack.io</id>
		    <url>https://jitpack.io</url>
		</repository>
	</repositories>

    <dependency>
	    <groupId>com.github.ReactiveX</groupId>
	    <artifactId>RxKotlin</artifactId>
	    <version>3.x-SNAPSHOT</version>
	</dependency>

Learn more about building this project with JitPack here.

Extensions

Target TypeMethodReturn TypeDescription
BooleanArraytoObservable()Observable<Boolean>Turns a Boolean array into an Observable
ByteArraytoObservable()Observable<Byte>Turns a Byte array into an Observable
ShortArraytoObservable()Observable<Short>Turns a Short array into an Observable
IntArraytoObservable()Observable<Int>Turns an Int array into an Observable
LongArraytoObservable()Observable<Long>Turns a Long array into an Observable
FloatArraytoObservable()Observable<Float>Turns a Float array into an Observable
DoubleArraytoObservable()Observable<Double>Turns a Double array into an Observable
Array<T>toObservable()Observable<Double>Turns a T array into an Observable
IntProgressiontoObservable()Observable<Int>Turns an IntProgression into an Observable
Iterable<T>toObservable()Observable<T>Turns an Iterable<T> into an Observable
Iterator<T>toObservable()Observable<T>Turns an Iterator<T> into an Observable
Observable<T>flatMapSequence()Observable<R>Flat maps each T emission to a Sequence<R>
Observable<Pair<A,B>>toMap()Single<Map<A,B>>Collects Pair<A,B> emissions into a Map<A,B>
Observable<Pair<A,B>>toMultimap()Single<Map<A, List<B>>Collects Pair<A,B> emissions into a Map<A,List<B>>
Observable<Observable<T>>mergeAll()Observable<T>Merges all Observables emitted from an Observable
Observable<Observable<T>>concatAll()Observable<T>Concatenates all Observables emitted from an Observable
Observable<Observable<T>>switchLatest()Observable<T>Emits from the last emitted Observable
Observable<*>cast()Observable<R>Casts all emissions to the reified type
Observable<*>ofType()Observable<R>Filters all emissions to only the reified type
Iterable<Observable<out T>>merge()Observable<T>Merges an Iterable of Observables into a single Observable
Iterable<Observable<out T>>mergeDelayError()Observable<T>Merges an Iterable of Observables into a single Observable, but delays any error
BooleanArraytoFlowable()Flowable<Boolean>Turns a Boolean array into an Flowable
ByteArraytoFlowable()Flowable<Byte>Turns a Byte array into an Flowable
ShortArraytoFlowable()Flowable<Short>Turns a Short array into an Flowable
IntArraytoFlowable()Flowable<Int>Turns an Int array into an Flowable
LongArraytoFlowable()Flowable<Long>Turns a Long array into an Flowable
FloatArraytoFlowable()Flowable<Float>Turns a Float array into an Flowable
DoubleArraytoFlowable()Flowable<Double>Turns a Double array into an Flowable
Array<T>toFlowable()Flowable<Double>Turns a T array into an Flowable
IntProgressiontoFlowable()Flowable<Int>Turns an IntProgression into an Flowable
Iterable<T>toFlowable()Flowable<T>Turns an Iterable<T> into an Flowable
Iterator<T>toFlowable()Flowable<T>Turns an Iterator<T> into an Flowable
Flowable<T>flatMapSequence()Flowable<R>Flat maps each T emission to a Sequence<R>
Flowable<Pair<A,B>>toMap()Single<Map<A,B>>Collects Pair<A,B> emissions into a Map<A,B>
Flowable<Pair<A,B>>toMultimap()Single<Map<A, List<B>>>Collects Pair<A,B> emissions into a Map<A,List<B>>
Flowable<Flowable<T>>mergeAll()Flowable<T>Merges all Flowables emitted from an Flowable
Flowable<Flowable<T>>concatAll()Flowable<T>Concatenates all Flowables emitted from an Flowable
Flowable<Flowable<T>>switchLatest()Flowable<T>Emits from the last emitted Flowable
Flowable<Any>cast()Flowable<R>Casts all emissions to the reified type
Flowable<Any>ofType()Flowable<R>Filters all emissions to only the reified type
Iterable<Flowable<out T>>merge()Flowable<T>Merges an Iterable of Flowables into a single Flowable
Iterable<Flowable<out T>>mergeDelayError()Flowable<T>Merges an Iterable of Flowables into a single Flowable, but delays any error
Single<Any>cast()Single<R>Casts all emissions to the reified type
Observable<Single<T>>mergeAllSingles()Observable<R>Merges all Singles emitted from an Observable
Flowable<Single<T>>mergeAllSingles()Flowable<R>Merges all Singles emitted from a Flowable
Maybe<Any>cast()Maybe<R>Casts any emissions to the reified type
Maybe<Any>ofType()Maybe<R>Filters any emission that is the reified type
Observable<Maybe<T>>mergeAllMaybes()Observable<T>Merges all emitted Maybes
Flowable<Maybe<T>>mergeAllMaybes()Flowable<T>Merges all emitted Maybes
ActiontoCompletable()CompletableTurns an Action into a Completable
Callable<out Any>toCompletable()CompletableTurns a Callable into a Completable
Future<out Any>toCompletable()CompletableTurns a Future into a Completable
(() -> Any)toCompletable()CompletableTurns a (() -> Any) into a Completable
Observable<Completable>mergeAllCompletables()Completable>Merges all emitted Completables
Flowable<Completable>mergeAllCompletables()CompletableMerges all emitted Completables
Observable<T>subscribeBy()DisposableAllows named arguments to construct an Observer
Flowable<T>subscribeBy()DisposableAllows named arguments to construct a Subscriber
Single<T>subscribeBy()DisposableAllows named arguments to construct a SingleObserver
Maybe<T>subscribeBy()DisposableAllows named arguments to construct a MaybeObserver
CompletablesubscribeBy()DisposableAllows named arguments to construct a CompletableObserver
Observable<T>blockingSubscribeBy()UnitAllows named arguments to construct a blocking Observer
Flowable<T>blockingSubscribeBy()UnitAllows named arguments to construct a blocking Subscriber
Single<T>blockingSubscribeBy()UnitAllows named arguments to construct a blocking SingleObserver
Maybe<T>blockingSubscribeBy()UnitAllows named arguments to construct a blocking MaybeObserver
Completable<T>blockingSubscribeBy()UnitAllows named arguments to construct a blocking CompletableObserver
DisposableaddTo()DisposableAdds a Disposable to the specified CompositeDisposable
CompositeDisposableplusAssign()DisposableOperator function to add a Disposable to thisCompositeDisposable

SAM Helpers (made obsolete since Kotlin 1.4)

These methods have been made obsolete with new type inference algorithm in Kotlin 1.4. They will be removed in some future RxKotlin version.

To help cope with the SAM ambiguity issue when using RxJava with Kotlin, there are a number of helper factories and extension functions to workaround the affected operators.

Observables.zip()
Observables.combineLatest()
Observable#zipWith()
Observable#withLatestFrom()
Flowables.zip()
Flowables.combineLatest()
Flowable#zipWith()
Flowable#withLatestFrom()
Singles.zip()
Single#zipWith()
Maybes.zip()

Usage with Other Rx Libraries

RxKotlin can be used in conjunction with other Rx and Kotlin libraries, such as RxAndroid, RxBinding, and TornadoFX/RxKotlinFX. These libraries and RxKotlin are modular, and RxKotlin is merely a set of extension functions to RxJava that can be used with these other libraries. There should be no overlap or dependency issues.

Other Resources

Learning RxJava Packt Book

Chapter 12 of Learning RxJava covers RxKotlin and Kotlin idioms with RxJava.

Reactive Programming in Kotlin Packt Book

The book Reactive Programming in Kotlin mainly focuses on RxKotlin, as well as learning reactive programming with Kotlin.