Home

Awesome

Aerospike Batch Processing

Problem

You want to process all of the records, perhaps billions of records, in an Aerospike database. You don't have heap space in you application to process them all at once, so you would like to "batch process" them, 1,000 at a time.

Solution

Use the Aerospike scanAll() method to scan the entire namespace, or namespace and set. Each record is retrieved from a buffered queue one at a time. You can collect the returned records into a "batch", and then process the batch of records.

The source code for this solution is available on GitHub, and the README.md https://github.com/helipilot50/aerospike-batch-processing.git.

This is a Java example, but the same principles can be applied to any programming language supported by Aerospike.

How to build

Clone the GitHub repository with:

git clone https://github.com/helipilot50/aerospike-batch-processing.git

The build uses Maven, to build the solution use:

mvn clean package

This will produce a complete jar aerospike-batch-processing-1.0.0-full.jar in the target directory.

How to run

The jar aerospike-batch-processing-1.0.0-full.jar is runnable with the following options

options:
-g,--gen              Generate data
-h,--host <arg>       Server hostname (default: 127.0.0.1)
-n,--namespace <arg>  Namespace (default: test)
-p,--port <arg>       Server port (default: 3000)
-s,--set <arg>        Set (default: demo)
-u,--usage            Print usage.

To generate some test data use the -g option

java -jar aerospike-batch-processing-1.0.0-full.jar -g

This will generate 99,999 user records.

To run the batch processing, uses the same command without any options

java -jar aerospike-batch-processing-1.0.0-full.jar

Discussion

So how does it work?

A scan operation is sent from the Aerospike client to each node in the cluster. Each node responds with a stream of records. IN the client, the records a put into a buffered queue, and the protocol will fill the buffer until it is full, then wait until records are are read from the queue by callback in your application.

This way you application does not run out of heap space.

Scan

Look at the batchUsingScanAll() method. It uses the scanAll() method on the Aerospike client, and you supply a user written callback to process each record as it is received from the cluster.

	public void batchUsingScanAll() throws Exception {
		/*
		 * a List to contain a "batch" of records to process
		 */
		final List<Record> batchOfRecords = new ArrayList<Record>();
		/*
		 * Scan all "namespace" and "set to retrieve each record
		 */
		this.client.scanAll(null, this.namespace, this.set, 
				new ScanCallback() {

			@Override
			public void scanCallback(Key key, Record record) 
					throws AerospikeException {
				/*
				 * process each Record into a "batch" 
				 */
				if (batchOfRecords.size() == BATCH_SIZE){
					for (Record rec : batchOfRecords){
						/*
						 * do something with each record in the batch 
						 */
						System.out.print("+");
					}
					System.out.println();
					System.out.println("Processed " + BATCH_SIZE + " records");
					batchOfRecords.clear();
				}
				batchOfRecords.add(record);
			}
		}, "username","password","gender","region",
			"lasttweeted","tweetcount","interests");
	}

The call back receives a Record and a Key and you can process these in any way you wish. This code builds a batch of records in a List. When the batch is full, we process the batch.