Home

Awesome

Amazon Kinesis Aggregators


This project is now deprecated, and only updates for security vulnerabilities in dependencies will be made. We advise the use of Apache Flink on Amazon Kinesis Analytics instead.


Amazon Kinesis Aggregators is a Java framework that enables the automatic creation of real-time aggregated time series data from Amazon Kinesis streams.

You can use this data to answer questions such as ‘how many times per second has ‘x’ occurred’ or ‘what was the breakdown by hour over the day of the streamed data containing ‘y'. Using this framework, you simply describe the format of the data on your stream (CSV, JSON, and so on), the granularity of times series that you require (seconds, minutes, hours, and so on), and how the data elements that are streamed should be grouped; the framework handles all the time series calculations and data persistence. You then simply consume the time series aggregates in your application using Amazon DynamoDB, or interact with the time series using Amazon CloudWatch or the Web Query API.

You can also analyze the data using Hive on Amazon Elastic MapReduce, or bulk import it to Amazon Redshift. The process runs as a standalone Amazon Kinesis-enabled application which only requires configuration, or can be integrated into existing Amazon Kinesis applications.

The data is stored in a time series based on how you aggregate it. A dataset aggregating Telecoms Call Data Records in DynamoDB might look like this:

Dynamo Real Time Aggregate Table

The corresponding data in CloudWatch would look like this:

CloudWatch Dashboard View

Building Aggregators

Amazon Kinesis Aggregators is built using Apache Maven. To build, simply run Maven from the amazon-kinesis-aggregators directory. The target directory contains the following build artifacts:

Running Aggregators

Amazon Kinesis Aggregators ships with several deployment options, which should enable you to run with minimal operational overhead while also accommodating advanced deployment use cases. You can run Amazon Kinesis Aggregators as:

Running Amazon Kinesis Aggregators Using Elastic Beanstalk

