Home

Awesome

<div align="center"> <h1><code>fluvio-ex</code></h1>

<strong>💧Elixir client for <a href="https://www.fluvio.io/">Fluvio</a> streaming platform</strong>

<p></p> <p> <a href="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml"><img src="https://github.com/viniarck/fluvio-ex/actions/workflows/unit_tests.yml/badge.svg" alt="unit tests status" /></a> <a href="https://coveralls.io/r/viniarck/fluvio-ex?branch=master"><img src="https://coveralls.io/repos/viniarck/fluvio-ex/badge.svg?branch=master" alt="test coverage" /></a> <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/v/fluvio.svg" alt="hex.pm version" /></a> <a href="https://hex.pm/packages/fluvio"><img src="https://img.shields.io/hexpm/dt/fluvio.svg" alt="hex.pm downloads" /></a> </p> <h3> <a href="https://hexdocs.pm/fluvio/readme.html">Docs</a> </h3> </div>

Installation

You can add fluvio to your list of dependencies in mix.exs:

def deps do
  [
    {:fluvio, "~> 0.2"}
  ]
end

Features

The following Fluvio Rust interfaces are supported on Elixir:

RustElixir
fluvio::PartitionConsumerFluvio.Consumer
fluvio::TopicProducerFluvio.Producer
fluvio::FluvioAdminFluvio.Admin

Each Elixir module tends to provide equivalent functionalities that fluvio crate exposes, although Elixir-oriented to be well integrated with Elixir ecosystem and OTP.

fluvio::FluvioAdmin is minimally supported since provisioning cluster resources is typically done upfront and not at the application level, but for experimentation it's useful to have Fluvio.Admin.

Examples

Consumer

This snippet illustrates a Fluvio.Consumer connected to a "lobby" topic and using an optional SmartModule filter. Initially, it lazily unfolds the infinite stream, taking 4 records and chunking every 2. Finally, it keeps consuming it.

alias Fluvio.Consumer

{:ok, pid} =
  Consumer.start_link(%{
    topic: "lobby",
    offset: [from_beginning: 0],
    smartmodule_path: "./examples/wasm/map_reverse.wasm"
  })

Consumer.stream_unfold(pid)
|> Stream.take(4)
|> Stream.chunk_every(2)
|> Enum.to_list()
|> IO.inspect()

Consumer.stream_each(pid, fn result ->
  case result do
    {:ok, record} -> IO.inspect(record)
    {:error, msg} -> IO.inspect("Error: #{msg}")
  end
end)
MIX_ENV=prod mix run examples/consumer_smartmodule.exs

Producer

This snippet illustrates a Fluvio.Producer connected to a "lobby" topic. Initially, a string value "hello" is sent and flushed synchronously. Also, 20 values are sent asynchronously and flushed in chunks of 10.

alias Fluvio.Producer

{:ok, pid} = Producer.start_link(%{topic: "lobby"})

{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)

[] =
  1..20
  |> Stream.chunk_every(10)
  |> Stream.flat_map(fn chunk ->
    [
      chunk
      |> Enum.map(fn value ->
        Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
      end)
      |> Task.await_many()
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
      [{Producer.flush(pid), :flush}]
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1))
    ]
  end)
  |> Stream.concat()
  |> Enum.to_list()
MIX_ENV=prod mix run examples/producer.exs

Supervised Producer and Consumer

This example demonstrates a ping pong application that uses two pairs of a Fluvio.Producer and Fluvio.Consumer, which have been linked to an Elixir Supervisor (Task.Supervisor) to provide process fault-tolerance inside your app. You could also strategically restart and init with a different Fluvio.Consumer offset.

alias Fluvio.Producer
alias Fluvio.Consumer
alias Fluvio.Record

defmodule App do
  def start(topic_1 \\ "ping", topic_2 \\ "pong", initial_value \\ "1") do
    children = [{Task.Supervisor, name: TaskSup}]
    {:ok, sup_pid} = Supervisor.start_link(children, strategy: :one_for_one)

    {:ok, pid_one} =
      Task.Supervisor.start_child(
        TaskSup,
        fn ->
          {:ok, p1_pid} = Producer.start_link(%{topic: topic_1})
          {:ok, c2_pid} = Consumer.start_link(%{topic: topic_2, offset: [from_end: 0]})
          IO.inspect("Bootstrapping '#{topic_1}' by sending '#{initial_value}'")
          {:ok, _} = Producer.send(p1_pid, initial_value)
          {:ok, _} = Producer.flush(p1_pid)
          App.PingPong.keep_consuming(c2_pid, p1_pid)
        end,
        restart: :permanent
      )

    {:ok, pid_two} =
      Task.Supervisor.start_child(
        TaskSup,
        fn ->
          {:ok, p2_pid} = Producer.start_link(%{topic: topic_2})
          {:ok, c1_pid} = Consumer.start_link(%{topic: topic_1, offset: [from_end: 0]})
          App.PingPong.keep_consuming(c1_pid, p2_pid)
        end,
        restart: :permanent
      )

    {sup_pid, pid_one, pid_two}
  end

  defmodule PingPong do
    defp produce(p_pid, value) do
      IO.inspect("Producing value: '#{value}'")
      {:ok, _} = Producer.send(p_pid, value)
      {:ok, _} = Producer.flush(p_pid)
    end

    defp do_consume({:ok, %Record{value: "4"}}, _c_pid, _p_pid) do
      raise("simulating an unrecoverable error")
    end

    defp do_consume({:ok, record}, _c_pid, p_pid) do
      IO.inspect("Consumed value: '#{record.value}'")
      produce(p_pid, to_string(String.to_integer(record.value) + 1))
    end

    defp do_consume({:error, _msg}, c_pid, _p_pid), do: Process.exit(c_pid, :kill)
    defp do_consume({:stop_next, _}, _c_pid, _p_pid), do: nil

    def keep_consuming(c_pid, p_pid, min_freq_ms \\ 1000) do
      Consumer.stream_each(c_pid, fn result ->
        Process.sleep(min_freq_ms)
        do_consume(result, c_pid, p_pid)
      end)
    end
  end
end

If you were to run this example, you'd see that once the pairs of producer and consumer start, the initial value "1" is sent, and each pair will keep incrementing the value and sending it. Once the value "4" is reached, the ping consumer simulates an unrecoverable error, which will crash this process and the supervisor will restart it:

MIX_ENV=prod iex -S mix
iex(1)> c "examples/ping_pong.exs"
[App, App.PingPong]
iex(2)> App.start()
{#PID<0.220.0>, #PID<0.222.0>, #PID<0.223.0>}


"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"
"Producing value: '3'"
"Consumed value: '3'"
"Producing value: '4'"
iex(3)>
01:31:46.911 [error] Task #PID<0.222.0> started from #PID<0.207.0> terminating
** (RuntimeError) simulating an unrecoverable error
    examples/ping_pong.exs:46: App.PingPong.do_consume/3
    (fluvio 0.2.0) lib/fluvio/consumer.ex:172: Fluvio.Consumer.stream_each/3
    (elixir 1.14.0) lib/task/supervised.ex:89: Task.Supervised.invoke_mfa/2
    (stdlib 4.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<0.131083353/0 in App.start/3>
    Args: []
"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"