Dataset API Reference

Creating a Dataset

ray.data.range(n: int, *, parallelism: int = 200) → ray.data.dataset.Dataset[int][source]

Create a dataset from a range of integers [0..n).

Examples

>>> ray.data.range(10000).map(lambda x: x * 2).show()
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.range_arrow(n: int, *, parallelism: int = 200) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from a range of integers [0..n).

Examples

>>> ds = ray.data.range_arrow(1000)
>>> ds.map(lambda r: {"v2": r["value"] * 2}).show()

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {“value”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow records.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.range_tensor(n: int, *, shape: Tuple = 1, parallelism: int = 200) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a Tensor dataset from a range of integers [0..n).

Examples

>>> ds = ray.data.range_tensor(1000, shape=(3, 10))
>>> ds.map_batches(lambda arr: arr * 2, batch_format="pandas").show()

This is similar to range_arrow(), but uses the ArrowTensorArray extension type. The dataset elements take the form {“value”: array(N, shape=shape)}.

Parameters
  • n – The upper bound of the range of integer records.

  • shape – The shape of each record.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow tensor records.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_csv_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_json_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from json files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_json(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • arrow_json_args – Other json read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, …]]]] = None, **arrow_parquet_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from parquet files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • _tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_numpy(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **numpy_load_args) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_numpy("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple directories.
>>> ray.data.read_numpy(["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • numpy_load_args – Other options to pass to np.load.

Returns

Dataset holding Tensor records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_text(paths: Union[str, List[str]], *, encoding: str = 'utf-8', filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None) → ray.data.dataset.Dataset[str][source]

Create a dataset from lines stored in text files.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_text("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_text(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • encoding – The encoding of the files (e.g., “utf-8” or “ascii”).

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

Returns

Dataset holding lines of text read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None) → ray.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]][source]

Create a dataset from binary files of arbitrary contents.

Examples

>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path")
>>> # Read multiple local files.
>>> ray.data.read_binary_files(["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.read_datasource(datasource: ray.data.datasource.datasource.Datasource[T], *, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _spread_resource_prefix: Optional[str] = None, **read_args) → ray.data.dataset.Dataset[T][source]

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource.

  • read_args – Additional kwargs to pass to the datasource impl.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

Returns

Dataset holding the data read from the datasource.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_items(items: List[Any], *, parallelism: int = 200) → ray.data.dataset.Dataset[Any][source]

Create a dataset from a list of local Python objects.

Examples

>>> ray.data.from_items([1, 2, 3, 4, 5])
Parameters
  • items – List of local Python objects.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the items.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_arrow(tables: Union[pyarrow.Table, bytes, List[Union[pyarrow.Table, bytes]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Arrow tables.

Parameters

tables – An Arrow table, or a list of Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_arrow_refs(tables: Union[ray.types.ObjectRef[Union[pyarrow.Table, bytes]], List[ray.types.ObjectRef[Union[pyarrow.Table, bytes]]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a set of Arrow tables.

Parameters

tables – A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: Optional[int] = None) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Spark dataframe.

Parameters
  • spark – A SparkSession, which must be created by RayDP (Spark-on-Ray).

  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_dask(df: dask.DataFrame) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_modin(df: modin.DataFrame) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a Modin dataframe.

Parameters

df – A Modin dataframe, which must be using the Ray backend.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_mars(df: mars.DataFrame) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_pandas(dfs: Union[pandas.DataFrame, List[pandas.DataFrame]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Pandas dataframes.

Parameters

dfs – A Pandas dataframe or a list of Pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.data.from_pandas_refs(dfs: Union[ray.types.ObjectRef[pandas.DataFrame], List[ray.types.ObjectRef[pandas.DataFrame]]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a list of Ray object references to Pandas dataframes.

Parameters

dfs – A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_numpy(ndarrays: List[ray.types.ObjectRef[numpy.ndarray]]) → ray.data.dataset.Dataset[ray.data.impl.arrow_block.ArrowRow][source]

Create a dataset from a set of NumPy ndarrays.

Parameters

ndarrays – A list of Ray object references to NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

Dataset API

class ray.data.Dataset(blocks: ray.data.impl.block_list.BlockList, epoch: int, stats: ray.data.impl.stats.DatasetStats)[source]

Implements a distributed Arrow dataset.

Datasets are implemented as a list of ObjectRef[Block]. The block also determines the unit of parallelism. The default block type is the pyarrow.Table. Arrow-incompatible objects are held in list blocks.

Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors just like any other object. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

Dataset supports parallel transformations such as .map(), .map_batches(), and simple repartition, but currently not aggregations and joins.

PublicAPI (beta): This API is in beta and may change before becoming stable.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[U][source]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using .map_batches() for performance.

Examples

>>> # Transform python objects.
>>> ds.map(lambda x: x * 2)
>>> # Transform Arrow records.
>>> ds.map(lambda record: {"v2": record["value"] * 2})
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]]], *, batch_size: Optional[int] = 4096, compute: Optional[str] = None, batch_format: str = 'native', **ray_remote_args) → Dataset[Any][source]

Apply the given function to batches of records of this dataset.

This is a blocking operation.

Examples

>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, item):
...        return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
...    CachedModel,
...    batch_size=256, compute="actors", num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable.

  • batch_size – Request a specific batch size, or None to use entire blocks as batches. Defaults to a system-chosen batch size.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • batch_format – Specify “native” to use the native block format, “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[U][source]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3])

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args) → ray.data.dataset.Dataset[T][source]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> ds.filter(lambda x: x % 2 == 0)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling Ray actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

repartition(num_blocks: int, *, shuffle: bool = False) → ray.data.dataset.Dataset[T][source]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation. After repartitioning, all blocks in the returned dataset will have approximately the same number of rows.

Examples

>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(10).write_parquet(...)

Time complexity: O(dataset size / parallelism)

Parameters
  • num_blocks – The number of blocks.

  • shuffle – Whether to perform a distributed shuffle during the repartition. When shuffle is enabled, each output block contains a subset of data rows from each input block, which requires all-to-all data movement. When shuffle is disabled, output blocks are created from adjacent input blocks, minimizing data movement.

Returns

The repartitioned dataset.

random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None, _move: Optional[bool] = False, _spread_resource_prefix: Optional[str] = None) → ray.data.dataset.Dataset[T][source]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345)

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.data.dataset.Dataset[T]][source]

Split the dataset into n disjoint pieces.

This returns a list of sub-datasets that can be passed to Ray tasks and actors and used to read the dataset records in parallel.

Examples

>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

See also: Dataset.split_at_indices

Parameters
  • n – Number of child datasets to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith dataset with the ith actor to maximize data locality.

Returns

A list of n disjoint dataset splits.

split_at_indices(indices: List[int]) → List[ray.data.dataset.Dataset[T]][source]

Split the dataset at the given indices (like np.split).

Examples

>>> d1, d2, d3 = ray.data.range(10).split_at_indices([2, 5])
>>> d1.take()
[0, 1]
>>> d2.take()
[2, 3, 4]
>>> d3.take()
[5, 6, 7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split

Parameters

indices – List of sorted integers which indicate where the dataset will be split. If an index exceeds the length of the dataset, an empty dataset will be returned.

Returns

The dataset splits.

union(*other: List[Dataset[T]]) → ray.data.dataset.Dataset[T][source]

Combine this dataset with others of the same type.

The order of the blocks in the datasets is preserved, as is the relative ordering between the datasets passed in the argument list.

Parameters

other – List of datasets to combine with this one. The datasets must have the same schema as this dataset, otherwise the behavior is undefined.

Returns

A new dataset holding the union of their data.

groupby(key: GroupKeyT) → GroupedDataset[T][source]

Group the dataset by the key function or column name (Experimental).

This is a lazy operation.

Examples

>>> # Group by a key function and aggregate.
>>> ray.data.range(100).groupby(lambda x: x % 3).count()
>>> # Group by an Arrow table column and aggregate.
>>> ray.data.from_items([
...     {"A": x % 3, "B": x} for x in range(100)]).groupby(
...     "A").count()

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters

key – A key function or Arrow column name.

Returns

A lazy GroupedDataset that can be aggregated later.

aggregate(*aggs: ray.data.aggregate.AggregateFn) → U[source]

Aggregate the entire dataset as one group.

This is a blocking operation.

Examples

>>> from ray.data.aggregate import Max, Mean
>>> ray.data.range(100).aggregate(Max())
>>> ray.data.range_arrow(100).aggregate(
    Max("value"), Mean("value"))

Time complexity: O(dataset size / parallelism)

Parameters

aggs – Aggregations to do.

Returns

If the input dataset is a simple dataset then the output is a tuple of (agg1, agg2, ...) where each tuple element is the corresponding aggregation result. If the input dataset is an Arrow dataset then the output is an ArrowRow where each column is the corresponding aggregation result. If the dataset is empty, return None.

sum(on: Optional[AggregateOnTs] = None) → U[source]

Compute sum over entire dataset.

This is a blocking operation.

Examples

>>> ray.data.range(100).sum()
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).sum(lambda x: x[1])
>>> ray.data.range_arrow(100).sum("value")
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).sum(["A", "B"])
Parameters

