...

How to Use Fivetran Upserting to Update Ingested Data

Quang Anh
June 5, 2023
Read: 8 min

Introducing Fivetran and its AWS Lambda connector

If you are a data engineer building your own ELT pipeline, then at some point in your career you must have heard of Fivetran, a data platform that automates your data ingestion process.

Fivetran comes packed with useful features. Most of them are connectors that make ingestion as simple as point and click. Want to get that financial report from the App Store? Click. Want to get data from those Google Sheets? Click click click.

If you have a source that is not supported by Fivetran yet, then you can certainly use their AWS Lambda Connector. You just need to put your script in your AWS Lambda that will handle data extraction, then Fivetran will handle the ingestion into the destination.

With sufficient information, Fivetran can even take care of upserting your data. Fivetran upserting means you are both updating old rows with new values and inserting new rows into a table. Fivetran could also act as an orchestrator that schedules triggers and sends state parameters into AWS Lambda.

State

Why do you need a state? State is an important concept in an ELT process as it tells you where you are so you can decide where you want to be next.

For example, if you are supposed to do daily ingestion and the state tells you your last successful ingestion was on 2023-05-10, you know that this time you should be ingesting data for 2023-05-11:

  • If that is successful, the state is updated, so the next time you run this is for 2023-05-12.
  • If it fails then the state remains unchanged then the next time you run it would be still data of 2023-05-11, as it should.

This is important because oftentimes your pipeline is going to fail due to external reasons, such as a flaky connection or expired credentials. This could be resolved with a retry, which Fivetran does provide after one hour with the original state to make sure that you do not miss any data because of a failed run.

Fivetran input and output

A typical Fivetran input into AWS Lambda could be like the following.

Note that here we introduced a new parameter – secrets – which, as the name suggests, contains the credentials that grant you access to the source. You can configure these values from the Fivetran configuration web UI.

Below is a simple example of an AWS Lambda script in Python that extracts data from source based on the given state, and returns the data with an updated state.

This is a highly simplified example to help us get the main points across. For instance, we assume that we have some records for the input state so it will keep running. In reality the state would be empty for the first run, so bear in mind that this would not run.

We also input secrets for requests headers parameters but in reality this would depend on the source. Additionally, consider that we introduce a new return parameter, hasMore, which tells Fivetran whether to fire another run after this one. In this example it defaults to False, so Fivetran would stop here, then pick it up with the returned state the next scheduled time.

Finally, in practice, your data could be too big to ingest in a single run, which would warrant partitioning into multiple smaller runs. AWS Lambda has a cap size of 6MB for the output payload, which, in most cases, should be large enough for all kinds of daily data. But if you try to ingest data for multiple days (e.g. weekly or monthly), you would need to break it down.

We also omit the parameter ‘schema’, which is optional but later plays an important role in solving our problem.

The setup in Fivetran is simple and straightforward. You just need to know your AWS Lambda Role ARN and name so Fivetran can have access to trigger it. Once you configure all of that, your pipeline would be ready to go.

Problem with data change after ingestion

Now that you know the basics of ingesting data with Fivetran and the AWS Lambda connector, let’s tackle a rare but unique challenge that occurs in your ELT process.

Take a look at the example below where we have 2 fields: Date and DailySale. This is all fairly self explanatory.

Let’s say today is 25 December 2022 and you are ingesting data of the day before, the 24th.

Date DailySale
2022-12-24 6

Then on the 26th, you are ingesting data from the 25th. The new row is appended to the old one.

Date DailySale
2022-12-24 6
2022-12-25 8

This is all good and you are happy with how it is going.

Date DailySale
2022-12-24 6
2022-12-25 8
2022-12-26 5
2022-12-27 8
2022-12-28 1
2022-12-29 1
2022-12-30 9
2022-12-31 5

You take the holiday break only to come back to your manager’s ominous request, “Hey this is weird, our Christmas sales should not be this low. Could you double check?”

So you pull up the source, hoping to be able to find the culprit for the inaccurate data. These are the numbers you see:

Date DailySale
2022-12-24 612
2022-12-25 849

At this point, you rush to check the policy on the source webpage only to find this:

“Our data is subject to change within 15 days of the first publication.”

First attempts at solving the problem

“The source is to blame, really,” you tell your manager but they still ask you to fix it. What could you do? Should you just tell them to wait 15 days to get the data? If that were acceptable, you would not be reading this blogpost now, would you?

Usually, the business cannot wait; stakeholders need the data now and they need accurate data.

The latest data is there but it will not get updated for a few days, so what you have is not accurate. Here is what you can do:

You can add another ingestion that extracts finalised data from 15 days before. This ingestion will store data in another table (let’s refer to it as the ‘accurate’ table as opposed to the ‘recent’ table). Once both tables land in your destination, the analytics engineer could merge them into one.

Let’s look at the example above again. If today is 2023-01-02, we will be ingesting data in 2023-01-01 for our ‘recent’ table and then data in 2022-12-18 for our ‘accurate’ table.

In the transformation layer, we will take all data from the ‘accurate’ table, then append with data from 2022-12-19 onward in the ‘recent’ table.

Great. Problem solved. You pat yourself on the back and proceed to show your manager the ingenious solution of the 2 interconnected tables. You guarantee that this way the data in the warehouse will be both accurate and up to date. Management should be able to see the latest data on the dashboard, although for accurate data they might need to wait 15 days.

‘But our Christmas sales are still low!’ your manager points out. You explain this is due to the accurate table not having caught up yet. It is only 2 January and we will need to wait for 6 more days in order to get the correct data. That will not work for management because they need access to the data from the source.

