Postgres CDC Solution with Debezium and Google Pub/Sub

If you have ever felt the need to track modifications happening in your database whether it be the creation of a new table, deletion of a record or a schema change, a Change Data Capture (CDC) solution might be a good fit for you. As the name suggests, CDC is a process that captures changes happening on a database which is then often ingested into a destination, ensuring correct data replication.

One tool that could help with your CDC setup is Debezium.

debezium-cdcIn their own words:

“Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases.” – Debezium

Now that we have a tool to generate messages based on changes in our database, we need one to consume these messages. Traditionally, we would use Kafka for this purpose, which is no surprise since Debezium was built on top of Apache Kafka. However, as we love a good challenge at Infinite Lambda, we decided to try out Google Cloud Pub/Sub as our messaging service, and SPOILER ALERT!, we made it work.

Before we jump right into the final solution and the steps leading to the working solution, it is important to understand the 2 main ideas that we tried out and how we came to our final decision.

Easier Maintainability vs More Customisation

CDC debezium diagram

Now let us dive in and analyse our 2 options in depth.

Option 1

Our first option was leveraging the Google Pub/Sub service, Cloud Function and BigQuery. But wait! It seemed like we needed to introduce an extra step where dbt entered the equation. Let me explain why.

In this scenario, Debezium would stream everything into Pub/Sub and it would trigger Cloud Function every time there was a new message. Next, the Cloud Function would extract the primary keys and table names from the payload and load it into the corresponding BigQuery table. Still at this stage we would only have the raw data, which means we needed a tool that could extract it into nice, usable tables. Here comes dbt to the rescue.

We could not imagine a better tool for this kind of task because it would allow us to process our raw data in any way we wanted. Nevertheless, you should keep in mind that at every schema change or new table, your dbt code would need to be updated.

Again, this would not be a serious drawback when implementing some kind of notification into this flow if you have a dedicated data team to monitor this and update dbt whenever necessary. Just keep in mind that as Cloud Function would be triggered for every event, you would need more hands to maintain.

Option 2

As you can see above, option 2 focuses on simplification. Here, we would leverage Fivetran, another amazing tool that would make our lives much easier.

We are going to explore how to set up a Fivetran data connector a little bit further in this article but here is the gist:

You only need to set the source endpoint (in this case, this is the HTTP endpoint of our Google Cloud Function) and set the destination as a BigQuery dataset. Finally, you set up a schedule and when it gets triggered it will call our Cloud Function. From the response Fivetran will do the necessary modifications on the target tables without any maintenance work from your side.

What was our choice?

dbt-fivetran-cdc

We went with Option 2 and this was not a particularly difficult decision. We just love how easy it is to set up Fivetran in the end but we also appreciate it that we will not need to worry about schema changes or a new table creation.

Let us now explore how to set up a CDC solution with PostgreSQL -> Debezium -> Google Cloud with Fivetran.

The road to BigQuery

As discussed above, we decided to use Google’s Pub/Sub service on the Google Cloud Platform as our messaging service to handle the change data captured by Debezium. Main reason why we decided to deviate from the recommended Kafka setup was that Pub/Sub is a fully managed service by Google so we would not lose any sleep over machines, parameter fine tuning or cluster setup. In Pub/Sub you have topics, which are resources where publishers can send messages, and subscribers, which are applications that can subscribe to a topic to consume data.

Postgres and Debezium deployment

Let us start with the source database which, in our case, is a PostgreSQL database. Since neither database is configured to stream these CDC events to Debezium, you have 2 options to make it work.

Option A

Modify your current Postgres database config and add these values:

Option B

This is a good approach if you have just started your project and want to use Debezium from the very beginning because this solution is using the debezium/postgres Docker image. This way you will not have to worry about any PostgreSQL configuration.

Other than the configuration there is nothing tricky about this Postgres Kubernetes deployment. There is a README.md file in the repository where you can see how easy it is to deploy this to a cluster.

Debezium deployment

Here are our insights into the Debezium deployment itself.

To make it simple, we used the debezium/serverDocker image as the base of our custom debezium-server image. I say custom but fear not, we only created our own version because we wanted to copy the debezium-config and the Google Service Account key into the image.

Regarding the configuration, we had a big task ahead of us aiming to make sure that we do not have to take any manual action after we deploy the whole solution (Pub/Sub topics). We are going to elaborate on this in a second.

Here is a quick snapshot of the .properties config file:

Possible Debezium sink options:

