Home

Awesome

Dendrite

Dendrite is a Go package that implements distributed hash table (DHT) based on Chord Protocol. Included sub-package 'dtable' is built on top of dendrite and implements distributed in-memory key/value database, with replication and failover support, with query interface to Get() or Set() items with different consistency levels.

For better key distribution, dendrite allows configurable number of virtual nodes per instance (vnodes). The number of replicas in dtable is also configurable.

Calling application can bootstrap the cluster, or join existing one by connecting to any of existing nodes (must be manually specified). Node discovery is not part of the implementation. Use consul (consul.io) or something else for that purpose.

Chord protocol defines ring stabilization. In dendrite, stabilization period is configurable.

Node to node (network) communication is built on top of ZeroMQ sockets over TCP for speed, clustering and reliability. Dendrite starts configurable number of goroutines (default: 10) for load balanced serving of remote requests, but scales that number up and down depending on the load (aka prefork model).

All messages sent through dendrite are encapsulated in ChordMsg structure, where first byte indicates message type, and actual data follows. Data part is serialized with protocol buffers.

Dendrite can be extended through two interfaces:

TransportHook allows other packages to provide additional message types, decoders and handlers, while DelegateHook can be used to capture chord events that dendrite emits:

Documentation

Usage

import "github.com/fastfn/dendrite"
import "github.com/fastfn/dendrite/dtable"
...
// Initialize ZMQTransport with timeout set to 5 seconds
transport, err := dendrite.InitZMQTransport("127.0.0.1:5000", 30*time.Second, nil)
if err != nil {
	panic(err)
	return
}
config := dendrite.DefaultConfig("127.0.0.1:5000")

Bootstrap the cluster (first node)

// Start new cluster
ring, err = dendrite.CreateRing(config, transport)
if err != nil {
	panic(err)
}
table = dtable.Init(ring, transport, dtable.LogInfo)

Joining the cluster

// We join the cluster by providing the address of one of existing nodes in the cluster.
ring, err = dendrite.JoinRing(config, transport, "192.168.0.50:5000")
if err != nil {
	panic(err)
}
table = dtable.Init(ring, transport, dtable.LogInfo)

DTable Query examples

Set()

query := table.NewQuery()
err := query.Set([]byte("testkey"), []byte("testvalue"))
if err != nil {
	panic(err)
}

Set() with consistency

Consistency() is used prior to Set() to request minimum writes before operation returns success. If dtable runs with 2 replicas, user may request 2 writes (primary + 1 replica) and let dtable handle final write in the background. If requested value is larger than configured dendrite replicas, value is reset to default. Default is 1.

query := table.NewQuery()
err := query.Consistency(2).Set([]byte("testkey"), []byte("testvalue"))
if err != nil {
	panic(err)
}

Get()

query := table.NewQuery()
item, err := query.Get([]byte("testkey"))
if err != nil {
	log.Println("Got error in table Get: ", err)
} else if item == nil {
	log.Printf("item not found")
} else {
	log.Printf("Value is: %s\n", string(item.Val))
}

GetLocalKeys()

GetLocalKeys() returns the list of all keys stored on local node.

query := table.NewQuery()
for _, key := range query.GetLocalKeys() {
	log.Printf("Key: %s\n", string(key))
}

Todo