Dataset API Reference

Creating a Dataset

ray.data.range(n: int, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[int]

Create a dataset from a range of integers [0..n).

Examples

>>> ray.data.range(10000).map(lambda x: x * 2).show()
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the integers.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.range_arrow(n: int, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create an Arrow dataset from a range of integers [0..n).

Examples

>>> ds = ray.data.range_arrow(1000)
>>> ds.map(lambda r: {"v2": r["value"] * 2}).show()

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {“value”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding the integers as Arrow records.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, **arrow_csv_args) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create an Arrow dataset from csv files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, **arrow_json_args) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create an Arrow dataset from json files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_json(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_json_args – Other json read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = 200, **arrow_parquet_args) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create an Arrow dataset from parquet files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The amount of parallelism to use for the dataset.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]]

Create a dataset from binary files of arbitrary contents.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_binary_files(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_datasource(datasource: ray.experimental.data.datasource.datasource.Datasource[T], *, parallelism: int = 200, **read_args) → ray.experimental.data.dataset.Dataset[T]

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

Dataset holding the data read from the datasource.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_arrow(tables: List[ray.types.ObjectRef[pyarrow.Table]], *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a set of Arrow tables.

Parameters
  • dfs – A list of Ray object references to Arrow tables.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding Arrow records from the tables.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a Spark dataframe.

Parameters
  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray).

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_dask(df: dask.DataFrame, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_modin(df: modin.DataFrame, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a Modin dataframe.

Parameters
  • df – A Modin dataframe, which must be using the Ray backend.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_mars(df: mars.DataFrame, *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_pandas(dfs: List[ray.types.ObjectRef[pandas.DataFrame]], *, parallelism: int = 200) → ray.experimental.data.dataset.Dataset[ray.experimental.data.impl.arrow_block.ArrowRow]

Create a dataset from a set of Pandas dataframes.

Parameters
  • dfs – A list of Ray object references to pandas dataframes.

  • parallelism – The amount of parallelism to use for the dataset.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Dataset API

class ray.data.Dataset(blocks: ray.experimental.data.impl.block_list.BlockList[T])

Implements a distributed Arrow dataset.

Datasets are implemented as a list of ObjectRef[Block]. The block also determines the unit of parallelism. The default block type is the pyarrow.Table. Arrow-incompatible objects are held in list blocks.

Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors just like any other object. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

Dataset supports parallel transformations such as .map(), .map_batches(), and simple repartition, but currently not aggregations and joins.

PublicAPI (beta): This API is in beta and may change before becoming stable.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args) → ray.experimental.data.dataset.Dataset[U]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using .map_batches() for performance.

Examples

>>> # Transform python objects.
>>> ds.map(lambda x: x * 2)
>>> # Transform Arrow records.
>>> ds.map(lambda record: {"v2": record["value"] * 2})
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, list]], Union[pandas.DataFrame, pyarrow.Table, list]]], *, batch_size: int = None, compute: Optional[str] = None, batch_format: str = 'pandas', **ray_remote_args) → Dataset[Any]

Apply the given function to batches of records of this dataset.

This is a blocking operation.

Examples

>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, item):
...        return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
...    CachedModel,
...    batch_size=256, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable.

  • batch_size – Request a specific batch size, or leave unspecified to use entire blocks as batches.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • batch_format – Specify “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args) → ray.experimental.data.dataset.Dataset[U]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3])

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args) → ray.experimental.data.dataset.Dataset[T]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> ds.flat_map(lambda x: x % 2 == 0)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

repartition(num_blocks: int) → ray.experimental.data.dataset.Dataset[T]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation.

Examples

>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(100).write_parquet(...)

Time complexity: O(dataset size / parallelism)

Parameters

num_blocks – The number of blocks.

Returns

The repartitioned dataset.

random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) → ray.experimental.data.dataset.Dataset[T]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345)

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.experimental.data.dataset.Dataset[T]]

Split the dataset into n disjoint pieces.

This returns a list of sub-datasets that can be passed to Ray tasks and actors and used to read the dataset records in parallel.

Examples

>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

Parameters
  • n – Number of child datasets to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith dataset with the ith actor to maximize data locality.

Returns

A list of n disjoint dataset splits.

sort(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False) → ray.experimental.data.dataset.Dataset[T]

Sort the dataset by the specified key column or key function. (experimental support)

This is a blocking operation.

Examples

