Home

Awesome

PD: Pubsub service base on disk queue on golang

GitHub license GoDoc Build Status

What is Pubsub

Pubsub is prove of concept implement for Redis "Pub/Sub" messaging management feature. SUBSCRIBE, UNSUBSCRIBE and PUBLISH implement the Publish/Subscribe messaging paradigm where (citing Wikipedia) senders (publishers) are not programmed to send their messages to specific receivers (subscribers). (sited from here)

What is Disk Queue

Disk Queue is a data structure which come from NSQ. It is message queue data structure using disk file as storage medium.

How it work together

This is no API change from Pub/Sub mechanism, but change it basic concurrency process.

Topic as a another object to handle Topic related info with Disk Queue. Each Topic contains two kind information:

In this modification, it gain follow benefits:

Installation and Usage

Install

go get github.com/kkdai/pd

Usage

package main

import (
	"fmt"
	"time"

	. "github.com/kkdai/pubsub"
)

func main() {
	ser := NewPubsub(1)
	c1 := ser.Subscribe("topic1")
	c2 := ser.Subscribe("topic2")
	ser.Publish("test1", "topic1")
	ser.Publish("test2", "topic2")
	fmt.Println(<-c1)
	//Got "test1"
	fmt.Println(<-c2)
	//Got "test2"

	// Add subscription "topic2" for c1.
	ser.AddSubscription(c1, "topic2")

	// Publish new content in topic2
	ser.Publish("test3", "topic2")

	fmt.Println(<-c1)
	//Got "test3"
	fmt.Println(<-c2)
	//Got "test3"

	// Remove subscription "topic2" in c1
	ser.RemoveSubscription(c1, "topic2")

	// Publish new content in topic2
	ser.Publish("test4", "topic2")

	select {
	case val := <-c1:
		fmt.Errorf("Should not get %v notify on remove topic", val)
		break
	case <-time.After(time.Second):
		//Will go here, because we remove subscription topic2 in c1.
		fmt.Println("Not receive any msg from topic2, timeout.")
		break
	}
} 

Benchmark

Benchmark include memory usage. (original memory)

BenchmarkAddSub-4       	     500	2906467 ns/op
BenchmarkRemoveSub-4    	   10000	 232910 ns/op
BenchmarkBasicFunction-4	 5000000	    232 ns/op

Benchmark include memory usage. (Using Disk Queue)

BenchmarkAddSub-4       	  300000	125628 ns/op 
BenchmarkRemoveSub-4    	  200000    144854 ns/op
BenchmarkBasicFunction-4	    2000	906076 ns/op

Inspired By

Project52

It is one of my project 52.

License

This package is licensed under MIT license. See LICENSE for details.