Dataset API Reference

Creating a Dataset

ray.data.range(n: int, *, parallelism: int = 200) → ray.data.dataset.Dataset[int][source]

Create a dataset from a range of integers [0..n).

Examples

>>> ray.data.range(10000).map(lambda x: x * 2).show()
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the integers.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.range_arrow(n: int, *, parallelism: int = 200) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from a range of integers [0..n).

Examples

>>> ds = ray.data.range_arrow(1000)
>>> ds.map(lambda r: {"v2": r["value"] * 2}).show()

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {“value”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the integers as Arrow records.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.range_tensor(n: int, *, shape: Tuple = 1, parallelism: int = 200) → ray.data.dataset.Dataset[numpy.ndarray][source]

Create a Tensor dataset from a range of integers [0..n).

Examples

>>> ds = ray.data.range_tensor(1000, shape=(3, 10))
>>> ds.map_batches(lambda arr: arr ** 2).show()

This is similar to range(), but uses np.ndarrays to hold the integers in tensor form. The dataset has overall the shape (n,) + shape.

Parameters
  • n – The upper bound of the range of integer records.

  • shape – The shape of each record.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the integers as tensors.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_csv_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_json_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from json files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_json(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • arrow_json_args – Other json read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, …]]]] = None, **arrow_parquet_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from parquet files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The amount of parallelism to use for the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • _tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_numpy(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **numpy_load_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_numpy("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_numpy(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • numpy_load_args – Other options to pass to np.load.

Returns

Dataset holding Tensor records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_text(paths: Union[str, List[str]], *, encoding: str = 'utf-8', filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None) → ray.data.dataset.Dataset[str][source]

Create a dataset from lines stored in text files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_text("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_text(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • encoding – The encoding of the files (e.g., “utf-8” or “ascii”).

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

Returns

Dataset holding lines of text read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None) → ray.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]][source]

