Home

Awesome

go-liftbridge CircleCI GoDoc

Go client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.

Liftbridge provides the following high-level features:

Installation

$ go get github.com/liftbridge-io/go-liftbridge/v2

Basic Usage

package main

import (
	"fmt"

	lift "github.com/liftbridge-io/go-liftbridge/v2"
	"golang.org/x/net/context"
)

func main() {
	// Create Liftbridge client.
	addrs := []string{"localhost:9292", "localhost:9293", "localhost:9294"}
	client, err := lift.Connect(addrs)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Create a stream attached to the NATS subject "foo".
	var (
        	subject = "foo"
        	name    = "foo-stream"
	)
	if err := client.CreateStream(context.Background(), subject, name); err != nil {
		if err != lift.ErrStreamExists {
			panic(err)
		}
	}
	
	// Publish a message to "foo".
	if _, err := client.Publish(context.Background(), name, []byte("hello")); err != nil {
		panic(err)
	}

	// Subscribe to the stream starting from the beginning.
	ctx := context.Background()
	if err := client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
		if err != nil {
			panic(err)
		}
		fmt.Println(msg.Offset(), string(msg.Value()))
	}, lift.StartAtEarliestReceived()); err != nil {
		panic(err)
	}

	<-ctx.Done()
}

Create Stream

Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.

Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages will be balanced among them.

// Create a stream attached to the NATS subject "foo.*" that is replicated to
// all the brokers in the cluster. ErrStreamExists is returned if a stream with
// the given name already exists.
client.CreateStream(context.Background(), "foo.*", "my-stream", lift.MaxReplication())

We can also configure different properties of the stream with options, which
allow overriding the server settings.

// Create a stream and set the rentention to 134217728 bytes (128MB) and enable
// stream compaction.
client.CreateStream(context.Background(), subject, name,
    lift.RetentionMaxBytes(134217728), lift.CompactEnabled(true))

Also, the client has the possibility to create a stream with enforced Optimistic Concurrency Control. This will create a stream and enable Optimistic Concurrency Control on that specific stream.

// Create a stream with Optimistic Concurrency Control enabled
err = client.CreateStream(context.Background(), "foo", stream, lift.OptimisticConcurrencyControl(true))

Subscription Start/Replay Options

Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe. Client can also choose to subscribe from partition's leader (by default) or from a random ISR replica

// Subscribe starting with new messages only.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
})

// Subscribe starting with the most recently published value.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtLatestReceived())

// Subscribe starting with the oldest published value.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtEarliestReceived())

// Subscribe starting at a specific offset.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtOffset(42))

// Subscribe starting at a specific time.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtTime(time.Now()))

// Subscribe starting at a specific amount of time in the past.
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.StartAtTimeDelta(time.Minute))

// Subscribe to a random ISR replica
// this helps reduce the work load
// for partition leader
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset(), string(msg.Value()))
}, lift.ReadISRReplica())

Publishing

There are two publish APIs provided to make it easy to write messages to streams, Publish and PublishToSubject. These include a number of options for decorating messages with metadata like a message key and headers as well as configuring acking behavior from the server.

Publish sends a message to a Liftbridge stream. The stream partition that gets published to is determined by the provided partition or Partitioner strategy passed through MessageOptions, if any. If a partition or Partitioner is not provided, it will publish to the base partition (partition 0). This partition determines the underlying NATS subject that gets published to. To publish directly to a specific NATS subject, use the low-level PublishToSubject API described below.

Keys are used by Liftbridge's log compaction. When enabled, Liftbridge streams will retain only the last message for a given key.

// Publish a message with a key and header set.
client.Publish(context.Background(), "foo-stream", []byte("hello"),
	lift.Key([]byte("key"),
	lift.Header("foo", []byte("bar")),
)

An AckPolicy tells the server when to send an ack. If a deadline is provided to Publish, it will block up to this amount of time waiting for the ack.

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo-stream", []byte("hello"),
	lift.AckPolicyAll(), // Wait for all stream replicas to get the message
)

ctx, _ = context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo-stream", []byte("hello"),
	lift.AckPolicyLeader(), // Wait for just the stream leader to get the message
)

client.Publish(context.Background(), "foo-stream", []byte("hello"),
	lift.AckPolicyNone(), // Don't send an ack
)

Also, on specific streams where Optimistic Concurrency Control is enabled, the client has to publish a message with ExpectedOffset. The ExpectedOffset is the offset that should be on the stream's partition if the message is published successfully.

Note: as the error of concurrency is piggybacked via the Ack, so AckPolicy has to be set to use this feature.

