Home

Awesome

<a href="https://cognite.com/"> <img src="https://images.squarespace-cdn.com/content/5bd167cf65a707203855d3c0/1540463676940-6USHZRRF36KCAZLUPM2P/Logo-H.jpg?format=300w&content-type=image%2Fjpeg" alt="Cognite logo" title="Cognite" align="right" height="40" /> </a>

.Net Utilities for Building Cognite Extractors

Build and Test Release codecov Nuget

A library containing utilities for building extractors in .Net.

Documentation

Installation

The Cognite Extractor Utils can be downloaded from NuGet.

To create a console application and add the library:

Using .NET CLI:

mkdir NewExtractor
cd NewExtractor
dotnet new console
dotnet add package Cognite.ExtractorUtils

Quickstart

Create a config.yml file containing the extractor configuration

version: 1

logger:
    console:
        level: "debug"

metrics:
    push-gateways:
      - host: "http://localhost:9091"
        job: "extractor-metrics"

cognite:
    project: ${COGNITE_PROJECT}
    # This is for microsoft as IdP, to use a different provider,
    # set implementation: Basic, and use token-url instead of tenant.
    # See the example config for the full list of options.
    idp-authentication:
        # Directory tenant
        tenant: ${COGNITE_TENANT_ID}
        # Application Id
        client-id: ${COGNITE_CLIENT_ID}
        # Client secret
        secret: ${COGNITE_CLIENT_SECRET}
        # List of resource scopes, ex:
        # scopes:
        #   - scopeA
        #   - scopeB
        scopes:
          - ${COGNITE_SCOPE}

See the example configuration for a full example with all available options.

Set the COGNITE_PROJECT, COGNITE_TENANT_ID, COGNITE_CLIENT_ID, COGNITE_CLIENT_SECRET, and COGNITE_SCOPE environment variables. Set the metrics tag, only if collecting metrics is required by the extractor. If using a Prometheus pushgateway, set hostto a valid endpoint.

The easiest way to use the library utilities is by using the BaseExtractor class. The following is a working implementation of an extractor writing a sine wave to CDF.

using Microsoft.Extensions.DependencyInjection;
using Cognite.Extractor.Utils;
using Cognite.Extensions;
using CogniteSdk;

class MyExtractor : BaseExtractor<BaseConfig>
{
    public MyExtractor(BaseConfig config, CogniteDestination destination)
        : base(config, destination)
    {
    }
    
    protected override async Task Start() 
    {
        await Destination.EnsureTimeSeriesExistsAsync(new[]
        {
            new TimeSeriesCreate {
                ExternalId = "sine-wave",
                Name = "Sine Wave"
            }
        }, RetryMode.OnError, SanitationMode.Clean, Source.Token);
        CreateTimeseriesQueue(1000, TimeSpan.FromSeconds(1), null);
        ScheduleDatapointsRun("datapoints", TimeSpan.FromMilliseconds(100), token =>
        {
            var dp = (
                Identity.Create("sine-wave"),
                new Datapoint(DateTime.UtcNow, Math.Sin(DateTime.UtcNow.Ticks))
            );
            return Task.FromResult<IEnumerable<(Identity, Datapoint)>>(new [] { dp });
        });
    }
}

// Then, in the Main() method:
class Program
{
    static async Task Main()
    {
        await ExtractorRunner.Run<BaseConfig, MyExtractor>(
            "config.yml",
            new[] { 1 },
            "my-extractor",
            "myextractor/1.0.0",
            addStateStore: false,
            addLogger: true,
            addMetrics: true,
            restart: true,
            CancellationToken.None);
    }
}

Inserting data points:

// Create a dictonary of time series identities and datapoint objects
datapoints = new Dictionary<Identity, IEnumerable<Datapoint>>() {
    { new Identity("externalId1"), new Datapoint[] { new Datapoint(DateTime.UtcNow, "A")}},
    { new Identity("externalId2"), new Datapoint[] { new Datapoint(DateTime.UtcNow, 1), 
                                                     new Datapoint(DateTime.UtcNow, 2)}},
    { new Identity(12345789), new Datapoint[] { new Datapoint(DateTime.UtcNow, 1)}}}
};

