Home

Awesome

Batcher

This content supports the Rate Limiting Cloud Design Pattern found in the Azure Architecture Center.

Overview

Batcher is a datastore-agnostic batching and rate-limiting implementation for Go.

Datastores have performance limits and the work that is executed against them has costs in terms of memory, CPU, disk, network, and so on (whether you have quantified those costs or not).

Batcher, not only allows you to enqueue operations which are then given back to you in a batch, but can provide an easy way for applications to consume all available resources on a datastore without exceeding their performance limits.

Use Cases

Here are the most common use cases for Batcher:

Rate limiting on datastores

Consider this scenario:

Given that we have capacity of 20K RU per second, we know that we could complete the work in 100 seconds if we could spread it out.

However, each process might try and send their own 100K records in parallel and with no knowledge of each other. This would cause Cosmos to start issuing 429 TooManyRequests error messages and given the volume it might even cut you off with 503 ServiceUnavailable error messages.

Batcher solves this problem by allowing you to share the capacity across multiple replicas and controlling the flow of traffic so you don't exceed the 20K RU per second.

Cost savings - Reserved vs Shared Capacity

Consider this scenario:

Using Batcher, you might still reserve capacity per instance, but it can be a small amount. You can then share capacity across the instances. For instance, you might reserve 2K RU for each of the 4 instances and share an addition 18K RU, allowing each instance to have capacity between 2K and 20K RU. This would require provisioning 26K RU in Cosmos instead of 80K RU.

Cost control

Consider this scenario:

Batcher will ensure that you don't exceed this capacity by lengthening the time it takes for your operations to complete. Therefore, if you find that your application takes too long for operations to complete, you can increase the capacity. If you want to save money, you can decrease the capacity.

Rate limiting on other resource targets

Batcher use cases are not limited to datastores. Consider the scenario where you want to limit the number of messages you are allowed to send in a mail API. Batcher can provide the same rate limiting feature.

Batcher Components

topology

Terminology

Some other terms will be used throughout...

Features

Workflow

  1. A Batcher is created for a datastore. It may be assigned a rate limiter.

  2. A user creates a Watcher, then enqueues Operations into the Batcher.

  3. In the Batcher processing loop, the CapacityInterval asks for capacity from the rate limiter to process the Operations.

  4. In the Batcher processing loop, the FlushInterval organizes Operations into batches and raises them back to the Watchers.

  5. The user performs the queries as batches are raised.

Usage

This code sample shows the general usage...

  1. If you are going to use rate limiting...
    1. Create a rate limiter for each Batcher via New() methods
    2. Set capacity for the rate limiters and/or attach LeaseManagers if appropriate
    3. Start() those rate limiters
  2. Create one or more Batchers via New() methods
  3. Start() those Batchers
  4. As you need to process data...
    1. Create a Watcher
    2. Enqueue Operations into the appropriate Batcher
import (
    gobatcher "github.com/plasne/go-batcher"
)

func main() {
    ctx := context.Background()

    // start getting shared resource capacity
    leaseMgr := gobatcher.NewAzureBlobLeaseManager(AZBLOB_ACCOUNT, AZBLOB_CONTAINER, AZBLOB_KEY)
    azresource := gobatcher.NewSharedResource().
        WithSharedCapacity(uint32(CAPACITY), leaseMgr).
        WithFactor(1000)
    resourceListener := azresource.AddListener(func(event string, val int, msg string, metadata interface{}) {
        switch event {
        case gobatcher.ErrorsEvent:
            log.Err(errors.New(msg)).Msgf("SharedResource raised the following error...")
        }
    })
    defer azresource.RemoveListener(resourceListener)
    if err := azresource.Provision(ctx); err != nil {
        panic(err)
    }
    if err := azresource.Start(ctx); err != nil {
        panic(err)
    }

    // start the batcher
    batcher := gobatcher.NewBatcher().
        WithRateLimiter(azresource)
    batcherListener := batcher.AddListener(func(event string, val int, msg string, metadata interface{}) {
        switch event {
        case gobatcher.PauseEvent:
            log.Debug().Msgf("batcher paused for %v ms to alleviate pressure on the datastore.", val)
        case gobatcher.AuditFailEvent:
            log.Debug().Msgf("batcher audit-fail: %v", msg)
        }
    })
    defer batcher.RemoveListener(batcherListener)
    if err := batcher.Start(ctx); err != nil {
        panic(err)
    }

    // example of an HTTP call
    http.HandleFunc("/ingest", func(res http.ResponseWriter, req *http.Request) {

        // create a batch watcher
        watcher := gobatcher.NewWatcher(func(batch []gobatcher.Operation) {
            // process the batch
        }).WithMaxAttempts(3)

        // enqueue operations
        for i := 0; i < total; i++ {
            payload := struct{}{}
            op := gobatcher.NewOperation(watcher, 10, payload).AllowBatch()
            if errorOnEnqueue := batcher.Enqueue(op); errorOnEnqueue != nil {
                panic(errorOnEnqueue)
            }
        }
    })
}

