Home

Awesome

dbt-audit-helper

Useful macros when performing data audits

Contents

Installation instructions

New to dbt packages? Read more about them here.

  1. Include this package in your packages.yml file β€” check here for the latest version number.
  2. Run dbt deps to install the package.

Compare Data Outputs

compare_and_classify_query_results (source)

Generates a row-by-row comparison of two queries, as well as summary stats of added, removed, identical and modified records. This prevents you from having to query your comparison tables multiple times to get raw data and summary data.

Output

order_idorder_datestatusdbt_audit_in_adbt_audit_in_bdbt_audit_row_statusdbt_audit_num_rows_in_statusdbt_audit_sample_number
12024-01-01completedTrueTrueidentical11
22024-01-02completedTrueFalsemodified21
22024-01-02returnedFalseTruemodified21
32024-01-03completedTrueFalsemodified22
32024-01-03completedFalseTruemodified22
42024-01-04completedFalseTrueadded11

Note that there are 4 rows with the modified status, but dbt_audit_num_rows_in_status says 2. This is because it is counting each primary key only once.

Arguments

Usage


{% set old_query %}
  select
    id as order_id,
    amount,
    customer_id
  from old_database.old_schema.fct_orders
{% endset %}

{% set new_query %}
  select
    order_id,
    amount,
    customer_id
  from {{ ref('fct_orders') }}
{% endset %}

{{ 
  audit_helper.compare_and_classify_query_results(
    old_query, 
    new_query, 
    primary_key_columns=['order_id'], 
    columns=['order_id', 'amount', 'customer_id']
  )
}}

compare_and_classify_relation_rows (source)

A wrapper to compare_which_query_columns_differ, except it takes two Relations (instead of two queries).

Each relation must have the same columns with the same names, but they do not have to be in the same order.

Arguments

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_and_classify_relation_rows(
    a_relation = old_relation,
    b_relation = dbt_relation,
    primary_key_columns = ["order_id"],
    columns = None
) }}

quick_are_queries_identical (source)

On supported adapters (currently Snowflake and BigQuery), take a hash of all rows in two queries and compare them.

This can be calculated relatively quickly compared to other macros in this package and can efficiently provide reassurance that a refactor introduced no changes.

Output

are_tables_identical
true

Arguments

Usage


{% set old_query %}
    select * from old_database.old_schema.dim_product
{% endset %}

{% set new_query %}
    select * from {{ ref('dim_product') }}
{% endset %}

{{ audit_helper.quick_are_queries_identical(
    query_a = old_query,
    query_b = new_query,
    columns=['order_id', 'amount', 'customer_id']
  ) 
}}

quick_are_relations_identical (source)

A wrapper to quick_are_queries_identical, except it takes two Relations (instead of two queries).

Each relation must have the same columns with the same names, but they do not have to be in the same order. Build long lists with a few exclusions with dbt_utils.get_filtered_columns_in_relation, or pass None and the macro will find all intersecting columns automatically.

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.quick_are_relations_identical(
    a_relation = old_relation,
    b_relation = dbt_relation,
    columns = None
) }}

compare_row_counts (source)

This macro does a simple comparison of the row counts in two relations.

Output

Calling this macro on two different relations will return a very simple table comparing the row counts in each relation.

relation_nametotal_records
target_database.target_schema.my_a_relation34,231
target_database.target_schema.my_b_relation24,789

Arguments

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_row_counts(
    a_relation = old_relation,
    b_relation = dbt_relation
) }}

Compare Columns

compare_which_query_columns_differ (source)

This macro generates SQL that can be used to detect which columns returned by two queries contain any value level changes.

It does not return the magnitude of the change, only whether or not a difference has occurred. Only records that exist in both queries (as determined by the primary key) are considered.

Output

The generated query returns whether or not each column has any differences:

column_namehas_difference
order_idFalse
customer_idFalse
order_dateTrue
statusFalse
amountTrue

Arguments

compare_which_relation_columns_differ (source)

A wrapper to compare_which_query_columns_differ, except it takes two Relations (instead of two queries).

Each relation must have the same columns with the same names, but they do not have to be in the same order. Build long lists with a few exclusions with dbt_utils.get_filtered_columns_in_relation, or pass None and the macro will find all intersecting columns automatically.

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_which_relation_columns_differ(
    a_relation = old_relation,
    b_relation = dbt_relation,
    primary_key_columns = ["order_id"],
    columns = None
) }}


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{% set columns = dbt_utils.get_filtered_columns_in_relation(old_relation, exclude=["loaded_at"]) %}

{{ audit_helper.compare_which_relation_columns_differ(
    a_relation = old_relation,
    b_relation = dbt_relation,
    primary_key_columns = ["order_id"],
    columns = columns
) }}

compare_column_values (source)