on

The data subset on which to compute the sum.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar sum of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise sum of all columns.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a scalar representing the sum of all rows,

  • on=callable: a scalar representing the sum of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (sum_1, ..., sum_n) representing the sum of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise sum of all columns,

  • on="col": a scalar representing the sum of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise sum of the provided columns.

If the dataset is empty, then the output is 0.

min(on: Optional[AggregateOnTs] = None) → U[source]

Compute minimum over entire dataset.

This is a blocking operation.

Examples

>>> ray.data.range(100).min()
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).min(lambda x: x[1])
>>> ray.data.range_arrow(100).min("value")
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).min(["A", "B"])
Parameters

on

The data subset on which to compute the min.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar min of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise min of all columns.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a scalar representing the min of all rows,

  • on=callable: a scalar representing the min of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (min_1, ..., min_n) representing the min of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise min of all columns,

  • on="col": a scalar representing the min of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise min of the provided columns.

If the dataset is empty, then a ValueError is raised.

max(on: Optional[AggregateOnTs] = None) → U[source]

Compute maximum over entire dataset.

This is a blocking operation.

Examples

>>> ray.data.range(100).max()
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).max(lambda x: x[1])
>>> ray.data.range_arrow(100).max("value")
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).max(["A", "B"])
Parameters

on

The data subset on which to compute the max.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar max of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise max of all columns.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a scalar representing the max of all rows,

  • on=callable: a scalar representing the max of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (max_1, ..., max_n) representing the max of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise max of all columns,

  • on="col": a scalar representing the max of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise max of the provided columns.

If the dataset is empty, then a ValueError is raised.

mean(on: Optional[AggregateOnTs] = None) → U[source]

Compute mean over entire dataset.

This is a blocking operation.

Examples

>>> ray.data.range(100).mean()
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).mean(lambda x: x[1])
>>> ray.data.range_arrow(100).mean("value")
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).mean(["A", "B"])
Parameters

on

The data subset on which to compute the mean.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar mean of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise mean of all columns.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a scalar representing the mean of all rows,

  • on=callable: a scalar representing the mean of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (mean_1, ..., mean_n) representing the mean of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise mean of all columns,

  • on="col": a scalar representing the mean of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise mean of the provided columns.

If the dataset is empty, then a ValueError is raised.

std(on: Optional[AggregateOnTs] = None, ddof: int = 1) → U[source]

