Home

Awesome

Namotion.Messaging

Storage | Messaging | Reflection

Azure DevOps Azure DevOps Discord

<img align="left" src="https://raw.githubusercontent.com/RicoSuter/Namotion.Reflection/master/assets/Icon.png" width="48px" height="48px">

The Namotion.Messaging .NET libraries provide abstractions and implementations for message brokers, event queues and data ingestion services.

By programming against a messaging abstraction you enable the following scenarios:

Usage

To listen for messages, create a new message receiver for a specific implementation and call the ListenWithRetryAsync() method (this will newer return):

IMessageReceiver receiver = ServiceBusMessageReceiver
	.Create("MyConnectionString", "myqueue");

await receiver.ListenAsync(async (messages, ct) =>
{
    ...
}, CancellationToken.None);

In another process or thread you can then publish messages to this listener:

IMessagePublisher publisher = ServiceBusMessagePublisher
    .Create(configuration["ServiceBusConnectionString"], "myqueue");

await publisher.PublishAsync(new Message(content: new byte[] { 1, 2, 3 }));

However, you should host the listener with a .NET generic host which provides dependency injection, configuration and logging. To use the IMessageReceiver in a simple command line application (.NET Generic Host), implement a new BackgroundService and start message processing in ExecuteAsync:

public class MyBackgroundService : BackgroundService
{
    private readonly IMessageReceiver _messageReceiver;
    private readonly ILogger _logger;

    public MyBackgroundService(IMessageReceiver messageReceiver, ILogger logger)
    {
        _messageReceiver = messageReceiver;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _messageReceiver.ListenWithRetryAsync(async (messages, ct) =>
        {
            foreach (var message in messages)
            {
                try
                {
                    // TODO: Process message

                    await _messageReceiver.ConfirmAsync(message, ct);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, $"Error while processing {nameof(MyMessage)} message.");
                    await _messageReceiver.RejectAsync(message, ct);
                }
            }
        }, stoppingToken);
    }
}

In your program's Main method, create a new HostBuilder and add the background service as a hosted service:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .ConfigureServices(services => 
        {
            var receiver = ServiceBusMessageReceiver.Create("MyConnectionString", "myqueue");
            services.AddSingleton<IMessageReceiver>(receiver);
            services.AddHostedService<MyBackgroundService>();
        })
        .Build();

    await host.RunAsync();
}

Extensions

Behavior extensions, for example custom dead letter queues or large message handling, is achieved with interceptors which wrap publisher and receiver methods with custom code. These interceptors are added with the With* extension methods. Custom interceptors can be implemented with the MessagePublisher<T> and MessageReceiver<T> classes.

Core packages

Namotion.Messaging.Abstractions

Nuget Apimundo

Contains the messaging abstractions, mainly interfaces with a very small footprint and extremely stable contracts:

The idea behind the generic interfaces is to allow multiple instance registrations, read Dependency Injection in .NET: A way to work around missing named registrations for more information.

Namotion.Messaging

Nuget Apimundo

Contains common helper methods and base implementations of the abstractions:

Extension methods to enhance or modify instances:

Other extension methods:

Namotion.Messaging.Storage

Nuget Apimundo

Extension methods to enhance or modify instances:

Dependencies:

Namotion.Messaging.Json

Nuget Apimundo

Provides extension methods on IMessagePublisher<T> and IMessageReceiver<T> to enable JSON serialization for messages:

Send a JSON encoded message:

var publisher = ServiceBusMessagePublisher
    .Create("MyConnectionString", "myqueue")
    .AsPublisher<OrderCreatedMessage>();

await publisher.PublishAsJsonAsync(new OrderCreatedMessage { ... });

Receive JSON encoded messages:

var receiver = ServiceBusMessageReceiver
    .Create("MyConnectionString", "myqueue")
    .AsReceiver<OrderCreatedMessage>();

await receiver.ListenAndDeserializeJsonAsync(async (messages, ct) => 
{
    foreach (OrderCreatedMessage message in messages.Select(m => m.Object))
    {
        ...
    }

    await receiver.ConfirmAsync(messages, ct);
});

Implementation packages

The following packages should only be used in the head project, i.e. directly in your application bootstrapping project where the dependency injection container is initialized.

Azure<br /> Service BusAzure<br /> Event HubAzure<br /> Storage QueueRabbitMQAmazon SQSInMemory
PublishAsync:heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark:
ListenAsync:heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark:
GetMessageCountAsync:x::x::heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark:
KeepAliveAsync:heavy_check_mark::heavy_minus_sign: (1.):heavy_check_mark::x::heavy_check_mark::heavy_minus_sign:
ConfirmAsync:heavy_check_mark::heavy_minus_sign: (1.):heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_minus_sign:
RejectAsync:heavy_check_mark::heavy_minus_sign: (1.):heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark:
DeadLetterAsync:heavy_check_mark::x: (2.):x: (2.):x: (2.):x: (2.):heavy_check_mark:
User properties:heavy_check_mark::heavy_check_mark::x: (3.):heavy_check_mark::heavy_check_mark::heavy_check_mark:
  1. Because Event Hub is stream based and not transactional, these method calls are just ignored.
  2. Use receiver.WithDeadLettering(publisher) to enable dead letter support.
  3. Use receiver.WithPropertiesInContent() to enable user properties support (not implemented yet).

:heavy_minus_sign: = Noop/Ignored

Namotion.Messaging.Azure.ServiceBus

Nuget Apimundo

Implementations:

Behavior:

Dependencies:

Namotion.Messaging.Azure.EventHub

Nuget Apimundo

Implementations:

Behavior:

Dependencies:

Namotion.Messaging.Azure.Storage.Queue

Nuget Apimundo

Implementations:

Behavior:

Dependencies:

Namotion.Messaging.RabbitMQ

Nuget Apimundo

Implementations:

Behavior:

Dependencies:

Namotion.Messaging.Amazon.SQS

Nuget Apimundo

Implementations:

Behavior:

Dependencies: