Deleting Records with DBT

DBT is a great tool for managing data transformations. It is built from the ground up to support version control and peer review, it enables and encourages best practices such as automated testing, environment separation and modularity.

One of the most powerful features of DBT is the ability to only transform and load new and updated records. This is implemented through incremental models.

{{ 
config( 
materialized='incremental' 
) 
}}

A comparison of an incremental load vs. a full DBT load of 40 models running on Snowflake. The incremental load has been loading only the new records. Please note that performance will vary in your case.

The above chart shows the performance advantages of an incrementally loaded dbt model vs. a fully refreshed model. This performance benefit translates to real life savings ? (e.g. when using Snowflake) and also allows faster decision making off the back of analytics data. ? We now see why this approach is so appealing to data engineers.

However, it is often required to also update a record after its initial ingestion. For instance an order item was substituted due to unavailability of the original product. In cases like this DBT helps us out, when defining an incremental model we also define a unique key, which is then used to implement an insert and update statements (or merge where supported).

Another approach often taken by data architects is to store every version of the record and then have a set of timestamp columns which address the bi-temporal nature of the data e.g.

In the above example we see the how this data can tell us:

  • What is the total for order_id 1
  • it can also tell us what was the total between 16th and 17th of January of 2020

Great! ?

While the above approaches will help us solve most cases, we recently stumbled upon one that does not quite fit. Our client required a data model which deletes records from a reporting table. The reasons why we needed to implement an actual delete are long and complicated, but it boils down to:

  • Lots of existing code which queries this table and changing this code to incorporate new flags/columns would be difficult and lengthy
  • A bespoke library was in use which generates requests against this table based on a UI tool – changing that would also be tedious!
  • A view would be impractical due to the sheer number of records (billions! and growing ?)

 

As you all know we can only write SELECT statements in our SQL model files, so how can can one delete records with a dbt model if all you can write is a SELECT statement?! Thankfully, the folks at Fishtown Analytics (authors of DBT) have allowed for various hooks that run post model execution and others.

The idea is pretty simple, add a column that tells us that a record is outdated and we need to delete it, then add a post model hook to delete any record with the above flag set!

Our input data contained a JSON message with a top level order and a set of order items belonging to it. When a customer returns an item an updated message is received which contains one fewer records in the order item set as in below example:

You can see that a customer has made one order with two items for a total of 11.50. They then decide to return one of the items:

A new and updated record is sent, but now it has one fewer order items!

It is rather difficult to update a record that is not in your input, so ended up first doing a union of any existing records with the input joining by order_id and setting their deletion flag to true i.e. clear any existing records and then insert the new records (which are now fewer) with their deletion flag set to false.

The post model hook then takes care of cleaning up the outdated records:

 

{{</span>
<pre>config(
materialized='incremental',
unique_key='order_pk',
post_hook='delete from {{this}} where to_delete = true'
)
}}</code>

{% if is_incremental() %}
SELECT
order_pk,
order_id,
sub_id,
type,
total,
version,
true as to_delete,
current_timestamp(4) as ts
FROM {{this}}
where order_id in (select distinct order_id from latest_batch)
UNION
{% endif %}

SELECT
order_pk,
order_id,
sub_id,
type,
total as total,
version as version,
false as to_delete,
current_timestamp(4) as ts
FROM latest_batch

A few points to note from the above code:
We are only doing this if we are indeed performing an incremental load, otherwise this isn’t needed

  • order_pk is a surrogate key which we use to still update existing records
  • latest_batch is the result of a CTE that contains only the latest set of records (with filters for incremental loading etc.)

We complemented with a set of tests! ✅

Ensure no records are left to be deleted:

SELECT 1
FROM {{ ref('ORDERS') }}
where to_delete = true

Ensure there are no records for the same order with different versions:

SELECT order_id, sub_id, count(*) as n_records
FROM {{ ref('ORDERS') }}
GROUP BY order_id, sub_id
HAVING n_records > 1

The End. ?

Share on facebook
Share on twitter
Share on linkedin