Compute standard deviation over entire dataset.

This is a blocking operation.

Examples

>>> ray.data.range(100).std()
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).std(lambda x: x[1])
>>> ray.data.range_arrow(100).std("value", ddof=0)
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).std(["A", "B"])

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a scalar representing the std of all rows,

  • on=callable: a scalar representing the std of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (std_1, ..., std_n) representing the std of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise std of all columns,

  • on="col": a scalar representing the std of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise std of the provided columns.

If the dataset is empty, then a ValueError is raised.

sort(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False) → ray.data.dataset.Dataset[T][source]

Sort the dataset by the specified key column or key function. (experimental support)

This is a blocking operation.

Examples

>>> # Sort using the entire record as the key.
>>> ds.sort()
>>> # Sort by a single column in descending order.
>>> ds.sort("field1", descending=True)
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["field1"] % 100)
>>> # Sort by multiple columns (not yet supported).
>>> ds.sort([("field1", "ascending"), ("field2", "descending")])

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

zip(other: Dataset[U]) → Dataset[T, U][source]

Zip this dataset with the elements of another.

The datasets must have identical num rows, block types, and block sizes (e.g., one was produced from a .map() of another). For Arrow blocks, the schema will be concatenated, and any duplicate column names disambiguated with _1, _2, etc. suffixes.

Time complexity: O(dataset size / parallelism)

Parameters

other – The dataset to zip with on the right hand side.

Examples

>>> ds = ray.data.range(5)
>>> ds.zip(ds).take()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
Returns

A Dataset with (k, v) pairs (or concatenated Arrow schema) where k comes from the first dataset and v comes from the second.

limit(limit: int) → ray.data.dataset.Dataset[T][source]

Limit the dataset to the first number of records specified.

Examples

>>> ds.limit(100).map(lambda x: x * 2).take()

Time complexity: O(limit specified)

Parameters

limit – The size of the dataset to truncate to.

Returns

The truncated dataset.

take(limit: int = 20) → List[T][source]

Take up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

take_all(limit: int = 100000) → List[T][source]

Take all the records in the dataset.

Time complexity: O(dataset size)

Parameters

limit – Raise an error if the size exceeds the specified limit.

Returns

A list of all the records in the dataset.

show(limit: int = 20) → None[source]

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

count() → int[source]

Count the number of records in the dataset.

Time complexity: O(dataset size / parallelism), O(1) for parquet

Returns

The number of records in the dataset.

schema(fetch_if_missing: bool = False) → Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset.

For datasets of Arrow records, this will return the Arrow schema. For datasets of Python objects, this returns their Python type.

Time complexity: O(1)

Parameters

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. Default is False, where None is returned if the schema is not known.

Returns

The Python type or Arrow schema of the records, or None if the schema is not known and fetch_if_missing is False.

num_blocks() → int[source]

Return the number of blocks of this dataset.

Note that during read and transform operations, the number of blocks may be dynamically adjusted to respect memory limits, increasing the number of blocks at runtime.

Time complexity: O(1)

Returns

The number of blocks of this dataset.

size_bytes() → int[source]

Return the in-memory size of the dataset.

Time complexity: O(1)

Returns

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

input_files() → List[str][source]

Return the list of input files for the dataset.

Time complexity: O(num input files)

Returns

The list of input files used to create the dataset, or an empty list if the input files is not known.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_parquet_args) → None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> ds.write_parquet("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **pandas_json_args) → None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> ds.write_json("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_csv_args) → None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> ds.write_csv("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

write_numpy(path: str, *, column: str = 'value', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>) → None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> ds.write_numpy("s3://bucket/path")

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. This defaults to “value”.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

write_datasource(datasource: ray.data.datasource.datasource.Datasource[T], **write_args) → None[source]

Write the dataset to a custom datasource.

Examples

>>> ds.write_datasource(CustomDatasourceImpl(...))

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • write_args – Additional write args to pass to the datasource.

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T][source]

Return a local row iterator over the dataset.

Examples

>>> for i in ray.data.range(1000000).iter_rows():
...     print(i)

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'native', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]][source]

Return a local batched iterator over the dataset.

Examples

>>> for batch in ray.data.range(1000000).iter_batches():
...     print(batch)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “native” to use the current block format, “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “native”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

