Home

Awesome

<a title="Codecov" target="_blank" href="https://github.com/alitto/pond/actions"><img alt="Build status" src="https://github.com/alitto/pond/actions/workflows/main.yml/badge.svg?branch=master&event=push"/></a> <a title="Codecov" target="_blank" href="https://codecov.io/gh/alitto/pond"><img src="https://codecov.io/gh/alitto/pond/branch/master/graph/badge.svg"/></a> <a title="Release" target="_blank" href="https://github.com/alitto/pond/releases"><img src="https://img.shields.io/github/v/release/alitto/pond"/></a> <a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/alitto/pond"><img src="https://goreportcard.com/badge/github.com/alitto/pond"/></a>

pond

Minimalistic and High-performance goroutine worker pool written in Go

Motivation

This library is meant to provide a simple way to limit concurrency when executing some function over a limited resource or service.

Some common scenarios include:

Features:

How to install

go get -u github.com/alitto/pond

How to use

Worker pool with dynamic size

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a buffered (non-blocking) pool that can scale up to 100 workers
	// and has a buffer capacity of 1000 tasks
	pool := pond.New(100, 1000)

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}

Worker pool with fixed size

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create an unbuffered (blocking) pool with a fixed 
	// number of workers
	pool := pond.New(10, 0, pond.MinWorkers(10))

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}

Submitting a group of tasks

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a pool
	pool := pond.New(10, 1000)
	defer pool.StopAndWait()

	// Create a task group
	group := pool.Group()

	// Submit a group of tasks
	for i := 0; i < 20; i++ {
		n := i
		group.Submit(func() {
			fmt.Printf("Running group task #%d\n", n)
		})
	}

	// Wait for all tasks in the group to complete
	group.Wait()
}

Submitting a group of tasks associated to a context (since v1.8.0)

This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task. Similar to errgroup.Group from golang.org/x/sync/errgroup package with concurrency bounded by the worker pool.

package main

import (
	"context"
	"fmt"
	"net/http"

	"github.com/alitto/pond"
)

func main() {

	// Create a worker pool
	pool := pond.New(10, 1000)
	defer pool.StopAndWait()

	// Create a task group associated to a context
	group, ctx := pool.GroupContext(context.Background())

	var urls = []string{
		"https://www.golang.org/",
		"https://www.google.com/",
		"https://www.github.com/",
	}

	// Submit tasks to fetch each URL
	for _, url := range urls {
		url := url
		group.Submit(func() error {
			req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
			resp, err := http.DefaultClient.Do(req)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}

	// Wait for all HTTP requests to complete.
	err := group.Wait()
	if err != nil {
		fmt.Printf("Failed to fetch URLs: %v", err)
	} else {
		fmt.Println("Successfully fetched all URLs")
	}
}

Pool Configuration Options

MinWorkers

Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:

// This will create a pool with 5 running worker goroutines 
pool := pond.New(10, 1000, pond.MinWorkers(5))

IdleTimeout

Defines how long to wait before removing idle worker goroutines from the pool. The default value is 5 seconds. Example:

// This will create a pool that will remove workers 100ms after they become idle 
pool := pond.New(10, 1000, pond.IdleTimeout(100 * time.Millisecond))

PanicHandler

Allows to configure a custom function to handle panics thrown by tasks submitted to the pool. The default handler just writes a message to standard output using fmt.Printf with the following contents: Worker exits from a panic: [panic] \n Stack trace: [stack trace]). Example:

// Custom panic handler function
panicHandler := func(p interface{}) {
	fmt.Printf("Task panicked: %v", p)
}

// This will create a pool that will handle panics using a custom panic handler
pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))

Strategy

Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the pond.ResizingStrategy interface or choose one of the 3 presets:

// Example: create pools with different resizing strategies
eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager()))
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))

Context

Configures a parent context on this pool to stop all workers when it is cancelled. The default value context.Background(). Example:

// This creates a pool that is stopped when myCtx is cancelled 
pool := pond.New(10, 1000, pond.Context(myCtx))

Resizing strategies

The following chart illustrates the behaviour of the different pool resizing strategies as the number of submitted tasks increases. Each line represents the number of worker goroutines in the pool (pool size) and the x-axis reflects the number of submitted tasks (cumulative).

Pool resizing strategies behaviour

As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs (GOMAXPROCS). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.

Stopping a pool

There are 3 methods available to stop a pool and release associated resources:

Metrics & monitoring

Each worker pool instance exposes useful metrics that can be queried through the following methods:

In our Prometheus example we showcase how to configure collectors for these metrics and expose them to Prometheus.

Examples

API Reference

Full API reference is available at https://pkg.go.dev/github.com/alitto/pond

Benchmarks

See Benchmarks.

Resources

Here are some of the resources which have served as inspiration when writing this library:

Contribution & Support

Feel free to send a pull request if you consider there's something which can be improved. Also, please open up an issue if you run into a problem when using this library or just have a question.