Home

Awesome

Finding the Top 10 using Aerospike Aggregations

##Problem You want to create a leaderboard of the top 10 scores, or 10 most recent events, using Aerospike as the data store

##Solution The solution is to use an Aggregation that processes the stream of tuples flowing from a query on a secondary index. The aggregation is done in code on each node in the cluster and finally aggregated, or reduced, in the client.

How to build

The source code for this solution is available on GitHub,
https://github.com/helipilot50/aerospike-top-10-aggregation.git.

This example requires a working Java development environment (Java 6 and above) including Maven (Maven 2). The Aerospike Java client will be downloaded from Maven Central as part of the build.

After cloning the repository, use maven to build the jar files. From the root directory of the project, issue the following command:

mvn clean package

A JAR file will be produced in the directory 'target', aerospike-top-10-1.0-full.jar

###Running the solution This is a runnable jar complete with all the dependencies packaged.

To load data use this command:

java -jar aerospike-top-10-1.0-full.jar -l

It will generate 100,000 Event records with an event name and a time stamp.

To verify you have loaded data use this command:

java -jar aerospike-top-10-1.0-full.jar -S

You can run the aggregation with the following command:

java -jar aerospike-top-10-1.0-full.jar -q

This program will load a User Defined Function (UDF) module when it starts. It will look for the UDF module at this location udf/leaderboard.lua. Be sure you place it there.

####Options

-a,--all              Aggregate all using ScanAggregate.
-h,--host <arg>       Server hostname (default: 127.0.0.1)
-l,--load             Load data.
-n,--namespace <arg>  Namespace (default: test)
-p,--port <arg>       Server port (default: 3000)
-q,--query            Aggregate with query.
-s,--set <arg>        Set (default: demo)
-S,--scan			  Scan all for testing.
-u,--usage            Print usage.

Output

The output is a List of 10 Maps, in highest to lowest order:

{eventid=Event:100000, time=1421955197267}
{eventid=Event:99999, time=1421955197266}
{eventid=Event:99998, time=1421955197265}
{eventid=Event:99996, time=1421955197259}
{eventid=Event:99997, time=1421955197259}
{eventid=Event:99995, time=1421955197259}
{eventid=Event:99994, time=1421955197258}
{eventid=Event:99993, time=1421955197258}
{eventid=Event:99992, time=1421955197256}
{eventid=Event:99991, time=1421955197255} 

##Discussion

The Java code is very simple, in the main() method a secondary index is created on the time Bin and the UDF module is registered with the cluster.

/*
 * Create index for query
 * Index creation only needs to be done once and can be done using AQL or ASCLI also
 */
IndexTask it = as.client.createIndex(null, as.namespace, as.set, "top-10", TIME_BIN, IndexType.NUMERIC);
it.waitTillComplete();
/*
 * Register UDF module
 * Registration only needs to be done after a change in the UDF module.
 */
RegisterTask rt = as.client.register(null, "udf/leaderboard.lua", "leaderboard.lua", Language.LUA);
rt.waitTillComplete();

Based on the option from the command line the code will either load data or run the aggregation.


if (cl.hasOption("l")) {
	as.populateData();
	return;
} else if (cl.hasOption("q")) {
	as.queryAggregate();
	return;
} else if (cl.hasOption("a")) {
	as.scanAggregate();
	return;
} else {
	logUsage(options);
}

You will note option -a; this performs an aggregation by scanning the whole set rather than by using a secondary index and it is only supported on the latest version of Aerospike.

The queryAggregate() method creates the Statement for the query and then calls the aggregate() method, which uses the Aerospike queryAggregate() operation to query the data and invoke the StreamUDF top() in the module leaderboard.

	public void queryAggregate() {
		long now = System.currentTimeMillis();
		long yesterday = now - 24 * 60 * 60 * 1000;
		Statement stmt = new Statement();
		stmt.setNamespace(this.namespace);
		stmt.setSetName(this.set);
		stmt.setBinNames(EVENT_ID_BIN, TIME_BIN);
		stmt.setFilters(Filter.range(TIME_BIN, yesterday, now));
		aggregate(stmt);
	}
	
	private void aggregate(Statement stmt){
		ResultSet rs = this.client.queryAggregate(null, stmt, 
					"leaderboard", "top", Value.get(10));
		
		while (rs.next()){
			List<Map<String, Object>> result =  
					(List<Map<String, Object>>) rs.getObject();
			for (Map<String, Object> element : result){
				System.out.println(element);
			}
		}
	}