to_torch(*, label_column: Optional[str] = None, feature_columns: Union[None, List[str], List[List[str]], Dict[str, List[str]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Union[None, torch.dtype, List[torch.dtype], Dict[str, torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, unsqueeze_label_tensor: bool = True) → torch.utils.data.IterableDataset[source]

Return a Torch IterableDataset over this dataset.

This is only supported for datasets convertible to Arrow records.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item contains the feature tensor(s), and the second item is the label tensor. Those can take on different forms, depending on the specified arguments.

For the features tensor (N is the batch_size and n, m, k are the number of features per tensor):

  • If feature_columns is a List[str], the features will be a tensor of shape (N, n), with columns corresponding to feature_columns

  • If feature_columns is a List[List[str]], the features will be a list of tensors of shape [(N, m),…,(N, k)], with columns of each tensor corresponding to the elements of feature_columns

  • If feature_columns is a Dict[str, List[str]], the features will be a dict of key-tensor pairs of shape {key1: (N, m),…, keyN: (N, k)}, with columns of each tensor corresponding to the value of feature_columns under the key.

If unsqueeze_label_tensor=True (default), the label tensor will be of shape (N, 1). Otherwise, it will be of shape (N,). If label_column is specified as None, then no column from the Dataset will be treated as the label, and the output label tensor will be None.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column (Optional[str]) – The name of the column used as the label (second element of the output list). Can be None for prediction, in which case the second element of returned tuple will also be None.

  • feature_columns (Union[None, List[str], List[List[str]], Dict[str, List[str]]]) – The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. If None, then use all columns except the label columns as the features.

  • label_column_dtype (Optional[torch.dtype]) – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes (Union[None, torch.dtype, List[torch.dtype], Dict[str, torch.dtype]]) – The dtypes to use for the feature tensors. This should match the format of feature_columns, or be a single dtype, in which case it will be applied to all tensors. If None, then automatically infer the dtype.

  • batch_size (int) – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks (int) – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last (bool) – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

  • unsqueeze_label_tensor (bool) – If set to True, the label tensor will be unsqueezed (reshaped to (N, 1)). Otherwise, it will be left as is, that is (N, ). In general, regression loss functions expect an unsqueezed tensor, while classification loss functions expect a squeezed one. Defaults to True.

Returns

A torch IterableDataset.

to_tf(*, label_column: str, output_signature: Tuple[tf.TypeSpec, tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset[source]

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

It is recommended to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • label_column (str) – The name of the column used as the label (second element of the output tuple).

  • output_signature (Tuple[tf.TypeSpec, tf.TypeSpec]) – A 2-element tuple of tf.TypeSpec objects corresponding to (features, label).

  • feature_columns (Optional[List[str]]) – List of columns in datasets to use. If None, all columns will be used.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

Returns

A tf.data.Dataset.

to_dask() → dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Returns

A Dask DataFrame created from this dataset.

to_mars() → mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

to_modin() → modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

to_spark(spark: pyspark.sql.SparkSession) → pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

to_pandas(limit: int = 100000) → pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow or Pandas records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

to_pandas_refs() → List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

to_numpy_refs(*, column: Optional[str] = None) → List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters

column – The name of the column to convert to numpy, or None to specify the entire row. Required for Arrow tables.

Returns

A list of remote NumPy ndarrays created from this dataset.

to_arrow_refs() → List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

repeat(times: int = None) → DatasetPipeline[T][source]

Convert this into a DatasetPipeline by looping over this dataset.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset.

Note that every repeat of the dataset is considered an “epoch” for the purposes of DatasetPipeline.iter_epochs().

Examples

>>> # Infinite pipeline of numbers [0, 5)
>>> ray.data.range(5).repeat().take()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...]
>>> # Can apply transformations to the pipeline.
>>> ray.data.range(5).repeat().map(lambda x: -x).take()
[0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...]
>>> # Can shuffle each epoch (dataset) in the pipeline.
>>> ray.data.range(5).repeat().random_shuffle().take()
[2, 3, 0, 4, 1, 4, 0, 2, 1, 3, ...]
Parameters

times – The number of times to loop over this dataset, or None to repeat indefinitely.

window(*, blocks_per_window: int = 10) → DatasetPipeline[T][source]

Convert this into a DatasetPipeline by windowing over data blocks.

Transformations prior to the call to window() are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per window of blocks as data is read from the output of the pipeline.

Windowing execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).

Without windowing:

[preprocessing......]
                      [inference.......]
                                         [write........]
Time ----------------------------------------------------------->

With windowing:

[prep1] [prep2] [prep3]
        [infer1] [infer2] [infer3]
                 [write1] [write2] [write3]
Time ----------------------------------------------------------->

Examples

>>> # Create an inference pipeline.
>>> ds = ray.data.read_binary_files(dir)
>>> pipe = ds.window(blocks_per_window=10).map(infer)
DatasetPipeline(num_windows=40, num_stages=2)
>>> # The higher the stage parallelism, the shorter the pipeline.
>>> pipe = ds.window(blocks_per_window=20).map(infer)
DatasetPipeline(num_windows=20, num_stages=2)
>>> # Outputs can be incrementally read from the pipeline.
>>> for item in pipe.iter_rows():
...    print(item)
Parameters

blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

get_internal_block_refs() → List[ray.types.ObjectRef[Union[List[T], pyarrow.Table, pandas.DataFrame, bytes]]][source]

Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data. It blocks until the underlying blocks are computed.

Time complexity: O(1)

Returns:

A list of references to this dataset’s blocks.

DeveloperAPI: This API may change across minor Ray releases.

stats() → str[source]

Returns a string containing execution timing information. DeveloperAPI: This API may change across minor Ray releases.

DatasetPipeline API

class ray.data.dataset_pipeline.DatasetPipeline(base_iterable: Iterable[Callable[], ray.data.dataset.Dataset[T]]], stages: List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]] = None, length: int = None, progress_bars: bool = True, _executed: List[bool] = None)[source]

Implements a pipeline of Datasets.

Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI (beta): This API is in beta and may change before becoming stable.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'pandas', drop_last: bool = False) → Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]][source]

Return a local batched iterator over the data in the pipeline.

Examples

>>> for pandas_df in ray.data.range(1000000).iter_batches():
...     print(pandas_df)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “pandas”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

A list of iterators over record batches.

