ray.data.read_datasource#

ray.data.read_datasource(datasource: Datasource, *, parallelism: int = -1, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None, compute: ComputeStrategy | None = None, override_num_blocks: int | None = None, **read_args) Dataset[source]#

Read a stream from a custom Datasource.

Parameters:
  • datasource – The Datasource to read data from.

  • parallelism – This argument is deprecated. Use override_num_blocks argument.

  • num_cpus – The number of CPUs to reserve for each parallel read worker.

  • num_gpus – The number of GPUs to reserve for each parallel read worker. For example, specify num_gpus=1 to request 1 GPU for each parallel read worker.

  • memory – The heap memory in bytes to reserve for each parallel read worker.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.

  • compute – The compute strategy to use for reading. Pass an ActorPoolStrategy instance to use an actor pool, or a TaskPoolStrategy instance (default) to use Ray tasks. If not specified, defaults to TaskPoolStrategy(concurrency). If both compute and concurrency are specified, concurrency takes precedence.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

  • **read_args – Additional kwargs to pass to the Datasource implementation.

Returns:

Dataset that reads data from the Datasource.

Examples

Read using default task-based execution:

>>> import ray
>>> from ray.data._internal.datasource.range_datasource import RangeDatasource
>>> datasource = RangeDatasource(n=1000, block_format="arrow")
>>> ds = ray.data.read_datasource(datasource) 

Read using actors for stateful operations:

>>> from ray.data import ActorPoolStrategy
>>> ds = ray.data.read_datasource( 
...     datasource,
...     compute=ActorPoolStrategy(size=4)  # Use 4 actors
... )

Note

The use of ActorPoolStrategy is currently experimental and comes with caveats, such as additional overhead due to limited operator fusion opportunities.