Incremental processing

DATA ENGINEERING

Incremental processing is a tried and true approach to improving data transformation performance and reducing cost. The basic idea is that you avoid reprocessing source data multiple times by having each job process only new or changed rows. This recipe shows how to implement incremental processing with Apache Iceberg tables.

Typically, this pattern has been achieved using a shared control table to keep track of the most recent source record processed with a high watermark on a data column. The implementation using Iceberg tables improves upon this idea by taking advantage of immutable snapshot lineage for a data-independent way to find unprocessed records.

This recipe also shows how to track the high watermark in snapshot metadata. One of the advantages of this approach is that processing state is stored with the data. This avoids a number of operational issues from correctness to migrations. While these examples are shown using Apache Spark, this pattern can be implemented in any Iceberg-compatible environment.

Incremental reads with snapshot watermarks

This example will enhance the idempotent log message deduplication example from the MERGE recipe. That recipe used a SQL MERGE to add new rows to a logs table.

MERGE INTO logs AS t
USING log_source AS s
 ON t.log_id = s.log_id
WHEN NOT MATCHED THEN INSERT *

This recipe will produce the log_source data as a view. The raw, potentially duplicated log records come from the raw.log_source table and the target table is final.logs.

Retrieve source table watermark

To keep track of what has already been processed, this pattern stores the last processed snapshot ID as a high watermark. This is the snapshot ID that was current in the source table the last time the incremental job ran. The rows that need to be processed in the current run are rows written in newer snapshots.

The last processed snapshot ID is stored in the final table’s snapshot summary metadata. This same mechanism can be used to track the high watermarks for multiple source tables, so it is recommended to include the source database and table as part of the property key.

// load the logs table (this is the target table that has been deduplicated)
Table logsTable = Spark3Util.loadIcebergTable(spark, "final.logs");
// Retrieve last processed snapshot ID from the table
String lastProcessedId = logsTable.currentSnapshot()
    .summary()
    .get("watermark:raw.log_source");

The next piece of information needed is the source table’s current snapshot. The two snapshots define the range of commits that need to be processed from the source table.

// load the source table, log_source
Table logSourceTable = Spark3Util.loadIcebergTable(spark, "raw.log_source");
String toProcessId = logSourceTable.currentSnapshot().snapshotId().toString()

Note: These code examples use Spark3Util.loadIcebergTable to load the Iceberg tables. This is a helper method that uses the catalogs already defined in your Spark configuration.

Create a view containing unprocessed logs

To load the data between the last processed snapshot and the source table’s current snapshot, use the Spark DataFrame reader. This interface allows you to pass additional options to Iceberg. In this case, passing start-snapshot-id and end-snapshot-id tells Iceberg to produce just the incremental view. That is, just the rows that were appended in that range, after the start snapshot (exclusive) through the end snapshot (inclusive).

It is important to set the end-snapshot-id, to guarantee that the high watermark you set at job completion will be accurate, even if additional updates are made to the raw.log_source table during processing (those will be picked up on a subsequent run).

// do the incremental read from the source table
Dataset<Row> newLogs = spark.read()
    .format("iceberg")
    .option("start-snapshot-id", lastProcessedId)
    .option("end-snapshot-id", toProcessId)
    .table("raw.log_source");
// create a temp view from the result so we can use it in the MERGE INTO sql
newLogs.createOrReplaceTempView("log_source");

The last line in this code sample calls createOrReplaceTempView to register the incremental DataFrame under the name log_source for SQL. This makes the incremental data available to the MERGE command in the next step.

Note: This incremental read syntax only supports reading data from append commits. Replacements from compaction are ignored and other concurrent commits like deletes can be optionally skipped. See the Iceberg documentation for ways to consume incremental changes from tables that are not fact tables.

Update the target table and high watermark atomically

Once the job has successfully processed the incremental changes from the source table, the watermark on the target table can be updated to the end-snapshot-id that was used for the incremental read. This will signal to subsequent job runs that all of the data from the source table up through that snapshot has already been loaded.

Storing the last processed snapshot ID in the snapshot summary sets it atomically at the same time as the commit to the target table. This is important to ensure that the data and the watermark are never out of sync, which could easily happen when using a separate control table.

To set the watermark property in the result of a SQL command, use CommitMetadata.withCommitProperties to wrap the SQL command. Properties can also be set when writing with the DataFrame writer APIs that allow passing custom properties to the write directly.

// Update the target table and set the watermark in the same commit
CommitMetadata.withCommitProperties(
    Map.of("watermark:raw.log_source", endSnapshotId),
    () -> {
        spark.sql("MERGE INTO ...");
        return 0;
     }, RuntimeException.class);

Rollback & replay

There is a hidden operational bonus that comes along with using the snapshot summary to store the watermark in this pattern — recovering from an errant code update is trivial. All you have to do is use Iceberg’s rollback ability to return the target table to a known good state, such as the last snapshot prior to the first run of the buggy transformation code.

CALL spark.rollback_to_snapshot('final.logs', <LAST-CORRECT-SNAPSHOT-ID>);

That’s it! Now push an update with corrected code and the next run will automatically start reading from the source data as if the errant code had never run, because rolling back the table also rolls back the watermark state. No backfilling required!

Advanced Topics

Intermediate snapshots

To keep the example simple, the code blocks above assumed that the logs table’s latest snapshot was annotated with the watermark. But compacting or removing old rows would produce snapshots that don’t have a watermark. The solution is to do a little more work to find the last snapshot with a watermark property. Iceberg provides utilities in SnapshotUtil to make this easier.

// Find the last processed ID when the last incremental run isn't the current snapshot
String lastProcessedId = null;
for (Snapshot snap : SnapshotUtil.currentAncestors(logsTable)) {
  lastProcessedId = snap.summary().get("watermark:raw.log_source");
  if (lastProcessedId != null) {
    break;
  }
}

Controlling incremental batch sizes

There is an additional challenge related to incremental processing that isn’t often covered — handling arbitrary input data volumes. For most batch processing workloads that operate on fixed intervals, the amount of data that needs to be processed by each run is consistent. This enables relatively simple allocation of computing resources for the job.

However, when using incremental processing there can be much higher variability in volume. If a job run fails, the next job will likely be responsible for the original set of new data plus whatever data has landed in the meantime. On the one hand this is desirable, as pipelines are “self-healing” and require less manual backfilling for transient job failures. On the other hand, it comes at the cost of making resource allocation more complicated.

Iceberg’s rich snapshot metadata and the incremental read pattern can be combined to build powerful and flexible solutions. For example, starting with the previous watermark, the snapshot lineage and associated commit statistics can be incrementally walked to build source batches of data based on data volumes or number of records. Or those same statistics can be used to dynamically allocate job resources to “right size” them for each job run.