ray.data.Dataset.write_iceberg#

Dataset.write_iceberg(table_identifier: str, catalog_kwargs: Dict[str, Any] | None = None, snapshot_properties: Dict[str, str] | None = None, mode: SaveMode = SaveMode.APPEND, overwrite_filter: Expr | None = None, upsert_kwargs: Dict[str, Any] | None = None, overwrite_kwargs: Dict[str, Any] | None = None, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None, upsert_commit_memory: int | None = None) None[source]#

Writes the Dataset to an Iceberg table.

Tip

For more details on PyIceberg, see https://py.iceberg.apache.org/

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

import ray
import pandas as pd
from ray.data import SaveMode
from ray.data.expressions import col

# Basic append (current behavior)
docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)]
ds = ray.data.from_pandas(pd.DataFrame(docs))
ds.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"}
)

# Upsert mode - update existing rows or insert new ones
updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}]
ds_updates = ray.data.from_pandas(pd.DataFrame(updated_docs))
ds_updates.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"},
    mode=SaveMode.UPSERT,
    upsert_kwargs={"join_cols": ["id"]},
)

# Schema evolution is automatic - new columns are added automatically
enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)]
ds_enriched = ray.data.from_pandas(pd.DataFrame(enriched_docs))
ds_enriched.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"}
)

# Partial overwrite with Ray Data expressions
ds.write_iceberg(
    table_identifier="events.user_activity",
    catalog_kwargs={"name": "default", "type": "rest"},
    mode=SaveMode.OVERWRITE,
    overwrite_filter=col("date") >= "2024-10-28"
)
Parameters:
  • table_identifier – Fully qualified table identifier (db_name.table_name)

  • catalog_kwargs – Optional arguments to pass to PyIceberg’s catalog.load_catalog() function (such as name, type). For the function definition, see pyiceberg catalog.

  • snapshot_properties – Custom properties to write to snapshot when committing to an iceberg table.

  • mode

    Write mode using SaveMode enum. Options:

    • SaveMode.APPEND (default): Add new data to the table without checking for duplicates.

    • SaveMode.UPSERT: Update existing rows that match on the join condition (join_cols in upsert_kwargs), or insert new rows if they don’t exist in the table.

    • SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace data matching overwrite_filter if specified.

  • overwrite_filter – Optional filter for OVERWRITE mode to perform partial overwrites. Must be a Ray Data expression from ray.data.expressions. Only rows matching this filter are replaced. If None with OVERWRITE mode, replaces all table data. Example: col("date") >= "2024-01-01" or (col("region") == "US") & (col("status") == "active")

  • upsert_kwargs – Optional arguments for upsert operations. Supported parameters: join_cols (List[str]), case_sensitive (bool), branch (str). Note: Ray Data uses a copy-on-write strategy that always updates all columns for matched keys and inserts all new keys for optimal parallelism.

  • overwrite_kwargs – Optional arguments to pass through to PyIceberg’s table.overwrite() method. Supported parameters: case_sensitive (bool), branch (str). See PyIceberg documentation for details.

  • ray_remote_args – kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

  • upsert_commit_memory – [For UPSERT mode only] The heap memory in bytes to reserve for the upsert commit operation. The commit operation merges join keys from all workers and commits the transaction. Set this to avoid OOM issues when upserting with a large number of join keys. If None, uses Ray’s default memory allocation. Only applicable when mode is SaveMode.UPSERT.

Note

Schema evolution is automatically enabled. New columns in the incoming data are automatically added to the table schema.

Raises:

ValueError – If mode is SaveMode.UPSERT, join_cols is not provided in upsert_kwargs, and the table has no identifier fields.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.