Home

Awesome

Fivetran Utilities for dbt

⚠️ Warning! ⚠️

You are viewing a deprecated branch, which is no longer maintained and not recommended for use. This branch remains the default branch to prevent breaking changes to any existing dbt projects that use this package.

To view the current release branch, please refer to the release list.

This package includes macros that are used in Fivetran's dbt packages.

Macros

_get_utils_namespaces (source)

This macro allows for namespacing macros throughout a dbt project. The macro currently consists of two namespaces:


add_pass_through_columns (source)

This macro creates the proper name, datatype, and aliasing for user defined pass through column variable. This macro allows for pass through variables to be more dynamic and allow users to alias custom fields they are bringing in. This macro is typically used within staging models of a fivetran dbt source package to pass through user defined custom fields.

Usage:

{{ fivetran_utils.add_pass_through_columns(base_columns=columns, pass_through_var=var('hubspot__deal_pass_through_columns')) }}

Args:


array_agg (source)

This macro allows for cross database field aggregation. The macro contains the database specific field aggregation function for BigQuery, Snowflake, Redshift, and Postgres. By default a comma , is used as a delimiter in the aggregation.

Usage:

{{ fivetran_utils.array_agg(field_to_agg="teams") }}

Args:


ceiling (source)

This macro allows for cross database use of the ceiling function. The ceiling function returns the smallest integer greater than, or equal to, the specified numeric expression. The ceiling macro is compatible with BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.ceiling(num="target/total_days") }}

Args:


collect_freshness (source)

This macro overrides dbt's default collect_freshness macro that is called when running dbt source snapshot-freshness. It allows you to incorporate model enabling/disabling variables into freshness tests, so that, if a source table does not exist, dbt will not run (and error on) a freshness test on the table. Any package that has a dependency on fivetran_utils will use this version of the macro. If no meta.is_enabled field is provided, the collect_freshness should run exactly like dbt's default version.

Usage:

# in the sources.yml
sources:
  - name: source_name
    freshness:
      warn_after: {count: 84, period: hour}
      error_after: {count: 168, period: hour}
    tables:
      - name: table_that_might_not_exist
        meta:
          is_enabled: "{{ var('package__using_this_table', true) }}"

Args (sorta):


dummy_coalesce_value (source)

This macro creates a dummy coalesce value based on the data type of the field. See below for the respective data type and dummy values:

{{ fivetran_utils.dummy_coalesce_value(column="user_rank") }}

Args:


empty_variable_warning (source)

This macro checks a declared variable and returns an error message if the variable is empty before running the models within the dbt_project.yml file.

Usage:

on-run-start: '{{ fivetran_utils.empty_variable_warning(variable="ticket_field_history_columns", downstream_model="zendesk_ticket_field_history") }}'

Args:


enabled_vars_one_true (source)

This macro references a set of specified boolean variable and returns true if any variable value is equal to true.

Usage:

{{ fivetran_utils.enabled_vars_one_true(vars=["using_department_table", "using_customer_table"]) }}

Args:


enabled_vars (source)

This macro references a set of specified boolean variable and returns false if any variable value is equal to false.

Usage:

{{ fivetran_utils.enabled_vars(vars=["using_department_table", "using_customer_table"]) }}

Args:


fill_pass_through_columns (source)

This macro is used to generate the correct sql for package staging models for user defined pass through columns.

Usage:

{{ fivetran_utils.fill_pass_through_columns(pass_through_variable='hubspot__contact_pass_through_columns') }}

Args:


fill_staging_columns (source)

This macro is used to generate the correct SQL for package staging models. It takes a list of columns that are expected/needed (staging_columns) and compares it with columns in the source (source_columns).

Usage:

select

    {{
        fivetran_utils.fill_staging_columns(
            source_columns=adapter.get_columns_in_relation(ref('stg_twitter_ads__account_history_tmp')),
            staging_columns=get_account_history_columns()
        )
    }}

from source

Args:


first_value (source)

This macro returns the value_expression for the first row in the current window frame with cross db functionality. This macro ignores null values. The default first_value calculation within the macro is the first_value function. The Redshift first_value calculation is the first_value function, with the inclusion of a frame_clause {{ partition_field }} rows unbounded preceding.