Create a dataset from binary files of arbitrary contents.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_binary_files(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_datasource(datasource: ray.data.datasource.datasource.Datasource[T], *, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _spread_resource_prefix: Optional[str] = None, **read_args) → ray.data.dataset.Dataset[T][source]

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read.

  • read_args – Additional kwargs to pass to the datasource impl.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

Returns

Dataset holding the data read from the datasource.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_items(items: List[Any], *, parallelism: int = 200) → ray.data.dataset.Dataset[Any][source]

Create a dataset from a list of local Python objects.

Examples

>>> ray.data.from_items([1, 2, 3, 4, 5])
Parameters
  • items – List of local Python objects.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the items.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_arrow(tables: Union[pyarrow.Table, bytes, List[Union[pyarrow.Table, bytes]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Arrow tables.

Parameters

tables – An Arrow table, or a list of Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_arrow_refs(tables: Union[ray.types.ObjectRef[Union[pyarrow.Table, bytes]], List[ray.types.ObjectRef[Union[pyarrow.Table, bytes]]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a set of Arrow tables.

Parameters

tables – A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: Optional[int] = None) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Spark dataframe.

Parameters
  • spark – A SparkSession, which must be created by RayDP (Spark-on-Ray).

  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_dask(df: dask.DataFrame) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_modin(df: modin.DataFrame) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Modin dataframe.

Parameters

df – A Modin dataframe, which must be using the Ray backend.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_mars(df: mars.DataFrame, *, parallelism: int = 200) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_pandas(dfs: Union[pandas.DataFrame, List[pandas.DataFrame]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Pandas dataframes.

Parameters

dfs – A Pandas dataframe or a list of Pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_pandas_refs(dfs: Union[ray.types.ObjectRef[pandas.DataFrame], List[ray.types.ObjectRef[pandas.DataFrame]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Ray object references to Pandas dataframes.

Parameters

dfs – A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_numpy(ndarrays: List[ray.types.ObjectRef[numpy.ndarray]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a set of NumPy ndarrays.

Parameters

ndarrays – A list of Ray object references to NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

Dataset API

class ray.data.Dataset(blocks: ray.data.impl.block_list.BlockList[T], epoch: int)[source]

Implements a distributed Arrow dataset.

Datasets are implemented as a list of ObjectRef[Block]. The block also determines the unit of parallelism. The default block type is the pyarrow.Table. Arrow-incompatible objects are held in list blocks.

Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors just like any other object. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

Dataset supports parallel transformations such as .map(), .map_batches(), and simple repartition, but currently not aggregations and joins.

PublicAPI (beta): This API is in beta and may change before becoming stable.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[U][source]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using .map_batches() for performance.

Examples

>>> # Transform python objects.
>>> ds.map(lambda x: x * 2)
>>> # Transform Arrow records.
>>> ds.map(lambda record: {"v2": record["value"] * 2})
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]]], *, batch_size: int = None, compute: Optional[str] = None, batch_format: str = 'native', **ray_remote_args) → Dataset[Any][source]

Apply the given function to batches of records of this dataset.

This is a blocking operation.

Examples

>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, item):
...        return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
...    CachedModel,
...    batch_size=256, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable.

  • batch_size – Request a specific batch size, or leave unspecified to use entire blocks as batches.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • batch_format – Specify “native” to use the native block format, “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[U][source]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3])

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[T][source]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> ds.filter(lambda x: x % 2 == 0)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

repartition(num_blocks: int, *, shuffle: bool = False) → ray.data.dataset.Dataset[T][source]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation. After repartitioning, all blocks in the returned dataset will have approximately the same number of rows.

Examples

>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(10).write_parquet(...)

Time complexity: O(dataset size / parallelism)

Parameters
  • num_blocks – The number of blocks.

  • shuffle – Whether to perform a distributed shuffle during the repartition. When shuffle is enabled, each output block contains a subset of data rows from each input block, which requires all-to-all data movement. When shuffle is disabled, output blocks are created from adjacent input blocks, minimizing data movement.

Returns

The repartitioned dataset.

random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None, _move: Optional[bool] = False, _spread_resource_prefix: Optional[str] = None) → ray.data.dataset.Dataset[T][source]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345)

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.data.dataset.Dataset[T]][source]

Split the dataset into n disjoint pieces.

This returns a list of sub-datasets that can be passed to Ray tasks and actors and used to read the dataset records in parallel.

Examples

>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

See also: Dataset.split_at_indices

Parameters
  • n – Number of child datasets to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith dataset with the ith actor to maximize data locality.

Returns

A list of n disjoint dataset splits.

split_at_indices(indices: List[int]) → List[ray.data.dataset.Dataset[T]][source]

Split the dataset at the given indices (like np.split).

Examples

>>> d1, d2, d3 = ray.data.range(10).split_at_indices([2, 5])
>>> d1.take()
[0, 1]
>>> d2.take()
[2, 3, 4]
>>> d3.take()
[5, 6, 7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split

Parameters

indices – List of sorted integers which indicate where the dataset will be split. If an index exceeds the length of the dataset, an empty dataset will be returned.

Returns

The dataset splits.

union(*other: List[Dataset[T]]) → ray.data.dataset.Dataset[T][source]

Combine this dataset with others of the same type.

The order of the blocks in the datasets is preserved, as is the relative ordering between the datasets passed in the argument list.

Parameters

other – List of datasets to combine with this one. The datasets must have the same schema as this dataset, otherwise the behavior is undefined.

Returns

A new dataset holding the union of their data.

groupby(key: Callable[[T], Any]) → GroupedDataset[T][source]

Group the dataset by the specified key function (Experimental).

This is a lazy operation. Currently only simple block datasets are supported.

Examples

>>> # Group by a key function and aggregate.
>>> ray.data.range(100).groupby(lambda x: x % 3).count()

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters

key – A key function.

Returns

A lazy GroupedDataset that can be aggregated later.

sort(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False) → ray.data.dataset.Dataset[T][source]

Sort the dataset by the specified key column or key function. (experimental support)

This is a blocking operation.

Examples

>>> # Sort using the entire record as the key.
>>> ds.sort()
>>> # Sort by a single column in descending order.
>>> ds.sort("field1", descending=True)
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["field1"] % 100)
>>> # Sort by multiple columns (not yet supported).
>>> ds.sort([("field1", "ascending"), ("field2", "descending")])

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

zip(other: Dataset[U]) → Dataset[T, U][source]

Zip this dataset with the elements of another.

The datasets must have identical num rows, block types, and block sizes (e.g., one was produced from a .map() of another). For Arrow blocks, the schema will be concatenated, and any duplicate column names disambiguated with _1, _2, etc. suffixes.

Time complexity: O(dataset size / parallelism)

Parameters

other – The dataset to zip with on the right hand side.

Examples

>>> ds = ray.data.range(5)
>>> ds.zip(ds).take()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
Returns

A Dataset with (k, v) pairs (or concatenated Arrow schema) where k comes from the first dataset and v comes from the second.

limit(limit: int) → ray.data.dataset.Dataset[T][source]

Limit the dataset to the first number of records specified.

Examples

>>> ds.limit(100).map(lambda x: x * 2).take()

Time complexity: O(limit specified)

Parameters

limit – The size of the dataset to truncate to.

Returns

The truncated dataset.

take(limit: int = 20) → List[T][source]

Take up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

take_all(limit: int = 100000) → List[T][source]

Take all the records in the dataset.

Time complexity: O(dataset size)

Parameters

limit – Raise an error if the size exceeds the specified limit.

Returns

A list of all the records in the dataset.

show(limit: int = 20) → None[source]

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

count() → int[source]

Count the number of records in the dataset.

Time complexity: O(dataset size / parallelism), O(1) for parquet

Returns

The number of records in the dataset.

sum() → int[source]

Sum up the elements of this dataset.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset.

schema() → Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset.

For datasets of Arrow records, this will return the Arrow schema. For datasets of Python objects, this returns their Python type.

Time complexity: O(1)

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

num_blocks() → int[source]

Return the number of blocks of this dataset.

Time complexity: O(1)

Returns

The number of blocks of this dataset.

size_bytes() → int[source]

Return the in-memory size of the dataset.

Time complexity: O(1)

Returns

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

input_files() → List[str][source]

Return the list of input files for the dataset.

Time complexity: O(num input files)

Returns

The list of input files used to create the dataset, or an empty list if the input files is not known.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_parquet_args) → None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> ds.write_parquet("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **pandas_json_args) → None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> ds.write_json("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_csv_args) → None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> ds.write_csv("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

write_numpy(path: str, *, column: str = 'value', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None) → None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

The format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> ds.write_numpy("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. This defaults to “value”.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

write_datasource(datasource: ray.data.datasource.datasource.Datasource[T], **write_args) → None[source]

Write the dataset to a custom datasource.

Examples

>>> ds.write_datasource(CustomDatasourceImpl(...))

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • write_args – Additional write args to pass to the datasource.

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T][source]

Return a local row iterator over the dataset.

Examples

>>> for i in ray.data.range(1000000).iter_rows():
...     print(i)

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'native', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]][source]

Return a local batched iterator over the dataset.

Examples

>>> for batch in ray.data.range(1000000).iter_batches():
...     print(batch)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “native” to use the current block format, “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “native”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

to_torch(*, label_column: str, feature_columns: Optional[List[str]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[List[torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False) → torch.utils.data.IterableDataset[source]

Return a Torch IterableDataset over this dataset.

This is only supported for datasets convertible to Arrow records.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item is the features tensor, and the second item is the label tensor. The features tensor will be of shape (N, n), and the label tensor will be of shape (N, 1), where N is the batch_size used by the DataLoader, and n is the number of features.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output list).

  • feature_columns (Optional[List[str]]) – The names of the columns to use as the features. If None, then use all columns except the label columns as the features.

  • label_column_dtype (Optional[torch.dtype]) – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes (Optional[List[torch.dtype]]) – The dtypes to use for the feature columns. The len of this list must be equal to the len of feature_columns. If None, then automatically infer the dtype.

  • batch_size (int) – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks (int) – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last (bool) – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

Returns

A torch IterableDataset.

to_tf(*, label_column: str, output_signature: Tuple[tf.TypeSpec, tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset[source]

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

It is recommended to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output tuple).

  • output_signature (Tuple[tf.TypeSpec, tf.TypeSpec]) – A 2-element tuple of tf.TypeSpec objects corresponding to (features, label).

  • feature_columns (Optional[List[str]]) – List of columns in datasets to use. If None, all columns will be used.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

Returns

A tf.data.Dataset.

to_dask() → dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Returns

A Dask DataFrame created from this dataset.

to_mars() → mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

to_modin() → modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

to_spark(spark: pyspark.sql.SparkSession) → pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

to_pandas(limit: int = 100000) → pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

to_pandas_refs() → List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

to_numpy_refs(*, column: Optional[str] = None) → List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters

column – The name of the column to convert to numpy, or None to specify the entire row. Required for Arrow tables.

Returns

A list of remote NumPy ndarrays created from this dataset.

to_arrow_refs() → List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

repeat(times: int = None) → DatasetPipeline[T][source]

Convert this into a DatasetPipeline by looping over this dataset.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset.

Note that every repeat of the dataset is considered an “epoch” for the purposes of DatasetPipeline.iter_epochs().

Examples

>>> # Infinite pipeline of numbers [0, 5)
>>> ray.data.range(5).repeat().take()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...]
>>> # Can apply transformations to the pipeline.
>>> ray.data.range(5).repeat().map(lambda x: -x).take()
[0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...]
>>> # Can shuffle each epoch (dataset) in the pipeline.
>>> ray.data.range(5).repeat().random_shuffle().take()
[2, 3, 0, 4, 1, 4, 0, 2, 1, 3, ...]
Parameters

times – The number of times to loop over this dataset, or None to repeat indefinitely.

window(*, blocks_per_window: int = 10) → DatasetPipeline[T][source]

Convert this into a DatasetPipeline by windowing over data blocks.

Transformations prior to the call to window() are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per window of blocks as data is read from the output of the pipeline.

Windowing execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).

Without windowing:

[preprocessing......]
                      [inference.......]
                                         [write........]
Time ----------------------------------------------------------->

With windowing:

[prep1] [prep2] [prep3]
        [infer1] [infer2] [infer3]
                 [write1] [write2] [write3]
Time ----------------------------------------------------------->

Examples

>>> # Create an inference pipeline.
>>> ds = ray.data.read_binary_files(dir)
>>> pipe = ds.window(blocks_per_window=10).map(infer)
DatasetPipeline(num_windows=40, num_stages=2)
>>> # The higher the stage parallelism, the shorter the pipeline.
>>> pipe = ds.window(blocks_per_window=20).map(infer)
DatasetPipeline(num_windows=20, num_stages=2)
>>> # Outputs can be incrementally read from the pipeline.
>>> for item in pipe.iter_rows():
...    print(item)
Parameters

blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

get_internal_block_refs() → List[ray.types.ObjectRef[Union[List[T], pyarrow.Table, bytes]]][source]

Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data.

Time complexity: O(1)

Returns:

A list of references to this dataset’s blocks.

DeveloperAPI: This API may change across minor Ray releases.

DatasetPipeline API

class ray.data.dataset_pipeline.DatasetPipeline(base_iterable: Iterable[Callable[], ray.data.dataset.Dataset[T]]], stages: List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]] = None, length: int = None, progress_bars: bool = True, _executed: List[bool] = None)[source]