>>> # Sort using the entire record as the key.
>>> ds.sort()
>>> # Sort by a single column in descending order.
>>> ds.sort("field1", descending=True)
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["field1"] % 100)
>>> # Sort by multiple columns (not yet supported).
>>> ds.sort([("field1", "ascending"), ("field2", "descending)])

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

limit(limit: int) → ray.experimental.data.dataset.Dataset[T]

Limit the dataset to the first number of records specified.

Examples

>>> ds.limit(100).map(lambda x: x * 2).take()

Time complexity: O(limit specified)

Parameters

limit – The size of the dataset to truncate to.

Returns

The truncated dataset.

take(limit: int = 20) → List[T]

Take up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

show(limit: int = 20) → None

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

count() → int

Count the number of records in the dataset.

Time complexity: O(dataset size / parallelism), O(1) for parquet

Returns

The number of records in the dataset.

sum() → int

Sum up the elements of this dataset.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset.

schema() → Union[type, pyarrow.lib.Schema]

Return the schema of the dataset.

For datasets of Arrow records, this will return the Arrow schema. For datasets of Python objects, this returns their Python type.

Time complexity: O(1)

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

num_blocks() → int

Return the number of blocks of this dataset.

Time complexity: O(1)

Returns

The number of blocks of this dataset.

size_bytes() → int

Return the in-memory size of the dataset.

Time complexity: O(1)

Returns

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

input_files() → List[str]

Return the list of input files for the dataset.

Time complexity: O(num input files)

Returns

The list of input files used to create the dataset, or an empty list if the input files is not known.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None) → None

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> ds.write_parquet("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

write_json(path: str) → None

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> ds.write_json("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters

path – The path to the destination root directory, where json files will be written to.

write_csv(path: str) → None

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> ds.write_csv("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters

path – The path to the destination root directory, where csv files will be written to.

write_datasource(datasource: ray.experimental.data.datasource.datasource.Datasource[T], **write_args) → None

Write the dataset to a custom datasource.

Examples

>>> ds.write_datasource(CustomDatasourceImpl(...))

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • write_args – Additional write args to pass to the datasource.

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T]

Return a local row iterator over the dataset.

Examples

>>> for i in ray.data.range(1000000).iter_rows():
...     print(i)

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'pandas', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, list]]

Return a local batched iterator over the dataset.

Examples

>>> for pandas_df in ray.data.range(1000000).iter_batches():
...     print(pandas_df)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “pandas”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

to_torch(*, label_column: str, feature_columns: Optional[List[str]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[List[torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False) → torch.utils.data.IterableDataset

Return a Torch IterableDataset over this dataset.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item is a list of the feature tensors. The second item is the label tensor. Each tensor will be of shape (N, 1), where N is the batch_size used by the DataLoader.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output list).

  • feature_columns (Optional[List[str]]) – The names of the columns to use as the features. If None, then use all columns except the label columns as the features.

  • label_column_dtype (Optional[torch.dtype]) – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes (Optional[List[torch.dtype]]) – The dtypes to use for the feature columns. The len of this list must be equal to the len of feature_columns. If None, then automatically infer the dtype.

  • batch_size (int) – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks (int) – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last (bool) – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

Returns

A torch IterableDataset.

to_tf(*, label_column: str, output_signature: List[tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

Note that you probably want to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output tuple).

  • output_signature (List[tf.TypeSpec]) – A 2-element list of tf.TypeSpec objects corresponding to (features, label).

  • feature_columns (Optional[List[str]]) – List of columns in datasets to use. If None, all columns will be used.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

Returns

A tf.data.Dataset.

to_dask() → dask.DataFrame

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Returns

A Dask DataFrame created from this dataset.

to_mars() → mars.DataFrame

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

to_modin() → modin.DataFrame

Convert this dataset into a Modin dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

to_spark() → pyspark.sql.DataFrame

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

to_pandas() → List[ray.types.ObjectRef[pandas.DataFrame]]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_blocks().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

to_arrow() → List[ray.types.ObjectRef[pyarrow.Table]]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

repeat(times: int = None) → DatasetPipeline[T]

Convert this into a DatasetPipeline by looping over this dataset.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset.

Examples

>>> # Infinite pipeline of numbers [0, 5)
>>> ray.data.range(5).repeat().take()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...]
>>> # Can apply transformations to the pipeline.
>>> ray.data.range(5).repeat().map(lambda x: -x).take()
[0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...]
>>> # Can shuffle each epoch (dataset) in the pipeline.
>>> ray.data.range(5).repeat().random_shuffle().take()
[2, 3, 0, 4, 1, 4, 0, 2, 1, 3, ...]
Parameters

times – The number of times to loop over this dataset, or None to repeat indefinitely.

pipeline(*, parallelism: int = 10) → DatasetPipeline[T]

Pipeline the dataset execution by splitting its blocks into groups.

Transformations prior to the call to pipeline() are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per group of blocks as data is read from the output of the pipeline.

Pipelining execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).

Without pipelining:

[preprocessing......]
                      [inference.......]
                                         [write........]
Time ----------------------------------------------------------->

With pipelining:

[prep1] [prep2] [prep3]
        [infer1] [infer2] [infer3]
                 [write1] [write2] [write3]
Time ----------------------------------------------------------->

Examples

>>> # Create an inference pipeline.
>>> ds = ray.data.read_binary_files(dir)
>>> pipe = ds.pipeline(parallelism=10).map(infer)
DatasetPipeline(num_stages=2, length=40)
>>> # The higher the stage parallelism, the shorter the pipeline.
>>> pipe = ds.pipeline(parallelism=20).map(infer)
DatasetPipeline(num_stages=2, length=20)
>>> # Outputs can be incrementally read from the pipeline.
>>> for item in pipe.iter_rows():
...    print(item)
Parameters

parallelism – The parallelism (number of blocks) per stage. Increasing parallelism increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

get_blocks() → List[ray.types.ObjectRef[Union[List[T], pyarrow.Table]]]

Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data.

Time complexity: O(1)

Returns:

A list of references to this dataset’s blocks.

DeveloperAPI: This API may change across minor Ray releases.

DatasetPipeline API

class ray.experimental.data.dataset_pipeline.DatasetPipeline(base_iterable: Iterable[Callable[], ray.experimental.data.dataset.Dataset[T]]], stages: List[Callable[[ray.experimental.data.dataset.Dataset[Any]], ray.experimental.data.dataset.Dataset[Any]]] = None, length: int = None, progress_bars: bool = True)[source]

Implements a pipeline of Datasets.

Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.pipeline(parallelism=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI (beta): This API is in beta and may change before becoming stable.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'pandas', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, list]][source]

Return a local batched iterator over the data in the pipeline.

Examples

>>> for pandas_df in ray.data.range(1000000).iter_batches():
...     print(pandas_df)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “pandas”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.experimental.data.dataset_pipeline.DatasetPipeline[T]][source]

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.

