Home

Awesome

Obvs: an observable microservice bus

observable services, obviously

Join the chat at https://gitter.im/inter8ection/Obvs

.NET

NuGet

Features

Versions/Roadmap

More Details

Extensions

Example

Define a root message type to identify messages as belonging to your service:

public interface IMyServiceMessage : IMessage { }

Create command/event/request/response message types:

public class MyCommand : IMyServiceMessage, ICommand { }

public class MyEvent : IMyServiceMessage, IEvent { }

public class MyRequest: IMyServiceMessage, IRequest { }

public class MyResponse : IMyServiceMessage, IResponse { }

Create your service bus:

IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .Create();

Send commands:

serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());

Publish events:

serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());

Request/response:

serviceBus.Requests
	  .OfType<MyRequest>()
	  .Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));

serviceBus.GetResponses(new MyRequest())
	  .OfType<MyResponse>()
	  .Take(1)
	  .Timeout(TimeSpan.FromSeconds(1))
	  .Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));

Define custom endpoints that can wrap API calls or integrations with other systems:

public class MyCustomEndpoint : IServiceEndpointClient
	{
    	Type _serviceType = typeof(IMyCustomServiceMessage);

    	public IObservable<IEvent> Events
    	{
        		get
        		{
            		// subscribe to external MQ broker
        		}
    	}

    	public Task SendAsync(ICommand command)
    	{
        		// call external API
    	}

    	public IObservable<IResponse> GetResponses(IRequest request)
    	{
        		// call external API and wrap response in observable
    	}

    	public bool CanHandle(IMessage message)
    	{
        		return _serviceType.IsInstanceOfType(message);
    	}
	}
	
...

IServiceBus serviceBus = ServiceBus.Configure()
      .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
  .WithEndpoints(new MyCustomEndpoint())
    .Create();

Run Examples in Docker

cd examples
docker-compose up

cd client
dotnet run -f netcoreapp3.1