split(n: int, *, equal: bool = False, locality_hints: List[Any] = None) → List[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.

Examples

>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split(len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers):
...     worker.consume.remote(shard)

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

Parameters
  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.

Returns

A list of n disjoint pipeline splits.

split_at_indices(indices: List[int]) → List[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split the datasets within the pipeline at the given indices (like np.split).

This will split each dataset contained within this pipeline, thereby producing len(indices) + 1 pipelines with the first pipeline containing the [0, indices[0]) slice from each dataset, the second pipeline containing the [indices[0], indices[1]) slice from each dataset, and so on, with the final pipeline will containing the [indices[-1], self.count()) slice from each dataset.

Examples

>>> p1, p2, p3 = ray.data.range(
        8).repeat(2).split_at_indices([2, 5])
>>> p1.take()
[0, 1, 0, 1]
>>> p2.take()
[2, 3, 4, 2, 3, 4]
>>> p3.take()
[5, 6, 7, 5, 6, 7]

Time complexity: O(num splits)

See also: DatasetPipeline.split

Parameters

indices – List of sorted integers which indicate where the pipeline will be split. If an index exceeds the length of the pipeline, an empty pipeline will be returned.

Returns

The pipeline splits.

rewindow(*, blocks_per_window: int, preserve_epoch: bool = True)ray.data.dataset_pipeline.DatasetPipeline[T][source]

Change the windowing (blocks per dataset) of this pipeline.

Changes the windowing of this pipeline to the specified size. For example, if the current pipeline has two blocks per dataset, and .rewindow(blocks_per_window=4) is requested, adjacent datasets will be merged until each dataset is 4 blocks. If .rewindow(blocks_per_window) was requested the datasets will be split into smaller windows.

Parameters
  • blocks_per_window – The new target blocks per window.

  • preserve_epoch – Whether to preserve epoch boundaries. If set to False, then windows can contain data from two adjacent epochs.

repeat(times: int = None)ray.data.dataset_pipeline.DatasetPipeline[T][source]

Repeat this pipeline a given number or times, or indefinitely.

This operation is only allowed for pipelines of a finite length. An error will be raised for pipelines of infinite length.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the repeated pipeline are evaluated on each loop of the pipeline over the base pipeline.

Note that every repeat of the pipeline is considered an “epoch” for the purposes of iter_epochs(). If there are multiple repeat calls, the latest repeat takes precedence for the purpose of defining epochs.

Parameters

times – The number of times to loop over this pipeline, or None to repeat indefinitely.

schema() → Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset pipeline.

For datasets of Arrow records, this will return the Arrow schema. For dataset of Python objects, this returns their Python type.

Time complexity: O(1)

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

count() → int[source]

Count the number of records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The number of records in the dataset pipeline.

sum() → int[source]

Sum the records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset pipeline.

show_windows(limit_per_dataset: int = 10) → None[source]

Print up to the given number of records from each window/dataset.

This is helpful as a debugging tool for understanding the structure of dataset pipelines.

Parameters

limit_per_dataset – Rows to print per window/dataset.

iter_epochs() → Iterator[ray.data.dataset_pipeline.DatasetPipeline[T]][source]

Split this pipeline up by epoch.

This allows reading of data per-epoch for repeated Datasets, which is useful for ML training. For example, ray.data.range(10).repeat(50) generates a pipeline with 500 rows total split across 50 epochs. This method allows iterating over the data individually per epoch (repetition) of the original data.

Examples

>>> epochs = ray.data.range(10).repeat(50).iter_epochs()
>>> for i, epoch in enumerate(epochs):
...     print("Epoch", i)
...     for row in epoch.iter_rows():
...         print(row)
Returns

Iterator over epoch objects, where each epoch is a DatasetPipeline containing data from that epoch only.

iter_datasets() → Iterator[ray.data.dataset.Dataset[T]][source]

Iterate over the output datasets of this pipeline.

Returns:

Iterator over the datasets outputted from this pipeline.

DeveloperAPI: This API may change across minor Ray releases.

foreach_window(fn: Callable[[ray.data.dataset.Dataset[T]], ray.data.dataset.Dataset[U]])ray.data.dataset_pipeline.DatasetPipeline[U][source]

Apply a transform to each dataset/window in this pipeline.

Args:

fn: The function to transform each dataset with.

Returns:

The transformed DatasetPipeline.

DeveloperAPI: This API may change across minor Ray releases.

stats(exclude_first_window: bool = True) → str[source]

Returns a string containing execution timing information.

Args:
exclude_first_window: Whether to exclude the first window from

the pipeline time breakdown. This is generally a good idea since there is always a stall waiting for the first window to be initially computed, which can be misleading in the stats.

DeveloperAPI: This API may change across minor Ray releases.

static from_iterable(iterable: Iterable[Callable[], ray.data.dataset.Dataset[T]]])ray.data.dataset_pipeline.DatasetPipeline[T][source]

Create a pipeline from an sequence of Dataset producing functions.

Parameters

iterable – A finite or infinite-length sequence of functions that each produce a Dataset when called.

filter(fn: Union[type, Callable[[T], bool]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.filter to each dataset/window in this pipeline.

flat_map(fn: Union[type, Callable[[T], Iterable[U]]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.flat_map to each dataset/window in this pipeline.

iter_rows(*, prefetch_blocks: int = 0) → Iterator[T]

Call Dataset.iter_rows over the stream of output batches from the pipeline.

map(fn: Union[type, Callable[[T], U]], *, compute: Optional[str] = None, **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.map to each dataset/window in this pipeline.

map_batches(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, list]]], *, batch_size: Optional[int] = 4096, compute: Optional[str] = None, batch_format: str = 'native', **ray_remote_args)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.map_batches to each dataset/window in this pipeline.

random_shuffle_each_window(*, seed: Optional[int] = None, num_blocks: Optional[int] = None, _move: Optional[bool] = False, _spread_resource_prefix: Optional[str] = None)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

repartition_each_window(num_blocks: int, *, shuffle: bool = False)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.repartition to each dataset/window in this pipeline.

show(limit: int = 20) → None

Call Dataset.show over the stream of output batches from the pipeline.

sort_each_window(key: Union[None, str, List[str], Callable[[T], Any]] = None, descending: bool = False)ray.data.dataset_pipeline.DatasetPipeline[U]

Apply Dataset.sort to each dataset/window in this pipeline.

take(limit: int = 20) → List[T]

Call Dataset.take over the stream of output batches from the pipeline.

take_all(limit: int = 100000) → List[T]

Call Dataset.take_all over the stream of output batches from the pipeline.

to_tf(*, label_column: str, output_signature: Tuple[tf.TypeSpec, tf.TypeSpec], feature_columns: Optional[List[str]] = None, prefetch_blocks: int = 0, batch_size: int = 1) → tf.data.Dataset

Call Dataset.to_tf over the stream of output batches from the pipeline.

to_torch(*, label_column: Optional[str] = None, feature_columns: Union[None, List[str], List[List[str]], Dict[str, List[str]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Union[None, torch.dtype, List[torch.dtype], Dict[str, torch.dtype]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, unsqueeze_label_tensor: bool = True) → torch.utils.data.IterableDataset

Call Dataset.to_torch over the stream of output batches from the pipeline.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_csv_args) → None

Call Dataset.write_csv on each output dataset of this pipeline.

write_datasource(datasource: ray.data.datasource.datasource.Datasource[T], **write_args) → None

Call Dataset.write_datasource on each output dataset of this pipeline.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **pandas_json_args) → None

Call Dataset.write_json on each output dataset of this pipeline.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, **arrow_parquet_args) → None

Call Dataset.write_parquet on each output dataset of this pipeline.

GroupedDataset API

class ray.data.grouped_dataset.GroupedDataset(dataset: ray.data.dataset.Dataset[T], key: Union[Callable[[T], KeyType], str, List[Union[Callable[[T], KeyType], str]], None])[source]

Represents a grouped dataset created by calling Dataset.groupby().

The actual groupby is deferred until an aggregation is applied.

PublicAPI (beta): This API is in beta and may change before becoming stable.

aggregate(*aggs: ray.data.aggregate.AggregateFn) → ray.data.dataset.Dataset[U][source]

Implements an accumulator-based aggregation.

This is a blocking operation.

Examples

>>> grouped_ds.aggregate(AggregateFn(
...     init=lambda k: [],
...     accumulate=lambda a, r: a + [r],
...     merge=lambda a1, a2: a1 + a2,
...     finalize=lambda a: a
... ))
Parameters

aggs – Aggregations to do.

Returns

If the input dataset is simple dataset then the output is a simple dataset of (k, v_1, ..., v_n) tuples where k is the groupby key and v_i is the result of the ith given aggregation. If the input dataset is an Arrow dataset then the output is an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations. If groupby key is None then the key part of return is omitted.

count() → ray.data.dataset.Dataset[U][source]

Compute count aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).count()
>>> ray.data.from_items([
...     {"A": x % 3, "B": x} for x in range(100)]).groupby(
...     "A").count()
Returns

A simple dataset of (k, v) pairs or an Arrow dataset of [k, v] columns where k is the groupby key and v is the number of rows with that key. If groupby key is None then the key part of return is omitted.

sum(on: Union[Callable[[T], Any], str, List[Union[Callable[[T], Any], str]], None] = None) → ray.data.dataset.Dataset[U][source]

Compute grouped sum aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).sum()
>>> ray.data.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])             ...     .groupby(lambda x: x[0] % 3)             ...     .sum(lambda x: x[2])
>>> ray.data.range_arrow(100).groupby("value").sum()
>>> ray.data.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])             ...     .groupby("A")             ...     .sum(["B", "C"])
Parameters