Implements a pipeline of Datasets.

Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI (beta): This API is in beta and may change before becoming stable.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'pandas', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]][source]

Return a local batched iterator over the data in the pipeline.

Examples

>>> for pandas_df in ray.data.range(1000000).iter_batches():
...     print(pandas_df)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “pandas”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.

Examples

>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

Parameters
  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.

Returns

A list of n disjoint pipeline splits.

split_at_indices(indices: List[int]) → List[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split the datasets within the pipeline at the given indices (like np.split).

This will split each dataset contained within this pipeline, thereby producing len(indices) + 1 pipelines with the first pipeline containing the [0, indices[0]) slice from each dataset, the second pipeline containing the [indices[0], indices[1]) slice from each dataset, and so on, with the final pipeline will containing the [indices[-1], self.count()) slice from each dataset.

Examples

>>> p1, p2, p3 = ray.data.range(
        8).repeat(2).split_at_indices([2, 5])
>>> p1.take()
[0, 1, 0, 1]
>>> p2.take()
[2, 3, 4, 2, 3, 4]
>>> p3.take()
[5, 6, 7, 5, 6, 7]

Time complexity: O(num splits)

See also: DatasetPipeline.split

Parameters

indices – List of sorted integers which indicate where the pipeline will be split. If an index exceeds the length of the pipeline, an empty pipeline will be returned.

Returns

The pipeline splits.

rewindow(*, blocks_per_window: int, preserve_epoch: bool = True)ray.data.dataset_pipeline.DatasetPipeline[T][source]

Change the windowing (blocks per dataset) of this pipeline.

Changes the windowing of this pipeline to the specified size. For example, if the current pipeline has two blocks per dataset, and .rewindow(blocks_per_window=4) is requested, adjacent datasets will be merged until each dataset is 4 blocks. If .rewindow(blocks_per_window) was requested the datasets will be split into smaller windows.

Parameters
  • blocks_per_window – The new target blocks per window.

  • preserve_epoch – Whether to preserve epoch boundaries. If set to False, then windows can contain data from two adjacent epochs.

repeat(times: int = None)ray.data.dataset_pipeline.DatasetPipeline[T][source]

Repeat this pipeline a given number or times, or indefinitely.

This operation is only allowed for pipelines of a finite length. An error will be raised for pipelines of infinite length.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the repeated pipeline are evaluated on each loop of the pipeline over the base pipeline.

Note that every repeat of the pipeline is considered an “epoch” for the purposes of iter_epochs(). If there are multiple repeat calls, the latest repeat takes precedence for the purpose of defining epochs.

Parameters

times – The number of times to loop over this pipeline, or None to repeat indefinitely.

schema() → Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset pipeline.

For datasets of Arrow records, this will return the Arrow schema. For dataset of Python objects, this returns their Python type.

Time complexity: O(1)

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

count() → int[source]

Count the number of records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The number of records in the dataset pipeline.

sum() → int[source]

Sum the records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset pipeline.

show_windows(limit_per_dataset: int = 10) → None[source]

Print up to the given number of records from each window/dataset.

This is helpful as a debugging tool for understanding the structure of dataset pipelines.

Parameters

limit_per_dataset – Rows to print per window/dataset.

iter_epochs() → Iterator[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split this pipeline up by epoch.

This allows reading of data per-epoch for repeated Datasets, which is useful for ML training. For example, ray.data.range(10).repeat(50) generates a pipeline with 500 rows total split across 50 epochs. This method allows iterating over the data individually per epoch (repetition) of the original data.

Examples

>>> epochs = ray.data.range(10).repeat(50).iter_epochs()
>>> for i, epoch in enumerate(epochs):
...     print("Epoch", i)
...     for row in epoch.iter_rows():
...         print(row)
Returns

Iterator over epoch objects, where each epoch is a DatasetPipeline containing data from that epoch only.

iter_datasets() → Iterator[ray.data.dataset.Dataset[T]][source]

Iterate over the output datasets of this pipeline.

Returns:

Iterator over the datasets outputted from this pipeline.

DeveloperAPI: This API may change across minor Ray releases.

foreach_window(fn: Callable[[ray.data.dataset.Dataset[T]], ray.data.dataset.Dataset[U]])ray.data.dataset_pipeline.DatasetPipeline[U][source]

Apply a transform to each dataset/window in this pipeline.

Args:

fn: The function to transform each dataset with.

Returns:

The transformed DatasetPipeline.

DeveloperAPI: This API may change across minor Ray releases.

static from_iterable(iterable: Iterable[Callable[], ray.data.dataset.Dataset[T]]])ray.data.dataset_pipeline.DatasetPipeline[T][source]

Create a pipeline from an sequence of Dataset producing functions.

Parameters

iterable – A finite or infinite-length sequence of functions that each produce a Dataset when called.

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.filter to each dataset/window in this pipeline.

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.flat_map to each dataset/window in this pipeline.

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T]