Usage:

{{ fivetran_utils.first_value(first_value_field="created_at", partition_field="id", order_by_field="created_at", order="asc") }}

Args:


generate_columns_macro (source)

This macro is used to generate the macro used as an argument within the fill_staging_columns macro which will list all the expected columns within a respective table. The macro output will contain name and datatype; however, you may add an optional argument for alias if you wish to rename the column within the macro.

The macro should be run using dbt's run-operation functionality, as used below. It will print out the macro text, which can be copied and pasted into the relevant macro directory file within the package.

Usage:

dbt run-operation fivetran_utils.generate_columns_macro --args '{"table_name": "promoted_tweet_report", "schema_name": "twitter_ads", "database_name": "dbt-package-testing"}'

Output:

{% macro get_admin_columns() %}

{% set columns = [
    {"name": "email", "datatype": dbt_utils.type_string()},
    {"name": "id", "datatype": dbt_utils.type_string(), "alias": "admin_id"},
    {"name": "job_title", "datatype": dbt_utils.type_string()},
    {"name": "name", "datatype": dbt_utils.type_string()},
    {"name": "_fivetran_deleted", "datatype": "boolean"},
    {"name": "_fivetran_synced", "datatype": dbt_utils.type_timestamp()}
] %}

{{ return(columns) }}

{% endmacro %}

Args:


get_columns_for_macro (source)

This macro returns all column names and datatypes for a specified table within a database and is used as part of the generate_columns_macro.

Usage:

{{ fivetran_utils.get_columns_for_macro(table_name="team", schema_name="my_teams", database_name="my_database") }}

Args:


json_extract (source)

This macro allows for cross database use of the json extract function. The json extract allows the return of data from a json object. The data is returned by the path you provide as the argument. The json_extract macro is compatible with BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.json_extract(string="value", string_path="in_business_hours") }}

Args:


json_parse (source)

This macro allows for cross database use of the json extract function, specifically used to parse and extract a nested value from a json object. The data is returned by the path you provide as the list within the string_path argument. The json_parse macro is compatible with BigQuery, Redshift, Postgres, Snowflake and Databricks.

Usage:

{{ fivetran_utils.json_parse(string="receipt", string_path=["charges","data",0,"balance_transaction","exchange_rate"]) }}

Args:


pivot_json_extract (source)

This macro builds off of the json_extract macro in order to extract a list of fields from a json object and pivot the fields out into columns. The pivot_json_extract macro is compatible with BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.pivot_json_extract(string="json_value", list_of_properties=["field 1", "field 2"]) }}

Args:


max_bool (source)

This macro allows for cross database use of obtaining the max boolean value of a field. This macro recognizes true = 1 and false = 0. The macro will aggregate the boolean_field and return the max boolean value. The max_bool macro is compatible with BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.max_bool(boolean_field="is_breach") }}

Args:


percentile (source)

This macro is used to return the designated percentile of a field with cross db functionality. The percentile function stems from percentile_cont across db's. For Snowflake and Redshift this macro uses the window function opposed to the aggregate for percentile.

Usage:

{{ fivetran_utils.percentile(percentile_field='time_to_close', partition_field='id', percent='0.5') }}

Args:


remove_prefix_from_columns (source)

This macro removes desired prefixes from specified columns. Additionally, a for loop is utilized which allows for adding multiple columns to remove prefixes.

Usage:

{{ fivetran_utils.remove_prefix_from_columns(columns="names", prefix='', exclude=[]) }}

Args:


snowflake_seed_data (source)

This macro is intended to be used when a source table column is a reserved keyword in Snowflake, and Circle CI is throwing a fit. It simply chooses which version of the data to seed (the Snowflake copy should capitalize and put three pairs of quotes around the problematic column).

*Usage:

    # in integration_tests/dbt_project.yml
    vars:
        table_name: "{{ fivetran_utils.snowflake_seed_data(seed_name='user_data') }}"

Args:


seed_data_helper (source)