This macro generates SQL that can be used to compare a column's values across two queries. This macro is useful when you've used the compare_which_query_columns_differ macro to identify a column with differing values and want to understand how many discrepancies are caused by that column.

Output

The generated query returns a summary of the count of rows where the column's values:

match_statuscountpercent_of_total
βœ…: perfect match37,72179.03
βœ…: both are null5,78912.13
🀷: missing from a50.01
🀷: missing from b200.04
🀷: value is null in a only590.12
🀷: value is null in b only730.15
❌: ‍values do not match4,0648.51

Arguments

Usage


{% set old_query %}
    select * from old_database.old_schema.dim_product
    where is_latest
{% endset %}

{% set new_query %}
    select * from {{ ref('dim_product') }}
{% endset %}

{{ audit_helper.compare_column_values(
    a_query = old_query,
    b_query = new_query,
    primary_key = "product_id",
    column_to_compare = "status"
) }}

compare_all_columns (source)

Similar to compare_column_values, except it can be used to compare all columns' values across two relations. This macro is useful when you've used the compare_queries macro and found that a significant number of your records don't match and want to understand how many discrepancies are caused by each column.

Output

By default, the generated query returns a summary of the count of rows where the each column's values:

column_nameperfect_matchnull_in_anull_in_bmissing_from_amissing_from_bconflicting_values
order_id1000000
order_date200008
order_status644000

Setting the summarize argument to false lets you check the match status of a specific column value of a specifc row:

primary_keycolumn_nameperfect_matchnull_in_anull_in_bmissing_from_amissing_from_bconflicting_values
1order_idtruefalsefalsefalsefalsefalse
1order_datefalsefalsefalsefalsefalsetrue
1order_statusfalsetruetruefalsefalsefalse
........................

Arguments

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_all_columns(
    a_relation = old_relation,
    b_relation = dbt_relation,
    primary_key = "order_id"
) }}

compare_relation_columns (source)

This macro generates SQL that can be used to compare the schema (ordinal position and data types of columns) of two relations. This is especially useful when:

Output

column_namea_ordinal_positionb_ordinal_positiona_data_typeb_data_typehas_ordinal_position_matchhas_data_type_matchin_a_onlyin_b_onlyin_both
order_id11integerintegerTrueTrueFalseFalseTrue
customer_id22integerintegerTrueTrueFalseFalseTrue
order_date33timestampdateTrueFalseFalseFalseTrue
status45character varyingcharacter varyingFalseTrueFalseFalseTrue
amount54bigintbigintFalseTrueFalseFalseTrue

Note: For adapters other than BigQuery, Postgres, Redshift, and Snowflake, the ordinal position is inferred based on the response from dbt Core's adapter.get_columns_in_relation(), as opposed to being loaded from the information schema.

Arguments

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_relation_columns(
    a_relation=old_relation,
    b_relation=dbt_relation
) }}

Advanced Usage

Print Output To Logs

You may want to print the output of the query generated by an audit helper macro to your logc (instead of previewing the results).

To do so, you can alternatively store the results of your query and print it to the logs.

For example, using the compare_column_values macro:

{% set old_query %}
    select * from old_database.old_schema.dim_product
    where is_latest
{% endset %}

{% set new_query %}
    select * from {{ ref('dim_product') }}
{% endset %}

{% set audit_query = audit_helper.compare_column_values(
    a_query = old_query,
    b_query = new_query,
    primary_key = "product_id",
    column_to_compare = "status"
) %}

{% set audit_results = run_query(audit_query) %}

{% if execute %}
{% do audit_results.print_table() %}
{% endif %}

The .print_table() function is not compatible with dbt Cloud, so an adjustment needs to be made in order to print the results. Add the following code to a new macro file:

{% macro print_audit_output() %}
{%- set columns_to_compare=adapter.get_columns_in_relation(ref('fct_orders'))  -%}

{% set old_etl_relation_query %}
    select * from public.dim_product
{% endset %}

{% set new_etl_relation_query %}
    select * from {{ ref('fct_orders') }}
{% endset %}

{% if execute %}
    {% for column in columns_to_compare %}
        {{ log('Comparing column "' ~ column.name ~'"', info=True) }}
        {% set audit_query = audit_helper.compare_column_values(
                a_query=old_etl_relation_query,
                b_query=new_etl_relation_query,
                primary_key="order_id",
                column_to_compare=column.name
        ) %}

        {% set audit_results = run_query(audit_query) %}

        {% do log(audit_results.column_names, info=True) %}
            {% for row in audit_results.rows %}
                  {% do log(row.values(), info=True) %}
            {% endfor %}
    {% endfor %}
{% endif %}

{% endmacro %}

To run the macro, execute dbt run-operation print_audit_output() in the command bar.

Use Output For Custom Singular Test

If desired, you can use the audit helper macros to add a dbt test to your project to protect against unwanted changes to your data outputs.

For example, using the compare_all_columns macro, you could set up a test that will fail if any column values do not match.

