Ray Datasets API

Creating Datasets

ray.data.range(n: int, *, parallelism: int = 200) ray.data.dataset.Dataset[int][source]

Create a dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range(10000) 
>>> ds 
Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
>>> ds.map(lambda x: x * 2).take(4) 
[0, 2, 4, 6]
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers.

PublicAPI: This API is stable across Ray releases.

ray.data.range_table(n: int, *, parallelism: int = 200) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a tabular dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_table(1000) 
>>> ds 
Dataset(num_blocks=200, num_rows=1000, schema={value: int64})
>>> ds.map(lambda r: {"v2": r["value"] * 2}).take(2) 
[ArrowRow({'v2': 0}), ArrowRow({'v2': 2})]

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {“value”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow records.

PublicAPI: This API is stable across Ray releases.

ray.data.range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = 200) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a Tensor dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_tensor(1000, shape=(2, 2)) 
>>> ds 
Dataset(
    num_blocks=200,
    num_rows=1000,
    schema={__value__: <ArrowTensorType: shape=(2, 2), dtype=int64>},
)
>>> ds.map_batches(lambda arr: arr * 2).take(2) 
[array([[0, 0],
        [0, 0]]),
array([[2, 2],
        [2, 2]])]

This is similar to range_table(), but uses the ArrowTensorArray extension type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}.

Parameters
  • n – The upper bound of the range of integer records.

  • shape – The shape of each record.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow tensor records.

PublicAPI: This API is stable across Ray releases.

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.csv'], allow_if_no_extensions=False), **arrow_csv_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_csv( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.csv”.

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.json'], allow_if_no_extensions=False), **arrow_json_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from json files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_json( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.json”.

  • arrow_json_args – Other json read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.ParquetMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultParquetMetadataProvider object>, **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from parquet files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) 
