ray.data.Dataset.write_iceberg#
- Dataset.write_iceberg(table_identifier: str, catalog_kwargs: Dict[str, Any] | None = None, snapshot_properties: Dict[str, str] | None = None, mode: SaveMode = SaveMode.APPEND, overwrite_filter: Expr | None = None, upsert_kwargs: Dict[str, Any] | None = None, overwrite_kwargs: Dict[str, Any] | None = None, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None) None[source]#
Writes the
Datasetto an Iceberg table.Tip
For more details on PyIceberg, see - URI: https://py.iceberg.apache.org/
Note
This operation will trigger execution of the lazy transformations performed on this dataset.
Examples
import ray import pandas as pd from ray.data import SaveMode from ray.data.expressions import col # Basic append (default behavior) docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)] ds = ray.data.from_pandas(pd.DataFrame(docs)) ds.write_iceberg( table_identifier="db_name.table_name", catalog_kwargs={"name": "default", "type": "sql"} ) # Schema evolution is automatic - new columns are added automatically enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)] ds_enriched = ray.data.from_pandas(pd.DataFrame(enriched_docs)) ds_enriched.write_iceberg( table_identifier="db_name.table_name", catalog_kwargs={"name": "default", "type": "sql"} ) # Upsert mode - update existing rows or insert new ones updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}] ds_updates = ray.data.from_pandas(pd.DataFrame(updated_docs)) ds_updates.write_iceberg( table_identifier="db_name.table_name", catalog_kwargs={"name": "default", "type": "sql"}, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["id"]}, ) # Partial overwrite with Ray Data expressions ds.write_iceberg( table_identifier="events.user_activity", catalog_kwargs={"name": "default", "type": "rest"}, mode=SaveMode.OVERWRITE, overwrite_filter=col("date") >= "2024-10-28" )
- Parameters:
table_identifier – Fully qualified table identifier (
db_name.table_name)catalog_kwargs – Optional arguments to pass to PyIceberg’s catalog.load_catalog() function (such as name, type, etc.). For the function definition, see pyiceberg catalog.
snapshot_properties – Custom properties to write to snapshot when committing to an iceberg table.
mode –
Write mode using SaveMode enum. Options:
SaveMode.APPEND (default): Add new data to the table without checking for duplicates.
SaveMode.UPSERT: Update existing rows that match on the join condition (
join_colsinupsert_kwargs), or insert new rows if they don’t exist in the table.SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace data matching overwrite_filter if specified.
overwrite_filter – Optional filter for OVERWRITE mode to perform partial overwrites. Must be a Ray Data expression from
ray.data.expressions. Only rows matching this filter are replaced. If None with OVERWRITE mode, replaces all table data. Example:col("date") >= "2024-01-01"or(col("region") == "US") & (col("status") == "active")upsert_kwargs – Optional arguments for upsert operations. Supported parameters: join_cols (List[str]), case_sensitive (bool), branch (str). Note: Ray Data uses a copy-on-write strategy that always updates all columns for matched keys and inserts all new keys for optimal parallelism.
overwrite_kwargs – Optional arguments to pass through to PyIceberg’s table.overwrite() method. Supported parameters: case_sensitive (bool), branch (str). See PyIceberg documentation for details.
ray_remote_args – kwargs passed to
ray.remote()in the write tasks.concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.
Note
Schema evolution is automatically enabled. New columns in the incoming data are automatically added to the table schema. The schema is extracted automatically from the data being written.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.