Examples

>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

Parameters
  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.

Returns

A list of n disjoint pipeline splits.

schema() → Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset pipeline.

For datasets of Arrow records, this will return the Arrow schema. For dataset of Python objects, this returns their Python type.

Time complexity: O(1)

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

count() → int[source]

Count the number of records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The number of records in the dataset pipeline.

sum() → int[source]

Sum the records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset pipeline.

iter_datasets() → Iterator[ray.experimental.data.dataset.Dataset[T]][source]

Iterate over the output datasets of this pipeline.

Returns:

Iterator over the datasets outputted from this pipeline.

DeveloperAPI: This API may change across minor Ray releases.

foreach_dataset(fn: Callable[[ray.experimental.data.dataset.Dataset[T]], ray.experimental.data.dataset.Dataset[U]])ray.experimental.data.dataset_pipeline.DatasetPipeline[U][source]

Apply a transform to each dataset in this pipeline.

Args:

fn: The function to transform each dataset with.

Returns:

The transformed DatasetPipeline.

DeveloperAPI: This API may change across minor Ray releases.

static from_iterable(iterable: Iterable[Callable[], ray.experimental.data.dataset.Dataset[T]]])ray.experimental.data.dataset_pipeline.DatasetPipeline[T][source]

Create a pipeline from an sequence of Dataset producing functions.

Parameters

iterable – A finite or infinite-length sequence of functions that each produce a Dataset when called.

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args) → DatasetPipeline[T]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> ds.flat_map(lambda x: x % 2 == 0)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args) → DatasetPipeline[U]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3])

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T]

Return a local row iterator over the dataset.

Examples

>>> for i in ray.data.range(1000000).iter_rows():
...     print(i)

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args) → DatasetPipeline[U]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using .map_batches() for performance.

Examples

>>> # Transform python objects.
>>> ds.map(lambda x: x * 2)
>>> # Transform Arrow records.
>>> ds.map(lambda record: {"v2": record["value"] * 2})
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, list]], Union[pandas.DataFrame, pyarrow.Table, list]]], *, batch_size: int = None, compute: Optional[str] = None, batch_format: str = 'pandas', **ray_remote_args) → DatasetPipeline[Any]

Apply the given function to batches of records of this dataset.

This is a blocking operation.

Examples

