Home

Awesome

Opus

Build Status Package Version Coverage Status

Livebook badge

A framework for pluggable business logic components.

example-image

Installation

The package can be installed by adding opus to your list of dependencies in mix.exs:

def deps do
  [{:opus, "~> 0.8"}]
end

Documentation

Conventions

Usage

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step  :add_one,         with: &(&1 + 1)
  check :even?,           with: &(rem(&1, 2) == 0), error_message: :expected_an_even
  tee   :publish_number,  if: &Publisher.publishable?/1, raise: [ExternalError]
  step  :double,          if: :lucky_number?
  step  :divide,          unless: :lucky_number?
  step  :randomize,       with: &(&1 * :rand.uniform)
  link  JSONPipeline

  def double(n), do: n * 2
  def divide(n), do: n / 2
  def lucky_number?(n) when n in 42..1337, do: true
  def lucky_number?(_), do: false
end

ArithmeticPipeline.call(41)
# {:ok, 84.13436750126804}

Read this blogpost to get started.

Pipeline

The core aspect of this library is defining pipeline modules. As in the example above you need to add use Opus.Pipeline to turn a module into a pipeline. A pipeline module is a composition of stages executed in sequence.

Stages

There are a few different types of stages for different use-cases. All stage functions, expect a single argument which is provided either from initial call/1 of the pipeline module or the return value of the previous stage.

An error value is either :error or {:error, any} and anything else is considered a success value.

Step

This stage processes the input value and with a success value the next stage is called with that value. With an error value the pipeline is halted and an {:error, any} is returned.

Check

This stage is intended for validations.

This stage calls the stage function and unless it returns true it halts the pipeline.

Example:

defmodule CreateUserPipeline do
  use Opus.Pipeline

  check :valid_params?, with: &match?(%{email: email} when is_bitstring(email), &1)
  # other stages to actually create the user
end

Tee

This stage is intended for side effects, such as a notification or a call to an external system where the return value is not meaningful. It never halts the pipeline.

Link

This stage is to link with another Opus.Pipeline module. It calls call/1 for the provided module. If the module is not an Opus.Pipeline it is ignored.

Skip

The skip macro can be used for linked pipelines. A linked pipeline may act as a true bypass, based on a condition, expressed as either :if or :unless. When skipped, none of the stages are executed and it returns the input, to be used by any next stages of the caller pipeline. A very common use-case is illustrated in the following example:

defmodule RetrieveCustomerInformation do
  use Opus.Pipeline

  check :valid_query?
  link FetchFromCache,    if: :cacheable?
  link FetchFromDatabase, if: :db_backed?
  step :serialize
end

With skip it can be written as:

defmodule RetrieveCustomerInformation do
  use Opus.Pipeline

  check :valid_query?
  link FetchFromCache
  link FetchFromDatabase
  step :serialize
end

A linked pipeline becomes:

defmodule FetchFromCache do
  use Opus.Pipeline

  skip :assert_suitable, if: :cacheable?
  step :retrieve_from_cache
end

Available options

The behaviour of each stage can be configured with any of the available options:

Retries

defmodule ExternalApiPipeline do
  use Opus.Pipeline

  step :http_request, retry_times: 8, retry_backoff: fn -> linear_backoff(10, 30) |> cap(100) end

  def http_request(_input) do
    # code for the actual request
  end
end

The above module, will retry be retried up to 8 times, each time applying a delay from the next value of the retry_backoff function, which returns a Stream.

All the functions from the :retry package will be available to be used in retry_backoff.

Stage Filtering

You can select the stages of a pipeline to run using call/2 with the :except and :only options.

Example:

# Runs only the stage with the :validate_params name
CreateUserPipeline.call(params, only: [:validate_params]

# Runs all the stages except the selected ones
CreateUserPipeline.call(params, except: :send_notification)

Instrumentation

Instrumentation hooks which can be defined:

You can disable all instrumentation callbacks for a stage using instrument?: false.

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :double, instrument?: false
end

You can define module specific instrumentation callbacks using:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :double, with: &(&1 * 2)
  step :triple, with: &(&1 * 3)

  instrument :before_stage, fn %{input: input} ->
    IO.inspect input
  end

  # Will be called only for the matching stage
  instrument :stage_completed, %{stage: %{name: :triple}}, fn %{time: time} ->
    # send to the monitoring tool of your choice
  end
end

You can define a default instrumentation module for all your pipelines by adding in your config/*.exs:

config :opus, :instrumentation, YourModule

# but you may choose to provide a list of modules
config :opus, :instrumentation, [YourModuleA, YourModuleB]

An instrumentation module has to export instrument/3 functions like:

defmodule CustomInstrumentation do
  def instrument(:pipeline_started, %{pipeline: ArithmeticPipeline}, %{input: input}) do
    # publish the metrics to specific backend
  end

  def instrument(:before_stage, %{stage: %{pipeline: pipeline}}, %{input: input}) do
    # publish the metrics to specific backend
  end

  def instrument(:stage_completed, %{stage: %{pipeline: ArithmeticPipeline}}, %{time: time}) do
    # publish the metrics to specific backend
  end

  def instrument(:pipeline_completed, %{pipeline: ArithmeticPipeline}, %{result: result, time: total_time}) do
    # publish the metrics to specific backend
  end

  def instrument(_, _, _), do: nil
end

Telemetry

Opus includes an instrumentation module which emits events using the :telemetry library.
To enable it, change your config/config.exs with:

config :opus, :instrumentation, [Opus.Telemetry]

Browse the available events here.

For instructions to integrate Opus Telemetry metrics in your Phoenix application, read this post.

Module-Global Options

You may choose to provide some common options to all the stages of a pipeline.

defmodule ArithmeticPipeline do
  use Opus.Pipeline, instrument?: false, raise: true
  # The pipeline opts will disable instrumentation for this module
  # and will not rescue exceptions from any of the stages

  step :double, with: &(&1 * 2)
  step :triple, with: &(&1 * 3)
end

Graph

You may visualise your pipelines using Opus.Graph:

Opus.Graph.generate(:your_app)
# => {:ok, "Graph file has been written to your_app_opus_graph.png"}

:exclamation: This feature requires the opus_graph package to be installed, add it in your mix.exs.

defp deps do
  {:opus_graph, "~> 0.1", only: [:dev]}
end

Setup

First make sure to add graphvix to your dependencies:

# in mix.exs

defp deps do
  [
    {:opus, "~> 0.5"},
    {:graphvix, "~> 0.5", only: [:dev]}
  ]
end

This feature uses graphviz, so make sure to have it installed. To install it:

# MacOS

brew install graphviz
# Debian / Ubuntu

apt-get install graphviz

Opus.Graph is in fact a pipeline and its visualisation is:

graph-png

You can customise the visualisation:

Opus.Graph.generate(:your_app, %{filetype: :svg})
# => {:ok, "Graph file has been written to your_app_opus_graph.svg"}

Read the available visualisation options here.

Influences

Press

Using Opus in your company / project?
Let us know by submitting an issue describing how you use it.

License

Copyright (c) 2018 Dimitris Zorbas, MIT License. See LICENSE.txt for further details.