Users can configure what exactly constitutes a value match or failure. If there is a test failure, results can be inspected in the warehouse. The primary key and the column name can be included in the test output that gets written to the warehouse. This enables the user to join test results to relevant tables in your dev or prod schema to investigate the error.

Note: this test should only be used on (and will only work on) models that have a primary key that is reliably unique and not_null. Generic dbt tests should be used to ensure the model being tested meets the requirements of unique and not_null.

To create a test for the stg_customers model, create a custom test in the tests subdirectory of your dbt project that looks like this:

{{ 
  audit_helper.compare_all_columns(
    a_relation=ref('stg_customers'), -- in a test, this ref will compile as your dev or PR schema.
    b_relation=api.Relation.create(database='dbt_db', schema='analytics_prod', identifier='stg_customers'), -- you can explicitly write a relation to select your production schema, or any other db/schema/table you'd like to use for comparison testing.
    exclude_columns=['updated_at'], 
    primary_key='id'
  ) 
}}
where not perfect_match

The where not perfect_match statement is an example of a filter you can apply to define whatconstitutes a test failure. The test will fail if any rows don't meet the requirement of a perfect match. Failures would include:

If you'd like the test to only fail when there are conflicting values, you could configure it like this:

{{ 
  audit_helper.compare_all_columns(
    a_relation=ref('stg_customers'), 
    b_relation=api.Relation.create(database='dbt_db', schema='analytics_prod', identifier='stg_customers'),
    primary_key='id'
  ) 
}}
where conflicting_values

If you want to create test results that include columns from the model itself for easier inspection, that can be written into the test:

{{ 
  audit_helper.compare_all_columns(
    a_relation=ref('stg_customers'),
    b_relation=api.Relation.create(database='dbt_db', schema='analytics_prod', identifier='stg_customers'), 
    exclude_columns=['updated_at'], 
    primary_key='id'
  ) 
}}
left join {{ ref('stg_customers') }} using(id)

This structure also allows for the test to group or filter by any attribute in the model or in the macro's output as part of the test, for example:

with base_test_cte as (
  {{ 
    audit_helper.compare_all_columns(
      a_relation=ref('stg_customers'),
      b_relation=api.Relation.create(database='dbt_db', schema='analytics_prod', identifier='stg_customers'), 
      exclude_columns=['updated_at'], 
      primary_key='id'
    ) 
  }}
  left join {{ ref('stg_customers') }} using(id)
  where conflicting_values
)
select
  status, -- assume there's a "status" column in stg_customers
  count(distinct case when conflicting_values then id end) as conflicting_values
from base_test_cte
group by 1

You can write a compare_all_columns test on individual table; and the test will be run as part of a full test suite run - dbt test --select stg_customers.

If you want to store results in the warehouse for further analysis, add the --store-failures flag.

Legacy Macros

compare_queries (source)

[!TIP] Consider compare_and_classify_query_results instead

This macro generates SQL that can be used to do a row-by-row comparison of two queries. This macro is particularly useful when you want to check that a refactored model (or a model that you are moving over from a legacy system) are identical. compare_queries provides flexibility when:

Output

By default, the generated query returns a summary of the count of rows that are unique to a, unique to b, and identical:

in_ain_bcountpercent_of_total
TrueTrue687099.74
TrueFalse90.13
FalseTrue90.13

Setting the summarize argument to false lets you check which rows do not match between relations:

order_idorder_datestatusin_ain_b
12018-01-01completedTrueFalse
12018-01-01returnedFalseTrue
22018-01-02completedTrueFalse
22018-01-02returnedFalseTrue

Arguments

Usage


{% set old_query %}
  select
    id as order_id,
    amount,
    customer_id
  from old_database.old_schema.fct_orders
{% endset %}

{% set new_query %}
  select
    order_id,
    amount,
    customer_id
  from {{ ref('fct_orders') }}
{% endset %}

{{ audit_helper.compare_queries(
    a_query = old_query,
    b_query = new_query,
    primary_key = "order_id"
) }}

compare_relations (source)

[!TIP] Consider compare_and_classify_relation_rows instead

A wrapper to compare_queries, except it takes two Relations (instead of two queries).

Each relation must have the same columns with the same names, but they do not have to be in the same order. Use exclude_columns if some columns only exist in one relation.

Arguments

Usage


{% set old_relation = adapter.get_relation(
      database = "old_database",
      schema = "old_schema",
      identifier = "fct_orders"
) -%}

{% set dbt_relation = ref('fct_orders') %}

{{ audit_helper.compare_relations(
    a_relation = old_relation,
    b_relation = dbt_relation,
    exclude_columns = ["loaded_at"],
    primary_key = "order_id"
) }}

Internal Macros

Macros prefixed with an _ (such as those in the utils/ subdirectory) are for private use. They are not documented or contracted and can change without notice.