Call Dataset.iter_rows over the stream of output batches from the pipeline.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.map to each dataset/window in this pipeline.

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]]], *, batch_size: int = None, compute: Optional[str] = None, batch_format: str = 'native', **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.map_batches to each dataset/window in this pipeline.

random_shuffle_each_window(*, seed: Optional[int] = None, num_blocks: Optional[int] = None, _move: Optional[bool] = False, _spread_resource_prefix: Optional[str] = None)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

repartition_each_window(num_blocks: int, *, shuffle: bool = False)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.repartition to each dataset/window in this pipeline.

show(limit: int = 20) → None

Call Dataset.show over the stream of output batches from the pipeline.

sort_each_window(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.sort to each dataset/window in this pipeline.

take(limit: int = 20) → List[T]

Call Dataset.take over the stream of output batches from the pipeline.

take_all(limit: int = 100000) → List[T]

Call Dataset.take_all over the stream of output batches from the pipeline.

to_tf(*, label_column: str, output_signature: Tuple[tf.TypeSpec, tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset

Call Dataset.to_tf over the stream of output batches from the pipeline.

to_torch(*, label_column: str, feature_columns: Optional[List[str]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[List[torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False) → torch.utils.data.IterableDataset

Call Dataset.to_torch over the stream of output batches from the pipeline.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_csv_args) → None

Call Dataset.write_csv on each output dataset of this pipeline.

write_datasource(datasource: ray.data.datasource.datasource.Datasource[T], **write_args) → None

Call Dataset.write_datasource on each output dataset of this pipeline.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **pandas_json_args) → None

Call Dataset.write_json on each output dataset of this pipeline.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_parquet_args) → None

Call Dataset.write_parquet on each output dataset of this pipeline.

GroupedDataset API

class ray.data.grouped_dataset.GroupedDataset(dataset: ray.data.dataset.Dataset[T], key: Callable[[T], KeyType])[source]

Implements a lazy dataset grouped by key (Experimental).

The actual groupby is deferred until an aggregation is applied.

PublicAPI (beta): This API is in beta and may change before becoming stable.

aggregate(init: Callable[[KeyType], AggType], accumulate: Callable[[KeyType, AggType, T], AggType], merge: Callable[[KeyType, AggType, AggType], AggType], finalize: Callable[[KeyType, AggType], U] = <function GroupedDataset.<lambda>>) → ray.data.dataset.Dataset[Tuple[KeyType, U]][source]

Implements the accumulator-based aggregation.

This is a blocking operation. See https://www.sigops.org/s/conferences/sosp/2009/papers/yu-sosp09.pdf for more details about accumulator-based aggregation.

Examples

>>> grouped_ds.aggregate(
...     init=lambda k: [],
...     accumulate=lambda k, a, r: a.append(r),
...     merge=lambda k, a1, a2: a1 + a2,
...     finalize=lambda k, a: a
... )
Parameters
  • init – This is called once for each key to return the empty accumulator. For example, an empty accumulator for a sum would be 0.

  • accumulate – This is called once per row of the same key. This combines the accumulator and the row, returns the updated accumulator.

  • merge – This may be called multiple times, each time to merge two accumulators into one.

  • finalize – This is called once to compute the final aggregation result from the fully merged accumulator.

Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the corresponding aggregation result.

count() → ray.data.dataset.Dataset[Tuple[KeyType, int]][source]

Compute count of each group.

This is a blocking operation.

Example

>>> ray.data.range(100).groupby(lambda x: x % 3).count()
Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the number of rows with that key.

sum() → ray.data.dataset.Dataset[Tuple[KeyType, T]][source]

Compute sum of each group.

This is a blocking operation.

Example

>>> ray.data.range(100).groupby(lambda x: x % 3).sum()
Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the sum of the group.

min() → ray.data.dataset.Dataset[Tuple[KeyType, T]][source]

Compute min of each group.

This is a blocking operation.

Example

>>> ray.data.range(100).groupby(lambda x: x % 3).min()
Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the min of the group.

max() → ray.data.dataset.Dataset[Tuple[KeyType, T]][source]

Compute max of each group.

This is a blocking operation.

Example

>>> ray.data.range(100).groupby(lambda x: x % 3).max()
Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the max of the group.

mean() → ray.data.dataset.Dataset[Tuple[KeyType, U]][source]

Compute mean of each group.

This is a blocking operation.

Example

>>> ray.data.range(100).groupby(lambda x: x % 3).mean()
Returns

A new dataset of (k, v) pairs where k is the groupby key and v is the mean of the group.

Tensor Column Extension API

class ray.data.extensions.tensor_extension.TensorDtype[source]

Pandas extension type for a column of fixed-shape, homogeneous-typed tensors.

See: https://github.com/pandas-dev/pandas/blob/master/pandas/core/dtypes/base.py for up-to-date interface documentation and the subclassing contract. The docstrings of the below properties and methods were copied from the base ExtensionDtype.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> df = pd.DataFrame({
        "one": [1, 2, 3],
        "two": list(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note the opaque np.object dtype for this column.
>>> df.dtypes
one     int64
two    object
dtype: object
>>> # Cast column to our TensorDtype extension type.
>>> df["two"] = df["two"].astype(TensorDtype())
>>> # Note that the column dtype is now TensorDtype instead of
>>> # np.object.
>>> df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is now aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col)
pandas.core.series.Series
>>> col
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean()
>>> type(tensor)
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy())
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas([ray.put(df)])
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path")
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path")
>>> read_ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df)
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property type

The scalar type for the array, e.g. int It’s expected ExtensionArray[item] returns an instance of ExtensionDtype.type for scalar item, assuming that value is valid (not NA). NA values do not need to be instances of type.

property name

A string identifying the data type. Will be used for display in, e.g. Series.dtype

classmethod construct_from_string(string: str)[source]

Construct this type from a string.

This is useful mainly for data types that accept parameters. For example, a period dtype accepts a frequency parameter that can be set as period[H] (where H means hourly frequency).

By default, in the abstract class, just the name of the type is expected. But subclasses can overwrite this method to accept parameters.

Parameters

string (str) – The name of the type, for example category.

Returns

Instance of the dtype.

Return type

ExtensionDtype

Raises

TypeError – If a class cannot be constructed from this ‘string’.

Examples

For extension dtypes with arguments the following may be an adequate implementation.

>>> @classmethod
... def construct_from_string(cls, string):
...     pattern = re.compile(r"^my_type\[(?P<arg_name>.+)\]$")
...     match = pattern.match(string)
...     if match:
...         return cls(**match.groupdict())
...     else:
...         raise TypeError(
...             f"Cannot construct a '{cls.__name__}' from '{string}'"
...         )
classmethod construct_array_type()[source]

Return the array type associated with this dtype.

Returns

Return type

type

class ray.data.extensions.tensor_extension.TensorArray(values: Union[numpy.ndarray, pandas.core.dtypes.generic.ABCSeries, Sequence[Union[numpy.ndarray, ray.data.extensions.tensor_extension.TensorArrayElement]], ray.data.extensions.tensor_extension.TensorArrayElement, Any])[source]

Pandas ExtensionArray representing a tensor column, i.e. a column consisting of ndarrays as elements. All tensors in a column must have the same shape.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> df = pd.DataFrame({
        "one": [1, 2, 3],
        "two": TensorArray(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note that the column dtype is TensorDtype.
>>> df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col)
pandas.core.series.Series
>>> col
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean()
>>> type(tensor)
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy())
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas([ray.put(df)])
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path")
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path")
>>> read_ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df)
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property dtype

An instance of ‘ExtensionDtype’.

property nbytes

The number of bytes needed to store this object in memory.

isna()ray.data.extensions.tensor_extension.TensorArray[source]

A 1-D array indicating if each value is missing.

Returns

na_values – In most cases, this should return a NumPy ndarray. For exceptional cases like SparseArray, where returning an ndarray would be expensive, an ExtensionArray may be returned.

Return type

Union[np.ndarray, ExtensionArray]

Notes

If returning an ExtensionArray, then

  • na_values._is_boolean should be True

  • na_values should implement ExtensionArray._reduce()

  • na_values.any and na_values.all should be implemented

take(indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None)ray.data.extensions.tensor_extension.TensorArray[source]

Take elements from an array.

Parameters
  • indices (sequence of int) – Indices to be taken.

  • allow_fill (bool, default False) –

    How to handle negative values in indices.

    • False: negative values in indices indicate positional indices from the right (the default). This is similar to numpy.take().

    • True: negative values in indices indicate missing values. These values are set to fill_value. Any other other negative values raise a ValueError.

  • fill_value (any, optional) –

    Fill value to use for NA-indices when allow_fill is True. This may be None, in which case the default NA value for the type, self.dtype.na_value, is used.

    For many ExtensionArrays, there will be two representations of fill_value: a user-facing “boxed” scalar, and a low-level physical NA value. fill_value should be the user-facing version, and the implementation should handle translating that to the physical version for processing the take if necessary.

Returns

Return type

ExtensionArray

Raises
  • IndexError – When the indices are out of bounds for the array.

  • ValueError – When indices contains negative values other than -1 and allow_fill is True.

See also

numpy.take()

Take elements from an array along an axis.

api.extensions.take()

Take elements from an array.

Notes

ExtensionArray.take is called by Series.__getitem__, .loc, iloc, when indices is a sequence of values. Additionally, it’s called by Series.reindex(), or any other method that causes realignment, with a fill_value.

Examples

Here’s an example implementation, which relies on casting the extension array to object dtype. This uses the helper method pandas.api.extensions.take().

def take(self, indices, allow_fill=False, fill_value=None):
    from pandas.core.algorithms import take

    # If the ExtensionArray is backed by an ndarray, then
    # just pass that here instead of coercing to object.
    data = self.astype(object)

    if allow_fill and fill_value is None:
        fill_value = self.dtype.na_value

    # fill value should always be translated from the scalar
    # type for the array, to the physical storage type for
    # the data, before passing to take.

    result = take(data, indices, fill_value=fill_value,
                  allow_fill=allow_fill)
    return self._from_sequence(result, dtype=self.dtype)
copy()ray.data.extensions.tensor_extension.TensorArray[source]

Return a copy of the array.

Returns

Return type

ExtensionArray

to_numpy(dtype: numpy.dtype = None, copy: bool = False, na_value: Any = <no_default>)[source]

Convert to a NumPy ndarray.

New in version 1.0.0.

This is similar to numpy.asarray(), but may provide additional control over how the conversion is done.

Parameters
  • dtype (str or numpy.dtype, optional) – The dtype to pass to numpy.asarray().

  • copy (bool, default False) – Whether to ensure that the returned value is a not a view on another array. Note that copy=False does not ensure that to_numpy() is no-copy. Rather, copy=True ensure that a copy is made, even if not strictly necessary.

  • na_value (Any, optional) – The value to use for missing values. The default value depends on dtype and the type of the array.

Returns

Return type

numpy.ndarray

property numpy_dtype

Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray

property numpy_ndim

Get the number of tensor dimensions. :return: integer for the number of dimensions

property numpy_shape

Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray

astype(dtype, copy=True)[source]

Cast to a NumPy array with ‘dtype’.

Parameters
  • dtype (str or dtype) – Typecode or data-type to which the array is cast.

  • copy (bool, default True) – Whether to copy the data, even if not necessary. If False, a copy is made only if the old dtype does not match the new dtype.

Returns

array – NumPy ndarray with ‘dtype’ for its dtype.

Return type

ndarray

any(axis=None, out=None, keepdims=False)[source]

Test whether any array element along a given axis evaluates to True.

See numpy.any() documentation for more information https://numpy.org/doc/stable/reference/generated/numpy.any.html#numpy.any

Parameters
  • axis – Axis or axes along which a logical OR reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

all(axis=None, out=None, keepdims=False)[source]

Test whether all array elements along a given axis evaluate to True.

Parameters
  • axis – Axis or axes along which a logical AND reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

class ray.data.extensions.tensor_extension.ArrowTensorType(shape: Tuple[int, …], dtype: pyarrow.lib.DataType)[source]

Arrow ExtensionType for an array of fixed-shaped, homogeneous-typed tensors.

This is the Arrow side of TensorDtype.

See Arrow extension type docs: https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types

PublicAPI (beta): This API is in beta and may change before becoming stable.

property shape

Shape of contained tensors.

to_pandas_dtype()[source]

Convert Arrow extension type to corresponding Pandas dtype.

Returns

An instance of pd.api.extensions.ExtensionDtype.

class ray.data.extensions.tensor_extension.ArrowTensorArray[source]

An array of fixed-shape, homogeneous-typed tensors.

This is the Arrow side of TensorArray.

See Arrow docs for customizing extension arrays: https://arrow.apache.org/docs/python/extending_types.html#custom-extension-array-class

PublicAPI (beta): This API is in beta and may change before becoming stable.

OFFSET_DTYPE

alias of numpy.int32

to_pylist(self)[source]

Convert to a list of native Python objects.

Returns

lst

Return type

list

classmethod from_numpy(arr)[source]

Convert an ndarray or an iterable of fixed-shape ndarrays to an array of fixed-shape, homogeneous-typed tensors.

Parameters

arr – An ndarray or an iterable of fixed-shape ndarrays.

Returns

An ArrowTensorArray containing len(arr) tensors of fixed shape.

to_numpy()[source]

Convert the entire array of tensors into a single ndarray.

Returns

A single ndarray representing the entire array of tensors.

Custom Datasource API

class ray.data.Datasource[source]

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

PublicAPI (beta): This API is in beta and may change before becoming stable.

prepare_read(parallelism: int, **read_args) → List[ray.data.datasource.datasource.ReadTask[T]][source]

Return the list of tasks needed to perform a read.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should be as close to this value as possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

do_write(blocks: List[ray.types.ObjectRef[Union[List[T], pyarrow.Table, bytes]]], metadata: List[ray.data.block.BlockMetadata], **write_args) → List[ray.types.ObjectRef[Any]][source]

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

on_write_complete(write_results: List[Any], **kwargs) → None[source]

Callback for when a write job completes.

This can be used to “commit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_results – The list of the write task results.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_results: List[ray.types.ObjectRef[Any]], error: Exception, **kwargs) → None[source]

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_results – The list of the write task result futures.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[], Union[List[T], pyarrow.Table, bytes]], metadata: ray.data.block.BlockMetadata)[source]

A function used to read a block of a dataset.

Read tasks are generated by datasource.prepare_read(), and return a ray.data.Block when called. Metadata about the read operation can be retrieved via get_metadata() prior to executing the read.

Ray will execute read tasks in remote functions to parallelize execution.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Utility

ray.data.set_progress_bars(enabled: bool) → bool[source]

Set whether progress bars are enabled.

Returns

Whether progress bars were previously enabled.

PublicAPI: This API is stable across Ray releases.