Parameters
  • paths – A single file path or directory, or a list of file paths. Multiple directories are not supported.

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_parquet_bulk(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_file_args: Optional[Dict[str, Any]] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.FastFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.parquet'], allow_if_no_extensions=False), **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from a large number (e.g. >1K) of parquet files quickly.

By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to one that supports directory expansion (e.g. DefaultFileMetadataProvider).

Offers improved performance vs. read_parquet() due to not using PyArrow’s ParquetDataset abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node.

Also supports a wider variety of input Parquet file types than read_parquet() due to not trying to merge and resolve a unified schema for all files.

However, unlike read_parquet(), this does not offer file metadata resolution by default, so a custom metadata provider should be provided if your use-case requires a unified dataset schema, block sizes, row counts, etc.

Examples

>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk( 
...     ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk( 
...     "s3://bucket/path",
...     meta_provider=DefaultFileMetadataProvider())
Parameters
  • paths – A single file path or a list of file paths. If one or more directories are provided, then meta_provider should also be set to an implementation that supports directory expansion (e.g. DefaultFileMetadataProvider).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_file_args – kwargs passed to pyarrow.fs.FileSystem.open_input_file.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be files. Change to DefaultFileMetadataProvider or a custom metadata provider if directory expansion and/or file metadata resolution is required.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.parquet”.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_numpy(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.npy'], allow_if_no_extensions=False), **numpy_load_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from numpy files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_numpy("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_numpy( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • numpy_load_args – Other options to pass to np.load.

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.npy”.

Returns

Dataset holding Tensor records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_text(paths: Union[str, List[str]], *, encoding: str = 'utf-8', errors: str = 'ignore', drop_empty_lines: bool = True, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Optional[Dict[str, Any]] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None) ray.data.dataset.Dataset[str][source]

Create a dataset from lines stored in text files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_text("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_text(["/path/to/file1", "/path/to/file2"]) 
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • encoding – The encoding of the files (e.g., “utf-8” or “ascii”).

  • errors – What to do with errors on decoding. Specify either “strict”, “ignore”, or “replace”. Defaults to “ignore”.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – Kwargs passed to ray.remote in the read tasks and in the subsequent text decoding map task.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files. If wishing to filter out all file paths except those whose file extension matches e.g. “.txt”, a FileXtensionFilter("txt") can be provided.

Returns

Dataset holding lines of text read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None) ray.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]][source]

Create a dataset from binary files of arbitrary contents.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_binary_files( 
...     ["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.read_api.T], *, parallelism: int = 200, ray_remote_args: Optional[Dict[str, Any]] = None, **read_args) ray.data.dataset.Dataset[ray.data.read_api.T][source]

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource.

  • read_args – Additional kwargs to pass to the datasource impl.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

Returns

Dataset holding the data read from the datasource.

PublicAPI: This API is stable across Ray releases.

ray.data.from_items(items: List[Any], *, parallelism: int = 200) ray.data.dataset.Dataset[Any][source]

Create a dataset from a list of local Python objects.

Examples

>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 4, 5]) 
>>> ds 
Dataset(num_blocks=5, num_rows=5, schema=<class 'int'>)
>>> ds.take(2) 
[1, 2]
Parameters
  • items – List of local Python objects.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the items.

PublicAPI: This API is stable across Ray releases.

ray.data.from_arrow(tables: Union[pyarrow.Table, bytes, List[Union[pyarrow.Table, bytes]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Arrow tables.

Parameters

tables – An Arrow table, or a list of Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

PublicAPI: This API is stable across Ray releases.

ray.data.from_arrow_refs(tables: Union[ray.types.ObjectRef[Union[pyarrow.Table, bytes]], List[ray.types.ObjectRef[Union[pyarrow.Table, bytes]]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a set of Arrow tables.

Parameters

tables – A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_huggingface(dataset: Union[datasets.Dataset, datasets.DatasetDict]) Union[ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow], Dict[str, ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow]]][source]

Create a dataset from a Hugging Face Datasets Dataset.

This function is not parallelized, and is intended to be used with Hugging Face Datasets that are loaded into memory (as opposed to memory-mapped).

Parameters

dataset – A Hugging Face Dataset, or DatasetDict. IterableDataset is not supported.

Returns

Dataset holding Arrow records from the Hugging Face Dataset, or a dict of datasets in case dataset is a DatasetDict.

PublicAPI: This API is stable across Ray releases.

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: Optional[int] = None) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Spark dataframe.

Parameters
  • spark – A SparkSession, which must be created by RayDP (Spark-on-Ray).

  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

ray.data.from_dask(df: dask.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI: This API is stable across Ray releases.

ray.data.from_modin(df: modin.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Modin dataframe.

Parameters

df – A Modin dataframe, which must be using the Ray backend.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

ray.data.from_mars(df: mars.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

ray.data.from_pandas(dfs: Union[pandas.DataFrame, List[pandas.DataFrame]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Pandas dataframes.

Parameters

dfs – A Pandas dataframe or a list of Pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI: This API is stable across Ray releases.

ray.data.from_pandas_refs(dfs: Union[ray.types.ObjectRef[pandas.DataFrame], List[ray.types.ObjectRef[pandas.DataFrame]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Ray object references to Pandas dataframes.

Parameters

dfs – A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

DeveloperAPI: This API may change across minor Ray releases.

ray.data.from_numpy(ndarrays: Union[numpy.ndarray, List[numpy.ndarray]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of NumPy ndarrays.

Parameters

ndarrays – A NumPy ndarray or a list of NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

PublicAPI: This API is stable across Ray releases.

ray.data.from_numpy_refs(ndarrays: Union[ray.types.ObjectRef[numpy.ndarray], List[ray.types.ObjectRef[numpy.ndarray]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of NumPy ndarray futures.

Parameters

ndarrays – A Ray object reference to a NumPy ndarray or a list of Ray object references to NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

DeveloperAPI: This API may change across minor Ray releases.

Dataset API

class ray.data.Dataset(plan: ray.data._internal.plan.ExecutionPlan, epoch: int, lazy: bool, *, defer_execution: bool = False)[source]

A Dataset is a distributed data collection for data loading and processing.

Datasets are implemented as a list of ObjectRef[Block], where each block holds an ordered collection of items, representing a shard of the overall data collection. The block can be either a pyarrow.Table, or Python list. The block also determines the unit of parallelism.

Datasets can be created in multiple ways: from synthetic data via range_*() APIs, from existing memory data via from_*() APIs, or from external storage systems such as local disk, S3, HDFS etc. via the read_*() APIs. The (potentially processed) Dataset can be saved back to external storage systems via the write_*() APIs.

Examples

>>> import ray
>>> # Create dataset from synthetic data.
>>> ds = ray.data.range(1000) 
>>> # Create dataset from in-memory data.
>>> ds = ray.data.from_items( 
...     [{"col1": i, "col2": i * 2} for i in range(1000)])
>>> # Create dataset from external storage system.
>>> ds = ray.data.read_parquet("s3://bucket/path") 
>>> # Save dataset back to external storage system.
>>> ds.write_csv("s3//bucket/output") 

Datasets supports parallel processing at scale: transformations such as map_batches(), aggregations such as min()/max()/mean(), grouping via groupby(), shuffling operations such as sort(), random_shuffle(), and repartition().

Examples

>>> import ray
>>> ds = ray.data.range(1000) 
>>> # Transform in parallel with map_batches().
>>> ds.map_batches(lambda batch: [v * 2 for v in batch]) 
>>> # Compute max.
>>> ds.max() 
>>> # Group the data.
>>> ds.groupby(lambda x: x % 3).count() 
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle() 
>>> # Sort it back in order.
>>> ds.sort() 

Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors without incurring a copy. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

PublicAPI: This API is stable across Ray releases.

map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset.Dataset[ray.data.block.U][source]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using map_batches() for performance.

Examples

>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000) 
>>> ds.map(lambda x: x * 2) 
>>> # Transform Arrow records.
>>> ds = ray.data.from_items( 
...     [{"value": i} for i in range(1000)])
>>> ds.map(lambda record: {"v2": record["value"] * 2}) 
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... 
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map(CachedModel, 
...        compute=ActorPoolStrategy(2, 8),
...        num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

map_batches(fn: Union[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Callable[[...], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], type], *, batch_size: Optional[int] = 4096, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'native', fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, **ray_remote_args) Dataset[Any][source]

Apply the given function to batches of records of this dataset.

The format of the data batch provided to fn can be controlled via the batch_format argument, and the output of the UDF can be any batch type.

This is a blocking operation.

Examples

>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000) 
>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch]) 
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... 
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, item):
...        return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map_batches( 
...     CachedModel, 
...     batch_size=256, 
...     compute=ActorPoolStrategy(2, 8), 
...     num_gpus=1) 

You can use map_batches to efficiently filter records.

>>> import ray
>>> ds = ray.data.range(10000)  
>>> ds.count()  
10000
>>> ds = ds.map_batches(lambda batch: [x for x in batch if x % 2 == 0])  
>>> ds.count()  
5000

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • batch_size – Request a specific batch size, or None to use entire blocks as batches. Defaults to a system-chosen batch size.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • batch_format – Specify “native” to use the native block format (promotes tables to Pandas and tensors to NumPy), “pandas” to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or “numpy” to select numpy.ndarray for tensor datasets and Dict[str, numpy.ndarray] for tabular datasets. Default is “native”.

  • fn_args – Positional arguments to pass to fn, after the data batch. These arguments will be top-level arguments in the underlying Ray task that’s submitted.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments will be top-level arguments in the underlying Ray task that’s submitted.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. This can only be provided if fn is a callable class and the actor compute strategy is being used. These arguments will be top-level arguments in the underlying Ray actor construction task that’s submitted.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class and the actor compute strategy is being used. These arguments will be top-level arguments in the underlying Ray actor construction task that’s submitted.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

add_column(col: str, fn: Callable[[pandas.DataFrame], pandas.Series], *, compute: Optional[str] = None, **ray_remote_args) Dataset[T][source]

Add the given column to the dataset.

This is only supported for datasets convertible to pandas format. A function generating the new column values given the batch in pandas format must be specified.

Examples

>>> import ray
>>> ds = ray.data.range_table(100) 
>>> # Add a new column equal to value * 2.
>>> ds = ds.add_column( 
...     "new_col", lambda df: df["value"] * 2)
>>> # Overwrite the existing "value" with zeros.
>>> ds = ds.add_column("value", lambda df: 0) 

Time complexity: O(dataset size / parallelism)

Parameters
  • col – Name of the column to add. If the name already exists, the column will be overwritten.

  • fn – Map function generating the column values given a batch of records in pandas format.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

flat_map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset.Dataset[ray.data.block.U][source]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> import ray
>>> ds = ray.data.range(1000) 
>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3]) 

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

filter(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset.Dataset[ray.data.block.T][source]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.filter(lambda x: x % 2 == 0) 

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

repartition(num_blocks: int, *, shuffle: bool = False) ray.data.dataset.Dataset[ray.data.block.T][source]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation. After repartitioning, all blocks in the returned dataset will have approximately the same number of rows.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(10).write_parquet(...) 

Time complexity: O(dataset size / parallelism)

Parameters
  • num_blocks – The number of blocks.

  • shuffle – Whether to perform a distributed shuffle during the repartition. When shuffle is enabled, each output block contains a subset of data rows from each input block, which requires all-to-all data movement. When shuffle is disabled, output blocks are created from adjacent input blocks, minimizing data movement.

Returns

The repartitioned dataset.

random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle() 
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345) 

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

randomize_block_order(*, seed: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly shuffle the blocks of this dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> # Randomize the block order.
>>> ds.randomize_block_order() 
>>> # Randomize the block order with a fixed random seed.
>>> ds.randomize_block_order(seed=12345) 
Parameters

seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

Returns

The block-shuffled dataset.

random_sample(fraction: float, *, seed: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly samples a fraction of the elements of this dataset.

Note that the exact number of elements returned is not guaranteed, and that the number of elements being returned is roughly fraction * total_rows.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.random_sample(0.1) 
>>> ds.random_sample(0.2, seed=12345) 
Parameters
  • fraction – The fraction of elements to sample.

  • seed – Seeds the python random pRNG generator.

Returns

Returns a Dataset containing the sampled elements.

split(n: int, *, equal: bool = False, locality_hints: Optional[List[Any]] = None) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset into n disjoint pieces.

This returns a list of sub-datasets that can be passed to Ray tasks and actors and used to read the dataset records in parallel.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> workers = ... 
>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers) 
>>> for shard, worker in zip(shards, workers): 
...     worker.consume.remote(shard) 

Time complexity: O(1)

See also: Dataset.split_at_indices, Dataset.split_proportionately

Parameters
  • n – Number of child datasets to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the i-th dataset with the i-th actor to maximize data locality.

Returns

A list of n disjoint dataset splits.

split_at_indices(indices: List[int]) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset at the given indices (like np.split).

Examples

>>> import ray
>>> ds = ray.data.range(10) 
>>> d1, d2, d3 = ds.split_at_indices([2, 5]) 
>>> d1.take() 
[0, 1]
>>> d2.take() 
[2, 3, 4]
>>> d3.take() 
[5, 6, 7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split, Dataset.split_proportionately

Parameters

indices – List of sorted integers which indicate where the dataset will be split. If an index exceeds the length of the dataset, an empty dataset will be returned.

Returns

The dataset splits.

split_proportionately(proportions: List[float]) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset using proportions.

A common use case for this would be splitting the dataset into train and test sets (equivalent to eg. scikit-learn’s train_test_split). See also ray.air.train_test_split() for a higher level abstraction.

The indices to split at will be calculated in such a way so that all splits always contains at least one element. If that is not possible, an exception will be raised.

This is equivalent to caulculating the indices manually and calling Dataset.split_at_indices.

Examples

>>> import ray
>>> ds = ray.data.range(10) 
>>> d1, d2, d3 = ds.split_proportionately([0.2, 0.5]) 
>>> d1.take() 
[0, 1]
>>> d2.take() 
[2, 3, 4, 5, 6]
>>> d3.take() 
[7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split, Dataset.split_at_indices, ray.air.train_test_split()

Parameters

proportions – List of proportions to split the dataset according to. Must sum up to less than 1, and each proportion has to be bigger than 0.

Returns

The dataset splits.

union(*other: List[ray.data.dataset.Dataset[ray.data.block.T]]) ray.data.dataset.Dataset[ray.data.block.T][source]

Combine this dataset with others of the same type.

The order of the blocks in the datasets is preserved, as is the relative ordering between the datasets passed in the argument list.

NOTE: Unioned datasets are not lineage-serializable, i.e. they can not be used as a tunable hyperparameter in Ray Tune.

Parameters

other – List of datasets to combine with this one. The datasets must have the same schema as this dataset, otherwise the behavior is undefined.

Returns

A new dataset holding the union of their data.

groupby(key: Union[None, str, Callable[[ray.data.block.T], Any]]) GroupedDataset[T][source]

Group the dataset by the key function or column name.

This is a lazy operation.

Examples

>>> import ray
>>> # Group by a key function and aggregate.
>>> ray.data.range(100).groupby(lambda x: x % 3).count() 
>>> # Group by an Arrow table column and aggregate.
>>> ray.data.from_items([ 
...     {"A": x % 3, "B": x} for x in range(100)]).groupby( 
...     "A").count() 

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters

key – A key function or Arrow column name. If this is None, the grouping is global.

Returns

A lazy GroupedDataset that can be aggregated later.

aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.block.U[source]

Aggregate the entire dataset as one group.

This is a blocking operation.

Examples

>>> import ray
>>> from ray.data.aggregate import Max, Mean
>>> ray.data.range(100).aggregate(Max()) 
>>> ray.data.range_table(100).aggregate( 
...    Max("value"), Mean("value")) 

Time complexity: O(dataset size / parallelism)

Parameters

aggs – Aggregations to do.

Returns

If the input dataset is a simple dataset then the output is a tuple of (agg1, agg2, ...) where each tuple element is the corresponding aggregation result. If the input dataset is an Arrow dataset then the output is an ArrowRow where each column is the corresponding aggregation result. If the dataset is empty, return None.

sum(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute sum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).sum() 
>>> ray.data.from_items([ 
...     (i, i**2) 
...     for i in range(100)]).sum(lambda x: x[1]) 
>>> ray.data.range_table(100).sum("value") 
>>> ray.data.from_items([ 
...     {"A": i, "B": i**2} 
...     for i in range(100)]).sum(["A", "B"]) 
Parameters
  • on

    The data subset on which to compute the sum.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar sum of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise sum of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the sum; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a scalar representing the sum of all rows,

  • on=callable: a scalar representing the sum of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (sum_1, ..., sum_n) representing the sum of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise sum of all columns,

  • on="col": a scalar representing the sum of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise sum of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

min(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute minimum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).min() 
>>> ray.data.from_items([ 
...     (i, i**2) 
...     for i in range(100)]).min(lambda x: x[1]) 
>>> ray.data.range_table(100).min("value") 
>>> ray.data.from_items([ 
...     {"A": i, "B": i**2} 
...     for i in range(100)]).min(["A", "B"]) 
Parameters
  • on

    The data subset on which to compute the min.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar min of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise min of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the min; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a scalar representing the min of all rows,

  • on=callable: a scalar representing the min of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (min_1, ..., min_n) representing the min of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise min of all columns,

  • on="col": a scalar representing the min of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise min of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

max(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute maximum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).max() 
>>> ray.data.from_items([ 
...     (i, i**2) 
...     for i in range(100)]).max(lambda x: x[1]) 
>>> ray.data.range_table(100).max("value") 
>>> ray.data.from_items([ 
...     {"A": i, "B": i**2} 
...     for i in range(100)]).max(["A", "B"]) 
Parameters
  • on

    The data subset on which to compute the max.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar max of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise max of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the max; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a scalar representing the max of all rows,

  • on=callable: a scalar representing the max of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (max_1, ..., max_n) representing the max of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise max of all columns,

  • on="col": a scalar representing the max of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise max of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

mean(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute mean over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).mean() 
>>> ray.data.from_items([ 
...     (i, i**2) 
...     for i in range(100)]).mean(lambda x: x[1]) 
>>> ray.data.range_table(100).mean("value") 
>>> ray.data.from_items([ 
...     {"A": i, "B": i**2} 
...     for i in range(100)]).mean(["A", "B"]) 
Parameters
  • on

    The data subset on which to compute the mean.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar mean of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise mean of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the mean; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a scalar representing the mean of all rows,

  • on=callable: a scalar representing the mean of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (mean_1, ..., mean_n) representing the mean of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise mean of all columns,

  • on="col": a scalar representing the mean of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise mean of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

std(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ddof: int = 1, ignore_nulls: bool = True) ray.data.block.U[source]

Compute standard deviation over entire dataset.

This is a blocking operation.

Examples

>>> import ray 
>>> ray.data.range(100).std() 
>>> ray.data.from_items([ 
...     (i, i**2) 
...     for i in range(100)]).std(lambda x: x[1]) 
>>> ray.data.range_table(100).std("value", ddof=0) 
>>> ray.data.from_items([ 
...     {"A": i, "B": i**2} 
...     for i in range(100)]).std(["A", "B"]) 

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the std; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a scalar representing the std of all rows,

  • on=callable: a scalar representing the std of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (std_1, ..., std_n) representing the std of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise std of all columns,

  • on="col": a scalar representing the std of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise std of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

sort(key: Union[None, str, Callable[[ray.data.block.T], Any]] = None, descending: bool = False) ray.data.dataset.Dataset[ray.data.block.T][source]

Sort the dataset by the specified key column or key function.

This is a blocking operation.

Examples

>>> import ray 
>>> # Sort using the entire record as the key.
>>> ds = ray.data.range(100) 
>>> ds.sort() 
>>> # Sort by a single column in descending order.
>>> ds = ray.data.from_items( 
...     [{"value": i} for i in range(1000)])
>>> ds.sort("value", descending=True) 
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["value"]) 

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

zip(other: Dataset[U]) Dataset[T, U][source]

Zip this dataset with the elements of another.

The datasets must have identical num rows, block types, and block sizes (e.g., one was produced from a .map() of another). For Arrow blocks, the schema will be concatenated, and any duplicate column names disambiguated with _1, _2, etc. suffixes.

NOTE: Zipped datasets are not lineage-serializable, i.e. they can not be used as a tunable hyperparameter in Ray Tune.

Time complexity: O(dataset size / parallelism)

Parameters

other – The dataset to zip with on the right hand side.

Examples

>>> import ray
>>> ds = ray.data.range(5) 
>>> ds.zip(ds).take() 
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
Returns

A Dataset with (k, v) pairs (or concatenated Arrow schema) where k comes from the first dataset and v comes from the second.

limit(limit: int) ray.data.dataset.Dataset[ray.data.block.T][source]

Limit the dataset to the first number of records specified.

Examples

>>> import ray
>>> ds = ray.data.range(1000) 
>>> ds.limit(100).map(lambda x: x * 2).take() 

Time complexity: O(limit specified)

Parameters

limit – The size of the dataset to truncate to.

Returns

The truncated dataset.

take(limit: int = 20) List[ray.data.block.T][source]

Take up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

take_all(limit: int = 100000) List[ray.data.block.T][source]

Take all the records in the dataset.

Time complexity: O(dataset size)

Parameters

limit – Raise an error if the size exceeds the specified limit.

Returns

A list of all the records in the dataset.

show(limit: int = 20) None[source]

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

count() int[source]

Count the number of records in the dataset.

Time complexity: O(dataset size / parallelism), O(1) for parquet

Returns

The number of records in the dataset.

schema(fetch_if_missing: bool = False) Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset.

For datasets of Arrow records, this will return the Arrow schema. For datasets of Python objects, this returns their Python type.

Time complexity: O(1)

Parameters

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. Default is False, where None is returned if the schema is not known.

Returns

The Python type or Arrow schema of the records, or None if the schema is not known and fetch_if_missing is False.

num_blocks() int[source]

Return the number of blocks of this dataset.

Note that during read and transform operations, the number of blocks may be dynamically adjusted to respect memory limits, increasing the number of blocks at runtime.

Time complexity: O(1)

Returns

The number of blocks of this dataset.

size_bytes() int[source]

Return the in-memory size of the dataset.

Time complexity: O(1)

Returns

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

input_files() List[str][source]

Return the list of input files for the dataset.

Time complexity: O(num input files)

Returns

The list of input files used to create the dataset, or an empty list if the input files is not known.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_parquet("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_json("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_csv("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

write_numpy(path: str, *, column: str = '__value__', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, ray_remote_args: Dict[str, Any] = None) None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_numpy("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. The default is "__value__", the column name that Datasets uses for storing tensors in single-column tables.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Optional[Dict[str, Any]] = None, **write_args) None[source]

Write the dataset to a custom datasource.

Examples

>>> import ray
>>> from ray.data.datasource import Datasource
>>> ds = ray.data.range(100) 
>>> class CustomDatasource(Datasource): 
...     # define custom data source
...     pass 
>>> ds.write_datasource(CustomDatasource(...)) 

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional write args to pass to the datasource.

iter_rows(*, prefetch_blocks: int = 0) Iterator[Union[ray.data.block.T, ray.data.row.TableRow]][source]

Return a local row iterator over the dataset.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dict-like mappings TableRow are yielded for each row by the iterator. If the dataset is not tabular, the raw row is yielded.

Examples

>>> import ray
>>> for i in ray.data.range(1000000).iter_rows(): 
...     print(i) 

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

iter_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = None, batch_format: str = 'native', drop_last: bool = False) Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]][source]

Return a local batched iterator over the dataset.

Examples

>>> import ray
>>> for batch in ray.data.range(1000000).iter_batches(): 
...     print(batch) 

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “native” to use the native block format (promoting tables to Pandas and tensors to NumPy), “pandas” to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or “numpy” to select numpy.ndarray for tensor datasets and Dict[str, numpy.ndarray] for tabular datasets. Default is “native”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

An iterator over record batches.

to_torch(*, label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[Union[torch.dtype, List[torch.dtype], Dict[str, torch.dtype]]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True) torch.utils.data.IterableDataset[source]

Return a Torch IterableDataset over this dataset.

This is only supported for datasets convertible to Arrow records.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item contains the feature tensor(s), and the second item is the label tensor. Those can take on different forms, depending on the specified arguments.

For the features tensor (N is the batch_size and n, m, k are the number of features per tensor):

  • If feature_columns is a List[str], the features will be a tensor of shape (N, n), with columns corresponding to feature_columns

  • If feature_columns is a List[List[str]], the features will be a list of tensors of shape [(N, m),…,(N, k)], with columns of each tensor corresponding to the elements of feature_columns

  • If feature_columns is a Dict[str, List[str]], the features will be a dict of key-tensor pairs of shape {key1: (N, m),…, keyN: (N, k)}, with columns of each tensor corresponding to the value of feature_columns under the key.

If unsqueeze_label_tensor=True (default), the label tensor will be of shape (N, 1). Otherwise, it will be of shape (N,). If label_column is specified as None, then no column from the Dataset will be treated as the label, and the output label tensor will be None.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column – The name of the column used as the label (second element of the output list). Can be None for prediction, in which case the second element of returned tuple will also be None.

  • feature_columns – The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. If None, then use all columns except the label column as the features.

  • label_column_dtype – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes – The dtypes to use for the feature tensors. This should match the format of feature_columns, or be a single dtype, in which case it will be applied to all tensors. If None, then automatically infer the dtype.

  • batch_size – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

  • unsqueeze_label_tensor – If set to True, the label tensor will be unsqueezed (reshaped to (N, 1)). Otherwise, it will be left as is, that is (N, ). In general, regression loss functions expect an unsqueezed tensor, while classification loss functions expect a squeezed one. Defaults to True.

  • unsqueeze_feature_tensors – If set to True, the features tensors will be unsqueezed (reshaped to (N, 1)) before being concatenated into the final features tensor. Otherwise, they will be left as is, that is (N, ). Defaults to True.

Returns

A torch IterableDataset.

to_tf(*, output_signature: Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec], Tuple[Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec]], tf.TypeSpec]], label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, prefetch_blocks: int = 0, batch_size: int = 1, drop_last: bool = False) tf.data.Dataset[source]

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

For the features tensor (N is the batch_size and n1, …, nk are the number of features per tensor):

  • If feature_columns is a List[str], the features will be a tensor of shape (N, n), with columns corresponding to feature_columns

  • If feature_columns is a List[List[str]], the features will be a list of tensors of shape [(N, n1),…,(N, nk)], with columns of each tensor corresponding to the elements of feature_columns

  • If feature_columns is a Dict[str, List[str]], the features will be a dict of key-tensor pairs of shape {key1: (N, n1),…, keyN: (N, nk)}, with columns of each tensor corresponding to the value of feature_columns under the key.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

It is recommended to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • output_signature – If label_column is specified, a two-element tuple containing a FeatureTypeSpec and tf.TypeSpec object corresponding to (features, label). Otherwise, a single TensorflowFeatureTypeSpec corresponding to features tensor. A TensorflowFeatureTypeSpec is a tf.TypeSpec, List["tf.TypeSpec"], or Dict[str, "tf.TypeSpec"].

  • label_column – The name of the column used as the label (second element of the output tuple). If not specified, output will be just one tensor instead of a tuple.

  • feature_columns – The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. If None, then use all columns except the label columns as the features.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

  • drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

Returns

A tf.data.Dataset.

to_dask() dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Returns

A Dask DataFrame created from this dataset.

to_mars() mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

to_modin() modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

to_spark(spark: pyspark.sql.SparkSession) pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

to_pandas(limit: int = 100000) pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow or Pandas records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

to_pandas_refs() List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

to_numpy_refs(*, column: Optional[str] = None) List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters
  • column – The name of the column to convert to numpy, or None to specify the

  • blocks (entire row. If not specified for Arrow or Pandas) –

  • returned (each) –

  • ndarrays. (future will represent a dict of column) –

Returns

A list of remote NumPy ndarrays created from this dataset.

to_arrow_refs() List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

to_random_access_dataset(key: str, num_workers: Optional[int] = None) ray.data.random_access_dataset.RandomAccessDataset[source]

Convert this Dataset into a distributed RandomAccessDataset (EXPERIMENTAL).

RandomAccessDataset partitions the dataset across the cluster by the given sort key, providing efficient random access to records via binary search. A number of worker actors are created, each of which has zero-copy access to the underlying sorted data blocks of the Dataset.

Note that the key must be unique in the dataset. If there are duplicate keys, an arbitrary value is returned.

This is only supported for Arrow-format datasets.

Parameters
  • key – The key column over which records can be queried.

  • num_workers – The number of actors to use to serve random access queries. By default, this is determined by multiplying the number of Ray nodes in the cluster by four. As a rule of thumb, you can expect each worker to provide ~3000 records / second via get_async(), and ~10000 records / second via multiget().

repeat(times: Optional[int] = None) DatasetPipeline[T][source]

Convert this into a DatasetPipeline by looping over this dataset.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset.

Note that every repeat of the dataset is considered an “epoch” for the purposes of DatasetPipeline.iter_epochs().

Examples

>>> import ray
>>> # Infinite pipeline of numbers [0, 5)
>>> ray.data.range(5).repeat().take() 
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...]
>>> # Can apply transformations to the pipeline.
>>> ray.data.range(5).repeat().map(lambda x: -x).take() 
[0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...]
>>> # Can shuffle each epoch (dataset) in the pipeline.
>>> ray.data.range(5).repeat().random_shuffle().take() 
[2, 3, 0, 4, 1, 4, 0, 2, 1, 3, ...]
Parameters

times – The number of times to loop over this dataset, or None to repeat indefinitely.

window(*, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None) DatasetPipeline[T][source]

Convert this into a DatasetPipeline by windowing over data blocks.

Transformations prior to the call to window() are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per window of blocks as data is read from the output of the pipeline.

Windowing execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).

Without windowing:

[preprocessing......]
                      [inference.......]
                                         [write........]
Time ----------------------------------------------------------->

With windowing:

[prep1] [prep2] [prep3]
        [infer1] [infer2] [infer3]
                 [write1] [write2] [write3]
Time ----------------------------------------------------------->

Examples

>>> import ray
>>> # Create an inference pipeline.
>>> ds = ray.data.read_binary_files(dir) 
>>> infer = ... 
>>> pipe = ds.window(blocks_per_window=10).map(infer) 
DatasetPipeline(num_windows=40, num_stages=2)
>>> # The higher the stage parallelism, the shorter the pipeline.
>>> pipe = ds.window(blocks_per_window=20).map(infer) 
DatasetPipeline(num_windows=20, num_stages=2)
>>> # Outputs can be incrementally read from the pipeline.
>>> for item in pipe.iter_rows(): 
...    print(item) 
Parameters
  • blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

  • bytes_per_window – Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with blocks_per_window.

fully_executed() ray.data.dataset.Dataset[ray.data.block.T][source]

Force full evaluation of the blocks of this dataset.

This can be used to read all blocks into memory. By default, Datasets doesn’t read blocks from the datasource until the first transform.

Returns

A Dataset with all blocks fully materialized in memory.

is_fully_executed() bool[source]

Returns whether this Dataset has been fully executed.

This will return False if this Dataset is lazy and if the output of its final stage hasn’t been computed yet.

stats() str[source]

Returns a string containing execution timing information.

get_internal_block_refs() List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]][source]

Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data. It blocks until the underlying blocks are computed.

Time complexity: O(1)

Returns:

A list of references to this dataset’s blocks.

DeveloperAPI: This API may change across minor Ray releases.

experimental_lazy() ray.data.dataset.Dataset[ray.data.block.T][source]

EXPERIMENTAL: Enable lazy evaluation.

The returned dataset is a lazy dataset, where all subsequent operations on the dataset won’t be executed until the dataset is consumed (e.g. .take(), .iter_batches(), .to_torch(), .to_tf(), etc.) or execution is manually triggered via .fully_executed().

has_serializable_lineage() bool[source]

Whether this dataset’s lineage is able to be serialized for storage and later deserialized, possibly on a different cluster.

Only datasets that are created from data that we know will still exist at deserialization time, e.g. data external to this Ray cluster such as persistent cloud object stores, support lineage-based serialization. All of the ray.data.read_*() APIs support lineage-based serialization.

serialize_lineage() bytes[source]

Serialize this dataset’s lineage, not the actual data or the existing data futures, to bytes that can be stored and later deserialized, possibly on a different cluster.

Note that this will drop all computed data, and that everything will be recomputed from scratch after deserialization.

Use Dataset.deserialize_lineage() to deserialize the serialized bytes returned from this method into a Dataset.

NOTE: Unioned and zipped datasets, produced by :py:meth`Dataset.union` and Dataset.zip(), are not lineage-serializable.

Returns:

Serialized bytes containing the lineage of this dataset.

DeveloperAPI: This API may change across minor Ray releases.

static deserialize_lineage(serialized_ds: bytes) ray.data.dataset.Dataset[source]

Deserialize the provided lineage-serialized Dataset.

This assumes that the provided serialized bytes were serialized using Dataset.serialize_lineage().

Args:

serialized_ds: The serialized Dataset that we wish to deserialize.

Returns:

A deserialized Dataset instance.

DeveloperAPI: This API may change across minor Ray releases.

Block API

class ray.data.block.BlockExecStats[source]

Execution stats for this block.

wall_time_s

The wall-clock time it took to compute this block.

cpu_time_s

The CPU time it took to compute this block.

node_id

A unique id for the node that computed this block.

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.block.BlockMetadata(num_rows: Optional[int], size_bytes: Optional[int], schema: Optional[Union[type, pyarrow.lib.Schema]], input_files: Optional[List[str]], exec_stats: Optional[ray.data.block.BlockExecStats])[source]

Metadata about the block.

num_rows

The number of rows contained in this block, or None.

Type

Optional[int]

size_bytes

The approximate size in bytes of this block, or None.

Type

Optional[int]

schema

The pyarrow schema or types of the block elements, or None.

Type

Optional[Union[type, pyarrow.lib.Schema]]

input_files

The list of file paths used to generate this block, or the empty list if indeterminate.

Type

Optional[List[str]]

exec_stats

Execution stats for this block.

Type

Optional[ray.data.block.BlockExecStats]

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.block.BlockAccessor(*args, **kwds)[source]

Provides accessor methods for a specific block.

Ideally, we wouldn’t need a separate accessor classes for blocks. However, this is needed if we want to support storing pyarrow.Table directly as a top-level Ray object, without a wrapping class (issue #17186).

There are three types of block accessors: SimpleBlockAccessor, which operates over a plain Python list, ArrowBlockAccessor for pyarrow.Table type blocks, PandasBlockAccessor for pandas.DataFrame type blocks.

DeveloperAPI: This API may change across minor Ray releases.

num_rows() int[source]

Return the number of rows contained in this block.

iter_rows() Iterator[ray.data.block.T][source]

Iterate over the rows of this block.

slice(start: int, end: int, copy: bool) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Return a slice of this block.

Parameters
  • start – The starting index of the slice.

  • end – The ending index of the slice.

  • copy – Whether to perform a data copy for the slice.

Returns

The sliced block result.

random_shuffle(random_seed: Optional[int]) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Randomly shuffle this block.

to_pandas() pandas.DataFrame[source]

Convert this block into a Pandas dataframe.

to_numpy(columns: Optional[Union[str, List[str]]] = None) Union[numpy.ndarray, Dict[str, numpy.ndarray]][source]

Convert this block (or columns of block) into a NumPy ndarray.

Parameters

columns – Name of columns to convert, or None if converting all columns.

to_arrow() pyarrow.Table[source]

Convert this block into an Arrow table.

to_block() Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Return the base block that this accessor wraps.

to_native() Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Return the native data format for this accessor.

to_batch_format(batch_format: str) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]][source]

Convert this block into the provided batch format.

Parameters

batch_format – The batch format to convert this block to.

Returns

This block formatted as the provided batch format.

size_bytes() int[source]

Return the approximate size in bytes of this block.

schema() Union[type, pyarrow.lib.Schema][source]

Return the Python type or pyarrow schema of this block.

get_metadata(input_files: List[str], exec_stats: Optional[ray.data.block.BlockExecStats]) ray.data.block.BlockMetadata[source]

Create a metadata object from this block.

zip(other: Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Zip this block with another block of the same type and size.

static builder() BlockBuilder[T][source]

Create a builder for this block type.

static batch_to_block(batch: Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Create a block from user-facing data formats.

static for_block(block: Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]) BlockAccessor[T][source]

Create a block accessor for the given block.

sample(n_samples: int, key: Any) Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes][source]

Return a random sample of items from this block.

sort_and_partition(boundaries: List[ray.data.block.T], key: Any, descending: bool) List[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]][source]

Return a list of sorted partitions of this block.

combine(key: Union[None, str, Callable[[ray.data.block.T], Any]], agg: AggregateFn) Union[List[ray.data.block.U], pyarrow.Table, pandas.DataFrame, bytes][source]

Combine rows with the same key into an accumulator.

static merge_sorted_blocks(blocks: List[Block[T]], key: Any, descending: bool) Tuple[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes], ray.data.block.BlockMetadata][source]

Return a sorted block by merging a list of sorted blocks.

static aggregate_combined_blocks(blocks: List[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], key: Union[None, str, Callable[[ray.data.block.T], Any]], agg: AggregateFn) Tuple[Union[List[ray.data.block.U], pyarrow.Table, pandas.DataFrame, bytes], ray.data.block.BlockMetadata][source]

Aggregate partially combined and sorted blocks.

DatasetContext API

class ray.data.context.DatasetContext(block_owner: ray.actor.ActorHandle, block_splitting_enabled: bool, target_max_block_size: int, enable_pandas_block: bool, optimize_fuse_stages: bool, optimize_fuse_read_stages: bool, optimize_fuse_shuffle_stages: bool, optimize_reorder_stages: bool, actor_prefetcher_enabled: bool, use_push_based_shuffle: bool, pipeline_push_based_shuffle_reduce_tasks: bool, scheduling_strategy: Union[None, str, ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy, ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy], use_polars: bool)[source]

Singleton for shared Dataset resources and configurations.

This object is automatically propagated to workers and can be retrieved from the driver and remote workers via DatasetContext.get_current().

DeveloperAPI: This API may change across minor Ray releases.

static get_current() ray.data.context.DatasetContext[source]

Get or create a singleton context.

If the context has not yet been created in this process, it will be initialized with default settings.

DatasetPipeline API

class ray.data.dataset_pipeline.DatasetPipeline(base_iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]], base_datasets_can_be_cleared: bool, stages: Optional[List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]]] = None, length: Optional[int] = None, progress_bars: bool = True, _executed: Optional[List[bool]] = None)[source]