This macro is intended to be used when a source table column is a reserved keyword in a warehouse, and Circle CI is throwing a fit. It simply chooses which version of the data to seed. Also note the warehouses argument is a list and multiple warehouses may be added based on the number of warehouse specific seed data files you need for integration testing.

*Usage:

    # in integration_tests/dbt_project.yml
    vars:
        table_name: "{{ fivetran_utils.seed_data_helper(seed_name='user_data', warehouses=['snowflake', 'postgres']) }}"

Args:


staging_models_automation (source)

This macro is intended to be used as a run-operation when generating Fivetran dbt source package staging models/macros. This macro will receive user input to create all necessary (bash commands) appended with && so they may all be ran at once. The output of this macro within the CLI will then be copied and pasted as a command to generate the staging models/macros. Usage:

dbt run-operation staging_models_automation --args '{package: asana, source_schema: asana_source, source_database: database-source-name, tables: ["user","tag"]}'

CLI Output:

source dbt_modules/fivetran_utils/columns_setup.sh '../dbt_asana_source' stg_asana dbt-package-testing asana_2 user && 
source dbt_modules/fivetran_utils/columns_setup.sh '../dbt_asana_source' stg_asana dbt-package-testing asana_2 tag

Args:


string_agg (source)

This macro allows for cross database field aggregation and delimiter customization. Supported database specific field aggregation functions include BigQuery, Snowflake, Redshift.

Usage:

{{ fivetran_utils.string_agg(field_to_agg="issues_opened", delimiter='|') }}

Args:


timestamp_add (source)

This macro allows for cross database addition of a timestamp field and a specified datepart and interval for BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.timestamp_add(datepart="day", interval="1", from_timestamp="last_ticket_timestamp") }}

Args:


timestamp_diff (source)

This macro allows for cross database timestamp difference calculation for BigQuery, Redshift, Postgres, and Snowflake.

Usage:

{{ fivetran_utils.timestamp_diff(first_date="first_ticket_timestamp", second_date="last_ticket_timestamp", datepart="day") }}

Args:


union_relations (source)

This macro unions together an array of Relations, even when columns have differing orders in each Relation, and/or some columns are missing from some relations. Any columns exclusive to a subset of these relations will be filled with null where not present. An new column (_dbt_source_relation) is also added to indicate the source for each record.

Usage:

{{ dbt_utils.union_relations(
    relations=[ref('my_model'), source('my_source', 'my_table')],
    exclude=["_loaded_at"]
) }}

Args:


union_data (source)

This macro unions together tables of the same structure so that users can treat data from multiple connectors as the 'source' to a package. Depending on which macros are set, it will either look for schemas of the same name across multiple databases, or schemas with different names in the same database.

If the var with the name of the schema_variable argument is set, the macro will union the table_identifier tables from each respective schema within the target database (or source database if set by a variable). If the var with the name of the database_variable argument is set, the macro will union the table_identifier tables from the source schema in each respective database.

When using this functionality, every _tmp table should use this macro as described below.

Usage:

{{
    fivetran_utils.union_data(
        table_identifier='customer', 
        database_variable='shopify_database', 
        schema_variable='shopify_schema', 
        default_database=target.database,
        default_schema='shopify',
        default_variable='customer_source'
    )
}}

Args:


source_relation (source)

This macro creates a new column that signifies with database/schema a record came from when using the union_data macro above. It should be added to all non-tmp staging models when using the union_data macro.

Usage:

{{ fivetran_utils.source_relation() }}

Bash Scripts

columns_setup.sh (source)

This bash file can be used to setup or update packages to use the fill_staging_columns macro above. The bash script does the following three things:

The usage is as follows, assuming you are executing via a zsh terminal and in a dbt project directory that has already imported this repo as a dependency:

source dbt_modules/fivetran_utils/columns_setup.sh "path/to/directory" file_prefix database_name schema_name table_name

As an example, assuming we are in a dbt project in an adjacent folder to dbt_marketo_source:

source dbt_modules/fivetran_utils/columns_setup.sh "../dbt_marketo_source" stg_marketo "digital-arbor-400" marketo_v3 deleted_program_membership

In that example, it will: