Home

Awesome

xk6-nats

This is a k6 extension using the xk6 system, that allows to use NATS protocol.

❗ This extension isn't supported by the k6 team, and may break in the future. USE AT YOUR OWN RISK!

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

  1. Install xk6 framework for extending k6:
go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the binary:
xk6 build --with github.com/ydarias/xk6-nats@latest
  1. Run a test
k6 run -e NATS_HOSTNAME=localhost test/test.js

To run JetStream test, make sure NATS JetStream is started, e.g. nats-server -js

k6 run -e NATS_HOSTNAME=localhost test/test_jetstream.js

To run publish with headers test, make sure NATS JetStream is started, e.g. nats-server -js

./k6 run -e NATS_HOSTNAME=localhost test/test_headers.js

API

Nats

A Nats instance represents the connection with the NATS server, and it is created with new Nats(configuration), where configuration attributes are:

AttributeDescription
servers(mandatory) is the list of servers where NATS is available (e.g. [nats://localhost:4222])
unsafe(optional) allows running with self-signed certificates when doing tests against a testing environment, it is a boolean value (default value is false)
token(optional) is the value of the token used to connect to the NATS server

Example:

import {Nats} from 'k6/x/nats'

const natsConfig = {
    servers: ['nats://localhost:4222'],
    unsafe: true,
}

const nats = new Nats(natsConfig)

Publishing

You can publish messages to a topic using the following functions:

FunctionDescription
publish(topic, payload)publish a new message using the topic (string) and the given payload that is a string representation that later is serialized as a byte array
publisWithHeaders(topic, payload, headers)publish a new message using the topic (string), the given payload that is a string representation that later is serialized as a byte array and the headers
publishMsg(message)publish a new message using the message (object) that has the following attributes: topic (string), data (string), raw(byte array) and headers (object)
request(topic, payload, headers)sends a request to the topic (string) and the given payload as string representation and the headers, and returns a message

Example:

const publisher = new Nats(natsConfig)

publisher.publish('topic', 'data')
publisher.publishWithHeaders('topic', 'data', { 'header1': 'value1' })
publisher.publishMsg({ topic: 'topic', data: 'string data', headers: { 'header1': 'value1' } })
publisher.publishMsg({ topic: 'topic', raw: [ 0, 1, 2, 3 ], headers: { 'header1': 'value1' } })
const message = publisher.request('topic', 'data', { 'header1': 'value1' })

Subscribing

You can subscribe to a topic using the following functions:

FunctionDescription
subscribe(topic, callback)subscribe to a topic (string) and execute the callback function when a message is received, it returns a subscription

Example:

const subscriber = new Nats(natsConfig)
const subscription = subscriber.subscribe('topic', (msg) => {
    console.log(msg.data)
})
// ...
subscription.close()

Note: the subscription model has been changed. Now when you use subscribe method, a subscription object is returned and the subscription should be closed using the close() method.

JetStream

You can use JetStream Pub/Sub in the same way as NATS Pub/Sub. The only difference is that you need to setup the stream before publishing or subscribing to it.

The configuration is the same as the one used in the nats-io's StreamConfig:

AttributeDescription
name(mandatory) is the name of the stream
description(optional) is the description of the stream
subjects(mandatory) is the list of subjects that the stream will be listening to
retention(optional) is the retention policy of the stream, it can be limits, interest, workqueue or stream
max_consumers(optional) is the maximum number of consumers that the stream will allow
max_msgs(optional) is the maximum number of messages that the stream will store
max_bytes(optional) is the maximum number of bytes that the stream will store
max_age(optional) is the maximum age of the messages that the stream will store
max_msg_size(optional) is the maximum size of the messages that the stream will store
discard(optional) is the discard policy of the stream, it can be old, new or none
storage(optional) is the type of storage that the stream will use, it can be file or memory
replicas(optional) is the number of replicas that the stream will have
no_ack(optional) is a boolean value that indicates if the stream will use acks or not

Example:

const streamConfig = {
    name: "mock",
    subjects: ["foo"],
    max_msgs_per_subject: 100,
    discard: 0,
    storage: 1
}

const publisher = new Nats(natsConfig)
publisher.jetStreamSetup(streamConfig)

JetStream operations

Once the stream is setup, you can publish and subscribe to it using the following functions:

FunctionDescription
jetStreamSetup(config)setup a stream with the given configuration
jetStreamPublish(topic, payload)publish a new message using the topic (string) and the given payload that is a string representation that later is serialized as a byte array
jetStreamPublishWithHeaders(topic, payload, headers)publish a new message using the topic (string), the given payload that is a string representation that later is serialized as a byte array and the headers
jetStreamPublishMsg(message)publish a new message using the message (object) that has the following attributes: topic (string), data (string), raw(byte array) and headers (object)
jetStreamSubscribe(topic, callback)subscribe to a topic (string) and execute the callback function when a message is received, it returns a subscription

Example:

const subscriber = new Nats(natsConfig)
publisher.jetStreamSetup(streamConfig)
const subscription = subscriber.jetStreamSubscribe('mock', (msg) => {
    console.log(msg.data)
})

const publisher = new Nats(natsConfig)
publisher.jetStreamPublish('foo', 'data')
publisher.jetStreamPublishWithHeaders('foo', 'data', { 'header1': 'value1' })
publisher.jetStreamPublishMsg({ topic: 'topic', data: 'string data', headers: { 'header1': 'value1' } })
publisher.jetStreamPublishMsg({ topic: 'topic', raw: [ 0, 1, 2, 3 ], headers: { 'header1': 'value1' } })

// ...

subscription.close()

Return values

A subscription return value has the following methods:

MethodDescription
close()closes the subscription

A message return value has the following attributes:

AttributeDescription
rawthe payload in byte array format
datathe payload in string format
topicthe topic where the message was published
headersthe headers of the message

Examples

You can find some examples in the examples folder. To run them, you need to have a NATS server running and then run the following command:

k6 run -e NATS_HOSTNAME=your_nats_server_host examples/binary.js
k6 run -e NATS_HOSTNAME=your_nats_server_host examples/complex.js
k6 run -e NATS_HOSTNAME=your_nats_server_host examples/simple.js
k6 run -e NATS_HOSTNAME=your_nats_server_host examples/withHeaders.js

Or you can check the test folder to see how to use the extension.

License

The source code of this project is released under the MIT License.