Using MERGE

DATA ENGINEERING

Using MERGE

One of the most useful tools that Iceberg enables is the SQL MERGE command. This recipe will show you how to use MERGE to create an idempotent pipeline that applies incoming changes to existing rows in a table.

Why MERGE?

The MERGE command is designed to handle use cases that need to use or modify existing rows in a table.

Prior to Iceberg enabling fine-grained changes to a table with ACID guarantees, most data pipelines writing to Hive tables were built around INSERT. That was because the safest operation in Hive tables is to append new partitions. INSERT works well for fact tables, but building pipelines around INSERT is limiting. Many common use cases don’t fit the pattern, including simple ones like incoming fact data that contains duplicate rows.

MERGE reads data from a source and combines it with existing rows in the destination table, which is called the target. MERGE syntax is declarative, which has two benefits. First, it is powerful but also straightforward because it breaks down changes into readable case statements. Second, it can be optimized by the query engine for much better efficiency than making the same changes with INSERT OVERWRITE. In particular, a MERGE can identify just the files with records that need to be updated and rewrite only those files, while an INSERT OVERWRITE will replace whole partitions.

MERGE command basics

The MERGE command has 4 important parts:

  1. target table where the results of the command are stored; it may be read to find existing rows
  2. A set of source rows that are matched against the target table’s rows
  3. An ON clause that matches source to target rows, just like a join’s ON clause
  4. A list of actions that produce output for a given source row

Here’s an example that merges transfers into a table of bank accounts:

MERGE INTO accounts AS t              -- Target rows (1)
USING transfers AS s                  -- Source rows (2)
 ON t.account_id = s.account_id AND   -- ON clause (3)
    s.ts > ${last_processed_ts}
-- Action cases (4)
WHEN MATCHED THEN
    UPDATE SET t.balance = t.balance + s.amount
WHEN NOT MATCHED THEN
    INSERT (t.account_id, t.balance) VALUES (s.account_id, s.amount)

This example shows two cases. When a transfer matches an existing account, it updates the account balance by the transfer amount. And when a transfer doesn’t match an existing account, it creates a new account with the transferred amount as the balance.

You can add additional WHEN MATCHED or WHEN NOT MATCHED cases by adding additional requirements before the THEN keyword. The first action case that meets the requirements will be used.

In the command above, a transfer to an unmatched account ID creates an account. Similarly, you could use a NULL amount to delete an account:

MERGE INTO accounts AS t              -- Target rows (1)
USING transfers AS s                  -- Source rows (2)
 ON t.account_id = s.account_id AND   -- ON clause (3)
    s.ts > ${last_processed_ts}
-- Action cases (4)
WHEN MATCHED AND s.amount IS NULL THEN
    DELETE
WHEN MATCHED THEN
    UPDATE SET t.balance = t.balance + s.amount
WHEN NOT MATCHED THEN
    INSERT (t.account_id, t.balance) VALUES (s.account_id, s.amount)

Multiple updates for a target row

Engines will fail when there are multiple updates for the same row in the target table. SQL requires that there is at most one update for any target row. Otherwise, the final output is ambiguous because the result could be different depending on the order in which changes are applied. The query failure will look similar to this one, from Trino:

TrinoException: One MERGE target table row matched more than one source row

The solution is to use a subquery or a common table expression (CTE) to ensure there is only one matching source row per target row. For the example above, this CTE aggregates transfers by account ID to ensure there is only one update:

WITH transfers AS (
    SELECT r.account_id, sum(r.amount) AS amount
    FROM raw_transfers AS r WHERE r.ts > ${last_processed_ts}
    GROUP BY r.account_id
  ) 
MERGE INTO accounts AS t              -- Target rows (1)
USING transfers AS s                  -- Source rows (2)
 ON ...

Idempotent writes with MERGE

A common data engineering use case is to make writes idempotent, either by writing only the incoming records that are not already present in the target table (deduplication) or by replacing existing rows with new values from the source row (UPSERT). This example shows how to use MERGE to implement such an idempotent pipeline.

This example uses simple log data, consisting of common log fields: log_id, level, message, event_ts, classname, and line_no. The log ID is a unique ID generated on every call in the logging framework, to avoid accidental deduplication if multiple logs happen to have the same message and timestamp. In this example, deduplication is needed because logs may be sent twice from the running application in some failure cases.

Let’s assume that incremental consumption of the source data is handled elsewhere (as in this recipe) and exposed to the MERGE command as log_source with the same schema as the target table.

Deduplicating messages in a MERGE requires just one action: insert if the log isn’t present.

MERGE INTO logs AS t
USING log_source AS s
 ON t.log_id = s.log_id
WHEN NOT MATCHED THEN INSERT *

To change this to an UPSERT pattern, all you need to do is add a WHEN MATCHED clause that replaces columns with the values from the incoming row.

Degrading performance

Query engines typically do a good job optimizing MERGE commands using techniques such as dynamic partition pruning, rewriting to use an ANTI JOIN, and broadcasting source data. However, there are still cases where performance degrades over time as the target table grows. That is usually because the query engine is unable to filter the target table effectively.

The log example is instructive because it is one of those cases. A query engine might attempt to use dynamic partition pruning by log ID, but because the target table is partitioned by time using hours(event_ts), every file in the table appears to match a random set of IDs.

To avoid or fix situations like this, you can add filters to the ON clause that will be pushed down to the target table. A basic fix in this case is to add a limit for late-arriving data, like only adding logs that are less than 1 day old.

MERGE INTO logs AS t
USING log_source AS s
 ON t.log_id = s.log_id AND
    t.event_ts >= date_sub(current_date(), 1) AND
    s.event_ts >= date_sub(current_date(), 1)
WHEN NOT MATCHED THEN INSERT *

Another solution is to use the time range from the source table to filter the target table. It’s a little more complex to ensure this is working because not all engines support pushdown using scalar subqueries. You may need to run a query on the source data and embed the results into your SQL.

This example shows the scalar subquery option:

MERGE INTO logs AS t
USING log_source AS s
 ON t.log_id = s.log_id AND
    t.event_ts >= (SELECT min(event_ts) FROM log_source)
WHEN NOT MATCHED THEN INSERT *