ray.data.Dataset.write_clickhouse#

Dataset.write_clickhouse(table: str, dsn: str, *, mode: SinkMode = SinkMode.CREATE, schema: pyarrow.Schema | None = None, client_settings: Dict[str, Any] | None = None, client_kwargs: Dict[str, Any] | None = None, table_settings: ClickHouseTableSettings | None = None, max_insert_block_rows: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None) None[source]#

Write the dataset to a ClickHouse dataset table.

To control the number of parallel write tasks, use .repartition() before calling this method.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

import ray
import pyarrow as pa
import pandas as pd

docs = [{"title": "ClickHouse Datasink test"} for key in range(4)]
ds = ray.data.from_pandas(pd.DataFrame(docs))
user_schema = pa.schema(
    [
        ("id", pa.int64()),
        ("title", pa.string()),
    ]
)
ds.write_clickhouse(
    table="default.my_table",
    dsn="clickhouse+http://user:pass@localhost:8123/default",
    mode=ray.data.SinkMode.OVERWRITE,
    schema=user_schema,
    table_settings=ray.data.ClickHouseTableSettings(
        engine="ReplacingMergeTree()",
        order_by="id",
    ),
)
Parameters:
  • table – Fully qualified table identifier (e.g., “default.my_table”). The table is created if it doesn’t already exist.

  • dsn – A string in DSN (Data Source Name) HTTP format (e.g., “clickhouse+http://username:password@host:8123/default”). For more information, see ClickHouse Connection String doc.

  • mode

    One of SinkMode.CREATE, SinkMode.APPEND, or SinkMode.OVERWRITE:

    • SinkMode.CREATE: Create a new table; fail if it already exists. If the table

      does not exist, you must provide a schema (either via the schema argument or as part of the dataset’s first block).

    • SinkMode.APPEND: If the table exists, append data to it; if not, create

      the table using the provided or inferred schema. If the table does not exist, you must supply a schema.

    • SinkMode.OVERWRITE: Drop any existing table of this name, then create

      a new table and write data to it. You must provide a schema in this case, as the table is being re-created.

  • schema – Optional pyarrow.Schema specifying column definitions. This is mandatory if you are creating a new table (i.e., table doesn’t exist in CREATE or APPEND mode) or overwriting an existing table (OVERWRITE). When appending to an existing table, a schema is optional, though you can provide one to enforce column types or cast data as needed. If omitted (and the table already exists), the existing table definition will be used. If omitted and the table must be created, the schema is inferred from the first block in the dataset.

  • client_settings – Optional ClickHouse server settings to be used with the session/every request. For more information, see ClickHouse Client Settings doc.

  • client_kwargs – Optional keyword arguments to pass to the ClickHouse client. For more information, see ClickHouse Core Settings doc.

  • table_settings

    An optional ClickHouseTableSettings dataclass that specifies additional table creation instructions, including:

    • engine (default: "MergeTree()"):

      Specifies the engine for the CREATE TABLE statement.

    • order_by:

      Sets the ORDER BY clause in the CREATE TABLE statement, iff not provided. When overwriting an existing table, its previous ORDER BY (if any) is reused. Otherwise, a “best” column is selected automatically (favoring a timestamp column, then a non-string column, and lastly the first column).

    • partition_by:

      If present, adds a PARTITION BY <value> clause to the CREATE TABLE statement.

    • primary_key:

      If present, adds a PRIMARY KEY (<value>) clause.

    • settings:

      Appends a SETTINGS <value> clause to the CREATE TABLE statement, allowing custom ClickHouse settings.

  • max_insert_block_rows – If you have extremely large blocks, specifying a limit here will chunk the insert into multiple smaller insert calls. Defaults to None (no chunking).

  • ray_remote_args – Kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.