Home

Awesome

rscylla

A reflex stream client for a scylladb CDC log table. It provides an API for consuming scyllaDB CDC logs with at-least-once semantics*. It borrows a lot from scylla-cdc-go.

Usage

// Define your consumer business logic
f := func(ctx context.Context, fate fate.Fate, e *reflex.Event) error {
  fmt.Print("Consuming scylla CDC event", e)
  return fate.Tempt() // Fate injects application errors at runtime, enforcing idempotent logic.
}

// Define some more variables
var keyspace, table, scyllaAddr, consumerName string

// Connect to scylla
cluster := gocql.NewCluster(scyllaAddr)
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
session, _ := cluster.CreateSession()

// Setup rscylla and reflex
stream := rscylla.NewStream(session, keyspace, table)
consumer := reflex.NewConsumer(consumerName, fn)
cstore := rsql.NewCursorStore() // TODO(corver): Add rscyllla cursor store
spec := reflex.NewSpec(stream.Stream, cstore, consumer)

// Stream forever!
// Progress is stored in the cursor store, so restarts or any error continue where it left off.
for {
  err := reflex.Run(context.Backend(), spec)
  if err != nil { // Note Run always returns non-nil error
    log.Printf("stream error: %v", err)
  }
}

ScyllaDB CDC Overview

ScyllaDB CDC log tables are implemented as normal scylladb tables. A CDC log table contains the append-only log of all changes applied to the base table.

Like all NoSQL DBs of this type, the CDC log data is sharded by the partition key StreamId and sorted by the clustering key Time (and BatchSeqNo).

The CDC log is therefore multiple streams each identified by a StreamID. Ordering is only guaranteed within such a stream.

Note that when the scyllaDB cluster topology changes by a node joining or leaving, a new set of streams are created. This is called "stream generations". Each generation has its own streams and StreamIDs. Generations are sequential in time.

In Kafka terms a CDC log table is a topic and the streams are partitions.

Mapping to Reflex

Reflex was designed for single ordered streams, like mysql event tables or Firehose S3 buckets. A reflex EventID is normally a point in that stream and can therefore be used as a cursor.

Mapping multiple streams to a single reflex stream is therefore a little different.

rsylla basically slices the CDC streams into fixed time windows that are streamed one window at a time. The start of the time window is used as the cursor, not the events themselves. Once all events in the window have been streamed, the cursor is updated to the next window and the cycle begins again.

A rscylla EventID consists of two parts:

// eventID is json encoded as the reflex.EventID.
// It points both to the CDC log row itself and the
// time window it was streamed in.
type eventID struct {
  // Cursor of streamed window
  CursorGen  time.Time `json:"cursor_gen"`
  CursorTime time.Time `json:"cursor_time"`

  // CDC log row primary key
  StreamID   string `json:"stream_id"`
  StreamTime string `json:"stream_time"`
  StreamSeq  int `json:"stream_seq"`
}

Limitations

Features

TODO