Home

Awesome

ErlCass

Build Status GitHub Hex.pm

An Erlang Cassandra driver, based on DataStax cpp driver focused on performance.

Note for v4.0.0

Update from 2.x to 3.0

This update breaks the compatibility with the other versions. All query results will return in case of success:

Implementation note

How ErlCass affects the Erlang schedulers

It's well-known that NIF's can affect the Erlang schedulers performances in case the functions are not returning in less than 1-2 ms and blocks the threads.

Because the DataStax cpp driver is async, ErlCass won't block the scheduler threads and all calls to the native functions will return immediately. The DataStax driver use its own thread pool for managing the requests. Also, the responses are received on these threads and sent back to Erlang calling processes using enif_send in an async manner.

Features

List of supported features:

Missing features from Datastax driver can be found into the Todo List.

Benchmark comparing with other drivers

The benchmark (benchmarks/benchmark.erl) is spawning N processes that will send a total of X request using the async api's and then waits to read X responses. In benchmarks/benchmark.config you can find the config's for every driver used in tests. During test in case of unexpected results from driver will log errors in console.

To run the benchmark yourself you should do:

The following test was run on a Ubuntu 16.04 LTS (Intel(R) Core(TM) i5-2500 CPU @ 3.30GHz 4 cores) and the cassandra cluster was running on other 3 physical machines in the same LAN. The schema is created using prepare_load_test_table from benchmarks/load_test.erl. Basically the schema contains all possible data types and the query is based on a primary key (will return the same row all the time which is fine because we test the driver performances and not the server one)

To create schema:

make setup_benchmark

To run the benchmark:

make benchmark MODULE=erlcass PROCS=100 REQ=100000

Where:

The results for 100 concurrent processes that sends 100k queries. Picked the average time from 3 runs:

cassandra driverTime (ms)Req/sec
erlcass v4.0.0947105544
marina 0.3.5236042369

Changelog

Changelog is available here.

Getting started:

The application is compatible with both rebar or rebar3.

In case you receive any error related to compiling of the DataStax driver you can try to run rebar with sudo in order to install all dependencies. Also you can check wiki section for more details

Data types

In order to see the relation between Cassandra column types and Erlang types please check this wiki section

Starting the application

application:start(erlcass).

Setting the log level

Erlcass is using OTP logger for logging the errors. Beside the fact that you can set in logger the desired log level, for better performances it's better to set also in erlcass the desired level otherwise there will be a lot of resources consumed for messages that are going to be dropped anyway. Also the native driver performances can decrease because of the time spent in generating the logs and sending them from C++ into Erlang.

Available Log levels are:

-define(CASS_LOG_DISABLED, 0).
-define(CASS_LOG_CRITICAL, 1).
-define(CASS_LOG_ERROR, 2).
-define(CASS_LOG_WARN, 3). % default
-define(CASS_LOG_INFO, 4).
-define(CASS_LOG_DEBUG,5).
-define(CASS_LOG_TRACE, 6).

In order to change the log level for the native driver you need to set the log_level environment variable for erlcass into your app config file, example: {log_level, 3}.

Setting the cluster options

The cluster options can be set inside your app.config file under the cluster_options key:

{erlcass, [
    {log_level, 3},
    {keyspace, <<"keyspace">>},
    {cluster_options,[
        {contact_points, <<"172.17.3.129,172.17.3.130,172.17.3.131">>},       
        {latency_aware_routing, true},
        {token_aware_routing, true},
        {number_threads_io, 4},
        {queue_size_io, 128000},
        {core_connections_host, 1},
        {tcp_nodelay, true},
        {tcp_keepalive, {true, 60}},
        {connect_timeout, 5000},
        {request_timeout, 5000},
        {retry_policy, {default, true}},
        {default_consistency_level, 6}
    ]}
]},

Tips for production environment:

All available options are described in the following wiki section.

Add a prepare statement

Example:

ok = erlcass:add_prepare_statement(select_blogpost,
                                   <<"select * from blogposts where domain = ? LIMIT 1">>),

In case you want to overwrite the default consistency level for that prepare statement use a tuple for the query argument: {Query, ConsistencyLevelHere}

Also this is possible using {Query, Options} where options is a proplist with the following options supported:

Example:

ok = erlcass:add_prepare_statement(select_blogpost,
        {<<"select * from blogposts where domain = ? LIMIT 1">>, ?CASS_CONSISTENCY_LOCAL_QUORUM}).

or

ok = erlcass:add_prepare_statement(insert_blogpost, {
        <<"UPDATE blogposts SET author = ? WHERE domain = ? IF EXISTS">>, [
        {consistency_level, ?CASS_CONSISTENCY_LOCAL_QUORUM},
        {serial_consistency_level, ?CASS_CONSISTENCY_LOCAL_SERIAL}]
}).

Run a prepared statement query

You can bind the parameters in 2 ways: by name and by index. You can use ?BIND_BY_INDEX and ?BIND_BY_NAME from execute/3 in order to specify the desired method. By default is binding by index.

