An event-driven architecture is an extremely popular use case for streaming data. Using events produced and consumed asynchronously is a great way to allow microservices to communicate with each other while keeping them decoupled.
However, the events that power these business applications contain data that can provide even more business value downstream in a data lakehouse. And to safely and efficiently ingest this data we can use Kafka Connect with the Iceberg Sink Connector.
Streaming ingestion
Streaming data comes in quickly at tens of thousands of records per second or faster. For maximum efficiency, we need to parallelize the writes. But concurrent writes to data lakehouses can cause exceptions and even corrupt data. Yet another challenge of streaming ingestion is the potential for missed or duplicate data.
Let’s see how the Apache Iceberg connector for Kafka Connect solves these problems and provides safe, efficient ingestion from Kafka topics.
Setting the stage
A while back, I wrote a blog post about using Apache Kafka to build event-driven microservices. In that article, we looked at an application that produced random pizzas because sometimes it’s just too hard to decide what pizza you want. There were services to handle the different types of toppings that might be on a pizza, and each service produced to and/or consumed from a Kafka topic. The in-progress pizza was passed as a payload in each event. This worked great for our application, and the events in the interim topics were helpful if we ever needed to replay a batch of orders or troubleshoot our system.
The events in the final topic, however, have potential analytical value long after the pizza order has been delivered. For example, we might want to track top-producing stores.
Fast-forward to today, and our random pizza business has taken off. We now have over a hundred franchises selling randomly-generated pizzas by the thousands. We need a way to get the event data from that final topic, which contains the completed pizzas, into our data lake. We need to do this soon, as the storage cost of keeping this data in Kafka is getting painful.
The payload in our events contains each randomly-generated pizza in a JSON string format. Here’s a sample of one of our events:
{
"order_id": "af69865f-61a7-4e39-824e-cbdcf4a0c45e",
"store_id": 42,
"sauce": "extra",
"cheese": "three cheese",
"meats": "ham",
"veggies": "onions & pineapple",
"date_ordered": "2024-01-04 12:28:13"
}
We’ll be loading these records into an Iceberg table with these fields.
You can try this demo yourself by cloning this GitHub repository and following the instructions in the README.
Kafka Connect
Kafka Connect is an application that makes it easier to integrate Apache Kafka with external systems, using plugins called connectors. It can act as both a producer (for source connectors) and as a consumer (for sink connectors). Source connectors read from an external system and produce the data to Kafka. Sink connectors consume data from Kafka and write it to an external system.
Since we want to get streaming data from Kafka into Iceberg, we will focus on sink connectors. As mentioned earlier, sink connectors operate as Kafka consumers. And, as with all Kafka consumers, we can effectively have as many instances of a single consumer as there are partitions in our Kafka topic. In Kafka Connect, these consumer instances are called tasks. By having more tasks running, we can consume from Kafka and write to Iceberg in parallel.
The Iceberg Connector
We will use the Iceberg Sink Connector, which is available on the Confluent Hub or on GitHub. This connector is designed for streaming ingestion, with features such as:
- Central timed commits
- Exactly-once delivery semantics
- Multi-table fanout
- Message conversion based on Iceberg table properties
We’ll dive deeper into these features in future articles, but for now you can get an overview in this article by Brian Olsen and Bryan Keller.
Now, let’s get started building our pipeline.
Install the connector
We can configure and launch the Iceberg connector using the Kafka Connect REST API. But before we can do that we must install the connector in our Kafka Connect instance.
To install the connector, we need to either download it from the Confluent Hub or from the GitHub repository. Then unzip it to the location pointed to by Kafka Connect’s plugin.path
configuration. There are other ways to install connectors, including Confluent’s connect_hub CLI, or directly installing it in a docker file, as we are doing in our demo. For more tips on installing connectors check out this helpful video.
Once we’ve installed the Iceberg connector, we can configure and launch it.
Configure the connector
With Kafka Connect, configuration is done via a JSON document. This document contains all the information Kafka Connect needs to access and read from our Kafka cluster and then access and write to our Iceberg warehouse. Let’s look at the completed configuration file and talk about the individual items in more detail.
Note that these configuration values are specific to our demo project and that some variables will change based on things like the storage layer, Iceberg catalog type, and data format of the incoming Kafka records.
{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "6",
"topics": "completed-pizzas",
"iceberg.tables": "rpc.pizzas",
"iceberg.catalog": "demo",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://iceberg-rest:8181",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.s3.endpoint": "http://minio:9000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": 180000,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
If you’re familiar with Kafka Connect, you’ll see that some of these configuration items are standard for all connectors. Others are specific to Apache Iceberg and the Iceberg connector, as denoted by the iceberg.
prefix.
connector.class | The fully-qualified class name of the connector |
---|---|
tasks.max | The maximum number of tasks that will be launched. All the partitions of the Kafka topic (or topics) we are reading will be spread across these tasks. |
topics | The Kafka topic or topics to be read from. |
iceberg.tables | The table, or tables, that we want our Kafka data to be written to. |
iceberg.catalog | The name of the catalog we will connect to. |
iceberg.catalog.type | The type of catalog we will be working with (REST, Hive, Hadoop). |
iceberg.catalog.uri | The location where we can reach that catalog. |
iceberg.catalog.client.region | The region in which our catalog resides. |
iceberg.catalog.s3.endpoint | The endpoint for your S3 storage (or equivalent). |
iceberg.catalog.s3.path-style-access | Whether to use path-style access for S3 buckets. |
iceberg.tables.auto-create-enabled | Whether to auto-create the Iceberg tables when a new connector is launched. |
iceberg.tables.evolve-schema-enabled | Whether to evolve the Iceberg table schema if the Kafka schema changes. |
key.converter | A Java class that Kafka Connect should use to read the Kafka record keys. |
iceberg.control.commit.interval-ms | How often the controller should commit writes from all workers, in milliseconds. Default is 300000. |
value.converter | A Java class that Kafka Connect should use to read the Kafka record values. |
value.converter.schemas.enable | Whether to use a schema associated with the Kafka records. |
Launch the connector
The final step is to post this configuration document to Kafka Connect, which launches the connector and begins sending data to our Iceberg table. The simplest way to do this is to use the Kafka Connect REST API. To make the call simpler, we can save the configuration to a file. We’ll call it pizzas-on-ice.json
.
We can either use POST or PUT to send our configuration to Kafka Connect. PUT has the advantage of being able to update the connector if it has already been posted.
curl -X PUT http://localhost:8083/connectors/pizzas-on-ice/config \
-i -H "Content-Type: application/json" -d @pizzas_on_ice.json
A couple of notes on this API call:
- The host and port we are using – localhost and 8083 – should be replaced by those of your Kafka Connect cluster.
- The string
pizzas-on-ice
is the name we are giving our connector instance. This can be any name, but it is what you will use to inspect, update, pause, or shut down the connector going forward.
Now that we’ve launched our connector let’s check the status to see if it is up and running.
curl -s http://localhost:8083/connectors/pizzas-on-ice/status | jq
By calling the same endpoint but replacing config
with status
, we can check the status of our newly launched connector. The response to this call is a big chunk of JSON on the command line, which is why we’re piping it to jq. (If you weren’t already familiar with jq, you’re welcome!)
This is what our response should look like:
{
"name": "pizzas-on-ice",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
},
...
],
"type": "sink"
}
The task information repeats for all six of the tasks we launched. If any one of them were in a non-running state, this output would let us know.
Now there should be data flowing from our Kafka topic to our Iceberg table. Let’s query the table to confirm this.
Checking our data in Iceberg
First, we’ll perform a sanity check on our Iceberg table using PySpark.
>>> df = spark.table("demo.rpc.pizzas")
>>> df.show(5)
+--------+--------+-------+------------+------------+---------+-------------------+
|store_id|order_id| sauce| cheese| meats| veggies| date_ordered|
+--------+--------+-------+------------+------------+---------+-------------------+
| 1| 1000371|regular| none| pepperoni| olives|2024-01-06 17:13:33|
| 7| 1000372| light| extra| sausage| onions|2024-01-06 17:13:35|
| 23| 1000373| bbq| goat cheese| ham| peppers|2024-01-06 17:13:37|
| 6| 1000374|alfredo|three cheese| anchovies|pineapple|2024-01-06 17:13:39|
| 42| 1000371| none| none| pepperoni| olives|2024-01-06 17:13:41|
+--------+--------+-------+------------+------------+---------+-------------------+
Now we can easily find those top-producing stores.
>>> df.groupBy("store_id").count().sort("count", ascending=False).show(5)
+--------+---------+
|store_id| count|
+--------+---------+
| 1| 85294|
| 37| 82786|
| 19| 77981|
| 7| 74227|
| 82| 73849|
+--------+---------+
Digging a little deeper
We can see that our Kafka event data is being written to Iceberg. So now let’s look at some other interesting details about what the Iceberg Kafka connector is doing under the hood.
One of the challenges of streaming ingestion is its potential to create large numbers of small files. In our example, we have six Kafka Connect workers reading and writing data from six partitions in our Kafka topic. Each of these partitions could be receiving hundreds of records per second. To prevent an explosion of small files, the Iceberg Kafka connector uses a central controller to commit the writes being done by each worker at regular intervals. The length of that interval determines our latency, or how quickly the new data will be available.
We can manage this using the iceberg.control.commit.interval.ms
configuration value. The default is 5 minutes (300000 ms), but in our example, we set it to 3 minutes. If we don’t need to see new data as quickly, we can raise it to a higher value.
Let’s query our Iceberg metadata to see the impact of this setting.
>>> spark.table("demo.rpc.pizzas.history").show(10)
+--------------------+-------------------+-------------------+-------------------+
| made_current_at| snapshot_id| parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-01-17 20:07:...|6132070624874175021| NULL| true|
|2024-01-17 20:10:...|1678576419348871552|6132070624874175021| true|
|2024-01-17 20:13:...|8700917005153727716|1678576419348871552| true|
|2024-01-17 20:16:...|6881818396142418964|8700917005153727716| true|
|2024-01-17 20:19:...|7937257347777140988|6881818396142418964| true|
|2024-01-17 20:22:...|5943366865362558251|7937257347777140988| true|
|2024-01-17 20:25:...|7089307519364594733|5943366865362558251| true|
|2024-01-17 20:28:...|6682824190627074932|7089307519364594733| true|
|2024-01-17 20:31:...| 479230327362088171|6682824190627074932| true|
|2024-01-17 20:34:...|1884450110816643033| 479230327362088171| true|
+--------------------+-------------------+-------------------+-------------------+
From this result we can see there’s a new snapshot created every three minutes, in keeping with the 180000 milliseconds we set for iceberg.control.commit.interval.ms
.
Conclusion
We’ve only touched on a few of the features of the Iceberg Connector for Kafka Connect. But we hope you’ve got a starting point for your own future exploration. The amount of data flowing through streaming frameworks such as Apache Kafka is only growing, and Apache Iceberg will continue to be a great place to store that data to provide long-term value.
Stay tuned for more articles on this powerful combination of technologies.