Home

Awesome

Change Feed Processor library (v2)

This library helps distributing Azure Cosmos DB change feed events in partitioned collection across multiple observers. Instances of the processor can be scaled up (by adding) or down (by removing) dynamically, with partition load being automatically distributed among active instances in about-equal way.

Releases

Change Feed Processor for .Net is released as NuGet package.

Design highlights

For illustration, let's assume we are processing the change feed from Monitored collection, which is partitioned by city. The arrows in the preceding diagram indicate the current position (continuation) in the change feed.

Change Feed Processor

Usage

The workflow

Example

// Observer.cs
namespace Sample
{
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Documents;
    using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

    class SampleObserver : IChangeFeedObserver
    {
        public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
        {
            return Task.CompletedTask;  // Note: requires targeting .Net 4.6+.
        }

        public Task OpenAsync(IChangeFeedObserverContext context)
        {
             return Task.CompletedTask;
        }

        public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
        {
            Console.WriteLine("ProcessChangesAsync: partition {0}, {1} docs", context.PartitionKeyRangeId, docs.Count);
            return Task.CompletedTask;
        }
    }
}

// Main.cs
namespace Sample
{
    using System;
    using System.Threading.Tasks;
    using Microsoft.Azure.Documents.ChangeFeedProcessor;
    using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

    class ChangeFeedProcessorSample
    {
        public static void Run()
        {
            RunAsync().Wait();
        }

        static async Task RunAsync()
        {
            DocumentCollectionInfo feedCollectionInfo = new DocumentCollectionInfo()
            {
                DatabaseName = "DatabaseName",
                CollectionName = "MonitoredCollectionName",
                Uri = new Uri("https://sampleservice.documents.azure.com:443/"),
                MasterKey = "-- the auth key"
            };

            DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo()
            {
                DatabaseName = "DatabaseName",
                CollectionName = "leases",
                Uri = new Uri("https://sampleservice.documents.azure.com:443/"),
                MasterKey = "-- the auth key"
            };

            var builder = new ChangeFeedProcessorBuilder();
            var processor = await builder
                .WithHostName("SampleHost")
                .WithFeedCollection(feedCollectionInfo)
                .WithLeaseCollection(leaseCollectionInfo)
                .WithObserver<SampleObserver>()
                .BuildAsync();

            await processor.StartAsync();

            Console.WriteLine("Change Feed Processor started. Press <Enter> key to stop...");
            Console.ReadLine();

            await processor.StopAsync();
        }
    }
}

Note on obsolete API

The following v1 API from v1 is is present in v2 for backward compatibility but is marked obsolete. It is recommended to use new API.

Troubleshooting

How to enable tracing

Change Feed Processor library is using LibLog and supports a few log providers out of the box.

Log provider for .Net tracing is provided by the library and needs to be enabled. Follow steps below to enable .Net tracing:

  1. Add the following code to your project:
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider();
using (tracelogProvider.OpenNestedContext(hostName))
{
    LogProvider.SetCurrentLogProvider(tracelogProvider);
    // After this, create IChangeFeedProcessor instance and start/stop it.
}
  1. Do one of the following:
  <system.diagnostics>
    <sharedListeners>
      <add name="file" type="System.Diagnostics.TextWriterTraceListener" initializeData="C:\ChangeFeedProcessorTrace.log" />
    </sharedListeners>
    <sources>
      <source name="ChangeFeedEventHost" switchType="System.Diagnostics.SourceSwitch" switchValue="Information">
        <!-- All, Verbose, Warning, Information, Error, Off -->
        <listeners>
          <clear />
          <add name="file" />
        </listeners>
      </source>
    </sources>
    <trace autoflush="true" useGlobalLock="false" />
  </system.diagnostics>
using System.Diagnostics;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

using (var writer = new StreamWriter(@"C:\ChangeFeedProcessorTrace.log") { AutoFlush = true })
{
    // Create custom TraceSource.
    var traceSource = new TraceSource("Change Feed Processor", SourceLevels.All);
    traceSource.Listeners.Clear();
    traceSource.Listeners.Add(new TextWriterTraceListener(writer));

    // Create TraceLogProvider from TraceSource.
    var tracelogProvider = new TraceLogProvider(traceSource);

    // Continue using TraceLogProvider in the same way as code snippet above.
    var hostName = "SampleHost";
    using (tracelogProvider.OpenNestedContext(hostName))
    {
        LogProvider.SetCurrentLogProvider(tracelogProvider);
        // After this, create IChangeFeedProcessor instance and start/stop it.
    }
}

See also

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.