Home

Awesome

kwack - In-Memory Analytics for Kafka using DuckDB

Build Status

kwack supports in-memory analytics for Kafka data using DuckDB.

Getting Started

Note that kwack requires Java 11 or higher.

To run kwack, download a release, unpack it. Then change to the kwack-${version} directory and run the following to see the command-line options:

$ bin/kwack -h

Usage: kwack [-hV] [-t=<topic>]... [-p=<partition>]... [-b=<broker>]...
             [-m=<ms>] [-F=<config-file>] [-o=<offset>] [-k=<topic=serde>]...
             [-v=<topic=serde>]... [-r=<url>] [-q=<query>] [-a=<attr>]...
             [-d=<db>] [-X=<prop=val>]...
In-Memory Analytics for Kafka using DuckDB.
  -t, --topic=<topic>               Topic(s) to consume from and produce to
  -p, --partition=<partition>       Partition(s)
  -b, --bootstrap-server=<broker>   Bootstrap broker(s) (host:[port])
  -m, --metadata-timeout=<ms>       Metadata (et.al.) request timeout
  -F, --file=<config-file>          Read configuration properties from file
  -o, --offset=<offset>             Offset to start consuming from:
                                      beginning | end |
                                      <value>  (absolute offset) |
                                      -<value> (relative offset from end)
                                      @<value> (timestamp in ms to start at)
                                      Default: beginning
  -k, --key-serde=<topic=serde>     (De)serialize keys using <serde>
  -v, --value-serde=<topic=serde>   (De)serialize values using <serde>
                                    Available serdes:
                                      short | int | long | float |
                                      double | string | json | binary |
                                      avro:<schema|@file> |
                                      json:<schema|@file> |
                                      proto:<schema|@file> |
                                      latest (use latest version in SR) |
                                      <id>   (use schema id from SR)
                                      Default for key:   binary
                                      Default for value: latest
                                    The proto/latest/<id> serde formats can
                                    also take a message type name, e.g.
                                      proto:<schema|@file>;msg:<name>
                                    in case multiple message types exist
  -r, --schema-registry-url=<url>   SR (Schema Registry) URL
  -q, --query=<query>               SQL query to execute. If none is specified,
                                      interactive sqlline mode is used
  -a, --row-attribute=<attr>        Row attribute(s) to show:
                                      none
                                      rowkey (record key)
                                      ksi    (key schema id)
                                      vsi    (value schema id)
                                      top    (topic)
                                      par    (partition)
                                      off    (offset)
                                      ts     (timestamp)
                                      tst    (timestamp type)
                                      epo    (leadership epoch)
                                      hdr    (headers)
                                      Default: rowkey,ksi,vsi,par,off,ts,hdr
  -d, --db=<db>                     DuckDB db, appended to 'jdbc:duckdb:'
                                      Default: :memory:
  -x, --skip-bytes=<bytes>          Extra bytes to skip when deserializing with
                                      an external schema
  -X, --property=<prop=val>         Set configuration property.
  -h, --help                        Show this help message and exit.
  -V, --version                     Print version information and exit.

kwack shares many command-line options with kcat (formerly kafkacat). In addition, a file containing configuration properties can be used. The available configuration properties are listed here.

Simply modify config/kwack.properties to point to an existing Kafka broker and Schema Registry. Then run the following:

# Run with properties file
$ bin/kwack -F config/kwack.properties

Starting kwack is as easy as specifying a Kafka broker, topic, and Schema Registry URL:

$ bin/kwack -b mybroker -t mytopic -r http://schema-registry-url:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

When kwack starts, it will enter interactive mode, where you can enter SQL queries to analyze Kafka data. For non-interactive mode, specify a query on the command line:

$ bin/kwack -b mybroker -t mytopic -r http://schema-registry-url:8081 -q "SELECT * FROM mytopic"

The output of the above command will be in JSON, and so can be piped to other commands like jq.

One can load multiple topics, and then perform a query that joins the resulting tables on a common column:

$ bin/kwack -b mybroker -t mytopic -t mytopic2 -r http://schema-registry-url:8081 -q "SELECT * FROM mytopic JOIN mytopic2 USING (col1)"

One can convert Kafka data into Parquet format by using the COPY commmand in DuckDB:

$ bin/kwack -b mybroker -t mytopic -r http://schema-registry-url:8081 -q "COPY mytopic to 'mytopic.parquet' (FORMAT 'parquet')"

If not using Confluent Schema Registry, one can pass an external schema:

$ bin/kwack -b mybroker -t mytopic -v mytopic=proto:@/path/to/myschema.proto

For a given schema, kwack will create DuckDB columns based on the appropriate Avro, Protobuf, or JSON Schema as follows:

AvroProtobufJSON SchemaDuckDB
booleanbooleanbooleanBOOLEAN
intint32, sint32, sfixed32INTEGER
uint32, fixed32UINTEGER
longint64. sint64, sfixed64integerBIGINT
uint64, fixed64UBIGINT
floatfloatFLOAT
doubledoublenumberDOUBLE
stringstringstringVARCHAR
bytes, fixedbytesBLOB
enumenumenumENUM
recordmessageobjectSTRUCT
arrayrepeatedarrayLIST
mapmapMAP
uniononeofoneOf,anyOfUNION
decimalconfluent.type.DecimalDECIMAL
dategoogle.type.DateDATE
time-millis, time-microsgoogle.type.TimeOfDayTIME
timestamp-millisTIMESTAMP_MS
timestamp-microsTIMESTAMP
timestamp-nanosgoogle.protobuf.TimestampTIMESTAMP_NS
durationgoogle.protobuf.DurationINTERVAL
uuidUUID

For more on how to use kwack, see this blog.