Awesome
🚦 semaphore
Semaphore pattern implementation with timeout of lock/unlock operations based on channels.
Usage
Quick start
limiter := semaphore.New(1000)
http.HandleFunc("/", func(rw http.ResponseWriter, _ *http.Request) {
if _, err := limiter.Acquire(semaphore.WithTimeout(time.Minute)); err != nil {
http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
defer limiter.Release()
// handle request
})
log.Fatal(http.ListenAndServe(":80", nil))
Console tool for command execution in parallel
This example shows how to execute many console commands in parallel.
$ semaphore create 2
$ semaphore add -- docker build
$ semaphore add -- vagrant up
$ semaphore add -- ansible-playbook
$ semaphore wait --timeout=1m --notify
See more details here.
HTTP response time limitation
This example shows how to follow SLA.
sla := 100 * time.Millisecond
sem := semaphore.New(1000)
http.Handle("/do-with-timeout", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
done := make(chan struct{})
deadline := semaphore.WithTimeout(sla)
go func() {
release, err := sem.Acquire(deadline)
if err != nil {
return
}
defer release()
defer close(done)
// do some heavy work
}()
// wait what happens before
select {
case <-deadline:
http.Error(rw, "operation timeout", http.StatusGatewayTimeout)
case <-done:
// send success response
}
}))
HTTP request throughput limitation
This example shows how to limit request throughput.
limiter := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
throughput := semaphore.New(limit)
return func(rw http.ResponseWriter, req *http.Request) {
deadline := semaphore.WithTimeout(timeout)
release, err := throughput.Acquire(deadline)
if err != nil {
http.Error(rw, err.Error(), http.StatusTooManyRequests)
return
}
defer release()
handler.ServeHTTP(rw, req)
}
}
http.HandleFunc("/do-with-limit", limiter(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
// do some limited work
}))
HTTP personal rate limitation
This example shows how to create user-specific rate limiter.
func LimiterForUser(user User, cnf Config) semaphore.Semaphore {
mx.RLock()
limiter, ok := limiters[user]
mx.RUnlock()
if !ok {
mx.Lock()
// handle negative case
mx.Unlock()
}
return limiter
}
func RateLimiter(cnf Config, handler http.HandlerFunc) http.HandlerFunc {
return func(rw http.ResponseWriter, req *http.Request) {
user, ok := // get user from request context
limiter := LimiterForUser(user, cnf)
release, err := limiter.Acquire(semaphore.WithTimeout(cnf.SLA))
if err != nil {
http.Error(rw, err.Error(), http.StatusGatewayTimeout)
return
}
// handle the request in separated goroutine because the current will be held
go func() { handler.ServeHTTP(rw, req) }()
// hold the place for a required time
rl, ok := cnf.RateLimit[user]
if !ok {
rl = cnf.DefaultRateLimit
}
time.Sleep(rl)
release()
// rate limit = semaphore capacity / rate limit time, e.g. 10 request per second
}
}
http.HandleFunc("/do-with-rate-limit", RateLimiter(cnf, func(rw http.ResponseWriter, req *http.Request) {
// do some rate limited work
}))
Use context for cancellation
This example shows how to use context and semaphore together.
deadliner := func(limit int, timeout time.Duration, handler http.HandlerFunc) http.HandlerFunc {
throughput := semaphore.New(limit)
return func(rw http.ResponseWriter, req *http.Request) {
ctx := semaphore.WithContext(req.Context(), semaphore.WithTimeout(timeout))
release, err := throughput.Acquire(ctx.Done())
if err != nil {
http.Error(rw, err.Error(), http.StatusGatewayTimeout)
return
}
defer release()
handler.ServeHTTP(rw, req.WithContext(ctx))
}
}
http.HandleFunc("/do-with-deadline", deadliner(1000, time.Minute, func(rw http.ResponseWriter, req *http.Request) {
// do some limited work
}))
A pool of workers
This example shows how to create a pool of workers based on semaphore.
type Pool struct {
sem semaphore.Semaphore
work chan func()
}
func (p *Pool) Schedule(task func()) {
select {
case p.work <- task: // delay the task to already running workers
case release, ok := <-p.sem.Signal(nil): if ok { go p.worker(task, release) } // ok is always true in this case
}
}
func (p *Pool) worker(task func(), release semaphore.ReleaseFunc) {
defer release()
var ok bool
for {
task()
task, ok = <-p.work
if !ok { return }
}
}
func New(size int) *Pool {
return &Pool{
sem: semaphore.New(size),
work: make(chan func()),
}
}
func main() {
pool := New(2)
pool.Schedule(func() { /* do some work */ })
...
pool.Schedule(func() { /* do some work */ })
}
Interrupt execution
interrupter := semaphore.Multiplex(
semaphore.WithTimeout(time.Second),
semaphore.WithSignal(os.Interrupt),
)
sem := semaphore.New(runtime.GOMAXPROCS(0))
_, err := sem.Acquire(interrupter)
if err == nil {
panic("press Ctrl+C")
}
// successful interruption
Installation
$ go get github.com/kamilsk/semaphore
$ # or use mirror
$ egg bitbucket.org/kamilsk/semaphore
Update
This library is using SemVer for versioning, and it is not
BC-safe. Therefore, do not use go get -u
to update it,
use dep, glide or something similar for this purpose.
<sup id="egg">1</sup> The project is still in prototyping. ↩
made with ❤️ by OctoLab