Implements a pipeline of Datasets.

DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI: This API is stable across Ray releases.

iter_rows(*, prefetch_blocks: int = 0) Iterator[Union[ray.data.block.T, ray.data.row.TableRow]][source]

Return a local row iterator over the data in the pipeline.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dict-like mappings TableRow are yielded for each row by the iterator. If the dataset is not tabular, the raw row is yielded.

Examples

>>> import ray
>>> for i in ray.data.range(1000000).repeat(5).iter_rows(): 
...     print(i) 

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the records in the pipeline.

iter_batches(*, prefetch_blocks: int = 0, batch_size: int = None, batch_format: str = 'native', drop_last: bool = False) Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]][source]

Return a local batched iterator over the data in the pipeline.

Examples

>>> import ray
>>> ds = ray.data.range(1000000).repeat(5) 
>>> for pandas_df in ds.iter_batches(): 
...     print(pandas_df) 

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size, or None to let the system pick.

  • batch_format – The format in which to return each batch. Specify “native” to use the current block format (promoting Arrow to pandas automatically), “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “native”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

Returns

An iterator over record batches.

split(n: int, *, equal: bool = False, locality_hints: Optional[List[Any]] = None) List[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.

Examples

>>> import ray
>>> pipe = ray.data.range(10).repeat(50) 
>>> workers = ... 
>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split( 
...     len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers): 
...     worker.consume.remote(shard) 

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

Parameters
  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.

Returns

A list of n disjoint pipeline splits.

split_at_indices(indices: List[int]) List[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split the datasets within the pipeline at the given indices (like np.split).

This will split each dataset contained within this pipeline, thereby producing len(indices) + 1 pipelines with the first pipeline containing the [0, indices[0]) slice from each dataset, the second pipeline containing the [indices[0], indices[1]) slice from each dataset, and so on, with the final pipeline will containing the [indices[-1], self.count()) slice from each dataset.

Examples

>>> import ray
>>> p1, p2, p3 = ray.data.range( 
...     8).repeat(2).split_at_indices([2, 5]) 
>>> p1.take() 
[0, 1, 0, 1]
>>> p2.take() 
[2, 3, 4, 2, 3, 4]
>>> p3.take() 
[5, 6, 7, 5, 6, 7]

Time complexity: O(num splits)

See also: DatasetPipeline.split

Parameters

indices – List of sorted integers which indicate where the pipeline will be split. If an index exceeds the length of the pipeline, an empty pipeline will be returned.

Returns

The pipeline splits.

rewindow(*, blocks_per_window: int, preserve_epoch: bool = True) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Change the windowing (blocks per dataset) of this pipeline.

Changes the windowing of this pipeline to the specified size. For example, if the current pipeline has two blocks per dataset, and rewindow(blocks_per_window=4) is requested, adjacent datasets will be merged until each dataset is 4 blocks. If rewindow(blocks_per_window) was requested the datasets will be split into smaller windows.

Parameters
  • blocks_per_window – The new target blocks per window.

  • preserve_epoch – Whether to preserve epoch boundaries. If set to False, then windows can contain data from two adjacent epochs.

repeat(times: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Repeat this pipeline a given number or times, or indefinitely.

This operation is only allowed for pipelines of a finite length. An error will be raised for pipelines of infinite length.

Note that every repeat of the pipeline is considered an “epoch” for the purposes of iter_epochs(). If there are multiple repeat calls, the latest repeat takes precedence for the purpose of defining epochs.

Parameters

times – The number of times to loop over this pipeline, or None to repeat indefinitely.

schema(fetch_if_missing: bool = False) Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset pipeline.

For datasets of Arrow records, this will return the Arrow schema. For dataset of Python objects, this returns their Python type.

Note: This is intended to be a method for peeking schema before the execution of DatasetPipeline. If execution has already started, it will simply return the cached schema from the previous call.

Time complexity: O(1)

Parameters

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. Default is False, where None is returned if the schema is not known.

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

count() int[source]

Count the number of records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The number of records in the dataset pipeline.

sum() int[source]

Sum the records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset pipeline.

show_windows(limit_per_dataset: int = 10) None[source]

Print up to the given number of records from each window/dataset.

This is helpful as a debugging tool for understanding the structure of dataset pipelines.

Parameters

limit_per_dataset – Rows to print per window/dataset.

iter_epochs(max_epoch: int = - 1) Iterator[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split this pipeline up by epoch.

This allows reading of data per-epoch for repeated Datasets, which is useful for ML training. For example, ray.data.range(10).repeat(50) generates a pipeline with 500 rows total split across 50 epochs. This method allows iterating over the data individually per epoch (repetition) of the original data.

Parameters

max_epoch – If greater than zero, stop after the given number of epochs.

Examples

>>> import ray
>>> epochs = ray.data.range(10).repeat(50).iter_epochs()  
>>> for i, epoch in enumerate(epochs): 
...     print("Epoch", i) 
...     for row in epoch.iter_rows(): 
...         print(row) 
Returns

Iterator over epoch objects, where each epoch is a DatasetPipeline containing data from that epoch only.

iter_datasets() Iterator[ray.data.dataset.Dataset[ray.data.block.T]][source]

Iterate over the output datasets of this pipeline.

Returns:

Iterator over the datasets outputted from this pipeline.

DeveloperAPI: This API may change across minor Ray releases.

foreach_window(fn: Callable[[ray.data.dataset.Dataset[ray.data.block.T]], ray.data.dataset.Dataset[ray.data.block.U]]) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply a transform to each dataset/window in this pipeline.

Args:

fn: The function to transform each dataset with.

Returns:

The transformed DatasetPipeline.

DeveloperAPI: This API may change across minor Ray releases.

stats(exclude_first_window: bool = True) str[source]

Returns a string containing execution timing information.

Parameters

exclude_first_window – Whether to exclude the first window from the pipeline time breakdown. This is generally a good idea since there is always a stall waiting for the first window to be initially computed, which can be misleading in the stats.

static from_iterable(iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]]) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Create a pipeline from an sequence of Dataset producing functions.

Parameters

iterable – A finite or infinite-length sequence of functions that each produce a Dataset when called.

add_column(col: str, fn: Callable[[pandas.DataFrame], pandas.Series], *, compute: Optional[str] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.add_column to each dataset/window in this pipeline.

filter(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.filter to each dataset/window in this pipeline.

flat_map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.flat_map to each dataset/window in this pipeline.

map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], Callable[[...], ray.data.block.U], type], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.map to each dataset/window in this pipeline.

map_batches(fn: Union[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Callable[[...], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], type], *, batch_size: Optional[int] = 4096, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'native', fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.map_batches to each dataset/window in this pipeline.

random_shuffle_each_window(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

randomize_block_order_each_window(*, seed: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.randomize_block_order to each dataset/window in this pipeline.

repartition_each_window(num_blocks: int, *, shuffle: bool = False) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.repartition to each dataset/window in this pipeline.

show(limit: int = 20) None

Call Dataset.show over the stream of output batches from the pipeline.

sort_each_window(key: Union[None, str, Callable[[ray.data.block.T], Any]] = None, descending: bool = False) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U]

Apply Dataset.sort to each dataset/window in this pipeline.

take(limit: int = 20) List[ray.data.block.T]

Call Dataset.take over the stream of output batches from the pipeline.

take_all(limit: int = 100000) List[ray.data.block.T]

Call Dataset.take_all over the stream of output batches from the pipeline.

to_tf(*, output_signature: Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec], Tuple[Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec]], tf.TypeSpec]], label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, prefetch_blocks: int = 0, batch_size: int = 1, drop_last: bool = False) tf.data.Dataset