on

The data subset on which to compute the sum.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to take a sum of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise sum of all columns.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, sum) tuples where k is the groupby key and sum is sum of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, sum_1, ..., sum_n) tuples where k is the groupby key and sum_i is sum of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise sum column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

min(on: Union[Callable[[T], Any], str, List[Union[Callable[[T], Any], str]], None] = None) → ray.data.dataset.Dataset[U][source]

Compute grouped min aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).min()
>>> ray.data.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])             ...     .groupby(lambda x: x[0] % 3)             ...     .min(lambda x: x[2])
>>> ray.data.range_arrow(100).groupby("value").min()
>>> ray.data.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])             ...     .groupby("A")             ...     .min(["B", "C"])
Parameters

on

The data subset on which to compute the min.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to take a min of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise min of all columns.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, min) tuples where k is the groupby key and min is min of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, min_1, ..., min_n) tuples where k is the groupby key and min_i is min of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise min column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

max(on: Union[Callable[[T], Any], str, List[Union[Callable[[T], Any], str]], None] = None) → ray.data.dataset.Dataset[U][source]

Compute grouped max aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).max()
>>> ray.data.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])             ...     .groupby(lambda x: x[0] % 3)             ...     .max(lambda x: x[2])
>>> ray.data.range_arrow(100).groupby("value").max()
>>> ray.data.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])             ...     .groupby("A")             ...     .max(["B", "C"])
Parameters

on

The data subset on which to compute the max.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to take a max of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise max of all columns.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, max) tuples where k is the groupby key and max is max of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, max_1, ..., max_n) tuples where k is the groupby key and max_i is max of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise max column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

mean(on: Union[Callable[[T], Any], str, List[Union[Callable[[T], Any], str]], None] = None) → ray.data.dataset.Dataset[U][source]

Compute grouped mean aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).mean()
>>> ray.data.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])             ...     .groupby(lambda x: x[0] % 3)             ...     .mean(lambda x: x[2])
>>> ray.data.range_arrow(100).groupby("value").mean()
>>> ray.data.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])             ...     .groupby("A")             ...     .mean(["B", "C"])
Parameters

on

The data subset on which to compute the mean.

  • For a simple dataset: it can be a callable or a list thereof, and the default is to take a mean of all rows.

  • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise mean of all columns.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, mean) tuples where k is the groupby key and mean is mean of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, mean_1, ..., mean_n) tuples where k is the groupby key and mean_i is mean of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise mean column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

std(on: Union[Callable[[T], Any], str, List[Union[Callable[[T], Any], str]], None] = None, ddof: int = 1) → ray.data.dataset.Dataset[U][source]

Compute grouped standard deviation aggregation.

This is a blocking operation.

Examples

