Awesome
<p align="center"> <img alt="Machine" height="125" src="https://raw.githubusercontent.com/whitaker-io/machine/master/docs/static/Black-No-BG.png"> </p>Machine
is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.
Implementations of the EdgeProvider
and Edge
interfaces can be used for fan-out so that large volumes of data can be processed by multiple nodes.
type EdgeProvider[T Identifiable] interface {
New(name string, option *Option[T]) Edge[T]
}
type Edge[T Identifiable] interface {
OutputTo(ctx context.Context, channel chan []T)
Input(payload ...T)
}
Examples of using both AWS SQS and Google Pub/Sub coming soon!
The main function types are:
// Applicative is a function that is applied on an individual
// basis for each Packet in the payload. The resulting data replaces
// the old data
type Applicative[T Identifiable] func(d T) T
// Combiner is a function used to combine a payload into a single Packet.
type Combiner[T Identifiable] func(payload []T) T
// Filter is a function that can be used to filter the payload.
type Filter[T Identifiable] func(d T) FilterResult
// Comparator is a function to compare 2 items
type Comparator[T Identifiable] func(a T, b T) int
// Window is a function to work on a window of data
type Window[T Identifiable] func(payload []T) []T
// Remover func that is used to remove Data based on a true result
type Remover[T Identifiable] func(index int, d T) bool
These all implement the Component
interface, which can be use to provide a Vertex
instance. The Vertex
type is the main building block of a
Stream
. The builder method creates these under the covers, and they can be used individually for testing of single operations.
// Component is an interface for providing a Vertex that can be used to run individual components on the payload.
type Component[T Identifiable] interface {
Component(e Edge[T]) Vertex[T]
}
Installation
Add the primary library to your project
go get -u github.com/whitaker-io/machine
Example
Basic receive
-> process
-> send
Flow
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/whitaker-io/machine"
)
type kv struct {
name string
value int
}
func (i *kv) ID() string {
return i.name
}
func deepcopyKV(k *kv) *kv { return &kv{name: k.name, value: k.value} }
func main() {
ctx, cancel := context.WithCancel(context.Background())
stream := machine.NewWithChannels(
"test_stream",
&Option[*kv]{
DeepCopy: deepcopyKV,
FIFO: false,
BufferSize: 0,
},
)
input := make(chan []*kv)
out := make(chan []*kv)
go func() {
input <- someData //get some data from somewhere
}()
// this is a very simple example try experimenting
// with more complex flows including loops and filters
stream.Builder().
Map(
func(m *kv) *kv {
// do some processing
return m
},
).OutputTo(out)
if err := streamm.StartWith(ctx, input); err != nil {
fmt.Println(err)
}
go func() {
Loop:
for {
select {
case <-ctx.Done():
break Loop
case data := <-out:
//handle the processed data
}
}
}()
// run until SIGTERM
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
cancel()
// give some time for a graceful shutdown
<-time.After(time.Second * 2)
}
🤝 Contributing
Contributions, issues and feature requests are welcome.<br /> Feel free to check issues page if you want to contribute.<br /> Check the contributing guide.<br />
Author
👤 Jonathan Whitaker
- Twitter: @io_whitaker
- Github: @jonathan-whitaker
Show your support
Please ⭐️ this repository if this project helped you!
License
Machine is provided under the MIT License.
The MIT License (MIT)
Copyright (c) 2020 Jonathan Whitaker