Call Dataset.to_tf over the stream of output batches from the pipeline.

to_torch(*, label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[Union[torch.dtype, List[torch.dtype], Dict[str, torch.dtype]]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True) torch.utils.data.IterableDataset

Call Dataset.to_torch over the stream of output batches from the pipeline.

write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None

Call Dataset.write_csv on each output dataset of this pipeline.

write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Dict[str, Any] = None, **write_args) None

Call Dataset.write_datasource on each output dataset of this pipeline.

write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None

Call Dataset.write_json on each output dataset of this pipeline.

write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None

Call Dataset.write_parquet on each output dataset of this pipeline.

GroupedDataset API

class ray.data.grouped_dataset.GroupedDataset(dataset: ray.data.dataset.Dataset[ray.data.block.T], key: Union[None, str, Callable[[ray.data.block.T], Any]])[source]

Represents a grouped dataset created by calling Dataset.groupby().

The actual groupby is deferred until an aggregation is applied.

PublicAPI: This API is stable across Ray releases.

aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.dataset.Dataset[ray.data.block.U][source]

Implements an accumulator-based aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> from ray.data.aggregate import AggregateFn
>>> ds = ray.data.range(100) 
>>> grouped_ds = ds.groupby(lambda x: x % 3) 
>>> grouped_ds.aggregate(AggregateFn( 
...     init=lambda k: [], 
...     accumulate=lambda a, r: a + [r], 
...     merge=lambda a1, a2: a1 + a2, 
...     finalize=lambda a: a 
... )) 
Parameters