Example:

%bind by name
erlcass:execute(select_blogpost, ?BIND_BY_NAME, [{<<"domain">>, <<"Domain_1">>}]).

%bind by index
erlcass:execute(select_blogpost, [<<"Domain_1">>]).

%bind by index
erlcass:execute(select_blogpost, ?BIND_BY_INDEX, [<<"Domain_1">>]).

In case of maps you can use key(field) and value(field) in order to bind by name.

%table: CREATE TABLE test_map(key int PRIMARY KEY, value map<text,text>)
%statement: UPDATE examples.test_map SET value[?] = ? WHERE key = ?

%bind by index

erlcass:execute(identifier, [<<"collection_key_here">>, <<"collection_value_here">>, <<"key_here">>]).

%bind by name

erlcass:execute(insert_test_bind, ?BIND_BY_NAME, [
    {<<"key(value)">>, CollectionIndex1},
    {<<"value(value)">>, CollectionValue1},
    {<<"key">>, Key1}
]),

Async queries and blocking queries

For sync operations use erlcass:execute, for async execution use : erlcass:async_execute.

The sync API will block the calling process (still async into the native code in order to avoid freezing of the VM threads) until will get the result from the cluster.

In case of an async execution the calling process will receive a message of the following format: {execute_statement_result, Tag, Result} when the data from the server was retrieved.

For example:

{ok, Tag} = erlcass:async_execute(...),
    receive
        {execute_statement_result, Tag, Result} ->
            Result
    end.

Non prepared statements queries

In order to run queries that you don't want to run them as prepared statements you can use: query/1, query_async/1 or query_new_statement/1 (in order to create a query statement that can be executed into a batch query along other prepared or not prepared statements)

The same rules apply for setting the desired consistency level as on prepared statements (see Add prepare statement section).

erlcass:query(<<"select * from blogposts where domain = 'Domain_1' LIMIT 1">>).

Batched queries

In order to perform batched statements you can use erlcass:batch_async_execute/3 or erlcass:batch_execute/3.

First argument is the batch type and is defined as:

-define(CASS_BATCH_TYPE_LOGGED, 0).
-define(CASS_BATCH_TYPE_UNLOGGED, 1).
-define(CASS_BATCH_TYPE_COUNTER, 2).

The second one is a list of statements (prepared or normal statements) that needs to be executed in the batch.

The third argument is a list of options in {Key, Value} format (proplist):

Example:

ok = erlcass:add_prepare_statement(insert_prep, <<"INSERT INTO table1(id, age, email) VALUES (?, ?, ?)">>),

{ok, Stm1} = erlcass:query_new_statement(<<"UPDATE table2 set foo = 'bar'">>),

{ok, Stm2} = erlcass:bind_prepared_statement(insert_prep),
ok = erlcass:bind_prepared_params_by_index(Stm2, [Id2, Age2, Email2]),

ok = erlcass:batch_execute(?CASS_BATCH_TYPE_LOGGED, [Stm1, Stm2], [
    {consistency_level, ?CASS_CONSISTENCY_QUORUM}
]).

Paged queries

In order to perform paged query statements you can use erlcass:async_execute_paged/2, erlcass:async_execute_paged/3 or erlcass:execute_paged/2.

Statement paging is set with erlcass:set_paging_size/2.

Example:

ok = erlcass:add_prepare_statement(paged_query_prep, <<"SELECT val FROM table1">>),
{ok, Stm} = erlcass:bind_prepared_statement(paged_query_prep),
PageSize = 3,
ok = erlcass:set_paging_size(Stm, PageSize),
{ok, Columns, Rows1, HasMore1} = erlcass:execute_paged(Stm, paged_query_prep),
% Continue get more rows from same Stm until HasMore is false
% In this example, Rows1 contains at most 3 rows [[val1], [val2], [val3]]
%{ok, Columns, Rows2, HasMore2} = erlcass:execute_paged(Stm, paged_query_prep),

Working with uuid or timeuuid fields:

Working with date, time fields:

Getting metrics

In order to get metrics from the native driver you can use erlcass:get_metrics().

requests
stats
errors

Low level methods

Each query requires an internal statement (prepared or not). You can reuse the same statement object for multiple queries performed in the same process.

Getting a statement reference for a prepared statement query
{ok, Statement} = erlcass:bind_prepared_statement(select_blogpost).
Getting a statement reference for a non prepared query
{ok, Statement} = erlcass:query_new_statement(<<"select * from blogposts where domain = 'Domain_1' LIMIT 1">>).
Bind the values for a prepared statement before executing
%bind by name
ok = erlcass:bind_prepared_params_by_name(select_blogpost, [{<<"domain">>, <<"Domain_1">>}]);

%bind by index
ok = erlcass:bind_prepared_params_by_index(select_blogpost, [<<"Domain_1">>]);

For mode details about bind by index and name please see: 'Run a prepared statement query' section