This post follows an earlier article on streaming ingestion of event data from sources such as a clickstream, a stock ticker, or, in our case, event-driven microservices into Apache Iceberg tables, using the Iceberg Sink Connector for Kafka Connect. We discussed some of the challenges of streaming ingestion and how the Iceberg Sink Connector helps to solve them.
But there is another use case for streaming ingestion that might be even more common, and that comes with some challenges of its own: mirroring operational database tables by streaming the individual inserts, updates, and deletes using change data capture (CDC).
In this post, we’ll see how to take data from Debezium, a popular CDC framework, and use it to maintain a mirror of the source table in Iceberg.
Debezium and Kafka Connect
The Debezium project provides a set of Kafka Connect-based source connectors. There are options available for many popular databases, such as PostgreSQL, MySQL, and MongoDB. When one of these source connectors is launched in Kafka Connect, every change to the source database will be sent as an event into a Kafka topic. Here’s an example of the value of one of these events:
{
"op": "u",
"payload": {
"before": {
"id": "68e5f78c-f3d6-d894-d68f-40d7b52db0a8",
"firstName": "Stanford",
"lastName": "Legros",
"email": "[email protected]",
"last_updated": "2024-01-05T23:11:57"
},
"after": {
"id": "68e5f78c-f3d6-d894-d68f-40d7b52db0a8",
"firstName": "Stanford",
"lastName": "Bechtelar",
"email": "[email protected]",
"last_updated": "2024-01-25T23:41:38"
}
}
}
Each event will have an “op” code of either “i”, “u”, or “d”. Since this is an update,“u”, it contains both the before and after state of the row. An insert would only have the after state, and a delete would only have the before state.
A Tale of Two Tables
Before we set out to bring this CDC data into Iceberg, let’s talk about what we’re going to do with it. We could use this data to directly update a mirror table and keep it in sync with the source table, but a much more flexible pattern is to keep this data in a separate change log table in Iceberg and then use that change log to update our mirror table. This pattern provides the following benefits:
- Fast append-only writes of CDC data
- A mirror based on individual changes, with configurable freshness
- Accurate record of all state transitions
- Time travel to any point
- Ability to recreate current state at any time
So, that’s our plan. We’ll use Kafka Connect to write Debezium CDC data to our change log table, and we’ll use it to update a mirror table that we’ll create in Iceberg. Let’s dive in.
Our scenario
For our example, we’ll be mirroring customer data from a relational database and bringing it into Iceberg. Since there are already plenty of resources to help you get started with Debezium, we’ll start with the CDC data already in a Kafka topic called customer-cdc
. With that starting point, we have the following tasks:
- Load the data in the
customer-cdc
topic into a table calledcustomer_cdc
. This is our change log table. - Create an Iceberg table called
customer_mirror
with the same schema as our source customer table. - Write the merge query that will update the mirror table with data from the change log table.
Kafka Connect and the Iceberg Connector
In my earlier post, Streaming Event Data to Iceberg with Kafka Connect, I talked about Kafka Connect and the Iceberg Sink Connector, including how to find and install it, so I’ll refer you to that to save some space here. We’ll jump in at the configuration of the connector since it is slightly different in this use case.
Configure the connector
With Kafka Connect, configuration is done via a JSON document. This document contains all the information Kafka Connect needs to access and read from our Kafka cluster and then access and write to our Iceberg warehouse. Let’s look at the completed configuration file and talk about the individual items in more detail.
Note that these configuration values are specific to our demo project and that some variables will change based on details such as the storage layer, Iceberg catalog type, and data format of the incoming Kafka records.
{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "3",
"topics": "cdc-customer",
"iceberg.tables": "db.cdc_customer",
"iceberg.catalog": "demo",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://iceberg-rest:8181",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.s3.endpoint": "http://minio:9000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": 300000,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
If you’re familiar with Kafka Connect, you’ll see that some of these configuration items are standard for all connectors. Others are specific to Apache Iceberg and the Iceberg Connector, as denoted by the iceberg. prefix.
connector.class | The fully-qualified class name of the connector. |
tasks.max | The maximum number of tasks that will be launched. All the partitions of the Kafka topic (or topics) we are reading will be spread across these tasks. |
topics | The Kafka topic or topics to be read from. |
iceberg.tables | The table, or tables, that we want our Kafka data to be written to. |
iceberg.catalog | The name of the catalog we will connect to. |
iceberg.catalog.type | The type of catalog we will be working with (REST, Hive, Hadoop). |
iceberg.catalog.uri | The location where we can reach that catalog. |
iceberg.catalog.client.region | The region in which our catalog resides. |
iceberg.catalog.s3.endpoint | The endpoint for your S3 storage (or equivalent). |
iceberg.catalog.s3.path-style-access | Whether to use path-style access for S3 buckets. |
iceberg.tables.auto-create-enabled | Whether to auto-create the Iceberg tables when a new connector is launched. |
iceberg.tables.evolve-schema-enabled | Whether to evolve the Iceberg table schema if the Kafka schema changes. (Recommend true if auto-create-enabled is true) |
iceberg.control.commit.interval-ms | How often the controller should commit writes from all workers, in milliseconds. Default is 300000. |
key.converter | A Java class that Kafka Connect should use to read the Kafka record keys. |
key.converter.schemas.enable | Whether to use a schema associated with the Kafka record key. |
value.converter | A Java class that Kafka Connect should use to read the Kafka record values. |
value.converter.schemas.enable | Whether to use a schema associated with the Kafka record value. |
Launch the connector
Now we’ll launch the connector using the Kafka Connect REST API. To make the call simpler, we can save the configuration to a file. We’ll call it customer_iceberg.json
.
We can either use POST
or PUT
to send our configuration to Kafka Connect. PUT
has the advantage of being able to update the connector if it has already been posted. Let’s send this configuration data to Kafka Connect using the following curl command.
curl -X PUT http://localhost:8083/connectors/customer-iceberg/config \
-i -H "Content-Type: application/json" -d @customer_iceberg.json
A couple of notes on this API call:
- The host and port we are using – localhost and 8083 – should be replaced by those of your Kafka Connect cluster.
- The string customer-iceberg is the name we are giving our connector instance. This can be any name, but it is what you will use to inspect, update, pause, or shut down the connector going forward.
Now that we’ve launched our connector let’s check the status to see if it is up and running.
curl -s http://localhost:8083/connectors/customer-iceberg/status | jq
By calling the same endpoint but replacing config with status, we can check the status of our newly launched connector. The response to this call is a big chunk of JSON on the command line, which is why we’re piping it to jq. (If you weren’t already familiar with jq, you’re welcome!)
This is what our response should look like:
{
"name": "customer-iceberg",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
},
...
],
"type": "sink"
}
The task information repeats for all three of the tasks we launched. If any one of them were in a non-running state, this output would let us know.
Now there should be data flowing from our Kafka topic to our Iceberg change log table. Let’s query the table to confirm this.
Checking our changelog table in Iceberg
Since we had set db.cdc_customer
for the iceberg.tables
property in our connector configuration, the connector will have created that table for us. Let’s verify that using Spark SQL.
spark-sql ()> SHOW TABLES FROM rpc;
cdc_customer
Time taken: 0.096 seconds, Fetched 1 row(s)
OK. Our table was created but let’s also see if the data is landing successfully.
spark-sql ()> SELECT COUNT(*) FROM rpc.cdc_customer;
597864
Time taken: 0.218 seconds, Fetched 1 row(s)
As it is, the change log table is not very useful. All this table represents is the historical set of changes that have happened to keys. Typically, what users really want to know is what is the latest state of a given key. Therefore, what we really want to do is transform these records so that we end up with a table where we have one row per key.
Create the mirror table
Although the Iceberg Sink Connector will auto-create our change log table, we need to explicitly create a mirror table that looks like the source table in our relational database. Let’s take care of that now.
spark-sql ()> CREATE TABLE rpc.customer_mirror (
> id string,
> firstName string,
> lastName string,
> email string,
> last_updated string)
> USING iceberg;
Time taken: 0.062 seconds
Merging with our mirror table
The change log data is in Iceberg, and now we’re ready to perform a merge into the customer_mirror
table. An important consideration at this point is that we are likely to have multiple updates to individual rows in the merge, so we need to structure the merge query to ensure that the most recent state for each row is reflected in the end result. We do that using common table expressions to select the most recent update for the merge.
Once we have that set of distinct updates, we will use it to perform the merge. Notice how the merge statement handles inserts, updates, and deletes.
spark-sql ()> WITH windowed_changes AS (
> SELECT
> op,
> payload,
> row_number() OVER (
> PARTITION BY payload.after.id
> ORDER BY payload.after.last_updated DESC) AS row_num
> FROM rpc.cdc_customer
> ),
> customer_changes AS (
> SELECT * FROM windowed_changes WHERE row_num = 1
> )
>
> MERGE INTO rpc.customer_mirror a USING customer_changes c
> ON a.id = c.payload.after.id
> WHEN MATCHED AND c.op = 'D' THEN DELETE
> WHEN MATCHED THEN UPDATE
> SET a.firstName = c.payload.after.firstName,
> a.lastName = c.payload.after.lastName,
> a.email = c.payload.after.email,
> a.last_updated = c.payload.after.last_updated
> WHEN NOT MATCHED AND c.op != 'D' THEN
> INSERT (id, firstName, lastName, email, last_updated)
> VALUES (c.payload.after.id, c.payload.after.firstName, c.payload.after.lastName,
> c.payload.after.email, c.payload.after.last_updated);
Time taken: 4.953 seconds
After this statement executes, our mirror table will be an exact replica of the upstream relational table at the point in time of the last commit from the Iceberg Sink Connector.
Conclusion
We now have a mirror of an operational database table in an Iceberg data lake along with a complete change log table that can long outlive the CDC data in Kafka and be used to reproduce the table’s state for any point in time. Now we’ll end with some further considerations and homework for the reader.
The merge query we used to populate the mirror table will need to be run periodically. How often will depend on your specific use case. And unless you are working with a small table or one with infrequent updates, it will make sense to do this merge processing incrementally. Otherwise, you’d be reprocessing all of the change log data from the beginning. While we don’t cover incremental processing in this post, you can learn more about it in the Incremental Processing recipe of the Apache Iceberg Cookbook.
As we noted earlier, the merge query will bring the mirror table up to date as frequently as we run it, but there is a cost to running that merge. Another option is to create a view based on both the mirror table and the change log table to support low latency queries with less frequent merges. Stay tuned for more details in a future post.