aggs – Aggregations to do.

Returns

If the input dataset is simple dataset then the output is a simple dataset of (k, v_1, ..., v_n) tuples where k is the groupby key and v_i is the result of the ith given aggregation. If the input dataset is an Arrow dataset then the output is an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations. If groupby key is None then the key part of return is omitted.

map_groups(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'native', **ray_remote_args) Dataset[Any][source]

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:
  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().

This is a blocking operation.

Examples

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out). Note that median is not an
>>> # associative function so cannot be computed with aggregate().
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> ds = ray.data.range(100) 
>>> ds.groupby(lambda x: x % 3).map_groups( 
...     lambda x: [np.median(x)])
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
Parameters
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • batch_format – Specify “native” to use the native block format (promotes Arrow to pandas), “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Returns

The return type is determined by the return type of fn, and the return value is combined from results of all groups.

count() ray.data.dataset.Dataset[ray.data.block.U][source]

Compute count aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).count() 
>>> ray.data.from_items([ 
...     {"A": x % 3, "B": x} for x in range(100)]).groupby( 
...     "A").count() 
Returns

A simple dataset of (k, v) pairs or an Arrow dataset of [k, v] columns where k is the groupby key and v is the number of rows with that key. If groupby key is None then the key part of return is omitted.

sum(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped sum aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).sum() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .sum(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").sum() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .sum(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the sum.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a sum of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise sum of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the sum; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, sum) tuples where k is the groupby key and sum is sum of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, sum_1, ..., sum_n) tuples where k is the groupby key and sum_i is sum of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise sum column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

min(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped min aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).min() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .min(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").min() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .min(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the min.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a min of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise min of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the min; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, min) tuples where k is the groupby key and min is min of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, min_1, ..., min_n) tuples where k is the groupby key and min_i is min of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise min column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

max(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped max aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).max() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .max(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").max() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .max(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the max.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a max of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise max of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the max; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, max) tuples where k is the groupby key and max is max of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, max_1, ..., max_n) tuples where k is the groupby key and max_i is max of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise max column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

mean(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped mean aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).mean() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .mean(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").mean() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .mean(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the mean.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a mean of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise mean of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the mean; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, mean) tuples where k is the groupby key and mean is mean of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, mean_1, ..., mean_n) tuples where k is the groupby key and mean_i is mean of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise mean column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

std(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ddof: int = 1, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped standard deviation aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).std() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .std(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").std(ddof=0) 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .std(["B", "C"]) 

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the std; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, std) tuples where k is the groupby key and std is std of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, std_1, ..., std_n) tuples where k is the groupby key and std_i is std of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise std column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

Aggregate API

class ray.data.aggregate.AggregateFn(init: Callable[[ray.data.block.KeyType], ray.data.block.AggType], merge: Callable[[ray.data.block.AggType, ray.data.block.AggType], ray.data.block.AggType], accumulate_row: Callable[[ray.data.block.AggType, ray.data.block.T], ray.data.block.AggType] = None, accumulate_block: Callable[[ray.data.block.AggType, Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], ray.data.block.AggType] = None, finalize: Callable[[ray.data.block.AggType], ray.data.block.U] = <function AggregateFn.<lambda>>, name: Optional[str] = None)[source]

PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Count[source]

Defines count aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Sum(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines sum aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Max(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines max aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Mean(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines mean aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Std(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ddof: int = 1, ignore_nulls: bool = True)[source]

Defines standard deviation aggregation.

Uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to its numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.AbsMax(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines absolute max aggregation. PublicAPI: This API is stable across Ray releases.

RandomAccessDataset API

class ray.data.random_access_dataset.RandomAccessDataset(dataset: Dataset[T], key: str, num_workers: int)[source]

A class that provides distributed, random access to a Dataset.

See: Dataset.to_random_access_dataset().

PublicAPI (beta): This API is in beta and may change before becoming stable.

get_async(key: Any) ray.types.ObjectRef[Optional[ray.data.block.T]][source]

Asynchronously finds the record for a single key.

Parameters

key – The key of the record to find.

Returns

ObjectRef containing the record (in pydict form), or None if not found.

multiget(keys: List[Any]) List[Optional[ray.data.block.T]][source]

Synchronously find the records for a list of keys.

Parameters

keys – List of keys to find the records for.

Returns

List of found records (in pydict form), or None for missing records.

stats() str[source]

Returns a string containing access timing information.

Tensor Column Extension API

class ray.data.extensions.tensor_extension.TensorDtype[source]

Pandas extension type for a column of fixed-shape, homogeneous-typed tensors.

See: https://github.com/pandas-dev/pandas/blob/master/pandas/core/dtypes/base.py for up-to-date interface documentation and the subclassing contract. The docstrings of the below properties and methods were copied from the base ExtensionDtype.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> import pandas as pd
>>> import numpy as np
>>> import ray
>>> df = pd.DataFrame({
...     "one": [1, 2, 3],
...     "two": list(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note the opaque np.object dtype for this column.
>>> df.dtypes 
one     int64
two    object
dtype: object
>>> # Cast column to our TensorDtype extension type.
>>> from ray.data.extensions import TensorDtype
>>> df["two"] = df["two"].astype(TensorDtype())
>>> # Note that the column dtype is now TensorDtype instead of
>>> # np.object.
>>> df.dtypes 
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is now aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col) 
pandas.core.series.Series
>>> col 
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean()
>>> type(tensor) 
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor 
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy()) 
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas(df) 
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema() 
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path") 
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path") 
>>> read_ds.schema() 
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0] 
>>> read_df.dtypes 
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df) 
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property type

The scalar type for the array, e.g. int It’s expected ExtensionArray[item] returns an instance of ExtensionDtype.type for scalar item, assuming that value is valid (not NA). NA values do not need to be instances of type.

property name: str

A string identifying the data type. Will be used for display in, e.g. Series.dtype

classmethod construct_from_string(string: str)[source]

Construct this type from a string.

This is useful mainly for data types that accept parameters. For example, a period dtype accepts a frequency parameter that can be set as period[H] (where H means hourly frequency).

By default, in the abstract class, just the name of the type is expected. But subclasses can overwrite this method to accept parameters.

Parameters

string (str) – The name of the type, for example category.

Returns

Instance of the dtype.

Return type

ExtensionDtype

Raises

TypeError – If a class cannot be constructed from this ‘string’.

Examples

For extension dtypes with arguments the following may be an adequate implementation.

>>> import re
>>> @classmethod
... def construct_from_string(cls, string):
...     pattern = re.compile(r"^my_type\[(?P<arg_name>.+)\]$")
...     match = pattern.match(string)
...     if match:
...         return cls(**match.groupdict())
...     else:
...         raise TypeError(
...             f"Cannot construct a '{cls.__name__}' from '{string}'"
...         )
classmethod construct_array_type()[source]

Return the array type associated with this dtype.

Returns

Return type

type

class ray.data.extensions.tensor_extension.TensorArray(values: Union[numpy.ndarray, pandas.core.dtypes.generic.ABCSeries, Sequence[Union[numpy.ndarray, ray.air.util.tensor_extensions.pandas.TensorArrayElement]], ray.air.util.tensor_extensions.pandas.TensorArrayElement, Any])[source]

Pandas ExtensionArray representing a tensor column, i.e. a column consisting of ndarrays as elements. All tensors in a column must have the same shape.

Examples

>>> # Create a DataFrame with a list of ndarrays as a column.
>>> import pandas as pd
>>> import numpy as np
>>> import ray
>>> from ray.data.extensions import TensorArray
>>> df = pd.DataFrame({
...     "one": [1, 2, 3],
...     "two": TensorArray(np.arange(24).reshape((3, 2, 2, 2)))})
>>> # Note that the column dtype is TensorDtype.
>>> df.dtypes 
one          int64
two    TensorDtype
dtype: object
>>> # Pandas is aware of this tensor column, and we can do the
>>> # typical DataFrame operations on this column.
>>> col = 2 * (df["two"] + 10)
>>> # The ndarrays underlying the tensor column will be manipulated,
>>> # but the column itself will continue to be a Pandas type.
>>> type(col) 
pandas.core.series.Series
>>> col 
0   [[[ 2  4]
      [ 6  8]]
     [[10 12]
       [14 16]]]
1   [[[18 20]
      [22 24]]
     [[26 28]
      [30 32]]]
2   [[[34 36]
      [38 40]]
     [[42 44]
      [46 48]]]
Name: two, dtype: TensorDtype
>>> # Once you do an aggregation on that column that returns a single
>>> # row's value, you get back our TensorArrayElement type.
>>> tensor = col.mean() 
>>> type(tensor) 
ray.data.extensions.tensor_extension.TensorArrayElement
>>> tensor 
array([[[18., 20.],
        [22., 24.]],
       [[26., 28.],
        [30., 32.]]])
>>> # This is a light wrapper around a NumPy ndarray, and can easily
>>> # be converted to an ndarray.
>>> type(tensor.to_numpy()) 
numpy.ndarray
>>> # In addition to doing Pandas operations on the tensor column,
>>> # you can now put the DataFrame into a Dataset.
>>> ds = ray.data.from_pandas(df) 
>>> # Internally, this column is represented the corresponding
>>> # Arrow tensor extension type.
>>> ds.schema() 
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> # You can write the dataset to Parquet.
>>> ds.write_parquet("/some/path") 
>>> # And you can read it back.
>>> read_ds = ray.data.read_parquet("/some/path") 
>>> read_ds.schema() 
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>
>>> read_df = ray.get(read_ds.to_pandas_refs())[0] 
>>> read_df.dtypes 
one          int64
two    TensorDtype
dtype: object
>>> # The tensor extension type is preserved along the
>>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas
>>> # conversion chain.
>>> read_df.equals(df) 
True

PublicAPI (beta): This API is in beta and may change before becoming stable.

property dtype: pandas.core.dtypes.base.ExtensionDtype

An instance of ‘ExtensionDtype’.

property nbytes: int

The number of bytes needed to store this object in memory.

isna() ray.air.util.tensor_extensions.pandas.TensorArray[source]

A 1-D array indicating if each value is missing.

Returns

na_values – In most cases, this should return a NumPy ndarray. For exceptional cases like SparseArray, where returning an ndarray would be expensive, an ExtensionArray may be returned.

Return type

Union[np.ndarray, ExtensionArray]

Notes

If returning an ExtensionArray, then

  • na_values._is_boolean should be True

  • na_values should implement ExtensionArray._reduce()

  • na_values.any and na_values.all should be implemented

take(indices: Sequence[int], allow_fill: bool = False, fill_value: Optional[Any] = None) ray.air.util.tensor_extensions.pandas.TensorArray[source]

Take elements from an array.

Parameters
  • indices (sequence of int) – Indices to be taken.

  • allow_fill (bool, default False) –

    How to handle negative values in indices.

    • False: negative values in indices indicate positional indices from the right (the default). This is similar to numpy.take().

    • True: negative values in indices indicate missing values. These values are set to fill_value. Any other other negative values raise a ValueError.

  • fill_value (any, optional) –

    Fill value to use for NA-indices when allow_fill is True. This may be None, in which case the default NA value for the type, self.dtype.na_value, is used.

    For many ExtensionArrays, there will be two representations of fill_value: a user-facing “boxed” scalar, and a low-level physical NA value. fill_value should be the user-facing version, and the implementation should handle translating that to the physical version for processing the take if necessary.

Returns

Return type

ExtensionArray

Raises
  • IndexError – When the indices are out of bounds for the array.

  • ValueError – When indices contains negative values other than -1 and allow_fill is True.

See also

numpy.take

Take elements from an array along an axis.

api.extensions.take

Take elements from an array.

Notes

ExtensionArray.take is called by Series.__getitem__, .loc, iloc, when indices is a sequence of values. Additionally, it’s called by Series.reindex(), or any other method that causes realignment, with a fill_value.

Examples

Here’s an example implementation, which relies on casting the extension array to object dtype. This uses the helper method pandas.api.extensions.take().

def take(self, indices, allow_fill=False, fill_value=None):
    from pandas.core.algorithms import take

    # If the ExtensionArray is backed by an ndarray, then
    # just pass that here instead of coercing to object.
    data = self.astype(object)

    if allow_fill and fill_value is None:
        fill_value = self.dtype.na_value

    # fill value should always be translated from the scalar
    # type for the array, to the physical storage type for
    # the data, before passing to take.

    result = take(data, indices, fill_value=fill_value,
                  allow_fill=allow_fill)
    return self._from_sequence(result, dtype=self.dtype)
copy() ray.air.util.tensor_extensions.pandas.TensorArray[source]

Return a copy of the array.

Returns

Return type

ExtensionArray

to_numpy(dtype: Optional[numpy.dtype] = None, copy: bool = False, na_value: Any = NoDefault.no_default)[source]

Convert to a NumPy ndarray.

New in version 1.0.0.

This is similar to numpy.asarray(), but may provide additional control over how the conversion is done.

Parameters
  • dtype (str or numpy.dtype, optional) – The dtype to pass to numpy.asarray().

  • copy (bool, default False) – Whether to ensure that the returned value is a not a view on another array. Note that copy=False does not ensure that to_numpy() is no-copy. Rather, copy=True ensure that a copy is made, even if not strictly necessary.

  • na_value (Any, optional) – The value to use for missing values. The default value depends on dtype and the type of the array.

Returns

Return type

numpy.ndarray

property numpy_dtype

Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray

property numpy_ndim

Get the number of tensor dimensions. :return: integer for the number of dimensions

property numpy_shape

Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray

property numpy_size

Get the size of the tensor. :return: integer for the number of elements in the tensor

astype(dtype, copy=True)[source]

Cast to a NumPy array with ‘dtype’.

Parameters
  • dtype (str or dtype) – Typecode or data-type to which the array is cast.

  • copy (bool, default True) – Whether to copy the data, even if not necessary. If False, a copy is made only if the old dtype does not match the new dtype.

Returns

array – NumPy ndarray with ‘dtype’ for its dtype.

Return type

ndarray

any(axis=None, out=None, keepdims=False)[source]

Test whether any array element along a given axis evaluates to True.

See numpy.any() documentation for more information https://numpy.org/doc/stable/reference/generated/numpy.any.html#numpy.any

Parameters
  • axis – Axis or axes along which a logical OR reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

all(axis=None, out=None, keepdims=False)[source]

Test whether all array elements along a given axis evaluate to True.

Parameters
  • axis – Axis or axes along which a logical AND reduction is performed.

  • out – Alternate output array in which to place the result.

  • keepdims – If this is set to True, the axes which are reduced are left in the result as dimensions with size one.

Returns

single boolean unless axis is not None else TensorArray

class ray.data.extensions.tensor_extension.ArrowTensorType(shape: Tuple[int, ...], dtype: pyarrow.lib.DataType)[source]

Arrow ExtensionType for an array of fixed-shaped, homogeneous-typed tensors.

This is the Arrow side of TensorDtype.

See Arrow extension type docs: https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types

PublicAPI (beta): This API is in beta and may change before becoming stable.

property shape

Shape of contained tensors.

to_pandas_dtype()[source]

Convert Arrow extension type to corresponding Pandas dtype.

Returns

An instance of pd.api.extensions.ExtensionDtype.

class ray.data.extensions.tensor_extension.ArrowTensorArray[source]

An array of fixed-shape, homogeneous-typed tensors.

This is the Arrow side of TensorArray.

See Arrow docs for customizing extension arrays: https://arrow.apache.org/docs/python/extending_types.html#custom-extension-array-class

PublicAPI (beta): This API is in beta and may change before becoming stable.

OFFSET_DTYPE

alias of numpy.int32

to_pylist(self)[source]

Convert to a list of native Python objects.

Returns

lst

Return type

list

classmethod from_numpy(arr)[source]

Convert an ndarray or an iterable of fixed-shape ndarrays to an array of fixed-shape, homogeneous-typed tensors.

Parameters

arr – An ndarray or an iterable of fixed-shape ndarrays.

Returns

An ArrowTensorArray containing len(arr) tensors of fixed shape.

to_numpy(zero_copy_only: bool = True)[source]

Convert the entire array of tensors into a single ndarray.

Parameters

zero_copy_only – If True, an exception will be raised if the conversion to a NumPy array would require copying the underlying data (e.g. in presence of nulls, or for non-primitive types). This argument is currently ignored, so zero-copy isn’t enforced even if this argument is true.

Returns

A single ndarray representing the entire array of tensors.

Custom Datasource API

class ray.data.Datasource(*args, **kwds)[source]

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

Datasource instances must be serializable, since prepare_read() and do_write() are called in remote tasks.

DeveloperAPI: This API may change across minor Ray releases.

prepare_read(parallelism: int, **read_args) List[ray.data.datasource.datasource.ReadTask[ray.data.block.T]][source]

Return the list of tasks needed to perform a read.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should be as close to this value as possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], ray_remote_args: Dict[str, Any], **write_args) List[ray.types.ObjectRef[Any]][source]

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

on_write_complete(write_results: List[Any], **kwargs) None[source]

Callback for when a write job completes.

This can be used to “commit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_results – The list of the write task results.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_results: List[ray.types.ObjectRef[Any]], error: Exception, **kwargs) None[source]

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_results – The list of the write task result futures.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[[], Iterable[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: BlockMetadata)[source]

A function used to read blocks from the dataset.

Read tasks are generated by datasource.prepare_read(), and return a list of ray.data.Block when called. Initial metadata about the read operation can be retrieved via get_metadata() prior to executing the read. Final metadata is returned after the read along with the blocks.

Ray will execute read tasks in remote functions to parallelize execution. Note that the number of blocks returned can vary at runtime. For example, if a task is reading a single large file it can return multiple blocks to avoid running out of memory during the read.

The initial metadata should reflect all the blocks returned by the read, e.g., if the metadata says num_rows=1000, the read can return a single block of 1000 rows, or multiple blocks with 1000 rows altogether.

The final metadata (returned with the actual block) reflects the exact contents of the block itself.

DeveloperAPI: This API may change across minor Ray releases.

Datasource Partitioning API

class ray.data.datasource.PartitionStyle(value)[source]

Supported dataset partition styles.

Inherits from str to simplify plain text serialization/deserialization.

Examples

>>> # Serialize to JSON text.
>>> json.dumps(PartitionStyle.HIVE)  
'"hive"'
>>> # Deserialize from JSON text.
>>> PartitionStyle(json.loads('"hive"'))  
<PartitionStyle.HIVE: 'hive'>

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.PathPartitionScheme(style: ray.data.datasource.partitioning.PartitionStyle, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None)[source]

Partition scheme used to describe path-based partitions.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

DeveloperAPI: This API may change across minor Ray releases.

property style: ray.data.datasource.partitioning.PartitionStyle

Gets the path partitioning style.

property base_dir: str

Gets the original base directory supplied during object construction.

property normalized_base_dir: str

Returns the base directory normalized for compatibility with a filesystem.

property field_names: Optional[List[str]]

Gets the partition key field names.

property filesystem: Optional[pyarrow.fs.FileSystem]

Gets the original filesystem supplied during object construction.

property resolved_filesystem: pyarrow.fs.FileSystem

Returns the filesystem resolved for compatibility with a base directory.

class ray.data.datasource.PathPartitionEncoder(path_partition_scheme: ray.data.datasource.partitioning.PathPartitionScheme)[source]

Callable that generates directory path strings for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories will be generated using a “{key1}={value1}/{key2}={value2}” naming convention under the base directory. An accompanying ordered list of partition key field names must also be provided, where the order and length of all partition values must match the order and length of field names

For DIRECTORY Partitioning, all directories will be generated from partition values using a “{value1}/{value2}” naming convention under the base directory.

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionEncoder[source]

Creates a new partition path encoder.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – “/”-delimited base directory that all partition paths will be generated under (exclusive).

  • field_names – The partition key field names (i.e. column names for tabular datasets). Required for HIVE partition paths, optional for DIRECTORY partition paths. When non-empty, the order and length of partition key field names must match the order and length of partition values.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new partition path encoder.

property scheme: ray.data.datasource.partitioning.PathPartitionScheme

Returns the path partition scheme for this encoder.

class ray.data.datasource.PathPartitionParser(path_partition_scheme: ray.data.datasource.partitioning.PathPartitionScheme)[source]

Partition parser for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories under the base directory will be discovered based on “{key1}={value1}/{key2}={value2}” naming conventions. Key/value pairs do not need to be presented in the same order across all paths. Directory names nested under the base directory that don’t follow this naming condition will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file.

For DIRECTORY Partitioning, all directories under the base directory will be interpreted as partition values of the form “{value1}/{value2}”. An accompanying ordered list of partition field names must also be provided, where the order and length of all partition values must match the order and length of field names. Files stored directly in the base directory will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file. For example, if the base directory is “foo” then “foo.csv” and “foo/bar.csv” would be considered unpartitioned files but “foo/bar/baz.csv” would be associated with partition “bar”. If the base directory is undefined, then “foo.csv” would be unpartitioned, “foo/bar.csv” would be associated with partition “foo”, and “foo/bar/baz.csv” would be associated with partition (“foo”, “bar”).

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionParser[source]

Creates a path-based partition parser using a flattened argument list.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – “/”-delimited base directory to start searching for partitions (exclusive). File paths outside of this directory will be considered unpartitioned. Specify None or an empty string to search for partitions in all file path directories.

  • field_names – The partition key names. Required for DIRECTORY partitioning. Optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. Partition key field names are not required to exist in the dataset schema.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new path-based partition parser.

property scheme: ray.data.datasource.partitioning.PathPartitionScheme

Returns the path partition scheme for this parser.

class ray.data.datasource.PathPartitionFilter(path_partition_parser: ray.data.datasource.partitioning.PathPartitionParser, filter_fn: Callable[[Dict[str, str]], bool])[source]

Partition filter for path-based partition formats.

Used to explicitly keep or reject files based on a custom filter function that takes partition keys and values parsed from the file’s path as input.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Built-in Datasources

class ray.data.datasource.BinaryDatasource(*args, **kwds)[source]

Binary datasource, for reading and writing binary files.

Examples

>>> import ray
>>> from ray.data.datasource import BinaryDatasource
>>> source = BinaryDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[b"file_data", ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.CSVDatasource(*args, **kwds)[source]

CSV datasource, for reading and writing CSV files.

Examples

>>> import ray
>>> from ray.data.datasource import CSVDatasource
>>> source = CSVDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.FileBasedDatasource(*args, **kwds)[source]

File-based datasource, for reading and writing files.

This class should not be used directly, and should instead be subclassed and tailored to particular file formats. Classes deriving from this class must implement _read_file().

If the _FILE_EXTENSION is defined, per default only files with this extension will be read. If None, no default filter is used.

Current subclasses:

JSONDatasource, CSVDatasource, NumpyDatasource, BinaryDatasource

DeveloperAPI: This API may change across minor Ray releases.

prepare_read(parallelism: int, paths: Union[str, List[str]], filesystem: Optional[pyarrow.fs.FileSystem] = None, schema: Optional[Union[type, pyarrow.lib.Schema]] = None, open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: ray.data.datasource.partitioning.PathPartitionFilter = None, _block_udf: Optional[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]] = None, **reader_args) List[ray.data.datasource.datasource.ReadTask][source]

Creates and returns read tasks for a file-based datasource.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], path: str, dataset_uuid: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, write_args_fn: Callable[[], Dict[str, Any]] = <function FileBasedDatasource.<lambda>>, _block_udf: Optional[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]] = None, ray_remote_args: Dict[str, Any] = None, **write_args) List[ray.types.ObjectRef[Any]][source]

Creates and returns write tasks for a file-based datasource.

class ray.data.datasource.JSONDatasource(*args, **kwds)[source]

JSON datasource, for reading and writing JSON files.

Examples

>>> import ray
>>> from ray.data.datasource import JSONDatasource
>>> source = JSONDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.NumpyDatasource(*args, **kwds)[source]

Numpy datasource, for reading and writing Numpy files.

Examples

>>> import ray
>>> from ray.data.datasource import NumpyDatasource
>>> source = NumpyDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[array([0., 1., 2.]), ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.ParquetDatasource(*args, **kwds)[source]

Parquet datasource, for reading and writing Parquet files.

The primary difference from ParquetBaseDatasource is that this uses PyArrow’s ParquetDataset abstraction for dataset reads, and thus offers automatic Arrow dataset schema inference and row count collection at the cost of some potential performance and/or compatibility penalties.

Examples

>>> import ray
>>> from ray.data.datasource import ParquetDatasource
>>> source = ParquetDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, paths: Union[str, List[str]], filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, schema: Optional[Union[type, pyarrow.lib.Schema]] = None, meta_provider: ray.data.datasource.file_meta_provider.ParquetMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultParquetMetadataProvider object>, _block_udf: Optional[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]] = None, **reader_args) List[ray.data.datasource.datasource.ReadTask][source]