>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, item):
...        return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
...    CachedModel,
...    batch_size=256, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable.

  • batch_size – Request a specific batch size, or leave unspecified to use entire blocks as batches.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • batch_format – Specify “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) → DatasetPipeline[T]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345)

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

repartition(num_blocks: int) → DatasetPipeline[T]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation.

Examples

>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(100).write_parquet(...)

Time complexity: O(dataset size / parallelism)

Parameters

num_blocks – The number of blocks.

Returns

The repartitioned dataset.

show(limit: int = 20) → None

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

sort(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False) → DatasetPipeline[T]

Sort the dataset by the specified key column or key function. (experimental support)

This is a blocking operation.

Examples

>>> # Sort using the entire record as the key.
>>> ds.sort()
>>> # Sort by a single column in descending order.
>>> ds.sort("field1", descending=True)
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["field1"] % 100)
>>> # Sort by multiple columns (not yet supported).
>>> ds.sort([("field1", "ascending"), ("field2", "descending)])

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

take(limit: int = 20) → List[T]

Take up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

to_tf(*, label_column: str, output_signature: List[tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

Note that you probably want to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output tuple).

  • output_signature (List[tf.TypeSpec]) – A 2-element list of tf.TypeSpec objects corresponding to (features, label).

  • feature_columns (Optional[List[str]]) – List of columns in datasets to use. If None, all columns will be used.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

Returns

A tf.data.Dataset.

to_torch(*, label_column: str, feature_columns: Optional[List[str]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[List[torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False) → torch.utils.data.IterableDataset

Return a Torch IterableDataset over this dataset.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item is a list of the feature tensors. The second item is the label tensor. Each tensor will be of shape (N, 1), where N is the batch_size used by the DataLoader.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output list).

  • feature_columns (Optional[List[str]]) – The names of the columns to use as the features. If None, then use all columns except the label columns as the features.

  • label_column_dtype (Optional[torch.dtype]) – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes (Optional[List[torch.dtype]]) – The dtypes to use for the feature columns. The len of this list must be equal to the len of feature_columns. If None, then automatically infer the dtype.

  • batch_size (int) – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks (int) – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last (bool) – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

Returns

A torch IterableDataset.

write_csv(path: str) → None

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> ds.write_csv("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters

path – The path to the destination root directory, where csv files will be written to.

write_datasource(datasource: ray.experimental.data.datasource.datasource.Datasource[T], **write_args) → None

Write the dataset to a custom datasource.

Examples

>>> ds.write_datasource(CustomDatasourceImpl(...))

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • write_args – Additional write args to pass to the datasource.

write_json(path: str) → None

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> ds.write_json("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters

path – The path to the destination root directory, where json files will be written to.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None) → None

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

The format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> ds.write_parquet("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

Custom Datasource API

class ray.data.Datasource

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

PublicAPI (beta): This API is in beta and may change before becoming stable.

prepare_read(parallelism: int, **read_args) → List[ray.experimental.data.datasource.datasource.ReadTask[T]]

Return the list of tasks needed to perform a read.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should be as close to this value as possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

prepare_write(blocks: List[ray.types.ObjectRef[Union[List[T], pyarrow.Table]]], metadata: List[ray.experimental.data.block.BlockMetadata], **write_args) → List[WriteTask[T]]

Return the list of tasks needed to perform a write.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of write tasks that can be executed to write blocks to the datasource in parallel.

on_write_complete(write_tasks: List[WriteTask[T]], write_task_outputs: List[Any], **kwargs) → None

Callback for when a write job completes.

This can be used to “commit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_tasks – The list of the original write tasks.

  • write_task_outputs – The list of write task outputs.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_tasks: List[WriteTask[T]], error: Exception, **kwargs) → None

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_tasks – The list of the original write tasks.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[], Union[List[T], pyarrow.Table]], metadata: ray.experimental.data.block.BlockMetadata)

A function used to read a block of a dataset.

Read tasks are generated by datasource.prepare_read(), and return a ray.data.Block when called. Metadata about the read operation can be retrieved via get_metadata() prior to executing the read.

Ray will execute read tasks in remote functions to parallelize execution.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.data.WriteTask(write_fn: Callable[], Any])

A function used to write a chunk of a dataset.

Write tasks are generated by datasource.prepare_write(), and return a datasource-specific output that is passed to on_write_complete() on write completion.

Ray will execute write tasks in remote functions to parallelize execution.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Utility

ray.data.set_progress_bars(enabled: bool) → bool

Set whether progress bars are enabled.

Returns

Whether progress bars were previously enabled.

PublicAPI: This API is stable across Ray releases.