Amazon Kinesis Aggregators compiles a web application archive (WAR) file, which enables easy deployment on Java application servers, such as Apache Tomcat, using Elastic Beanstalk (http://aws.amazon.com/elasticbeanstalk). Amazon Kinesis Aggregators also includes configuration options that instruct Elastic Beanstalk to scale the application on CPU load, which is typically the bottleneck for applications as they scale up. This is the recommended deployment method.

To deploy Amazon Kinesis Aggregators as an Elastic Beanstalk application, start by creating a new Elastic Beanstalk web server application with the pre-configured Tomcat stack. When prompted by the AWS Management Console, upload the KinesisAggregators.war file from your local build. Select an instance type that is suitable for the type of aggregation that you are running (specifically, the higher the granularity of label items and the more fine-grained the TimeHorizon value, the larger the instance type you require). After deployment, click the URL for the application environment; the following message is displayed:

OK - Kinesis Aggregators Managed Application hosted in Elastic Beanstalk Online

Furthermore, if you request a log snapshot from the Elastic Beanstalk console, you see a log line indicating the following:

No Aggregators Configuration File found in Beanstalk Configuration config-file-url. Application is Idle

This indicates that the application is deployed but not configured. To configure the application, add these Elastic Beanstalk configuration parameters as required:

This is typically done by adding -D flags to the JVM command line options. Then, choose 'Save' and Elastic Beanstalk applies the changes to the environment. Wait a minute or so, and then snapshot logs to confirm that Amazon Kinesis Aggregators is running.

Running the Managed Java Client Application

This is a great option if you have data in Amazon Kinesis, but don’t want to use Elastic Beanstalk. You can start the application from a server using the following command:

java -cp AmazonKinesisAggregators.jar-complete.jar -Dconfig-file-path=<configuration> -Dstream-name=<stream name> -Dapplication-name=<application name> -Dregion=<region name - us-east-1, eu-west-1, etc> com.amazonaws.services.kinesis.aggregators.consumer.AggregatorConsumer

In addition to the configuration items outlined in the Elastic Beanstalk section, use the following configuration item:

We recommend that you run your servers in an Auto Scaling group to ensure fault tolerance if the host fails.

Configuration

You can use the configuration file to create one or more aggregations against the same stream. It is a JSON file that creates a set of aggregator objects managed by the framework. Create one aggregator for each distinct label that you want to aggregate on. Each aggregator can then have its own properties of time granularity, aggregator type, and so on.

The core structure of the configuration file is an array of aggregator objects. For example, the following configuration creates two aggregators:

[{aggregatorDef1}, {aggregatorDef2}]

Note that aggregatorDefN is an aggregator configuration. An aggregator configuration must include the following attributes:

You can also include the following options in the configuration:

Summary Items Mini-Language

You can configure summary items and the type of summary using a miniature specification language, and navigate complex document structures in JSON data. You can apply the following type of summary transformations:

Summary items can have aliases applied, as in SQL, to control the name of the generated attribute in the data store you write to. You simply add the name of the item you require to the definition of the summary item, including functions.

You can also navigate an entity structure in a JSON-formatted stream data using dot notation; for example, given the following object, you can access the calculated duration using a summary item of 'timeValues.durations.calculated':

{
  "name": "Object To Be Aggregated",
  "timeValues": {
    "durations": {
      "calculated": 60,
      "recorded": 58
    },
    "endTime": "01/01/1970 01:00:00",
    "startTime": "01/01/1970 00:00:00"
  }
}

These concepts can be combined into a mini-specification:

Example 1 - Calculate the min, max, and sum of value 7 in a CSV stream, giving them friendly names - ["min(7) min-purchase-price","max(7) max-purchase-price","sum(7) total-sales]"

Example 2 - Calculate the sum and maximum value of the calculated duration in the JSON stream - ["sum(timeValues.durations.calculated)","max(timeValues.durations.calculated)"]

Sample Configurations

Aggregator Data Structure

The data structure for aggregated data is arranged as a hash/range table in DynamoDB on the Label attributes and Date attribute at the configured granularity of time. Every table also includes the following:

Of course, the table also includes any summary values that were added to the aggregator configuration. The format of these summary attributes in DynamoDB follow the pattern <attribute>-<summary type>, or use the alias provided.

Indexes

All aggregator data stores have global secondary indexes (logically) on the date value and on lastWriteSeq. To ensure adequate write performance, these indexes are structured as hash/range on the scatterPrefix (a random number between 0 and 99) and the value is indexed.

Web-based Query API

The Amazon Kinesis Aggregators web application also provides several query API operations, which return data in the JSON format. When deployed, you can make an HTTP request to a variety of endpoints to retrieve different types of data. Currently, there is no security offered for the Web API operations, so you must ensure that they are only accessible from within your VPC using security group rules or similar. Do NOT make these endpoints publicly accessible.

Viewing the Running Configuration

You can view the configuration of your aggregators at the URL <web application>/configuration, which returns an object such as:

{
  "application-name": "EnergyRealTimeDataConsumer",
  "config-file-url": "s3://mybucket/kinesis/sensor-consumer-regex.json",
  "environment": null,
  "failures-tolerated": null,
  "max-records": "2500",
  "position-in-stream": "LATEST",
  "region": "eu-west-1",
  "stream-name": "EnergyPipelineSensors",
  "version": ".9.2.7.4"
}
Date-based Queries

Use the Date query to find data that has been aggregated on the basis of the stream timestamp value. For example, use this interface to periodically retrieve all new data that has been processed, or to pull data for specific time ranges for comparative analysis. The URL is:

<web application>/dateQuery?params

Parameters:

This returns all data from the aggregated table for the date period specified.

You can also use the internal Java API:

public List<Map<String, AttributeValue>> queryByDate(Date dateValue, TimeHorizon h,
ComparisonOperator comp, int threads) throws Exception

This method queries by the Date, TimeHorizon, and ComparisonOperator values you select. For example, to find all hourly aggregates after 3pm, use:

dateValue=Date('2014-01-01 15:00:00'), TimeHorizon.HOUR, ComparisonOperator.GT

The Threads parameter is the number of threads used to do the query. This is due to the index being organized on hash/range of scatterPrefix/DateValue.

Query for Label/Date Values

To query the application to find the unique set of labels and date values that have been aggregated, use the following URL:

<web application>/keyQuery?params

Parameters:

This returns a unique list of all keys from the aggregated table.

You can also use the internal Java API:

public Map<String, AttributeValue> queryValue(String label, Date dateValue, TimeHorizon h)
throws Exception

This method takes the label you are interested in, as well as a date for the date value. If you have multiple TimeHorizon values configured on the aggregator, it generates the correct dateValue to query the underlying table with. You are likely to use this interface to query across aggregator data stores looking for related time-based values.

Integrating Aggregators into Existing Java Applications

In addition to running aggregators as stand-alone Amazon Kinesis applications, you can integrate them into existing Amazon Kinesis applications. You can:

Managed IRecordProcessorFactory

To build your Amazon Kinesis worker and configure it explicitly, you can still use aggregators to create IRecordProcessorFactory. In this case, simply create a new instance of com.amazonaws.services.kinesis.aggregators.processor.AggregatorProcessorFactory with the configured aggregators.

Integration with Existing IRecordProcessors

If you have an existing worker application and you simply want to add the aggregation capability, you can directly integrate with one or more aggregators. To do this, simply construct the aggregators using a configuration file, or using a pure Java configuration. Then, to inject new data into the aggregator, simply call:

void aggregate(List<record> records)

This causes the time series calculations to be done based upon the configuration of the aggregators. Then, when your worker normally calls checkpoint(), also call:

void checkpoint()

This flushes the in-memory time series state to the backing data store. You must ensure that the aggregators are initialized correctly against the shard for the worker by calling this method in the existing KCL Application IRecordProcessor initialize() method:

void initialize(String shardId)

You must also ensure that if the shutdown() method is invoked on your Amazon Kinesis application, you call:

void shutdown(boolean flushState) If the shutdown reason specified in the shutdown method for IRecordProcessor is ShutdownReason.ZOMBIE, set flushState to 'false' to allow the data to be re-aggregated by another worker. However, if the value is ShutdownReason.TERMINATE, you should flush the aggregator state on termination.

Configuring Aggregators in Existing Applications

There are a variety of ways to configure aggregators when you are integrating into existing applications. You might use a factory to create one or more aggregators from a simple set of arguments, or you can configure each aggregator directly and manage it as part of an aggregator group.

Aggregator Factories

There are a variety of aggregator factories available in the com.amazonaws.services.kinesis.aggregators.factory package, which generally map to the configuration types found in the configuration file. In fact, you can use configuration files to configure aggregators from Java using the following:

ExternallyConfiguredAggregatorFactory.buildFromConfig(  
String streamName,  
String applicationName,  
KinesisClientLibConfiguration config,  
String configFile)  

You can also take advantage of aggregators that are specific to the type of data to be aggregated:

JSON Data
JsonAggregatorFactory.newInstance(String streamName  
, String appName  
, KinesisClientLibConfiguration config  
, String namespace  
, TimeHorizon timeHorizon  
, AggregatorType aggregatorType  
, List<string> labelAttributes  
, String dateAttribute  
, String dateFormat 
, List<string> summaryAttributes)  
CSV Data
CsvAggregatorFactory.newInstance(String streamName  
, String appName  
, KinesisClientLibConfiguration config  
, String namespace  
, TimeHorizon timeHorizon  
, AggregatorType aggregatorType  
, String delimiter  
, List<integer> labelIndicies  
, int dateIndex  
, String dateFormat 
, List<object> summaryIndicies)  
String Data parsed with Regular Expressions
RegexAggregatorFactory.newInstance(String streamName  
, String appName  
, KinesisClientLibConfiguration config  
, String namespace  
, List<timehorizon> timeHorizons  
, AggregatorType aggregatorType  
, String regularExpression  
, List<integer> labelIndicies  
, int dateIndex  
, String dateFormat  
, List<object> summaryIndicies)  
Object Serialized Data

You can generate aggregators for object-serialized data using annotations:

ObjectAggregatorFactory.newInstance(String streamName  
, String appName  
, KinesisClientLibConfiguration config  
, Class clazz)  

Note that 'clazz' is a class that has been configured using annotations found in the com.amazonaws.services.kinesis.aggregators.annotations package. This factory method throws an error if the class is not annotated.

Alternatively, you can configure the aggregator directly:

ObjectAggregatorFactory.newInstance(String streamName  
, String appName  
, KinesisClientLibConfiguration config  
, String namespace  
, List<TimeHorizon> timeHorizons  
, AggregatorType aggregatorType  
, Class clazz  
, List<String> labelMethods  
, String dateMethod  
, List<String> summaryMethods)

Direct Configuration

If you want even more control over the configuration of a given set of aggregators, then you can configure them directly. To effectively do this, you must understand how aggregators work. Aggregators are built around several subsystems that their factory methods configure automatically. When you build aggregators directly, you must construct an aggregator from its constituent subsystems. For more information, see the 'Extending Aggregators' section of this document.

To configure an aggregator directly, you must configure two of the subsystems: the aggregator and the IDataExtractor that extracts the data from the stream.

IDataExtractor

When you create an aggregator directly, you must specify the IDataExtractor to get data out of the stream for aggregation. There are IDataExtractors in the com.amazonaws.services.kinesis.aggregators.io package. Each of these map to the supported data formats, and provide relevant configuration options, including label, Date, and summary items. IDataExtractors use fluent builders for all optional configurations. For example, creating a JsonDataExtractor looks like this:

new JsonDataExtractor(labelAttributes)  
.withDateValueAttribute(dateAttribute)  
.withSummaryAttributes(summaryAttributes)  
.withDateFormat(dateFormat);  
Aggregator

You then create the aggregator with the options that are specific to it, including KinesisClientLibConfiguration, required TimeHorizon values, and options for emitting metrics. For example, using the example JsonDataExtractor, you might configure the aggregator as follows:

return new StreamAggregator(streamName, appName, namespace, config, dataExtractor)  
.withTimeHorizon(timeHorizons)  
.withAggregatorType(aggregatorType)  
.withCloudWatchMetrics(true);  

Extending Aggregators

You might want to extend aggregators for a variety of reasons. The use cases that we know of today that will require extension include supporting data on a stream that is compressed, encrypted, and uses an object serialization format other than Jackson/JSON, or implementing large objects. We designed aggregators with extensibility in mind. You can extend the framework at the following integration points.

Data Format & Handling

The ability to support CSV, JSON, arbitrary string data and object serialization is provided by the IDataExtractor and IKinesisSerializer interfaces, residing at com.amazonaws.services.kinesis.aggregators.io and io.Serializer.

IKinesisSerializer

This interface interoperates between the internal data format used by IDataExtractors, and byte arrays are used on the stream. You implement IKinesisSerializer to support compressed stream data or if your data is encrypted, for example. The implementation would conform to the following interface, which is identical to the Amazon Kinesis Connector ITransformer class:

/**  
* Transforms data from a Record (byte array) to the data  
* model class (T) for processing in the application and from the data model  
* class to the output type (U) for the emitter.  
* 
* @param <T> the data type stored in the record  
*/
public interface IKinesisSerializer<T, U> {  
/**
* Transform the record into an object of its original class.  
* 
* @param record raw record from the stream  
* @return data using its original class  
* @throws IOException if it could not convert the record to a T  
*/
public T toClass(InputEvent event) throws IOException;  

/**
* Transform the record from its original class to a byte array.  
* 
* @param record data as its original class  
* @return a data byte array  
*/
public U fromClass(T record) throws IOException;  
}  

IDataExtractor

IDataExtractors take the deserialized data and extract the relevant Label, Date, and Summary items. They also typically do any filtering that is exposed by the IDataExtractor. Implement a new IDataExtractor if the type of data returned by a custom IKinesisSerializer implementation is not compatible with the existing IDataExtractors in the io package. This new IDataExtractor would conform to:

/**
* Enables pluggable data extractors for different types of
* stream data. Aggregators use IDataExtractor to interoperate between the
* stream data format and the internal format required for aggregation.
* IDataExtractors likely use IKinesisSerializers to read and write to and from
* the stream
*/
public interface IDataExtractor {  
/**  
* Gets the name of the label value to be extracted.  
*   
* @return  
*/  
public String getAggregateLabelName();  

/**  
* Gets the name of the date value to be extracted.  
*   
* @return  
*/  
public String getDateValueName();  

/**
* Extracts one or more aggregatable items from a Amazon Kinesis record.  
*  
* @param event The Amazon Kinesis record from which we want to extract data.  
* @return A list of ExtractedData elements that have been resolved from  
*         the input data.  
* @throws SerializationException  
*/
public List<AggregateData> getData(InputEvent event) throws SerialisationException;

/**
* Sets the type of aggregator that contains this IDataExtractor. Used to
* boost efficiency in that the extractor will not extract summary items for
* COUNT-based aggregator integration.
* 
* @param type
*/
public void setAggregatorType(AggregatorType type);

/**
* Validates that the extractor is well formed.
* 
* @throws Exception
*/
public void validate() throws Exception;

/**
* Gets the summary configuration that is driving data extraction against the
* data stream.
* 
* @return
*/
public SummaryConfiguration getSummaryConfig();

public IDataExtractor copy() throws Exception;
}

Also note that an IDataExtractor returns multiple aggregatable objects from the stream. If you had a requirement to support M:N Kinesis Events to Aggregatable Events, an IDataExtractor could do the job using local state.

Note that the IDataExtractor is STATEFUL for the life of an aggregator running on a shard, and contains the configuration of the data that is to be extracted. Because a new IDataExtractor is generated when a new aggregator is initialized on a shard, you must ensure that it is thread-safe and implement the copy() interface correctly to ensure that multiple instances can operate within a single JVM.

Data Store

The Amazon Kinesis Aggregators framework backs its data onto DynamoDB, and takes advantage of powerful DynamoDB features such as hash/range keys, atomic increment, and conditional updates. It also implements a defensive flush mechanism, which means that at any provisioned I/O rate, the aggregator can flush its state to DynamoDB without timing out.

To extend aggregators with support for an alternate backing store, such as a relational database or Redis, implement com.amazonaws.services.kinesis.aggregators.datastore.IDataStore. This implementation must meet the following service levels:

The implementation of a new IDataStore must conform to the following:

/**
* Enables the in-memory cached aggregates 
* to be saved to a persistent store
*/
public interface IDataStore {
/**
* Writes a set of Update key/value pairs to the backing store
* 
* @param data The input dataset to be updated
* @return A data structure that maps a set of
*         AggregateAttributeModifications to the values that were
*         affected on the underlying data store, by UpdateKey
* @throws Exception
*/
public Map<UpdateKey, Map<String, AggregateAttributeModification>> write(
Map<UpdateKey, UpdateValue> data) throws Exception;

/**
* Method called on creation of the IDataStore
* 
* @throws Exception
*/
public void initialize() throws Exception;

/**
* Method that is periodically invoked to allow the IDataStore to
* refresh tolerated limits for how often write() should be called
* 
* @return
* @throws Exception
*/
public long refreshForceCheckpointThresholds() throws Exception;

/**
* Sets the region for the IDataStore
* 
* @param region
*/
public void setRegion(Region region);
}

Metrics Service

By default, Amazon Kinesis Aggregators integrates with CloudWatch for the purpose of metrics dashboards and alerts. However, you might want to push metrics to platforms such as Ganglia or New Relic. In these cases, you would provide an implementation of the com.amazonaws.services.kinesis.aggregators.metrics.IMetricsEmitter. This implementation would conform to the following:

/**
* Provides classes that can write to metrics services. 
* Receives the output of the IDataStore modifications, and applies the data to
* the metrics service.
*/
public interface IMetricsEmitter {
/**
* Emits a new set of metrics to the metrics service
* 
* @param metricData Input Data to be intrumented
* @throws Exception
*/
public void emit(Map<UpdateKey, Map<String, AggregateAttributeModification>> metricData)
throws Exception;

/**
* Sets the region of the metrics service
* 
* @param region
*/
public void setRegion(Region region);
}

Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at

http://aws.amazon.com/asl/