Many companies today use different database technologies for their application and their data platform. This creates the challenge of enabling analytics on application data without giving analysts access to the application database. A highly efficient solution is using Change Data Capture (CDC) to sync application data with the data warehouse. Now, we are going to show you how.
This article aims to show you how to create a PostgreSQL to BigQuery sync pipeline using Debezium and Kafka. What is special with this Change Data Capture (CDC) solution is that it allows you to transfer data that exceeds 1 MB in size.
We already explored another solution with such limitation, so If you are looking to process smaller messages, you can also check out our solution using a Debezium server to capture changes in a Postgres database paired with Google Pub/Sub as a broker and Fivetran to ingest the data into BigQuery.
At Infinite Lambda, we have been exploring solutions to sync a Postgres database to BigQuery for our clients at Hivebrite who build all-in-one, easy-to-set-up, highly customisable community portals.
We had already identified Debezium as the right tool to extract change event data from a Postgres database and we needed a way to handle the messages. Redis, Apache Pulsar, Amazon Kinesis and Apache Kafka were some of the sink options available for Debezium Server. We opted for Apache Kafka, which operates similarly to Google Pub/Sub but did not have the 1 MB limitation we needed to get around.
We set out to make a few adjustments to comply with Debezium’s recommendation of using its Kafka Connect Source connector with Kafka rather than Debezium Server.
All of the components in our target stack were open source, and thus we set out to create a reference implementation locally where we could run our tests and find the best settings to cover our needs.
We wanted to have a sync pipeline that would support the following features:
- 1:1 data sync from a list of tables in a source Postgres Database to a BigQuery dataset;
- Schema change sync support from Postgres to BigQuery;
- Support for an arbitrary message size.
We used Kubernetes with the Strimzi Kafka operator to set up Kafka and Kafka Connect, and opted for a custom Debezium PostgreSQL image, pre-configured with CDC support that would allow us to avoid a manual CDC configuration.
After installing the Kafka operator, we spun up Kafka and Kafka Connect with their default values. We only customised the settings to add the Debezium Postgres source and BigQuery sink kafka-connect plugins:
After applying these, we were ready to configure our connectors.
Configuring the PostgreSQL source connector
Here we are using JSON to serialise the data. This is great for testing but using AVRO would be preferred in a production context.
Let us create this connector by issuing kubectl apply -f postgres-connector.yml and watch it start doing its magic:
By default, Debezium will create CDC events structured like this:
As you can see, this is not the same schema as our source table, but a full CDC record: a rather large payload containing our data wrapped inside a metadata structure. This is also what would be picked up by BigQuery, which would end up with something like this:
The data we need can be accessed as an object in the after field (meaning the data after the change), so we have some additional modelling work to do to extract it and maintain the model over time. Fortunately, we have a more efficient approach to ensure a 1:1 representation.
Single Message Transformations (SMT)
Single Message Transformations are a way of applying transformations to the event messages on the fly as they go through the pipeline. In Kafka Connect, you can have SMTs running at source or sink level. Transformers are simply Java classes which you can include as plugins. Debezium provides some specific ones.
In our case, the lines that do the trick are:
- transforms: unwrap
- transforms.unwrap.add.fields: op,table,source.ts_ms
- transforms.unwrap.delete.handling.mode: rewrite
- transforms.unwrap.drop.tombstones: false
- transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
Let us go through them one by one, just as they will be processed:
- transform: unwrap
This tells Kafka Connect that we are starting to declare some transformation, more specifically that we are declaring only one and it is named unwrap. We could have declared more than one using a comma-separated list.
- transform.unwrap.type: io.debezium.transforms.ExtractNewRecordState
Here, we define the transformation class to be used. This is one of the plugins provided by Debezium, which will extract the after value of the original CDC record, so that the actual data will be passed as top-level fields, allowing schema replication on the target sink.
- transform.unwrap.add.fields: op,table,source.ts_ms
Our unwrap transformation will add the provided metadata fields. These will be added with a __ prefix to our final table. These fields might be useful for debugging. Specifically: op = operation (read, create, update, delete), source.ts_ms is the timestamp of the source event.
- transform.unwrap.delete.handling.mode: rewrite
Will rewrite our deletion event to include a __deleted: true flag and replicate all the fields in the schema as null values, to allow proper handling by the sink.
- transform.unwrap.drop.tombstones: false
This indicates whether our transformation should drop tombstone events, which instruct Kafka to drop all records for the key being deleted. This is not really relevant in our case, so we could also drop the records.
This is the resulting schema in the replicated table. The kafka_data field contains Kafka metadata, while kafka_key is required to support upsert operations.
We are now ready to configure our sink connector, so let us move on to the next steps.
Configuring the BigQuery sink connector
To use the BigQuery connector, you will need to create a service account and generate a key file as described here.
The service account will either need the BigQueryEditor primitive IAM role or the bigquery.dataEditor predefined IAM role. The minimum permissions are as follows:
Setting up a secret in Kubernetes is beyond the scope of this article, but the instructions are easy to follow and can be accessed here.
Let us have a look at the configuration:
Most of the properties are pretty self-explanatory and we will focus on the more interesting ones.
- sanitizeTopics: true
Debezium will create topic names using this structure: <db>.<schema>.<table>, and the tables will be created by simply replacing “.” with “_”. BigQuery might not like some of the characters in there, so this option will ensure the topic names are valid for BigQuery in order to avoid errors while creating tables.
- sanitizeFieldNames: true
Similarly to the once above, is makes sure field names play well with BigQuery.
- allowBigQueryRequiredFieldRelaxation: true
This will allow required fields to become nullable, which is necessary in order to support schema evolution (i.e. dropping columns)
- allowNewBigQueryFields: true
If a column does not exist while inserting a record, the sink will create it. This is also required for schema change support.
- allowSchemaUnionization: false
Use the last record in a batch to compute whether updates to the target table are required. To increase success rates, set it to true if you are getting errors.
- upsertEnabled: true
Update events will cause such records to be updated in the target table, maintaining the sync with the source table. This is only possible for tables with a primary key.
- deleteEnabled: true
This will cause deleted records in the source table to be deleted in the target table as well. This will only work for tables with a primary key. Setting to false will enable soft deletes and the __deleted field will be set to true instead.
Although this is a highly efficient setup, we would like to point out some limitations for you to consider.
First of all, enabling CDC on Postgres will cause additional overhead on the server and will take up additional space for the WAL files, so make sure you plan ahead. Also, in cluster settings, you can only enable CDC on the master server, so make sure your failover process covers pointing Debezium to the new master.
Schema change sync is only partially supported: columns can only be added. This means you might need to introduce some changes to your schema evolution process to ensure compatibility.
Only tables with a primary key can be upserted. Tables without primary keys have to be configured manually to allow being synced. If no natural unique key exists, the sync will be append-only, basically generating a log of changes.
We would love to hear your thoughts on this solution, so reach out with any comments at firstname.lastname@example.org
Explore more solutions in the data and cloud space on the Infinite Lambda blog.