The heavy lifting is done in the Stream UDF to build a List of the top 10 (latest) events as they come from the query stream.

The stream is processed with a Map() function, then an Aggregate() function and finally a Reduce() function.

Let's look at each one, then string them together to process the stream.

Map()

The purpose of a map() function is to transform the current element in the stream to a new form. In this example we are transforming a Record to a Map.

While it looks almost the same, the transformer() function is discarding the meta data associated with the Record, and retaining only the information we are interested in.

  local function transformer(rec)
    --info("rec:"..tostring(rec))
    local touple = map()
    touple["eventid"] = rec["eventid"]
    touple["time"] = rec["time"]
    --info("touple:"..tostring(touple))
    return touple
  end

The map() function is invoked on each node in the cluster for every element in the stream.

Aggregate()

The purpose of the aggregate() function is to accumulate a result from the elements in the stream. In this example, the accumulate() uses a List of 10 elements, as a new element arrives it is inserted into the list in the correct order. The local function movedown() aids in this.

   local function movedown(theList, size, at, element)
    --info("List:"..tostring(theList)..":"..tostring(size)..":"..tostring(start))
    if at > size then
      info("You are an idiot")
      return
    end 
    index = size-1
    while (index > at) do
      theList[index+1] = theList[index]
      index = index -1
    end
    
    theList[at] = element
  end


 local function accumulate(aggregate, nextitem)
    local aggregate_size = list.size(aggregate)
      --info("Item:"..tostring(nextitem))
      index = 1
      for value in  list.iterator(aggregate) do
        --info(tostring(nextitem.time).." > "..tostring(value.time))
        if nextitem.time > value.time then
          movedown(aggregate, top_size, index, nextitem)
          break
        end
        index = index + 1
      end
    return aggregate
  end

The aggregate() function is invoked for every element in the stream on each node in the cluster. Note: The aggregate variable is held in RAM, so watch for high memory usage for large elements.

Reduce()

The reduce() function combines all of the results from the stream in to one complete result. It will be invoked on each node in the cluster and a final reduce on the client.

The function reducer() simply combines two elements -- in this case two ordered Lists that are the output of two Aggregation() functions. The code uses a simple technique to take the two ordered Lists and return a new ordered list of the top 10.

  local function reducer( this, that )
    local merged_list = list()
    local this_index = 1
    local that_index = 1
    while this_index <= 10 do
      while that_index <= 10 do
        if this[this_index].time >= that[that_index].time then
          list.append(merged_list, this[this_index])
          this_index = this_index + 1
        else
          list.append(merged_list, that[that_index])
          that_index = that_index +1
        end
        if list.size(merged_list) == 10 then
          break
        end
      end
      if list.size(merged_list) == 10 then
        break
      end
    end
    --info("This:"..tostring(this).." that:"..tostring(that))
    return merged_list
  end

The stream function: top()

The stream function top() is the UDF called by the client. It takes a stream object as a parameter and configures a map() function, an aggregate() function and a reduce() function.

The functions that we have written to implement these stereotypes are passed in as function pointers.

NOTE: The aggregate() function also takes an additional parameter list{}. This in an initial List to be populated by the aggregate() function.

function top(flow, top_size)

	. . .
	  
  return flow:map(transformer):aggregate(
          list{
              map{eventid="ten",time=0},
              map{eventid="nine",time=0},
              map{eventid="eight",time=0},
              map{eventid="seven",time=0},
              map{eventid="six",time=0},
              map{eventid="five",time=0},
              map{eventid="four",time=0},
              map{eventid="three",time=0},
              map{eventid="two",time=0},
              map{eventid="one",time=0}
          }, accumulate):reduce(reducer)
end