>>> ray.data.range(100).groupby(lambda x: x % 3).std()
>>> ray.data.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])             ...     .groupby(lambda x: x[0] % 3)             ...     .std(lambda x: x[2])
>>> ray.data.range_arrow(100).groupby("value").std(ddof=0)
>>> ray.data.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])             ...     .groupby("A")             ...     .std(["B", "C"])

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, std) tuples where k is the groupby key and std is std of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, std_1, ..., std_n) tuples where k is the groupby key and std_i is std of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise std column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

Tensor Column Extension API

class ray.data.extensions.tensor_extension.TensorDtype[source]

Pandas extension type for a column of fixed-shape, homogeneous-typed tensors.

See: https://github.com/pandas-dev/pandas/blob/master/pandas/core/dtypes/base.py for up-to-date interface documentation and the subclassing contract. The docstrings of the below properties and methods were copied from the base ExtensionDtype.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> df = pd.DataFrame({
        "one": [1, 2, 3],
        "two": list(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note the opaque np.object dtype for this column.
>>> df.dtypes
one     int64
two    object
dtype: object
>>> # Cast column to our TensorDtype extension type.
>>> df["two"] = df["two"].astype(TensorDtype())
>>> # Note that the column dtype is now TensorDtype instead of
>>> # np.object.
>>> df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is now aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col)
pandas.core.series.Series
>>> col
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean()
>>> type(tensor)
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy())
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas(df)
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path")
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path")
>>> read_ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df)
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property type

The scalar type for the array, e.g. int It’s expected ExtensionArray[item] returns an instance of ExtensionDtype.type for scalar item, assuming that value is valid (not NA). NA values do not need to be instances of type.

property name

A string identifying the data type. Will be used for display in, e.g. Series.dtype

classmethod construct_from_string(string: str)[source]

Construct this type from a string.

This is useful mainly for data types that accept parameters. For example, a period dtype accepts a frequency parameter that can be set as period[H] (where H means hourly frequency).

By default, in the abstract class, just the name of the type is expected. But subclasses can overwrite this method to accept parameters.

Parameters

string (str) – The name of the type, for example category.

Returns

Instance of the dtype.

Return type

ExtensionDtype

Raises

TypeError – If a class cannot be constructed from this ‘string’.

Examples

For extension dtypes with arguments the following may be an adequate implementation.

>>> @classmethod
... def construct_from_string(cls, string):
...     pattern = re.compile(r"^my_type\[(?P<arg_name>.+)\]$")
...     match = pattern.match(string)
...     if match:
...         return cls(**match.groupdict())
...     else:
...         raise TypeError(
...             f"Cannot construct a '{cls.__name__}' from '{string}'"
...         )
classmethod construct_array_type()[source]

Return the array type associated with this dtype.

Returns

Return type

type

class ray.data.extensions.tensor_extension.TensorArray(values: Union[numpy.ndarray, pandas.core.dtypes.generic.ABCSeries, Sequence[Union[numpy.ndarray, ray.data.extensions.tensor_extension.TensorArrayElement]], ray.data.extensions.tensor_extension.TensorArrayElement, Any])[source]

Pandas ExtensionArray representing a tensor column, i.e. a column consisting of ndarrays as elements. All tensors in a column must have the same shape.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> df = pd.DataFrame({
        "one": [1, 2, 3],
        "two": TensorArray(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note that the column dtype is TensorDtype.
>>> df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col)
pandas.core.series.Series
>>> col
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean()
>>> type(tensor)
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy())
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas(df)
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path")
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path")
>>> read_ds.schema()
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df)
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property dtype

An instance of ‘ExtensionDtype’.

property nbytes

The number of bytes needed to store this object in memory.

isna()ray.data.extensions.tensor_extension.TensorArray[source]

A 1-D array indicating if each value is missing.

Returns

na_values – In most cases, this should return a NumPy ndarray. For exceptional cases like SparseArray, where returning an ndarray would be expensive, an ExtensionArray may be returned.

Return type

Union[np.ndarray, ExtensionArray]

Notes

If returning an ExtensionArray, then

  • na_values._is_boolean should be True

  • na_values should implement ExtensionArray._reduce()

  • na_values.any and na_values.all should be implemented

take(indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None)ray.data.extensions.tensor_extension.TensorArray[source]

Take elements from an array.

Parameters
  • indices (sequence of int) – Indices to be taken.

  • allow_fill (bool, default False) –

    How to handle negative values in indices.

    • False: negative values in indices indicate positional indices from the right (the default). This is similar to numpy.take().

    • True: negative values in indices indicate missing values. These values are set to fill_value. Any other other negative values raise a ValueError.

  • fill_value (any, optional) –

    Fill value to use for NA-indices when allow_fill is True. This may be None, in which case the default NA value for the type, self.dtype.na_value, is used.

    For many ExtensionArrays, there will be two representations of fill_value: a user-facing “boxed” scalar, and a low-level physical NA value. fill_value should be the user-facing version, and the implementation should handle translating that to the physical version for processing the take if necessary.

Returns

Return type

ExtensionArray

Raises
  • IndexError – When the indices are out of bounds for the array.

  • ValueError – When indices contains negative values other than -1 and allow_fill is True.

See also

numpy.take()

Take elements from an array along an axis.

api.extensions.take()

Take elements from an array.

Notes

ExtensionArray.take is called by Series.__getitem__, .loc, iloc, when indices is a sequence of values. Additionally, it’s called by Series.reindex(), or any other method that causes realignment, with a fill_value.

Examples

Here’s an example implementation, which relies on casting the extension array to object dtype. This uses the helper method pandas.api.extensions.take().

def take(self, indices, allow_fill=False, fill_value=None):
    from pandas.core.algorithms import take

    # If the ExtensionArray is backed by an ndarray, then
    # just pass that here instead of coercing to object.
    data = self.astype(object)

    if allow_fill and fill_value is None:
        fill_value = self.dtype.na_value

    # fill value should always be translated from the scalar
    # type for the array, to the physical storage type for
    # the data, before passing to take.

    result = take(data, indices, fill_value=fill_value,
                  allow_fill=allow_fill)
    return self._from_sequence(result, dtype=self.dtype)
