Awesome
Talaria
This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally designed and implemented in Grab, where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making.
<p align="center"> <img width="746" height="348" src=".github/images/query-event.png"> </p>Introduction
TalariaDB helped us to overcome the challenge of retrieving and acting upon the information from large amounts of data. It addressed our need to query at least 2-3 terabytes of data per hour with predictable low query latency and low cost. Most importantly, it plays very nicely with the different tools’ ecosystems and lets us query data using SQL.
From the original design, we have extended Talaria to be setup in a two possible ways:
- As an event ingestion platform. This allows you to track events using a simple gRPC endpoint from almost anywhere.
- As a data store for hot data. This allows you to query hot data (e.g. last 6 hours) as it goes through the data pipeline and ultimately ends up in your data lake when compacted.
Talaria is designed around event-based data model. An event is essentially a set of key-value pairs, however to make it consistent we need to define a set of commonly used keys. Each event will consist of the following:
- Hash key (e.g: using "event" key). This represents the type of the event and could be prefixed with the source scope (eg. "table1") and using the dot as a logical separator. The separation and namespacing is not required, but strongly recommended to make your system more usable.
- Sort key (e.g: using "time" key). This represents the time at which the update has occurred, in unix timestamp (as precise as the source allows) and encoded as a 64-bit integer value.
- Other key-value pairs will represent various values of the columns.
Below is an example of what a payload for an event describing a table update might look like.
KEY | VALUE | DATA TYPE |
---|---|---|
event | table1.update | string |
time | 1586500157 | int64 |
column1 | hello | string |
column2 | { "name": "roman" } | json |
Talaria supports string
, int32
, int64
, bool
, float64
, timestamp
and json
data types which are used to construct columns that can be exposed to Presto/SQL.
Event Ingestion with Talaria
If your organisation needs a reliable and scalable data ingestion platform, you can set up Talaria as one. The main advantage is that such platform is cost-efficient, does not require a complex Kafka setup and even offers in-flight query if you also point a Presto on it. The basic setup allows you to track events using a simple gRPC endpoint from almost anywhere.
In order to setup Talaria as an ingestion platform, you will need specify a table, in this case "eventlog", and enable compaction
in the configuration, something along these lines:
mode: staging
env: staging
domain: "talaria-headless.default.svc.cluster.local"
storage:
dir: "/data"
tables:
eventlog:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
s3: # sink to Amazon S3
region: "ap-southeast-1"
bucket: "bucket"
...
Once this is set up, you can point a gRPC client (see protobuf definition) directly to the ingestion endpoint. Note that we also offer some pre-generated or pre-made ingestion clients in this repository.
service Ingress {
rpc Ingest(IngestRequest) returns (IngestResponse) {}
}
Below is a list of currently supported sinks and their example configurations:
- Amazon S3 using s3 sink.
- DigitalOcean Spaces using s3 sink, a custom endpoint and us-east-1 region.
- Google Cloud Storage using gcs sink.
- Local filesystem using file sink.
- Microsoft Azure Blob Storage using azure sink.
- Minio using s3 sink, a custom endpoint and us-east-1 region.
- Google Big Query using bigquery sink.
- Talaria itself using talaria sink.
For Microsoft Azure Blob Storage and Azure Data Lake Gen 2, we support writing across multiple storage accounts. We supports two modes:
- Random choice, where each write is directed to a storage account randomly, for which you can just specficy a list of storage accouts.
- Weighted choice, where a set of weights (positive integers) are assigned and each write is directed to a storage account based on the specified weights.
An example of weighted choice is shown below:
- azure:
container: a_container
prefix: a_prefix
blobServiceURL: .storage.microsoft.net
storageAccounts:
- a_storage_account
- b_storage_account
storageAccountWeights: [1, 2]
Random choice and weighed choice are particularly useful for some key scenarios:
- High throughput deployment where the I/O generate by Talaria exceedes the limitation of the stroage accounts.
- When deploying on internal endpoints with multiple VPNs links and you want to split the network traffic across multiple network links.
Hot Data Query with Talaria
If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in Presto Thrift connector.
In the example configuration below we're setting up an s3 + sqs
writer to continously ingest files from an S3 bucket and an "eventlog" table which will be exposed to Presto.
mode: staging
env: staging
domain: "talaria-headless.default.svc.cluster.local"
writers:
grpc:
port: 8080
s3sqs:
region: "ap-southeast-1"
queue: "queue-url"
waitTimeout: 1
retries: 5
readers:
presto:
schema: data
port: 8042
storage:
dir: "/data"
tables:
eventlog:
ttl: 3600 # data is persisted for 1 hour
hashBy: event
sortBy: time
...
Once you have set up Talaria, you'll need to configure Presto to talk to it using the Thrift Connector. You would need to make sure that:
- In the properties file you have configured to talk to Talaria through a kubernetes load balancer.
- Presto can access directly the nodes, without the load balancer.
Once this is done, you should be able to query your data via Presto.
select *
from talaria.data.eventlog
where event = 'table1.update'
limit 1000
Ingesting Files Into Talaria
To ingest existing ORC, CSV or Parquet files from a storage URL (imagine S3 or Azure Blob Storage), use the Talaria File Ingestion Client:
https://github.com/atris/TalariaFileIngestionClient
Quick Start
The easiest way to get started would be using the provided helm chart.
Contributing
We are open to contributions, feel free to submit a pull request and we'll review it as quickly as we can. TalariaDB is maintained by:
- Roman Atachiants
- Yichao Wang
- Chun Rong Phang
- Ankit Kumar Sinha
- Atri Sharma
- Qiao Wei
- Oscar Cassetti
- Manoj Babu Katragadda
- Jeffrey Lean
License
TalariaDB is licensed under the MIT License.