Home

Awesome

RabbitMQ Kotlin

CI License: MIT

The RabbitMQ Kotlin Coroutine Library is designed to provide Kotlin developers with an efficient, coroutine-based approach to interact with RabbitMQ.
This library simplifies message queue operations by integrating seamlessly with Kotlin's coroutines, offering a modern and reactive way to handle asynchronous messaging in Kotlin applications.
It supports a variety of advanced features including queue and exchange manipulations, message publishing with confirmation, message consuming with acknowledgment, transactional operations, and the Remote Procedure Call (RPC) pattern.

Features

Getting Started

You need to have Java 8 installed.

Snapshots

repositories {
    mavenCentral()
    maven("https://s01.oss.sonatype.org/content/repositories/snapshots")
}

dependencies {
    implementation("io.github.viartemev:rabbitmq-kotlin:0.7.0-SNAPSHOT")
}

Examples

Full list of examples could be found here

Asynchronous message publishing with confirmation

    val connectionFactory = ConnectionFactory().apply { useNio() }
    connectionFactory.newConnection().use { connection ->
        connection.confirmChannel {
            declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
            publish {
                (1..TIMES).map { createMessage("") }.map { async(Dispatchers.IO) { publishWithConfirm(it) } }.awaitAll()
                    .forEach { println(it) }
            }
        }
    }

Asynchronous message consuming with acknowledgement

Consume only n-messages:

val connectionFactory = ConnectionFactory().apply { useNio() }
    connectionFactory.newConnection().use { connction ->
        connction.channel {
            consume(CONSUMER_QUEUE_NAME, 1) {
                (1..CONSUME_TIMES).map { async(Dispatchers.IO) { consumeMessageWithConfirm(handler) } }.awaitAll()
            }
        }
    }

Transactional publishing and consuming

RabbitMQ and AMQP itself offer rather scarce support for transaction. When considering using transactions you should be aware that:

The library provides a convenient way to perform transactional publishing and receiving based on transaction extension function. This function commits a transaction upon normal execution of the block and rolls it back if a RuntimeException occurs. Exceptions are always propagated further. Coroutines are not used for publishing though, since there are no any asynchronous operations involved.

connection.txChannel {
    transaction {
        val message = createMessage(queue = oneTimeQueue, body = "Hello from tx")
        publish(message)
    }
}

RPC pattern

ConnectionFactory().apply { useNio() }.newConnection().use { conn ->
        conn.channel {
            logger.info { "Asking for greeting request..." }
            val response = withTimeoutOrNull(1000) {
                async(Dispatchers.IO) {
                    rpc {
                        val result = call(message)
                        logger.info { "Got a message: ${String(result.body)}" }
                        result
                    }
                }.await()
            }
            if (response == null) {
                logger.info { "Timeout is exeeded" }
            } else {
                logger.info { "Result: ${String(response.body)}" }
            }
        }
    }

Links