:information_source: A full code sample that demonstrates usage is available in the sample folder.

There are some additional commands that can be executed on Batcher, including...

Batcher Configuration

Batcher can be configured depending on your use case and requirements. For example, creating a new Batcher with some configuration items might look like this...

batcher := gobatcher.NewBatcherWithBuffer(buffer).
    WithRateLimiter(rateLimiter)

All configuration options are documented in the Batcher Configuration docs.

Events

Events are raised with a "name" (string), "val" (int), "msg" (string), and "metadata" (interface{}). Some of the events that can be raised by Batcher are shutdown or pause, while the rate limiters can raise events like capacity to indicate capacity changes. The complete list of events is documented in the Batcher Events docs.

Unit Testing

There are several ways to facilitate unit testing when using Batcher. Detailed documentation is available in the Unit Testing docs.

Rate Limiting

The SharedResource rate limiter works like this...

lease

When there is only ReservedCapacity and no SharedCapacity is set, there is no requirement to use a LeaseManager.

Scenarios

There are a couple of scenarios I want to call attention to...

Cost Savings

Traditionally if you want to run multiple instances of a service, you might provision capacity in your datastore times the number of instances. For example, in Azure Cosmos, if you need 20K RU and have 4 instances, you might provision 80K RU to ensure that any node could operate at maximum capacity.

Using SharedResource, you might still reserve capacity per instance, but it can be a small amount. You can then share capacity across the instances. For instance, in the same scenario, you might reserve 2K RU for the 4 instances and (minimally) share an addition 18K RU.

reserved-shared-capacity To give a cost comparison with retail pricing in the East US region with 1 TB of capacity:

Cost Increase

When using an AzureBlobLeaseManager and default settings, each instance of the SharedResource will make a single storage transaction roughly every 250 milliseconds when it needs additional capacity. Therefore, we can determine the maximum cost for 4 instances on an Azure Storage Account GPv2 (or Blob) as...

(4 processes) x (4 lease operations per second) x (60 seconds per minute) x (60 minutes per hour) x 730 (hours per month) / (10,000 operations per billing unit) * ($0.004 per billing unit) = ~$168 month

However, this is a maximum cost - actual costs in many cases will be much lower as there are only storage operations when additional capacity is needed.

Changing Capacity

The SharedResource rate limiter supports changing capacity after Start(). You can call SetSharedCapacity(newcap) and SetReservedCapacity(newcap). If you change SharedCapacity, to a higher value than previously seen, realize it will need to provision partitions again. For AzureBlobLeaseManager, it will need to provision blobs (per SharedCapacity divided by Factor) before it can return to its normal cycle of procuring partitions for capacity.

Determining Cost

A Batcher with a rate limiter depends on each operation having a cost. The following documents provide you with assistance on determining what values you should use for cost.

Context

When you Start() Batcher or a RateLimiter you must provide a context. If the context is ever "done" (cancelled, deadlined, etc.), the Batcher or RateLimiter is shutdown. Once it is shutdown it cannot be restarted.

Generally, you want Batcher (and any associated RateLimiter) to run for the duration of your process. If that is true, you can simply use context.Background().

However, if you wanted Batcher (and any associated RateLimiter) to explicitly shutdown when some work was done, you could do something like:

func DoWork(workitems chan WorkItem) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // when DoWork is done (ie. workitems is closed), cancel the context
    batcher := gobatcher.NewBatcher()
    watcher := gobatcher.NewWatcher(func(batch []gobatcher.Operation) {
        // process the workitems in the batch
    })
    err := batcher.Start(ctx)
    if err != nil {
        return err
    }
    for workitem := range workitems
        op := gobatcher.NewOperation(watcher, 100, workitem, true)
        err := batcher.Enqueue(op)
        if err != nil {
            return err
        }
    }
}

Opportunities for improvement

Contributions

Please see our contributor guide.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Microsoft Patterns & Practices, Azure Architecture Center.