Home

Awesome

Flume

Test

Flume is a job processing system backed by GenStage & Redis

Table of Contents

Features

Requirements

Installation

Add Flume to your list of dependencies in mix.exs:

def deps do
  [
    {:flume, github: "scripbox/flume"}
  ]
end

Then run mix deps.get to install Flume and its dependencies.

Usage

Add Flume supervisor to your application's supervision tree:

defmodule MyApplication.Application do
  use Application

  import Supervisor.Spec

  def start(_type, _args) do
    children = [
      # Start Flume supervisor
      supervisor(Flume, [])
    ]

    opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Add config/flume.exs:

config :flume,
  name: Flume,
  # Redis host
  host: "127.0.0.1",
  # Redis port
  port: "6379",
  # Redis keys namespace
  namespace: "my-app",
  # Redis database
  database: 0,
  # Redis pool size
  redis_pool_size: 10,
  # Redis connection timeout in ms (Default 5000 ms)
  redis_timeout: 10_000,
  # Retry backoff intial in ms (Default 500 ms)
  backoff_initial: 30_000,
  # Retry backoff maximum in ms (Default 10_000 ms)
  backoff_max: 36_00_000,
  # Maximum number of retries (Default 5)
  max_retries: 15,
  # Scheduled jobs poll interval in ms (Default 10_000 ms)
  scheduler_poll_interval: 10_000,
  # Time to move jobs from processing queue to retry queue in seconds (Default 600 sec)
  visibility_timeout: 600,
  # ttl of the acquired lock to fetch jobs for bulk pipelines in ms (Default 30_000 ms)
  dequeue_lock_ttl: 30_000,
  # process timeout to fetch jobs for bulk pipelines in ms (Default 10_000 ms)
  dequeue_process_timeout: 10_000,
  # time to poll the queue again if it was locked by another process in ms (Default 500 ms)
  dequeue_lock_poll_interval: 500

Import flume config in config/config.exs as given below:

...
import_config "#{Mix.env()}.exs"
+import_config "flume.exs"

Pipelines

Each pipeline is a GenStage pipeline having these parameters -

Configuration

config :flume,
  pipelines: [
    %{name: "default_pipeline", queue: "default", max_demand: 1000},
  ]

Flume supervisor will start these processes:

                  [Flume.Supervisor]   <- (Supervisor)
                         |
                         |
                         |
              [default_pipeline_producer]   <- (Producer)
                         |
                         |
                         |
          [default_pipeline_producer_consumer]   <- (ProducerConsumer)
                         |
                         |
                         |
         [default_pipeline_consumer_supervisor]   <- (ConsumerSupervisor)
                        / \
                       /   \
                      /     \
             [worker_1]     [worker_2]   <- (Worker Processes)

Enqueuing Jobs

Enqueuing jobs into flume requires these things -

With default function

Flume.enqueue(:queue_name, MyApp.FancyWorker, [arg_1, arg_2])

With custom function

Flume.enqueue(:queue_name, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])

Creating Workers

Worker modules are responsible for processing a job. A worker module should define the function-name with the exact arity used while queuing the job.

defmodule MyApp.FancyWorker do
  def perform(arg_1, arg_2) do
    # your job processing logic
  end
end

Scheduled Jobs

With default function

# 10 seconds
schedule_time = 10_000

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, [arg_1, arg_2])

With custom function

# 10 seconds
schedule_time = 10_000

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])

Rate Limiting

Flume supports rate-limiting for each configured pipeline.

Rate-Limiting has two key parameters -

       Note: When this option is not set, rate limit will be maintained for a pipeline.

rate_limit_count = 1000
rate_limit_scale = 6 * 1000

config :flume,
  pipelines: [
    # This pipeline will process 1000 jobs every 6 seconds
    %{
      name: "promotional_email_pipeline",
      queue: "promotional_email",
      rate_limit_count: rate_limit_count,
      rate_limit_scale: rate_limit_scale,
      rate_limit_key: "email"
    },
    %{
      name: "transactional_email_pipeline",
      queue: "transactional_email",
      rate_limit_count: rate_limit_count,
      rate_limit_scale: rate_limit_scale,
      rate_limit_key: "email"
    }
  ]

OR

config :flume
  pipelines: [
    %{
      name: "webhooks_pipeline",
      queue: "webhooks",
      rate_limit_count: 1000,
      rate_limit_scale: 5000
    }
  ]

Flume will process the configured number of jobs (rate_limit_count) for each rate-limited pipeline, even if we are running multiple instances of our application.

Batch Processing

Flume supports batch-processing for each configured pipeline. It groups individual jobs by the configured batch_size option and each worker process will receive a group of jobs.

config :flume,
  pipelines: [
    # This pipeline will pull (100 * 10) jobs from the queue
    # and group them in batches of 10.
    %{
      name: "batch_pipeline",
      queue: "batch-queue",
      max_demand: 100,
      batch_size: 10
    }
  ]
defmodule MyApp.BatchWorker do
  def perform(args) do
    # args will be a list of arguments
    # E.g - [[job_1_args], [job_2_args], ...]
    # your job processing logic
  end
end

Pipeline Control

Flume has support to pause/resume each pipeline. Once a pipeline is paused, the producer process will stop pulling jobs from the queue. It will process the jobs which are already pulled from the queue.

Refer to "Options" section for supported options and default values.

Pause all pipelines

# Pause all pipelines permanently (in Redis) and asynchronously
Flume.pause_all(temporary: false, async: true)

Pause a pipeline

# Pause a pipeline temporarily (in current node) and asynchronously
Flume.pause(:default_pipeline, temporary: true, async: true)

Resume all pipelines

# Resume all pipelines temporarily (in current node) and synchronously with infinite timeout
Flume.resume_all(temporary: true, async: false, timeout: :infinity)

Resume a pipeline

# Resume a pipeline permanently (in Redis) and synchronously with a 10000 milli-second timeout
Flume.resume(:default_pipeline, temporary: false, async: false, timeout: 10000)

Options

The following options can be used to pause/resume a pipeline

Instrumentation

We use telemetry to emit metrics. Following metrics are emitted:

Writing Tests

To enable mock in the test environment

config/test.exs

config :flume, mock: true

To mock individual test

import Flume.Mock
...
describe "enqueue/4" do
  test "mock works" do
    with_flume_mock do
      Flume.enqueue(:test, List, :last, [[1]])

      assert_receive %{
        queue: :test,
        worker: List,
        function_name: :last,
        args: [[1]]
      }
    end
  end
end

To enable mock for all tests in a module

defmodule ListTest do
  use ExUnit.Case, async: true
  use Flume.Mock

  describe "enqueue/4" do
    test "mock works" do
      Flume.enqueue(:test, List, :last, [[1]])

      assert_receive %{
        queue: :test,
        worker: List,
        function_name: :last,
        args: [[1]]
      }
    end
  end
end

Roadmap

References

Contributing