// Insert the data points, ignoring and returning any errors.
var errors = await destination.InsertDataPointsIgnoreErrorsAsync(
    datapoints,
    cancellationToken);
if (errors.IdsNotFound.Any() || errors.IdsWithMismatchedData.Any())
{
    logger.LogError("Ids not found: {NfIds}. Time series with mismatched type: {MmIds}",
        errors.IdsNotFound, errors.IdsWithMismatchedData);
}

Using Raw upload queues:

// Data type object representing raw columns
private class ColumnsDto
{
    public string Name { get; set; }
    public int Number { get; set; }
}

// Creates an queue that uploads rows to Raw every 5 seconds (or when the queue size reaches 1.000)
using (var queue = destination.CreateRawUploadQueue<ColumnsDto>("myDb", "myTable", TimeSpan.FromSeconds(5), 1_000,
    result => { // handle result of upload here }))
{
    // Task to generate rows at regular intervals
    var enqueueTask = Task.Run(async () => {
        while (index < 2_000)
        {
            queue.EnqueueRow($"r{index}", new ColumnsDto {Name = "Test", Number = index});
            await Task.Delay(50, cancellationToken);
            index++;
        }
    });
    
    // Task to start the upload queue
    var uploadTask = queue.Start(cancellationToken);

    // wait for either the enqueue task to finish or the upload task to fail
    var t = Task.WhenAny(uploadTask, enqueueTask);
    await t;
    logger.LogInformation("Enqueueing task completed. Disposing of the upload queue");
} // disposing the queue will upload any rows left and stop the upload loop

Using the State Store

services.AddStateStore();

using (var provider = services.BuildServiceProvider()) {
    var stateStore = provider.GetRequiredService<IExtractionStateStore>();
    var destination = provider.GetRequiredService<CogniteDestination>();
    
    // Create a state for a node
    var myState = new BaseExtractionState("myState");
    var states = new [] { myState };
    var stateDict = states.ToDictionary(state => state.Id);
    
    await stateStore.RestoreExtarctionState(stateDict, "someTableorcollection", cancellationToken)
   
    // After uploading points cdf, update the ranges in your state
    var now = DateTime.UtcNow;
    var datapoints = new Dictionary<Identity, IEnumerable<Datapoint>>() {
        { new Identity("myState"), new Datapoint[] { new Datapoint(now - TimeSpan.FromHours(2), "B"), new Datapoint(now, "A")}}}

    await destination.InsertDataPointsIgnoreErrorsAsync(datapoints, cancellationToken);
    
    myState.UpdateDestinationRanges(now - TimeSpan.FromHours(2), now);
    
    await stateStore.StoreExtractionState(states, "someTableorcollection", cancellationToken);
    // If the extractor stops here, the state is saved and can be restored after restart.
}

Using the installer template

The installer template is mostly configured from the command live and the config .json file, but more advanced changes may require modifying the project files. Feel free to copy the installer project and modifying it to your needs. The following instructions produce a single file executable for a .NET core project. It assumes that you have a config file named config.example.yml somewhere.

build.ps1 -b Path\To\MSBuild.exe -v 1.0.0 -d "some description" -c Path\To\setup-config.json

The following configuration parameters are used by default:

See the ExampleExtractorSetup project in this repository for a full example.

In general the setup template assumes that this is a cognite product, but changing this is no more difficult than replacing instances of Cognite in Product.wxs with whatever suits your purposes.

Modifying the installer template

Adding to the installer template is relatively easy. New builds can be added in the <Target Name="BeforeBuild"> block in SetupTemplate.wixproj, these should output to new folders. New files going in the bin/ folder can be added to <ComponentGroup Id="ExecutableComponentGroup">. Note that the executable is duplicated here due to conditionals on service. New components can be added after <?endif?>.

New folders can be added by adding new Directory tags in the first Fragment, a new ComponentGroupRef at the bottom of Product, and a new ComponentGroup somewhere in the last Fragment.

Code of Conduct

This project follows https://www.contributor-covenant.org

License

Apache v2, see LICENSE.

Contributing

Due to restrictions on integration tests, PRs from external forks should be merged into the integration branch.

The project requires test coverage, if your change adds code, make sure to create a test for it.

External commits are merged. PRs should have clean commit history with descriptive commit messages.

Style