Creates and returns read tasks for a Parquet file-based datasource.

class ray.data.datasource.RangeDatasource(*args, **kwds)[source]

An example datasource that generates ranges of numbers from [0..n).

Examples

>>> import ray
>>> from ray.data.datasource import RangeDatasource
>>> source = RangeDatasource() 
>>> ray.data.read_datasource(source, n=10).take() 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, n: int, block_format: str = 'list', tensor_shape: Tuple = (1,)) List[ray.data.datasource.datasource.ReadTask][source]

Return the list of tasks needed to perform a read.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should be as close to this value as possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

class ray.data.datasource.SimpleTensorFlowDatasource(*args, **kwds)[source]

A datasource that lets you use TensorFlow datasets with Ray Data.

Warning

SimpleTensorFlowDataset doesn’t support parallel reads. You should only use this datasource for small datasets like MNIST or CIFAR.

Example

>>> import ray.data
>>> from ray.data.datasource import SimpleTensorFlowDatasource
>>> import tensorflow_datasets as tfds  
>>>
>>> def dataset_factory():
...     return tfds.load("cifar10", split=["train"], as_supervised=True)[0]
...
>>> dataset = ray.data.read_datasource(  
...     SimpleTensorFlowDatasource(),
...     parallelism=1,
...     dataset_factory=dataset_factory
... )
>>> features, label = dataset.take(1)[0]  
>>> features.shape  
TensorShape([32, 32, 3])
>>> label  
<tf.Tensor: shape=(), dtype=int64, numpy=7>

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, dataset_factory: Callable[[], tf.data.Dataset]) List[ray.data.datasource.datasource.ReadTask][source]