copy()ray.data.extensions.tensor_extension.TensorArray[source]

Return a copy of the array.

Returns

Return type

ExtensionArray

to_numpy(dtype: numpy.dtype = None, copy: bool = False, na_value: Any = <no_default>)[source]

Convert to a NumPy ndarray.

New in version 1.0.0.

This is similar to numpy.asarray(), but may provide additional control over how the conversion is done.

Parameters
  • dtype (str or numpy.dtype, optional) – The dtype to pass to numpy.asarray().

  • copy (bool, default False) – Whether to ensure that the returned value is a not a view on another array. Note that copy=False does not ensure that to_numpy() is no-copy. Rather, copy=True ensure that a copy is made, even if not strictly necessary.

  • na_value (Any, optional) – The value to use for missing values. The default value depends on dtype and the type of the array.

Returns

Return type

numpy.ndarray

property numpy_dtype

Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray

property numpy_ndim

Get the number of tensor dimensions. :return: integer for the number of dimensions

property numpy_shape

Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray

astype(dtype, copy=True)[source]

Cast to a NumPy array with ‘dtype’.

Parameters
  • dtype (str or dtype) – Typecode or data-type to which the array is cast.

  • copy (bool, default True) – Whether to copy the data, even if not necessary. If False, a copy is made only if the old dtype does not match the new dtype.

Returns

array – NumPy ndarray with ‘dtype’ for its dtype.

Return type

ndarray

any(axis=None, out=None, keepdims=False)[source]

Test whether any array element along a given axis evaluates to True.

See numpy.any() documentation for more information https://numpy.org/doc/stable/reference/generated/numpy.any.html#numpy.any

Parameters
  • axis – Axis or axes along which a logical OR reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

all(axis=None, out=None, keepdims=False)[source]

Test whether all array elements along a given axis evaluate to True.

Parameters
  • axis – Axis or axes along which a logical AND reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

class ray.data.extensions.tensor_extension.ArrowTensorType(shape: Tuple[int, …], dtype: pyarrow.lib.DataType)[source]

Arrow ExtensionType for an array of fixed-shaped, homogeneous-typed tensors.

This is the Arrow side of TensorDtype.

See Arrow extension type docs: https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types

PublicAPI (beta): This API is in beta and may change before becoming stable.

property shape

Shape of contained tensors.

to_pandas_dtype()[source]

Convert Arrow extension type to corresponding Pandas dtype.

Returns

An instance of pd.api.extensions.ExtensionDtype.

class ray.data.extensions.tensor_extension.ArrowTensorArray[source]

An array of fixed-shape, homogeneous-typed tensors.

This is the Arrow side of TensorArray.

See Arrow docs for customizing extension arrays: https://arrow.apache.org/docs/python/extending_types.html#custom-extension-array-class

PublicAPI (beta): This API is in beta and may change before becoming stable.

OFFSET_DTYPE

alias of numpy.int32

to_pylist(self)[source]

Convert to a list of native Python objects.

Returns

lst

Return type

list

classmethod from_numpy(arr)[source]

Convert an ndarray or an iterable of fixed-shape ndarrays to an array of fixed-shape, homogeneous-typed tensors.

Parameters

arr – An ndarray or an iterable of fixed-shape ndarrays.

Returns

An ArrowTensorArray containing len(arr) tensors of fixed shape.

to_numpy(zero_copy_only: bool = True)[source]

Convert the entire array of tensors into a single ndarray.

Parameters

zero_copy_only – If True, an exception will be raised if the conversion to a NumPy array would require copying the underlying data (e.g. in presence of nulls, or for non-primitive types). This argument is currently ignored, so zero-copy isn’t enforced even if this argument is true.

Returns

A single ndarray representing the entire array of tensors.

Custom Datasource API

class ray.data.Datasource[source]

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

DeveloperAPI: This API may change across minor Ray releases.

prepare_read(parallelism: int, **read_args) → List[ray.data.datasource.datasource.ReadTask[T]][source]

Return the list of tasks needed to perform a read.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should be as close to this value as possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

do_write(blocks: List[ray.types.ObjectRef[Union[List[T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], **write_args) → List[ray.types.ObjectRef[Any]][source]

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

on_write_complete(write_results: List[Any], **kwargs) → None[source]

Callback for when a write job completes.

This can be used to “commit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_results – The list of the write task results.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_results: List[ray.types.ObjectRef[Any]], error: Exception, **kwargs) → None[source]

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_results – The list of the write task result futures.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[], Iterable[Union[List[T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: BlockMetadata)[source]

A function used to read blocks from the dataset.

Read tasks are generated by datasource.prepare_read(), and return a list of ray.data.Block when called. Initial metadata about the read operation can be retrieved via get_metadata() prior to executing the read. Final metadata is returned after the read along with the blocks.

Ray will execute read tasks in remote functions to parallelize execution. Note that the number of blocks returned can vary at runtime. For example, if a task is reading a single large file it can return multiple blocks to avoid running out of memory during the read.

The initial metadata should reflect all the blocks returned by the read, e.g., if the metadata says num_rows=1000, the read can return a single block of 1000 rows, or multiple blocks with 1000 rows altogether.

The final metadata (returned with the actual block) reflects the exact contents of the block itself.

DeveloperAPI: This API may change across minor Ray releases.

Utility

ray.data.set_progress_bars(enabled: bool) → bool[source]

Set whether progress bars are enabled.

The default behavior is controlled by the RAY_DATA_DISABLE_PROGRESS_BARS environment variable. By default, it is set to “0”. Setting it to “1” will disable progress bars, unless they are reenabled by this method.

Returns

Whether progress bars were previously enabled.

PublicAPI: This API is stable across Ray releases.