Home

Awesome

Batched (Dynamic Batching)

The Batched API provides a flexible and efficient way to process multiple requests in a batch, with a primary focus on dynamic batching of inference workloads. It is designed to optimize throughput while maintaining a low-latency experience, especially useful in scenarios where you need to handle a high volume of requests simultaneously. It is designed for both async and sync execution.

Batch Performance

Table of Contents

Why Dynamic Batching?

Dynamic batching is a technique that automatically groups multiple incoming inference requests into a single batch for processing. This is particularly beneficial for inference workloads, where processing multiple inputs together can significantly improve throughput and efficiency.

In machine learning models, dynamic batching matters because it optimizes hardware utilization, especially for GPUs and specialized AI hardware designed for parallel processing. By batching requests, we can fully leverage this parallel processing, leading to higher throughput. It also reduces overhead by amortizing fixed costs (such as data transfer and model initialization) across multiple requests, improving overall efficiency. Furthermore, dynamic batching enhances real-time performance by adapting to varying request rates, maintaining low latency during quiet periods while maximizing throughput during busy times.

This makes dynamic batching a crucial technique for deploying ML models in production environments where request patterns can be unpredictable and resource optimization is key.

Installation

To install the Batched, you can use pip:

pip install batched

Usage

Basic Example

Below is a basic example of how to use the Batched API to process text data in batches.

   from sentence_transformers import SentenceTransformer
   import numpy as np
+  import batched

   class SentenceEmbedder:
      def __init__(self, model_name='mixedbread-ai/mxbai-embed-large-v1'):
         self.model = SentenceTransformer(model_name)

+     @batched.dynamically
      def embed_sentences(self, sentences: list[str]) -> list[np.ndarray]:
         # Convert sentences to embeddings
         return self.model.encode(sentences)

   # Create an instance of SentenceEmbedder
   embedder = SentenceEmbedder()

   # Embed single sentences
   single_sent = "This is a test sentence."
   embedding = embedder.embed_sentences(single_sent)
+  awaited_embedding = await embedder.embed_sentences.acall(single_sent)

   # Embed a batch of 1000 sentences
   batch_sentences = [f"This is test sentence number {i}." for i in range(1000)]
   batch_embeddings = embedder.embed_sentences(batch_sentences)
+  awaited_batch_embeddings = await embedder.embed_sentences.acall(batch_sentences)

   # Check the statistics
+  stats = embedder.embed_sentences.stats

Advanced Usage

For more advanced usage, such as customizing batch size and timeout dynamically, the Batched API provides decorators that allow fine-grained control over the batching process.

For example:

@batched.dynamically(batch_size=64, timeout_ms=5.0, small_batch_threshold=2)
def custom_batch_function(data):
    # Custom processing logic here
    pass

API Reference

The API offers both thread and asyncio implementations for batching general tasks and inference tasks:

Thread Implementation

import batched


@batched.dynamically(batch_size=64)
def my_function(items: list[int]) -> list[str]:
  # Custom processing logic here
  return [f"{item * 2}" for item in items]

# Sync call with single item
my_function(2)

# Sync call with a batch of items
my_function([2, 3, 4])

# Call with asyncio
await my_function.acall(2)
await my_function.acall([2, 3, 4])

# Support stat checking
print(my_function.stats)
from batched import inference
import torch

@inference.dynamically(pad_token={"input_ids": 0})
def my_inference_function(features: dict[str, torch.Tensor]) -> torch.Tensor:
  # input_ids = features["input_ids"]
  # attention_mask = features["attention_mask"]
  # token_type_ids = features["token_type_ids"]

  logits = model(**features)
  return logits

# Sync call
my_inference_function(data)

# Call with asyncio
await my_inference_function.acall(data)

print(my_inference_function.stats)

Asyncio Implementation

from batched import aio

@aio.dynamically(batch_size=64, timeout_ms=20.0, small_batch_threshold=10)
def my_function(items: list[int]) -> list[int]:  # can also be an async function: async def ...
  # Custom processing logic here
  return [item * 2 for item in items]


# Allow single item
await my_function(2)

# Allow batch of items
await my_function([2, 3, 4])

# Support stat checking
print(my_function.stats)
from batched import aio
import torch

@aio.inference.dynamically(pad_token={"input_ids": 0})
async def my_inference_function(features: dict[str, torch.Tensor]) -> list[torch.Tensor]:
  # input_ids = features["input_ids"]
  # attention_mask = features["attention_mask"]
  # token_type_ids = features["token_type_ids"]

  logits1 = await model1(**features)
  logits2 = await model2(**features)
  return [logits1, logits2]


await my_inference_function(data)

print(my_inference_function.stats)

Contributing

Contributions are welcome! Please feel free to submit a pull request or report an issue on GitHub.

Attribution

This project was inspired by the following projects:

License

This project is licensed under the Apache License, Version 2.0.