​​Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar or Redis (Stream).

As you can see, we specified a debezium.sink which is Pub/Sub and our PostgresConnector as the source. Lines 1 through 12 are fairly simple and almost mirror the default setup. What we added was line 13 where we tell debezium that it should create a snapshot of every new table with data. This also contributes to maintainability because we do not need to update the config or redeploy our Debezium connector every time we create a new table.

cdc-debezium-single-multiple-topic

Single topic vs multiple topic

Early on we realised that with the default configuration for Debezium you need to have a destination topic for each and every table in Pub/Sub. That makes sense, since naturally you would not want to be getting messages from all over the place. This also provides granularity in a sense that consuming messages from each topic could be fine tuned, event based triggers could be set up for one topic and a scheduled job for another.

We realised there was a problem when it transpired that new tables could constantly be added to the database. That is when a single-topic solution came into play.

The single-topic solution allows you to have all change data messages, well, in a single topic. The drawback is that, based on your use case, you might want to ingest data from each source table into a separate destination table. This setup would require you to do some coding afterwards to achieve that.

By default, Debezium will forward messages from each table to Pub/Sub into a corresponding topic. The naming of the Pub/Sub topics should match your full-source table name, which is <server>.<schema>.<table>. In order to deviate from the default setup, you need to modify the Topic rerouting parameters in the .properties file.

  • topic.regex
    Specifies a regex that the transformation applies to each change event record to determine if it should be routed to an appropriate topic.
  • topic.replacement
    Specifies a regex that denotes the destination topic name. The transformation routes each matching record to the topic identified by this regex.

In the example above, we routed every table which is inside the inventory schema to publish messages to the inventory.all Pub/Sub topic. Cool, now we have all messages in a single topic.

Primary key identification

Upon exploring the Debezium message in detail, we realised that even the schema information of the source table is in there next to the before and after state of the record. However, there was no way to identify the primary keys from that message, which we would need in order to replicate the source table correctly.

Diving deeper into documentation and the data itself, it became clear that Pub/Sub contains an Ordering Key attached to it, which is used by Debezium to forward the crucial information of the primary keys. This data is sent out in 2 different ways. The first scenario is when the source table has no primary key, in which case the ordering key only contains the word “default”. In the other scenario, data is sent in JSON format, where the schema.fields list contains the primary keys even in the case of a single or composite primary key.

Cloud functions

Our next task was to consume the messages from the single topic and ingest them into BigQuery. We decided to go with Cloud Functions, which is another Google service that allows you to run your code without any server management.

In our initial setup, we created an event based trigger based Cloud Function, which means our code would be triggered every time a new message was published into our Pub/Sub topic. However, there was one problem with that approach, viz. it was only the message body that was pushed to our Cloud Function and we could not retrieve the ordering key, which, as described above, held the primary key information.

To solve this, we created a HTTP trigger based Cloud Function and modified our code in a way that upon invoking the Cloud Function with the URL endpoint generated upon its creation, we would consume X amount of messages from the Pub/Sub topic’s subscription.

Opting for this type of Cloud Function was also necessary because Fivetran needs an endpoint which can be triggered in a set interval. The only thing left at this stage was to formulate a response out of the Pub/Sub messages for Fivetran to consume.

For the correct response structure, please refer to the Fivetran documentation.

fivetran-cdc

Fivetran

We had successfully delivered our Debezium events to Google Pub/Sub and we had the ideal solution to pull the messages. What is next? Enter Fivetran.

Fivetran has a really nice feature which allows you to use a data connector and run a custom function on any of the 3 main cloud platforms, AWS Lambda, Azure Cloud Functions and Google Cloud Function.

Setting up this connector and a destination for your data is as easy as 1-2-3. The folks over at Fivetran have done a fantastic job with documentation, so we strongly encourage you to check it out:

Finally, after we set up our data connector and our destination on Fivetran we could trigger the previously created Cloud Function which would pull the messages from the Pub/Sub topic and generate a JSON payload to Fivetran in response to the trigger. Fivetran would then process the JSON payload and make the necessary changes on the BigQuery tables.

And voila! You are now ready to set up an end-to-end CDC solution between our PostgreSQL source database and BigQuery on your own. If you have any questions about this process, we would be happy to help, so do not hesitate to reach out.

Find the repo and refer to the deployment instructions on GitHub.

Check out the other articles on the blog for more technical insights.

Share on facebook
Share on twitter
Share on linkedin

More on the topic