ray.data.Dataset.write_clickhouse#
- Dataset.write_clickhouse(table: str, dsn: str, *, mode: SinkMode = SinkMode.CREATE, schema: pyarrow.Schema | None = None, client_settings: Dict[str, Any] | None = None, client_kwargs: Dict[str, Any] | None = None, table_settings: ClickHouseTableSettings | None = None, max_insert_block_rows: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None) None [source]#
Write the dataset to a ClickHouse dataset table.
To control the number of parallel write tasks, use
.repartition()
before calling this method.Note
This operation will trigger execution of the lazy transformations performed on this dataset.
Examples
import ray import pyarrow as pa import pandas as pd docs = [{"title": "ClickHouse Datasink test"} for key in range(4)] ds = ray.data.from_pandas(pd.DataFrame(docs)) user_schema = pa.schema( [ ("id", pa.int64()), ("title", pa.string()), ] ) ds.write_clickhouse( table="default.my_table", dsn="clickhouse+http://user:pass@localhost:8123/default", mode=ray.data.SinkMode.OVERWRITE, schema=user_schema, table_settings=ray.data.ClickHouseTableSettings( engine="ReplacingMergeTree()", order_by="id", ), )
- Parameters:
table – Fully qualified table identifier (e.g., “default.my_table”). The table is created if it doesn’t already exist.
dsn – A string in DSN (Data Source Name) HTTP format (e.g., “clickhouse+http://username:password@host:8123/default”). For more information, see ClickHouse Connection String doc.
mode –
One of SinkMode.CREATE, SinkMode.APPEND, or SinkMode.OVERWRITE:
- SinkMode.CREATE: Create a new table; fail if it already exists. If the table
does not exist, you must provide a schema (either via the
schema
argument or as part of the dataset’s first block).
- SinkMode.APPEND: If the table exists, append data to it; if not, create
the table using the provided or inferred schema. If the table does not exist, you must supply a schema.
- SinkMode.OVERWRITE: Drop any existing table of this name, then create
a new table and write data to it. You must provide a schema in this case, as the table is being re-created.
schema – Optional
pyarrow.Schema
specifying column definitions. This is mandatory if you are creating a new table (i.e., table doesn’t exist in CREATE or APPEND mode) or overwriting an existing table (OVERWRITE). When appending to an existing table, a schema is optional, though you can provide one to enforce column types or cast data as needed. If omitted (and the table already exists), the existing table definition will be used. If omitted and the table must be created, the schema is inferred from the first block in the dataset.client_settings – Optional ClickHouse server settings to be used with the session/every request. For more information, see ClickHouse Client Settings doc.
client_kwargs – Optional keyword arguments to pass to the ClickHouse client. For more information, see ClickHouse Core Settings doc.
table_settings –
An optional
ClickHouseTableSettings
dataclass that specifies additional table creation instructions, including:- engine (default:
"MergeTree()"
): Specifies the engine for the
CREATE TABLE
statement.
- engine (default:
- order_by:
Sets the
ORDER BY
clause in theCREATE TABLE
statement, iff not provided. When overwriting an existing table, its previousORDER BY
(if any) is reused. Otherwise, a “best” column is selected automatically (favoring a timestamp column, then a non-string column, and lastly the first column).
- partition_by:
If present, adds a
PARTITION BY <value>
clause to theCREATE TABLE
statement.
- primary_key:
If present, adds a
PRIMARY KEY (<value>)
clause.
- settings:
Appends a
SETTINGS <value>
clause to theCREATE TABLE
statement, allowing custom ClickHouse settings.
max_insert_block_rows – If you have extremely large blocks, specifying a limit here will chunk the insert into multiple smaller insert calls. Defaults to None (no chunking).
ray_remote_args – Kwargs passed to
ray.remote()
in the write tasks.concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.