Awesome
dbt-incremental-stream
This dbt package is for Snowflake ❄️ only.
It is reproducing dbt incremental materialization leveraging on Snowflake streams to :
- Improve model performance 💨. Stream accelerate source table scan focusing new data only
- Optimise FinOps 💰. Snowflake warehouse will be started only if stream is not empty.
- Simplify code.
is_incremental()
is generally not required as stream natively collect "new" rows
Installation Instruction
New to dbt packages? Read more about them here.
- Include this package in your
packages.yml
. An example can be found here in the working example project - Run
dbt deps
Usage
Use incremental_stream
materialisation like dbt incremental model :
- Replace
ref
bystream_ref
macro to add a stream on a dbt model - Replace
source
bystream_source
to add a stream on a source incr_stream.get_stream_metadata_columns()
must be included to retreive METADATA columns of eachSTREAMS
. Hence the materialization can deal accordingly with the changes on inputs tables (INSERT
,UPDATE
,DELETE
) but Metadata columns (METADATA$ACTION
,METADATA$ISUPDATE
,METADATA$ROW_ID
) will not be included in the model- Like in dbt incremental model
unique_key
optional parameter is supported. DBT will perform aMERGE
with a unique_key anINSERT
instead. - Like in dbt incremental model
--full-refresh
rebuild the target model based on the source tables - An optional dbt Project Variable can be added to create streams at a specific timestamp using Snowflake Time Travel with Stream. The below example shows how to force to recreate a given model and related streams (using
--full-refresh
option) at given TIMESTAMP using--vars '{TIMESTAMP: 2024-04-13 00:00:00}'
. Date format is :yyyy-mm-dd hh24:mi:ss
dbt run --select 'conso_client_source' --vars '{TIMESTAMP: 2024-04-13 00:00:00}' --full-refresh
[!NOTE]
{{ incr_stream.get_stream_metadata_columns() }}
should not be included if*
is used to get all the columns from the source like in the example below :{{- config( materialized='incremental_stream', unique_key=['ID'] ) select * from incr_stream.stream_source('src', 'src_table') -}}
Basic examples using one table as input
Example using a unique key and a ref
A test model conso_client.sql following this pattern can be found in the integration_tests sub DBT project included in the package.
{{-
config(
materialized='incremental_stream',
unique_key=['column1']
)
select
column1 as c_1,
...,
columnN as c_n
{{ incr_stream.get_stream_metadata_columns() }}
from incr_stream.stream_ref('input_model')
-}}
Example using a unique key and a ref and a APPEND_ONLY stream
A test model conso_client_append_only.sql following this pattern can be found in the integration_tests sub DBT project included in the package.
{{-
config(
materialized='incremental_stream',
unique_key=['column1']
)
select
column1 as c_1,
...,
columnN as c_n
{{ incr_stream.get_stream_metadata_columns() }}
from incr_stream.stream_ref('input_model', stream_type='APPEND_ONLY')
-}}
Example using a unique key and a source
A test model conso_client_source.sql following this pattern can be found in the integration_tests sub DBT project included in the package.
Exemple with source
{{-
config(
materialized='incremental_stream',
unique_key=['column1']
)
select
column1 as c_1,
...,
columnN as c_n
{{ incr_stream.get_stream_metadata_columns() }}
from incr_stream.stream_source('src', 'src_table')
-}}
Example not using a unique key
Can be found in this test model: conso_client_insert
Example performing UNION on multiple tables
A test model conso_client_multiple_streams.sql implementing this pattern can be found in the integration_tests sub DBT project included in the package.
Macros
stream_ref
and stream_source
macro include created streams in the compiled SQL code of the model.
- Stream created in
{{this}}.database
and{{this}}.schema
- Stream name follows this naming convention :
S_{{this.name}}_{{source_table_name}}
get_stream_metadata_columns
is used to easily retreive metadata columns from the stream (METADATA$ACTION
,METADATA$ISUPDATE
,METADATA$ROW_ID
)
[!NOTE] When using
--full-refresh
flag, macros returnref
andsource
(not streams) to perform a complete rebuild.
macro | description |
---|---|
stream_ref (source) | Replace ref by the stream name (if not --full-refresh flag) |
stream_source (source) | Replace source by the stream name (if not --full-refresh flag) |
[!NOTE] An optional argument
stream_type
can be defined for both macros to specify the Snowflake stream type.
Materialization
Incremental materialization will perform the following tasks :
- Create
{{this}}
tableIF NOT EXISTS
- Create required stream
IF NOT EXISTS
CREATE OR REPLACE
a view to perform the transformation developped in the model- If stream is empty materialization stops and prevent Snowflake Warehouse
RESUME
MERGE
the view in{{this}}
table based onunique_key
provided
Working Example
integration_test contains a working example with models and a test:
- To validate
incremental_stream
andincremental
materialization produce the same output - To do performance test
model | description |
---|---|
add_clients (source) and add_clients_ (source) | Python 🐍 incremental models adding new random clients. To simulate a stream source like a Kafka topic |
conso_client (source) | incremental_stream model de-duplicating clients on ID |
conso_client_incr (source) | standard dbt incremental model to compare result and performance |
And more to explore in sub DBT Project integration_tests
# 1. Add 30 new random clients to ADD_CLIENTS table
# 2. Merge it in CONSO_CLIENT and CONSO_CLIENT_INCR
# 3. Test CONSO_CLIENT and CONSO_CLIENT_INCR equals
cd ./integration_test
dbt build
add_clients model
Python 🐍 incremental model :
- Creating a
ADD_CLIENTS
table if not exists in aSTG
schema - And adding random clients using Faker library.
ADD_CLIENTS Table
ID | FIRST_NAME | LAST_NAME | BIRTHDATE | LOADED_AT |
---|---|---|---|---|
1 | Amy | Patton | 1985-10-08 | 2023-04-28 15:01:10 |
2 | Eric | Ferguson | 1973-09-15 | 2023-04-28 15:01:10 |
3 | Amy | Weston | 1985-10-08 | 2023-04-28 15:01:10 |
[!NOTE]
nb_clients
variable defines the number of random clients to add (default 30)
Sample commands
# Add 30 random clients in add_clients model (create table if needed)
dbt run --select add_clients
# Add 100 random clients in add_clients model (create table if needed)
dbt run --select add_clients --vars '{nb_clients: 100}'
# Rebuild ADD_CLIENTS table with 30 new random clients
dbt run --select add_clients --full-refresh
conso_client model
A sample model leveraging on incremental_stream
custom materialization
- Collecting lastest data ingested in add_clients
- De-duplicating it based on
ID
with most recentLOADED_AT
from add_clients stream or table MERGE
data inCONSO_CLIENTS
table withID
as unique key
Sample commands
# Merge last clients inserted in add_clients model
dbt run --select conso_clients
# Drop CONSO_CLIENTS table and rebuild it based on ADD_CLIENTS table
dbt run --select conso_clients --full-refresh
Limitations
Current known limitations :
- When a
stream_ref
or astream_source
table is dropped and recreated stream must be dropped. If not you will observe the following error
091901 (01000): Base table 'DB_NAME.SCH_NAME.XXX_TABLE' dropped, cannot read from stream 'S_YYY_XXX_TABLE' in line 1 position 21
on_schema_change
parameter might work in certain conditions but is not currently supported
Credits
Thanks to jeremiahhansen ❄️ for the inspiring implementation (done in 2020) of streams & tasks on Snowflake