ray.data.read_databricks_tables#

ray.data.read_databricks_tables(*, warehouse_id: str, table: str | None = None, query: str | None = None, catalog: str | None = None, schema: str | None = None, credential_provider: DatabricksCredentialProvider | None = None, parallelism: int = -1, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None) Dataset[source]#

Read a Databricks unity catalog table or Databricks SQL execution result.

Before calling this API, set the DATABRICKS_TOKEN environment variable to your Databricks warehouse access token.

export DATABRICKS_TOKEN=...

If you’re not running your program on the Databricks runtime, also set the DATABRICKS_HOST environment variable.

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Alternatively, you can provide a custom credential provider for more advanced authentication scenarios (e.g., token refresh, dynamic credentials). Create a subclass of DatabricksCredentialProvider and pass it via the credential_provider parameter.

Note

This function is built on the Databricks statement execution API.

Examples

Read using environment variables:

import ray

ds = ray.data.read_databricks_tables(
    warehouse_id='...',
    catalog='catalog_1',
    schema='db_1',
    query='select id from table_1 limit 750000',
)

Read using a custom credential provider:

from ray.data._internal.datasource.databricks_credentials import (
    DatabricksCredentialProvider,
)

class MyCredentialProvider(DatabricksCredentialProvider):
    def get_token(self) -> str:
        return "my-token"  # Fetch token from custom source

    def get_host(self) -> str:
        return "my-host.databricks.com"

    def invalidate(self) -> None:
        pass  # Clear cached credentials if applicable

ds = ray.data.read_databricks_tables(
    warehouse_id='...',
    catalog='catalog_1',
    schema='db_1',
    query='select id from table_1 limit 750000',
    credential_provider=MyCredentialProvider(),
)
Parameters:
  • warehouse_id – The ID of the Databricks warehouse. The query statement is executed on this warehouse.

  • table – The name of UC table you want to read. If this argument is set, you can’t set query argument, and the reader generates query of select * from {table_name} under the hood.

  • query – The query you want to execute. If this argument is set, you can’t set table_name argument.

  • catalog – (Optional) The default catalog name used by the query.

  • schema – (Optional) The default schema used by the query.

  • credential_provider – (Optional) A custom credential provider for authentication. Must be a subclass of DatabricksCredentialProvider implementing get_token(), get_host(), and invalidate(). The provider must be picklable (serializable) as it is sent to Ray workers for distributed execution. If provided, the provider is used exclusively and environment variables are ignored.

  • parallelism – This argument is deprecated. Use override_num_blocks argument.

  • num_cpus – The number of CPUs to reserve for each parallel read worker.

  • num_gpus – The number of GPUs to reserve for each parallel read worker. For example, specify num_gpus=1 to request 1 GPU for each parallel read worker.

  • memory – The heap memory in bytes to reserve for each parallel read worker.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

Returns:

A Dataset containing the queried data.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.