Home

Awesome

NPM Version Inline docs Build Status

CassanKnex

A fully tested Apache Cassandra CQL query builder with support for the DataStax NodeJS driver, written in the spirit of Knex for CQL 3.x.

Installation

npm install cassanknex

Index

<a name="WhyCassanknex"></a>Why (what's in a name)

CQL was purposefully designed to be SQL-esq to enhance ease of access for those familiar w/ relational databases while Knex is the canonical NodeJS query builder for SQL dialects; however, even given the lexical similarities, the difference between the usage of CQL vs SQL is significant enough that adding CQL as yet another Knex SQL dialect does not make sense. Thus, CassanKnex.

<a name="Usage"></a>Usage

CassanKnex can be used to execute queries against a Cassandra cluster via cassandra-driver (the official DataStax NodeJS driver) or as a simple CQL statement generator via the following relative instantiations:

<a name="GeneratingQueries"></a>As a query generator

Compiled CQL statements can be retrieved at any time via the cql method.

var cassanKnex = require("cassanknex")(<DRIVER_OPTIONS|undefined>);
var qb = cassanKnex(KEYSPACE).QUERY_COMMAND()
          .QUERY_MODIFIER_1()
          .
          .
          .QUERY_MODIFIER_N();

var cql = qb.cql(); // get the cql statement

Where KEYSPACE is the name of the relevant keyspace and QUERY_COMMAND and QUERY_MODIFIER are among the list of available Query Commands and Query Modifiers.

<DRIVER_OPTIONS> may be provided to configure the client, and is an object w/ the following optional fields:

<a name="ExecutingQueries"></a>As a query executor

Execution of a given query is performed by invoking either the exec, stream or eachRow methods (which are straight pass throughs to the DataStax driver's execute, stream and eachRow methods, respectively); batch queries may be executed via the batch method (again, a pass through to the DataStax driver's own batch method).

You may provide your own driver or use the included DataStax driver.

var cassanKnex = require("cassanknex")({
  connection: {
    contactPoints: ["LIST OF CONNECTION POINTS"]
  }
});

cassanKnex.on("ready", function (err) {

  if (err)
    console.error("Error Connecting to Cassandra Cluster", err);
  else
    console.log("Cassandra Connected");

  var qb = cassanKnex(KEYSPACE).QUERY_COMMAND()
          .QUERY_MODIFIER_1()
          .
          .
          .QUERY_MODIFIER_N();

  // pass through to the underlying DataStax nodejs-driver 'execute' method

  qb.exec(function(err, res) {
    // do something w/ your query response
  });

  // OR pass through to the underlying DataStax nodejs-driver 'stream' method

  var onReadable = function () {
      // Readable is emitted as soon a row is received and parsed
      var row;
      while (row = this.read()) {
        console.log(row);
        // do something w/ the row response
      }
    }
    , onEnd = function () {
      // Stream ended, there aren't any more rows
      console.log("query finished");
    }
    , onError = function (err) {
      // Something went wrong: err is a response error from Cassandra
      console.log("query error", err);
    };

  // Invoke the stream method
  qb.stream({
    "readable": onReadable,
    "end": onEnd,
    "error": onError
  });

  // OR pass through to the underlying DataStax nodejs-driver 'eachRow' method

  var rowCallback = function (n, row) {
      // The callback will be invoked per each row as soon as they are received
      console.log(row);
      // do something w/ the row response
    }
    , errorCb = function (err) {
      // Something went wrong: err is a response error from Cassandra
      console.log("query error", err);
    };

  // Invoke the eachRow method
  qb.eachRow(rowCallback, errorCb);

  // Invoke the batch method to process multiple requests
  cassanKnex().batch([qb, qb], function(err, res) {
    // do something w/ your response
  });
});

<a name="BYOD"></a>Bring your own Driver

While the package includes the vanilla Cassandra driver (supported by Datastax), and will use that driver to connect to your cluster if you provide a connection configuration, you may optionally provide your own initialized driver to the cassaknex constructor. This allows for using either the DSE driver or a different version of the Cassandra driver, per your applications needs.

e.g., w/ the built in cassandra-driver:

var cassanKnex = require("cassanknex")({
  connection: { // default is 'undefined'
    contactPoints: ["10.0.0.2"]
  },
  exec: { // default is '{}'
    prepare: false // default is 'true'
  }
});

cassanKnex.on("ready", function (err) {...});

or, using a custom dse-driver connection:

// create a new dse-driver connection
var dse = require("dse-driver");
var dseClient = new dse.Client({
  contactPoints: ["10.0.0.2"],
  queryOptions: {
    prepare: true
  },
  socketOptions: {
    readTimeout: 0
  },
  profiles: []
});

// initialize dse-driver connection
dseClient.connect(function (err) {
  if (err) {
    console.log("Error initializing dse-driver", err);
  }
  else {
    // provide connection to cassanknex constructor
    var cassanKnex = require("cassanknex")({
      connection: dseClient,
      debug: false
    });

    cassanKnex.on("ready", function (err) {
      // ...
    });
  }
});

<a name="Quickstart"></a>Quickstart

var cassanKnex = require("cassanknex")({
  connection: { // default is 'undefined'
    contactPoints: ["10.0.0.2"]
  },
  exec: { // default is '{}'
    prepare: false // default is 'true'
  },
  awsKeyspace: false // default is 'false'
});

cassanKnex.on("ready", function (err) {

  if (err)
    console.error("Error Connecting to Cassandra Cluster", err);
  else {
    console.log("Cassandra Connected");

  var qb("keyspace").select("id", "foo", "bar", "baz")
    .ttl("foo")
    .where("id", "=", "1")
    .orWhere("id", "in", ["2", "3"])
    .orWhere("baz", "=", "bar")
    .andWhere("foo", "IN", ["baz", "bar"])
    .limit(10)
    .from("table")
    .exec(function(err, res) {

      // executes query :
      //  'SELECT "id","foo","bar","baz",ttl("foo") FROM "keyspace"."table"
      //    WHERE "id" = ? OR "id" in (?, ?)
      //    OR "baz" = ? AND "foo" IN (?, ?)
      //    LIMIT 10;'
      // with bindings array  : [ '1', '2', '3', 'bar', 'baz', 'bar' ]

      if (err)
        console.error("error", err);
      else
        console.log("res", res);

    });
  }
});

<a name="Debugging"></a>Debugging

To enable debug mode pass { debug: true } into the CassanKnex require statement, e.g.

var cassanKnex = require("cassanknex")({ debug: true });

When debug is enabled the query object will be logged upon execution, and you'll receive two informational components provided to ease the act of debugging:

  1. _queryPhases:
  1. _methodStack:

So you'll see something akin to the following insert statement upon invoking either qb.cql() or qb.exec():

var values = {
  "id": "foo"
  , "bar": "baz"
  , "baz": ["foo", "bar"]
};
var qb = cassanknex("cassanKnexy");
qb.insert(values)
      .usingTimestamp(250000)
      .usingTTL(50000)
      .into("columnFamily")
      .cql();

// =>
{ _debug: true,
  _dialect: 'cql',
  _exec: {},
  _execPrepare: true,
  _keyspace: 'cassanKnexy',
  _columnFamily: 'columnFamily',
  _methodStack:
   [ 'insert',
     'usingTimestamp',
     'insert',
     'usingTTL',
     'insert',
     'into',
     'insert',
     'insert' ],
  _queryPhases:
   [ 'INSERT INTO  ("id","bar","baz") VALUES (?, ?, ?);',
     'INSERT INTO  ("id","bar","baz") VALUES (?, ?, ?) USING TIMESTAMP ?;',
     'INSERT INTO  ("id","bar","baz") VALUES (?, ?, ?) USING TIMESTAMP ? AND USING TTL ?;',
     'INSERT INTO "cassanKnexy"."columnFamily" ("id","bar","baz") VALUES (?, ?, ?) USING TIMESTAMP ? AND USING TTL ?;',
     'INSERT INTO "cassanKnexy"."columnFamily" ("id","bar","baz") VALUES (?, ?, ?) USING TIMESTAMP ? AND USING TTL ?;' ],
  _cql: 'INSERT INTO "cassanKnexy"."columnFamily" ("id","bar","baz") VALUES (?, ?, ?) USING TIMESTAMP ? AND USING TTL ?;',
  _bindings: [ 'foo', 'baz', [ 'foo', 'bar' ], 250000, 50000 ],
  _statements:
   [ { grouping: 'compiling', type: 'insert', value: [Object] },
     { grouping: 'using', type: 'usingTimestamp', val: 250000 },
     { grouping: 'using', type: 'usingTTL', val: 50000 } ],
  ... }

While fuller documentation for all methods is in the works, the test files provide thorough examples as to method usage.

<a name="QueryExecutors"></a>Query Executors

All methods take an optional options object as the first argument in the call signature; if provided, the options will be passed through to the corresponding cassandra-driver call.

exec
var item = {
  foo: "bar",
  bar: ["foo", "baz"]
};
var qb = cassanKnex("cassanKnexy")
  .insert(item)
  .into("columnFamily")
  .exec(function(err, result) {
    // do something w/ your err/result
  });

  // w/ options
  qb.exec({ prepare: false }, function(err, result) {
     // do something w/ your err/result
   });
eachRow
var rowCallback = function (n, row) {
    // Readable is emitted as soon a row is received and parsed
  }
  , errorCallback = function (err) {
    // Something went wrong: err is a response error from Cassandra
  };

var qb = cassanKnex("cassanKnexy")
  .select()
  .from("columnFamily");

// Invoke the eachRow method
qb.eachRow(rowCallback, errorCallback);
stream
var onReadable = function () {
    // Readable is emitted as soon a row is received and parsed
    var row;
    while (row = this.read()) {
      // do something w/ your row
    }
  }
  , onEnd = function () {
    // Stream ended, there aren't any more rows
  }
  , onError = function (err) {
    // Something went wrong: err is a response error from Cassandra
  };

var qb = cassanKnex("cassanKnexy")
  .select()
  .from("columnFamily");

// Invoke the stream method
qb.stream({
  "readable": onReadable,
  "end": onEnd,
  "error": onError
});
batch
var qb1 = cassanKnex("cassanKnexy")
  .insert({foo: "is bar"})
  .usingTimestamp(250000)
  .usingTTL(50000)
  .into("columnFamily");

var qb2 = cassanKnex("cassanKnexy")
  .insert({bar: "is foo"})
  .usingTimestamp(250000)
  .usingTTL(50000)
  .into("columnFamily");

// w/o options
cassanKnex().batch([qb1, qb2], function(err, res) {
    // do something w/ your err/result
});

// w/ options
cassanKnex().batch({prepare: true}, [qb1, qb2], function(err, res) {
    // do something w/ your err/result
});

<a name="QueryCommands"></a>Query Commands

<a name="QueryCommands-Rows"></a>For standard (row) queries:
<a name="QueryCommands-ColumnFamilies"></a>For column family queries:
<a name="QueryCommands-Keyspaces"></a>For keyspace queries:

<a name="QueryModifiers"></a>Query Modifiers

<a name="QueryModifiers-Rows"></a>For standard (row) queries:
<a name="QueryModifiers-ColumnFamilies"></a>For column family queries:
<a name="QueryModifiers-Keyspaces"></a>For keyspace queries:

<a name="UtilityMethods"></a>Utility Methods

var cassanKnex = require("cassanknex")({
  connection: {
    contactPoints: ["10.0.0.2"]
  }
});

cassanKnex.on("ready", function (err) {

  if (err)
    console.error("Error Connecting to Cassandra Cluster", err);
  else {
    console.log("Cassandra Connected");

    // get the Cassandra Driver
    var client = cassanKnex.getClient();
  }
});
var cassanKnex = require("cassanknex")();

// get the Cassandra Driver
var driver = cassanKnex.getDriver();

<a name="ChangeLog"></a>ChangeLog

<a name="ReleasingToNPM"></a>Releasing To NPM

All CICD logic is managed by CircleCI via the configuration in the .circleci/ directory.

This configuration will automatically package and publish a new version to NPM when an appropriate Github Release is created.

So, to publish a new version, simply create a new Github Release whose name matches the current release version (i.e., v1.20.5).