Return a read task that loads a TensorFlow dataset.

Parameters
  • parallelism – This argument isn’t used.

  • dataset_factory – A no-argument function that returns the TensorFlow dataset to be read.

class ray.data.datasource.SimpleTorchDatasource(*args, **kwds)[source]

A datasource that let’s you use Torch datasets with Ray Data.

Warning

SimpleTorchDatasource doesn’t support parallel reads. You should only use this datasource for small datasets like MNIST or CIFAR.

Example

>>> import ray
>>> from ray.data.datasource import SimpleTorchDatasource
>>>
>>> dataset_factory = lambda: torchvision.datasets.MNIST("data", download=True)
>>> dataset = ray.data.read_datasource(  
...     SimpleTorchDatasource(), parallelism=1, dataset_factory=dataset_factory
... )
>>> dataset.take(1)  
(<PIL.Image.Image image mode=L size=28x28 at 0x1142CCA60>, 5)

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, dataset_factory: Callable[[], torch.utils.data.Dataset]) List[ray.data.datasource.datasource.ReadTask][source]

Return a read task that loads a Torch dataset.

Parameters
  • parallelism – This argument isn’t used.

  • dataset_factory – A no-argument function that returns the Torch dataset to be read.

Table Row API

class ray.data.row.TableRow(row: Any)[source]

A dict-like row of a tabular Dataset.

This implements the dictionary mapping interface, but provides more efficient access with less data copying than converting Arrow Tables or Pandas DataFrames into per-row dicts. This class must be subclassed, with subclasses implementing __getitem__, __iter__, and __len__.

Concrete subclasses include ray.data._internal.arrow_block.ArrowRow and ray.data._internal.pandas_block.PandasRow.

PublicAPI: This API is stable across Ray releases.

as_pydict() dict[source]

Convert to a normal Python dict. This will create a new copy of the row.

Utility

ray.data.set_progress_bars(enabled: bool) bool[source]

Set whether progress bars are enabled.

The default behavior is controlled by the RAY_DATA_DISABLE_PROGRESS_BARS environment variable. By default, it is set to “0”. Setting it to “1” will disable progress bars, unless they are reenabled by this method.

Returns

Whether progress bars were previously enabled.

PublicAPI: This API is stable across Ray releases.