Home

Awesome

dbt-incremental-stream

This dbt package is for Snowflake ❄️ only.

It is reproducing dbt incremental materialization leveraging on Snowflake streams to :

Installation Instruction

New to dbt packages? Read more about them here.

  1. Include this package in your packages.yml. An example can be found here in the working example project
  2. Run dbt deps

Usage

Use incremental_stream materialisation like dbt incremental model :

dbt run --select 'conso_client_source'  --vars '{TIMESTAMP: 2024-04-13 00:00:00}' --full-refresh

[!NOTE] {{ incr_stream.get_stream_metadata_columns() }} should not be included if * is used to get all the columns from the source like in the example below :

{{-
   config(
       materialized='incremental_stream',
       unique_key=['ID']
   )
   select 
       *
   from incr_stream.stream_source('src', 'src_table')
-}}

Basic examples using one table as input

Example using a unique key and a ref

A test model conso_client.sql following this pattern can be found in the integration_tests sub DBT project included in the package.

{{-
    config(
        materialized='incremental_stream',
        unique_key=['column1']
    )
    select 
        column1 as c_1,
        ...,
        columnN as c_n
        {{ incr_stream.get_stream_metadata_columns() }}
    from incr_stream.stream_ref('input_model')
-}}

Example using a unique key and a ref and a APPEND_ONLY stream

A test model conso_client_append_only.sql following this pattern can be found in the integration_tests sub DBT project included in the package.

{{-
    config(
        materialized='incremental_stream',
        unique_key=['column1']
    )
    select 
        column1 as c_1,
        ...,
        columnN as c_n
        {{ incr_stream.get_stream_metadata_columns() }}
    from incr_stream.stream_ref('input_model', stream_type='APPEND_ONLY')
-}}

Example using a unique key and a source

A test model conso_client_source.sql following this pattern can be found in the integration_tests sub DBT project included in the package.

Exemple with source

{{-
    config(
        materialized='incremental_stream',
        unique_key=['column1']
    )
    select 
        column1 as c_1,

        ...,
        columnN as c_n
        {{ incr_stream.get_stream_metadata_columns() }}
    from incr_stream.stream_source('src', 'src_table')
-}}

Example not using a unique key

Can be found in this test model: conso_client_insert

Example performing UNION on multiple tables

A test model conso_client_multiple_streams.sql implementing this pattern can be found in the integration_tests sub DBT project included in the package.

lineage_streams

Macros

stream_ref and stream_source macro include created streams in the compiled SQL code of the model.

[!NOTE] When using --full-refresh flag, macros return ref and source (not streams) to perform a complete rebuild.

macrodescription
stream_ref (source)Replace ref by the stream name (if not --full-refresh flag)
stream_source (source)Replace source by the stream name (if not --full-refresh flag)

[!NOTE] An optional argument stream_type can be defined for both macros to specify the Snowflake stream type.

Materialization

Incremental materialization will perform the following tasks :

  1. Create {{this}} table IF NOT EXISTS
  2. Create required stream IF NOT EXISTS
  3. CREATE OR REPLACE a view to perform the transformation developped in the model
  4. If stream is empty materialization stops and prevent Snowflake Warehouse RESUME
  5. MERGE the view in {{this}} table based on unique_key provided

Working Example

integration_test contains a working example with models and a test:

modeldescription
add_clients (source) and add_clients_ (source)Python 🐍 incremental models adding new random clients. To simulate a stream source like a Kafka topic
conso_client (source)incremental_stream model de-duplicating clients on ID
conso_client_incr (source)standard dbt incremental model to compare result and performance

And more to explore in sub DBT Project integration_tests

lineage

# 1. Add 30 new random clients to ADD_CLIENTS table 
# 2. Merge it in CONSO_CLIENT and CONSO_CLIENT_INCR
# 3. Test CONSO_CLIENT and CONSO_CLIENT_INCR equals
cd ./integration_test
dbt build

add_clients model

Python 🐍 incremental model :

  1. Creating a ADD_CLIENTS table if not exists in a STG schema
  2. And adding random clients using Faker library.

ADD_CLIENTS Table

IDFIRST_NAMELAST_NAMEBIRTHDATELOADED_AT
1AmyPatton1985-10-082023-04-28 15:01:10
2EricFerguson1973-09-152023-04-28 15:01:10
3AmyWeston1985-10-082023-04-28 15:01:10

[!NOTE] nb_clients variable defines the number of random clients to add (default 30)

Sample commands

# Add 30 random clients in add_clients model (create table if needed)
dbt run --select add_clients

# Add 100 random clients in add_clients model (create table if needed)
dbt run --select add_clients --vars '{nb_clients: 100}'

# Rebuild ADD_CLIENTS table with 30 new random clients
dbt run --select add_clients --full-refresh

conso_client model

A sample model leveraging on incremental_stream custom materialization

  1. Collecting lastest data ingested in add_clients
  2. De-duplicating it based on ID with most recent LOADED_AT from add_clients stream or table
  3. MERGE data in CONSO_CLIENTS table with ID as unique key

Sample commands

# Merge last clients inserted in add_clients model
dbt run --select conso_clients

# Drop CONSO_CLIENTS table and rebuild it based on ADD_CLIENTS table
dbt run --select conso_clients --full-refresh

Limitations

Current known limitations :

 091901 (01000): Base table 'DB_NAME.SCH_NAME.XXX_TABLE' dropped, cannot read from stream 'S_YYY_XXX_TABLE' in line 1 position 21

Credits

Thanks to jeremiahhansen ❄️ for the inspiring implementation (done in 2020) of streams & tasks on Snowflake