Home

Awesome

eGo

build Go Reference GitHub go.mod Go version GitHub Release codecov

eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers. Under the hood, ego leverages Go-Akt to scale out and guarantee performant, reliable persistence.

Features

Domain Entity/Aggregate Root

The aggregate root is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the aggregate root. In eGo commands sent the aggregate root are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number and timestamp that can help track it. The aggregate root in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the aggregate root. When running in cluster mode, aggregate root are sharded.

Howto

To define an Aggregate Root, one needs to:

  1. the state of the aggregate root using google protocol buffers message
  2. the various commands that will be handled by the aggregate root
  3. the various events that are result of the command handlers and that will be handled by the aggregate root to return the new state of the aggregate root
  4. implements the EntityBehavior interface.

Events Stream

Every event handled by Aggregate Root are pushed to an events stream. That enables real-time processing of events without having to interact with the events store

Projection

One can add a projection to the eGo engine to help build a read model. Projections in eGo rely on an offset store to track how far they have consumed events persisted by the write model. The offset used in eGo is a timestamp-based offset. One can also:

Events Store

One can implement a custom events store. See EventsStore. eGo comes packaged with two events store:

Offsets Store

One can implement a custom offsets store. See OffsetStore. eGo comes packaged with two offset store:

Cluster

The cluster mode heavily relies on Go-Akt clustering.

Mocks

eGo ships in some mocks

Examples

Check the examples

Installation

go get github.com/tochemey/ego

Sample

package main

import (
  "context"
  "errors"
  "log"
  "os"
  "os/signal"
  "syscall"
  "time"

  "github.com/google/uuid"
  "google.golang.org/protobuf/proto"

  "github.com/tochemey/ego/v3"
  "github.com/tochemey/ego/v3/eventstore/memory"
  samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1"
)

func main() {
  // create the go context
  ctx := context.Background()
  // create the event store
  eventStore := memory.NewEventsStore()
  // connect the event store
  _ = eventStore.Connect(ctx)
  // create the ego engine
  engine := ego.NewEngine("Sample", eventStore)
  // start ego engine
  _ = engine.Start(ctx)
  // create a persistence id
  entityID := uuid.NewString()
  // create an entity behavior with a given id
  behavior := NewAccountBehavior(entityID)
  // create an entity
  _ = engine.Entity(ctx, behavior)

  // send some commands to the pid
  var command proto.Message
  // create an account
  command = &samplepb.CreateAccount{
    AccountId:      entityID,
    AccountBalance: 500.00,
  }
  // send the command to the actor. Please don't ignore the error in production grid code
  reply, _, _ := engine.SendCommand(ctx, entityID, command, time.Minute)
  account := reply.(*samplepb.Account)
  log.Printf("current balance: %v", account.GetAccountBalance())

  // send another command to credit the balance
  command = &samplepb.CreditAccount{
    AccountId: entityID,
    Balance:   250,
  }

  reply, _, _ = engine.SendCommand(ctx, entityID, command, time.Minute)
  account = reply.(*samplepb.Account)
  log.Printf("current balance: %v", account.GetAccountBalance())

  // capture ctrl+c
  interruptSignal := make(chan os.Signal, 1)
  signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
  <-interruptSignal

  // disconnect the event store
  _ = eventStore.Disconnect(ctx)
  // stop the actor system
  _ = engine.Stop(ctx)
  os.Exit(0)
}

// AccountBehavior implements EntityBehavior
type AccountBehavior struct {
  id string
}

// make sure that AccountBehavior is a true persistence behavior
var _ ego.EntityBehavior = (*AccountBehavior)(nil)

// NewAccountBehavior creates an instance of AccountBehavior
func NewAccountBehavior(id string) *AccountBehavior {
  return &AccountBehavior{id: id}
}

// ID returns the id
func (a *AccountBehavior) ID() string {
  return a.id
}

// InitialState returns the initial state
func (a *AccountBehavior) InitialState() ego.State {
  return ego.State(new(samplepb.Account))
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ ego.State) (events []ego.Event, err error) {
  switch cmd := command.(type) {
  case *samplepb.CreateAccount:
    // TODO in production grid app validate the command using the prior state
    return []ego.Event{
      &samplepb.AccountCreated{
        AccountId:      cmd.GetAccountId(),
        AccountBalance: cmd.GetAccountBalance(),
      },
    }, nil

  case *samplepb.CreditAccount:
    // TODO in production grid app validate the command using the prior state
    return []ego.Event{
      &samplepb.AccountCredited{
        AccountId:      cmd.GetAccountId(),
        AccountBalance: cmd.GetBalance(),
      },
    }, nil

  default:
    return nil, errors.New("unhandled command")
  }
}

// HandleEvent handles every event emitted
func (a *AccountBehavior) HandleEvent(_ context.Context, event ego.Event, priorState ego.State) (state ego.State, err error) {
  switch evt := event.(type) {
  case *samplepb.AccountCreated:
    return &samplepb.Account{
      AccountId:      evt.GetAccountId(),
      AccountBalance: evt.GetAccountBalance(),
    }, nil

  case *samplepb.AccountCredited:
    account := priorState.(*samplepb.Account)
    bal := account.GetAccountBalance() + evt.GetAccountBalance()
    return &samplepb.Account{
      AccountId:      evt.GetAccountId(),
      AccountBalance: bal,
    }, nil

  default:
    return nil, errors.New("unhandled event")
  }
}

Versioning

The version system adopted in eGo deviates a bit from the standard semantic versioning system. The version format is as follows:

The versioning will remain like v3.x.x until further notice.

Contribution

Contributions are welcome! The project adheres to Semantic Versioning and Conventional Commits. This repo uses Earthly.

There are two ways you can become a contributor:

  1. Request to become a collaborator, and then you can just open pull requests against the repository without forking it.
  2. Follow these steps

Test & Linter

Prior to submitting a pull request, please run:

earthly +test