Home

Awesome

Overview

Build Status

MergeIndex is an Erlang library for storing ordered sets on disk. It is very similar to an SSTable (in Google's Bigtable) or an HFile (in Hadoop).

Basho Technologies developed MergeIndex to serve as the underlying index storage format for Riak Search and the upcoming Secondary Index functionality in Riak.

MergeIndex has the following characteristics:

And some tradeoffs:

Data Model

A MergeIndex database is a three-level hierarchy of data. (The chosen terminology reflects merge_index's roots as a storage engine for document data, but it can be considered a general purpose index.)

The hierarchy is:

Underneath each term, you can store one or more values, with associated properties and a timestamp:

These six fields together form a Posting. For example:

{Index, Field, Term, Value, Properties, Timestamp}
{<<"shoes">>, <<"color">>, <<"red">>, <<"SKU-52167">>, [], 23487197}

API

Example Usage

The example below opens a merge_index database, generates some dummy data using a list comprehension, indexes the dummy data, and then performs a lookup and range query.

%% Open a merge_index database.
application:start(merge_index),
{ok, Pid} = merge_index:start_link("./merge_index_data"),
Filter = fun(_,_) -> true end,
 
%% Index a posting...
merge_index:index(Pid, [{"index", "field", "term", "value1", [], 1}]),
 
%% Run a query, get results back as a list...
List1 = merge_index:lookup_sync(Pid, "index", "field", "term", Filter),
io:format("lookup_sync1:~n~p~n", [List1]),
 
%% Run a query, get results back as an iterator. 
%% Iterator returns {Result, NewIterator} or 'eof'.
Iterator1 = merge_index:lookup(Pid, "index", "field", "term", Filter),
{Result1, Iterator2} = Iterator1(),
eof = Iterator2(),
io:format("lookup:~n~p~n", [Result1]),
 
%% Index multiple postings...
merge_index:index(Pid, [
    {"index", "field", "term", "value1", [], 2},
    {"index", "field", "term", "value2", [], 2},
    {"index", "field", "term", "value3", [], 2}
]),
 
%% Run another query...
List2 = merge_index:lookup_sync(Pid, "index", "field", "term", Filter),
io:format("lookup_sync2:~n~p~n", [List2]),
 
%% Delete some postings...
merge_index:index(Pid, [
    {"index", "field", "term", "value1", undefined, 3},
    {"index", "field", "term", "value3", undefined, 3}
]),
 
%% Run another query...
List3 = merge_index:lookup_sync(Pid, "index", "field", "term", Filter),
io:format("lookup_sync2:~n~p~n", [List3]),
 
%% Delete the database.
merge_index:drop(Pid),
 
%% Close the database.
merge_index:stop(Pid).

Architecture

At a high level, MergeIndex is a collection of one or more in-memory buffers storing recently written data, plus one or more immutable segments storing older data. As data is written, the buffers are converted to segments, and small segments are compacted together to form larger segments. Each buffer is backed by an append-only disk log, ensuring that the buffer state is recoverable if the system is shut down before the buffer is converted to a segment.

Queries involve all active buffers and segments, but avoid touching disk as much as possible. Queries against buffers execute directly against memory, and as a result are fast. Queries against segments consult an in-memory offsets table with a bloom filter and signature table to determine key existence, and then seek directly to the correct disk position if the key is found within a given segment. If the key is not found, there is no disk penalty.

MI Server (mi_server module)

The mi_server module holds the coordinating logic of MergeIndex. It keeps track of which buffers and segments exist, handles incoming writes, manages locks on buffers and segments, and spawns new processes to respond to queries.

During startup, mi_server performs the following steps:

It then waits for incoming index, lookup, or range requests.

On an index request, mi_server is passed a list of postings, of the form {Index, Field, Term, Value, Props, Timestamp}. As a speed optimization, we invert the timestamp (multiply by -1). This allows a simple ascending sort to put the latest timestamped value first (otherwise the earliest timestamped value would be first). Later, iterators across data used during querying and compacting take advantage of this information to filter out duplicates. Also, each posting is translated to {{Index, Field, Term}, Value, InvertedTimestamp, Props} which is the posting format that the buffer expects. The postings are then written to the buffer. If the index operation causes the buffer to exceed the buffer_rollover_size setting, then the buffer is converted to a segment. More details are in the following sections.

On a lookup request, mi_server is passed an Index, Field, and Term. It first puts a lock on all buffers and segments that will be used in the query. This ensures that buffers and segments won't be deleted before the query has completed. Next, it spawns a linked process that creates an iterator across each buffer and segment for the provided Index/Field/Term key, and returns the results in ascending sorted order.

A range request is similar to a lookup request, except the iterators return the values for a range of keys.

Buffers (mi_buffer module)

A buffer consists of an in-memory Erlang ETS table plus an append only log file. All new data written to a MergeIndex database is first written to the buffer. Once a buffer reaches a certain size, it is converted to a segment.

MergeIndex opens the ETS table as a duplicate_bag, keyed on {Index, Field, Term}. Postings are written to the buffer in a batch.

At query time, the MergeIndex performs an ets:lookup/N to retrieve matching postings, sorts them, and wraps them in an iterator.

Range queries work slightly differently. MergeIndex gets a list of keys from the table, filters the keys according to what matches the range, and then returns an iterator for each key.

Buffer contents are also stored on disk in an append-only log file, named buffer.<NUMBER>. The format is simple: a 4-byte unsigned integer followed by the term_to_binary/1 encoded bytes for the list of postings.

When a buffer exceeds buffer_rollover_size, it is converted to a segment. The system puts the contents of the ETS table into a list, sorts the list, constructs an iterator over the list, and then sends the iterator to the same process used to compact segments, described below.

Segments (mi_segment module)

A segment consists of a data file and an offsets table. It is immutable; once written, it is read only. (Though eventually it may be compacted into a larger segment and deleted.)

The data file is a flat file with the following format: a key, followed by a list of values, followed by another key, followed by another list of values. Both the keys an the values are sorted in ascending order. Conceptually, the data file is split into blocks (approximately 32k in size by default). The offsets table contains one entry per block.

A key is an {Index, Field, Term} tuple. To save space, if the key has the same Index as the previous key, and it is not the first key in a block, then the Index will be omitted from the tuple. Likewise with the Field. The key is stored as a single bit set to '1', followed by a 15-bit unsigned integer containing the size of the key on disk, followed by the term_to_binary/N representation of the key. The maximum on-disk key size is 32k.

A value is a {Value, Timestamp, Props} tuple. It is put in this order to optimize sorting and comparisons during later operations. The list of values is compressed, and then stored as a single bit set to '0', followed by a 31-bit unsigned integer containing the size of the list of values on disk, followed by the term_to_binary/N representation of the values. If the list of values is larger than the segment_values_compression_threshold, then the values are compressed. If the list of values grows larger than the segment_values_staging_size, then it is broken up into multiple chunks. The maximum on-disk value size is theoretically 2GB.

The offsets table is an ETS ordered_set table with an entry for each block in the data file. The entry is keyed on the last key in the block (which makes lookup using ets:next/2 possible).

Each entry is a compressed tuple containing:

Compaction (mi_segment_writer module)

When the number of segments passes a threshold, the system compacts segments. This merges together the data files from multiple segments to create a new, larger segment, and deletes the old segments. In the process, duplicate or deleted values (determined by a tombstone) are removed. The mi_scheduler module ensures that only one compaction occurs at a time on a single Erlang VM, even when multiple MergeIndex databases are opened.

The advantage of compaction is that it moves the values for a given key closer together on disk, and reduces the number of disk seeks necessary to find the values for a given lookup. The disadvantage of a compaction is that it requires the system to rewrite all of the data involved in the compaction.

When the system decides to perform a compaction, it focuses on the smallest segments first. This ensures we get the optimal "bang for our buck" out of compaction, doing the most to reduce file handle usage and disk seeks during a query while touching the smallest amount of data.

To perform the compaction, the mi_server module spawns a new linked process. The process opens an iterator across each segment in the compaction set. The data is stored in sorted order by key and value, so the iterator simply needs to walk through the values from the beginning to the end of the file. The segment_compact_read_ahead_size setting determines how much of a file cache we use when reading the segment. For small segments, it might make sense to read the entire segment into memory, the segment_full_read_size setting determines this threshold. In this case, segment_compact_read_ahead_size is unused. The individual iterators are grouped into a single master iterator.

The mi_segment_writer module reads values from the master iterator, writing keys and values to the data file and offset information to the offsets table. While writing, the segment-in-progress is marked with a file of the same name with a ".deleted" extension, ensuring that if the system crashes and restarts, then it will be removed. Once finished, the obsolete segments are marked with files with ".deleted" extensions.

Note: That this is the same process used when rolling a single buffer into a segment.

Locking (mi_locks module)

MergeIndex uses a functional locking structure to manage locks on buffers and segments. The locks are really a form of reference counting. During query time, the system opens iterators against all available buffers and segments. This increments a separate lock count for each buffer and segment. When the query ends, the system decrements the lock count. Once a buffer rollover (or segment compaction) makes a buffer (or segment) obsolete, the system registers a function to call when the lock count drops to zero. This is a simple and easy way to make sure that buffers and segments stay around as long as necessary to answer queries, but no longer.

New queries are directed to the latest buffers and segments, they don't touch obsolete buffers or segments, so even during periods of high query loads, we are guaranteed that the locks will eventually be released and the obsolete buffers or segments deleted.

Configuration Settings

Overview

MergeIndex exposes a number of dials to tweak operations and RAM usage.

The most important MergeIndex setting in terms of memory usage is buffer_rollover_size. This affects how large the buffer is allowed to grow, in bytes, before getting converted to an on-disk segment. The higher this number, the less frequently a MergeIndex database will need compactions.

The second most important settings for memory usage are a combination of segment_full_read_size and max_compact_segments. During compaction, the system will completely page any segments smaller than the segment_full_read_size value into memory. This should generally be as large or larger than the buffer_rollover_size.

max_compact_segments is the maximum number of segments to compact at one time. The higher this number, the more segments MergeIndex can involve in each compaction. In the worst case, a compaction could take (segment_full_read_size * max_compact_segments) bytes of RAM.

The rest of the settings have a much smaller impact on performance and memory usage, and exist mainly for tweaking and special cases.

Full List of Settings

A number of configuration settings are fuzzed:

"Fuzzed" means that the actual value is increased or decreased by a certain random percent. If you open multiple MergeIndex databases and write to them with an evenly balanced load, then all of the buffers tend to roll over at the same time. Fuzzing spaces out the rollovers.

Troubleshooting

Determine the Number of Open Buffers/Segments

Run the following command to check how many buffers are currently open:

find <PATH> -name "buffer.*" | wc -l

Run the following command to check how many segments are currently open:

find <PATH> -name "segment.*.data" | wc -l

Run the following command to determine whether a compaction is currently in progress:

find <PATH> -name "segment.*.data.deleted"

Check Memory Usage

Run the following code in the Erlang shell to see how much space the in-memory buffers are consuming:

WordSize = erlang:system_info(wordsize),
F = fun(X, Acc) -> 
    case ets:info(X, name) == 'buffer' of
        true -> Acc + (ets:info(X, memory) * WordSize);
        false -> Acc
    end
end,
lists:foldl(F, 0, ets:all()).

Run the following code in the Erlang shell to see how much space the segment offset tables are consuming:

WordSize = erlang:system_info(wordsize),
F = fun(X, Acc) -> 
    case ets:info(X, name) == 'segment_offsets' of
        true -> Acc + (ets:info(X, memory) * WordSize);
        false -> Acc
    end
end,
lists:foldl(F, 0, ets:all()).

Further Reading