The correct way to solve the problem

At Infinite Lambda, we often find that the best way to help a client is to build what they need. In the case of this challenge, Fivetran already has a built-in feature called Upsert. This means you can either insert the row if it does not exist yet or update the value if the row is already there.

How to tell if a row is already there or not depends on the primary keys and this is declared in the schema part of the AWS return to Fivetran. As a best practice, we recommend always declaring the primary keys because otherwise if by any chance you have a duplicate ingestion, you will ingest data for a specific day more than once.

Fivetran could use the primary key information to prevent having duplicate data and the primary keys in your destination will always be unique. If you do not specify the schema, Fivetran will always append the data in ‘insert’, regardless of whether it is new or not.

In our example in 26 December, we have data from 2 previous days:

Date DailySale
2022-12-24 6
2022-12-25 8

But you know this data is not the end and can still be updated. On the 27th you would still want to ingest data from the 24th. Here, you make sure to let Fivetran know that Date is the primary key.

Date DailySale
2022-12-24 14
2022-12-25 12
2022-12-26 5

And then the 28th, you would ingest data from the 24th again, onward:

Date DailySale
2022-12-24 25
2022-12-25 29
2022-12-26 11
2022-12-27 8

So on and so forth until you reach the 15-day limit starting from the 24th (which makes 8 January 8). You would not need to ingest data from the 24th anymore because it would not change afterwards as you know from the policy.

On Jan 9th you would ingest data from 25 December to 8 January; on 10 January you would ingest from 26 December to 9 January. It is a rolling time window.

What would happen if you accidentally forgot to declare Date as the primary key? Then Fivetran would treat the new row as new and append to the old one instead of update, resulting in duplicates. This means that on the 27th you would end up with 5 rows instead of 3, 2 of them are duplicates.

Date DailySale
2022-12-24 6
2022-12-25 8
2022-12-24 14
2022-12-25 12
2022-12-26 5

Caveats about Fivetran Upserting

Our complete code looks like this:

With this solution we shall be getting fresh new data from the source everyday without having to wait for two weeks. However, there are a few caveats.

Say you need to ingest data from a few years ago. Knowing that the data would never change, would it be best to apply the rolling window method here? Certainly not as it could be quite inefficient in terms of resources to repeatedly ingest that data. Hence, we should only ingest the data in question once and use the rolling window method if the state date is within 15 days (or fewer) of the current date.

Another issue that could come up is that as you are ingesting multiple days worth of data instead of a single day, the daily data size ingested could get quite large and exceed AWS Lambda’s 6 MB output data limit in one run.

So you would need to be able to break down the ingested data into smaller runs too and this is a good time to take advantage of the hasMore output variable that I mentioned earlier so that you can have multiple Lambda runs within the span of one day.

If you are not too fond of this approach and you do not mind handling additional infrastructure, you can always opt for Fivetran’s S3 synchronisation option in the config as a workaround for the size limit.

Finally, all tables are different. Even when they came from the same source, they could have different primary keys, API endpoints and transformation methods. For instance you might encounter a table that places the date value as a column name, so every day the column name would appear differently. That table would certainly need to be pivoted before ingestion so date becomes row values. This is to maintain the schema integrity at the end, as you don’t want to change the schema every day, especially with a primary key like Date.

Some tables would need to be filtered and/or aggregated before ingestion. Some tables depend on the information of another table and in order to manage all of those properties and transformation methods for each table, you would probably need to build a class before you can define each of them. Plus a class could handle all of Fivetran state and output and facilitate the rolling window method.

Code structuring, while closely connected to this topic, will be discussed in a separate article. For now, we hope you have managed to get a good grasp of Fivetran’s AWS Lambda connector and are ready to apply it in your next data engineering project.

Now that you know how to leverage Fivetran upserting, go and optimise data integration for your project.

Visit the Infinite Lambda Blog for more tips and insights from the world of data.

Check out how Infinite Lambda has been leveraging Fivetran on client projects:

More on the topic

Everything we know, we are happy to share. Head to the blog to see how we leverage the tech.

Multitable SCD2 joins
Multitable SCD2 Joins: How to Unify Historical Changes
In the realm of data management, historical changes are conventionally stored in separate Slowly Changing Dimension Type 2 (SCD2) tables. However, extracting point-in-time insights from...
September 25, 2023
VaultSpeed Sapphire Partner
Infinite Lambda Becomes VaultSpeed Sapphire Partner
We are thrilled to share the exciting news that Infinite Lambda is now recognised as a VaultSpeed Sapphire partner. This development is extremely important to...
August 9, 2023
dbt Labs Premier Consulting Partner
Infinite Lambda Becomes dbt Labs Premier Consulting Partner
We are delighted to announce our new Premier Consulting Partner status with dbt Labs, marking a significant step in our successful collaboration. As two pioneers...
August 8, 2023
data contracts explained for non-tech audiences
Data Contracts for Non-Tech Readers: a Restaurant Analogy
Introducing the data kitchen Data contracts have been steadily gaining in popularity within the data community, providing a structured approach to manage and regulate the...
July 17, 2023
dbt Data Quality Tests Implementation
dbt Data Quality Tests Implementation
Data quality definition In this blog post, I am going to show you how to leverage dbt data quality tests to build robust pipelines that...
June 20, 2023
composability in data platforms
Towards Greater Composability in Data Platforms
This post is based on a talk of the same name that the author delivered at Data Mash #7 – London Edition in March 2023....
June 15, 2023

Everything we know, we are happy to share. Head to the blog to see how we leverage the tech.