Note: by default, Liftbridge concurrency control considers the ExpectedOffset value -1 to be the signil to indicate default behavior: next offset, i.e: simply skip the concurrency control verification. By default, to provide compatibility, the client should make sure that this value is always set to -1 when concurrency control is not used.

	// Publish Async with expected offset
	err = client.PublishAsync(context.Background(), "foo", []byte("hello"),
		func(ack *lift.Ack, err error) {
			errorC <- err
		},
		lift.AckPolicyLeader(),
		// Correct offset is 0 (first message)
		lift.SetExpectedOffset(0),
	)
	require.NoError(t, err)

PublishToSubject sends a message directly to the provided NATS subject. Note that because this publishes directly to a subject, there may be multiple (or no) streams that receive the message. As a result, MessageOptions related to partitioning will be ignored. To publish at the stream/partition level, use the high-level Publish API described above.

// Publish a message directly to a NATS subject.
client.PublishToSubject(context.Background(), "foo.bar", []byte("hello"))

Alternatively, messages can be published directly to NATS using a NATS client and Liftbridge helpers detailed below.

Publishing Directly with NATS

Since Liftbridge is an extension of NATS, a NATS client can also be used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.

package main

import "github.com/nats-io/go-nats"

func main() {
	// Connect to NATS.
	nc, _ := nats.Connect(nats.DefaultURL)

	// Publish a message.
	nc.Publish("foo.bar", []byte("Hello, world!")) 
	nc.Flush()
}

As shown with the publish APIs above, Liftbridge allows publishers to add metadata to messages, including a key, ack inbox, correlation ID, and ack policy. The message key can be used for stream compaction in Liftbridge. Acks are used to guarantee Liftbridge has recorded a message to ensure at-least-once delivery. The ack inbox determines a NATS subject to publish an acknowledgement to once Liftbridge has committed the message. The correlation id is used to correlate an ack back to the original message. The ack policy determines when Liftbridge acknowledges the message: when the stream leader has stored the message, when all replicas have stored it, or no ack at all.

This additional metadata is sent using a message envelope which is a protobuf. The publish APIs handle this for you, but this client library also provides helper APIs to make it easy to create envelopes and deal with acks yourself using a NATS client directly.

var (
	ackInbox = "foo.acks"
	cid      = "some-random-id"
)

// Create a message envelope to publish.
msg := lift.NewMessage([]byte("Hello, world!"),
	lift.Key([]byte("foo")), // Key to set on the message
	lift.AckInbox(ackInbox), // Send ack to this NATS subject
	lift.AckPolicyAll(),     // Send ack once message is fully replicated
	lift.CorrelationID(cid), // Set the ID which will be sent on the ack
)

// Setup a NATS subscription for acks.
sub, _ := nc.SubscribeSync(ackInbox)

// Publish the message.
nc.Publish("foo.bar", msg)

// Wait for ack from Liftbridge.
resp, _ := sub.NextMsg(5*time.Second)
ack, _ := lift.UnmarshalAck(resp.Data)
if ack.CorrelationID() == cid {
	fmt.Println("message acked!")
}

Partitioning

Liftbridge streams are partitioned to allow for increased parallelism. By default, a stream consists of a single partition, but the number of partitions can be configured when the stream is created.

Creating Partitioned Streams

// Create stream with three partitions.
client.CreateStream(context.Background(), "bar", "bar-stream", lift.Partitions(3))

Each partition maps to a NATS subject derived from the base stream subject. For example, the partitions for a stream with three partitions attached to the subject "bar" map to the NATS subjects "bar", "bar.1", and "bar.2", respectively.

Publishing to Stream Partitions

By default, clients will publish to the base partition, but this can be configured by providing a Partitioner.

// Publish to partition based on message key hash.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByKey(),
)

// Publish to partitions in a round-robin fashion.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByRoundRobin(),
)

// Publish to a specific partition.
client.Publish(context.Background(), "bar-stream", []byte("hello"),
	lift.Key([]byte("key")),
	lift.ToPartition(1),
)

// Publish directly to a partition NATS subject.
client.PublishToSubject(context.Background(), "bar.1", []byte("hello"),
	lift.Key([]byte("key")))

A custom Partitioner implementation can also be provided to Publish. PublishToSubject will ignore any partition-related MessageOption.

Subscribing to Stream Partitions

Like publishing, clients will subscribe to the base partition by default. However, a specific partition to consume from can be specified at subscribe time.

// Subscribe to a specific partition.
client.Subscribe(ctx, "bar-stream", func(msg *lift.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.Partition(1))