Input/Output#

Synthetic Data#

ray.data.range(n: int, *, parallelism: int = - 1) ray.data.dataset.Dataset[int][source]#

Create a dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range(10000) 
>>> ds 
Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
>>> ds.map(lambda x: x * 2).take(4) 
[0, 2, 4, 6]
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers.

PublicAPI: This API is stable across Ray releases.

ray.data.range_table(n: int, *, parallelism: int = - 1) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a tabular dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_table(1000) 
>>> ds 
Dataset(num_blocks=200, num_rows=1000, schema={value: int64})
>>> ds.map(lambda r: {"v2": r["value"] * 2}).take(2) 
[ArrowRow({'v2': 0}), ArrowRow({'v2': 2})]

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {β€œvalue”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow records.

PublicAPI: This API is stable across Ray releases.

ray.data.range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = - 1) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a Tensor dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_tensor(1000, shape=(2, 2)) 
>>> ds 
Dataset(
    num_blocks=200,
    num_rows=1000,
    schema={__value__: <ArrowTensorType: shape=(2, 2), dtype=int64>},
)
>>> ds.map_batches(lambda arr: arr * 2).take(2) 
[array([[0, 0],
        [0, 0]]),
array([[2, 2],
        [2, 2]])]

This is similar to range_table(), but uses the ArrowTensorArray extension type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}.

Parameters
  • n – The upper bound of the range of integer records.

  • shape – The shape of each record.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow tensor records.

PublicAPI: This API is stable across Ray releases.

Python Objects#

ray.data.from_items(items: List[Any], *, parallelism: int = - 1) ray.data.dataset.Dataset[Any][source]#

Create a dataset from a list of local Python objects.

Examples

>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 4, 5]) 
>>> ds 
Dataset(num_blocks=5, num_rows=5, schema=<class 'int'>)
>>> ds.take(2) 
[1, 2]
Parameters
  • items – List of local Python objects.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the items.

PublicAPI: This API is stable across Ray releases.

Parquet#

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.ParquetMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultParquetMetadataProvider object>, **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from parquet files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) 
>>> # Specify a schema for the parquet file.
>>> import pyarrow as pa
>>> fields = [("sepal.length", pa.float64()),
...           ("sepal.width", pa.float64()),
...           ("petal.length", pa.float64()),
...           ("petal.width", pa.float64()),
...           ("variety", pa.string())]
>>> ray.data.read_parquet("example://iris.parquet",
...     schema=pa.schema(fields))
Dataset(num_blocks=..., num_rows=150, schema={sepal.length: double, ...})

For further arguments you can pass to pyarrow as a keyword argument, see https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html

Parameters
  • paths – A single file path or directory, or a list of file paths. Multiple directories are not supported.

  • filesystem – The filesystem implementation to read from. These are specified in https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow, see https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_parquet_bulk(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_file_args: Optional[Dict[str, Any]] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.FastFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.parquet'], allow_if_no_extensions=False), **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from a large number (e.g. >1K) of parquet files quickly.

By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to one that supports directory expansion (e.g. DefaultFileMetadataProvider).

Offers improved performance vs. read_parquet() due to not using PyArrow’s ParquetDataset abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node.

Also supports a wider variety of input Parquet file types than read_parquet() due to not trying to merge and resolve a unified schema for all files.

However, unlike read_parquet(), this does not offer file metadata resolution by default, so a custom metadata provider should be provided if your use-case requires a unified dataset schema, block sizes, row counts, etc.

Examples

>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk( 
...     ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk( 
...     "s3://bucket/path",
...     meta_provider=DefaultFileMetadataProvider())
Parameters
  • paths – A single file path or a list of file paths. If one or more directories are provided, then meta_provider should also be set to an implementation that supports directory expansion (e.g. DefaultFileMetadataProvider).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_file_args – kwargs passed to pyarrow.fs.FileSystem.open_input_file.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be files. Change to DefaultFileMetadataProvider or a custom metadata provider if directory expansion and/or file metadata resolution is required.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match β€œ.parquet”.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_parquet("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

CSV#

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None, partitioning: ray.data.datasource.partitioning.Partitioning = Partitioning(style='hive', base_dir='', field_names=None, filesystem=None), **arrow_csv_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from csv files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_csv( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
>>> # Read files that use a different delimiter. For more uses of ParseOptions see
>>> # https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html  # noqa: #501
>>> from pyarrow import csv
>>> parse_options = csv.ParseOptions(delimiter="\t")
>>> ray.data.read_csv( 
...     "example://iris.tsv",
...     parse_options=parse_options)
>>> # Convert a date column with a custom format from a CSV file.
>>> # For more uses of ConvertOptions see
>>> # https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html  # noqa: #501
>>> from pyarrow import csv
>>> convert_options = csv.ConvertOptions(
...     timestamp_parsers=["%m/%d/%Y"])
>>> ray.data.read_csv( 
...     "example://dow_jones_index.csv",
...     convert_options=convert_options)

By default, read_csv parses Hive-style partitions from file paths. If your data adheres to a different partitioning scheme, set the partitioning parameter.

>>> ds = ray.data.read_csv("example://year=2022/month=09/sales.csv")  
>>> ds.take(1)  
[{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}

By default, read_csv reads all files from file paths. If you want to filter files by file extensions, set the partition_filter parameter.

>>> # Read only *.csv files from multiple directories.
>>> from ray.data.datasource import FileExtensionFilter
>>> ray.data.read_csv( 
...     ["s3://bucket/path1", "s3://bucket/path2"],
...     partition_filter=FileExtensionFilter("csv"))
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files. If wishing to filter out all file paths except those whose file extension matches e.g. β€œ.csv”, a FileExtensionFilter("csv") can be provided.

  • partitioning –

    A Partitioning object that describes how paths are organized. By default, this function parses Hive-style partitions.

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_csv("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

JSON#

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.json'], allow_if_no_extensions=False), partitioning: ray.data.datasource.partitioning.Partitioning = Partitioning(style='hive', base_dir='', field_names=None, filesystem=None), **arrow_json_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from json files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_json( 
...     ["s3://bucket/path1", "s3://bucket/path2"])

By default, read_json parses Hive-style partitions from file paths. If your data adheres to a different partitioning scheme, set the partitioning parameter.

>>> ds = ray.data.read_json("example://year=2022/month=09/sales.json")  
>>> ds.take(1)  
[{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match β€œ.json”.

  • arrow_json_args – Other json read options to pass to pyarrow.

  • partitioning –

    A Partitioning object that describes how paths are organized. By default, this function parses Hive-style partitions.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_json("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

Text#

ray.data.read_text(paths: Union[str, List[str]], *, encoding: str = 'utf-8', errors: str = 'ignore', drop_empty_lines: bool = True, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Optional[Dict[str, Any]] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None, partitioning: ray.data.datasource.partitioning.Partitioning = None) ray.data.dataset.Dataset[str][source]#

Create a dataset from lines stored in text files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_text("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_text(["/path/to/file1", "/path/to/file2"]) 
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • encoding – The encoding of the files (e.g., β€œutf-8” or β€œascii”).

  • errors – What to do with errors on decoding. Specify either β€œstrict”, β€œignore”, or β€œreplace”. Defaults to β€œignore”.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – Kwargs passed to ray.remote in the read tasks and in the subsequent text decoding map task.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files. If wishing to filter out all file paths except those whose file extension matches e.g. β€œ.txt”, a FileXtensionFilter("txt") can be provided.

  • partitioning – A Partitioning object that describes how paths are organized. Defaults to None.

Returns

Dataset holding lines of text read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Images (experimental)#

ray.data.read_images(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = - 1, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'], allow_if_no_extensions=False), partitioning: ray.data.datasource.partitioning.Partitioning = None, size: Optional[Tuple[int, int]] = None, mode: Optional[str] = None, include_paths: bool = False)[source]#

Read images from the specified paths.

Examples

>>> import ray
>>> path = "s3://air-example-data-2/movie-image-small-filesize-1GB"
>>> ds = ray.data.read_images(path)  
>>> ds  
Dataset(num_blocks=200, num_rows=41979, schema={image: ArrowVariableShapedTensorType(dtype=uint8, ndim=3)})

If you need image file paths, set include_paths=True.

>>> ds = ray.data.read_images(path, include_paths=True)  
>>> ds  
Dataset(num_blocks=200, num_rows=41979, schema={image: ArrowVariableShapedTensorType(dtype=uint8, ndim=3), path: string})
>>> ds.take(1)[0]["path"]  
'air-example-data-2/movie-image-small-filesize-1GB/0.jpg'

If your images are arranged like:

root/dog/xxx.png
root/dog/xxy.png

root/cat/123.png
root/cat/nsdf3.png

Then you can include the labels by specifying a Partitioning.

>>> import ray
>>> from ray.data.datasource.partitioning import Partitioning
>>> root = "example://tiny-imagenet-200/train"
>>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
>>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning)  
>>> ds  
Dataset(num_blocks=176, num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object})
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match *.png, *.jpg, *.jpeg, *.tiff, *.bmp, or *.gif.

  • partitioning – A Partitioning object that describes how paths are organized. Defaults to None.

  • size – The desired height and width of loaded images. If unspecified, images retain their original shape.

  • mode – A Pillow mode describing the desired type and depth of pixels. If unspecified, image modes are inferred by Pillow.

  • include_paths – If True, include the path to each image. File paths are stored in the 'path' column.

Returns

A Dataset containing tensors that represent the images at the specified paths. For information on working with tensors, read the tensor data guide.

Raises
  • ValueError – if size contains non-positive numbers.

  • ValueError – if mode is unsupported.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

Binary#

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None, partitioning: ray.data.datasource.partitioning.Partitioning = None) ray.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]][source]#

Create a dataset from binary files of arbitrary contents.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_binary_files( 
...     ["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files.

  • partitioning – A Partitioning object that describes how paths are organized. Defaults to None.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

TFRecords#

ray.data.read_tfrecords(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None) ray.data.dataset.Dataset[ray.data._internal.pandas_block.PandasRow][source]#

Create a dataset from TFRecord files that contain tf.train.Example messages.

Warning

This function exclusively supports tf.train.Example messages. If a file contains a message that isn’t of type tf.train.Example, then this function errors.

Examples

>>> import os
>>> import tempfile
>>> import tensorflow as tf
>>> features = tf.train.Features(
...     feature={
...         "length": tf.train.Feature(float_list=tf.train.FloatList(value=[5.1])),
...         "width": tf.train.Feature(float_list=tf.train.FloatList(value=[3.5])),
...         "species": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"setosa"])),
...     }
... )
>>> example = tf.train.Example(features=features)
>>> path = os.path.join(tempfile.gettempdir(), "data.tfrecords")
>>> with tf.io.TFRecordWriter(path=path) as writer:
...     writer.write(example.SerializeToString())

This function reads tf.train.Example messages into a tabular Dataset.

>>> import ray
>>> ds = ray.data.read_tfrecords(path)
>>> ds.to_pandas()  
   length  width    species
0     5.1    3.5  b'setosa'
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files in the dataset.

  • arrow_open_stream_args – Key-word arguments passed to pyarrow.fs.FileSystem.open_input_stream.

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match "*.tfrecords*".

Returns

A Dataset that contains the example features.

Raises

ValueError – If a file contains a message that isn’t a tf.train.Example.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.data.Dataset.write_tfrecords(self, path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, ray_remote_args: Dict[str, Any] = None) None

Write the dataset to TFRecord files.

The TFRecord files will contain tf.train.Example # noqa: E501 records, with one Example record for each row in the dataset.

Warning

tf.train.Feature only natively stores ints, floats, and bytes, so this function only supports datasets with these data types, and will error if the dataset contains unsupported types.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.tfrecords, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.from_items([
...     { "name": "foo", "score": 42 },
...     { "name": "bar", "score": 43 },
... ])
>>> ds.write_tfrecords("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where tfrecords files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

Pandas#

ray.data.from_pandas(dfs: Union[pandas.DataFrame, List[pandas.DataFrame]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a list of Pandas dataframes.

Parameters

dfs – A Pandas dataframe or a list of Pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI: This API is stable across Ray releases.

ray.data.from_pandas_refs(dfs: Union[ray.types.ObjectRef[pandas.DataFrame], List[ray.types.ObjectRef[pandas.DataFrame]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a list of Ray object references to Pandas dataframes.

Parameters

dfs – A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.to_pandas(limit: int = 100000) pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow or Pandas records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

Dataset.to_pandas_refs() List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

NumPy#

ray.data.read_numpy(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.npy'], allow_if_no_extensions=False), partitioning: ray.data.datasource.partitioning.Partitioning = None, **numpy_load_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from numpy files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_numpy("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_numpy( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • numpy_load_args – Other options to pass to np.load.

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match β€œ.npy”.

  • partitioning – A Partitioning object that describes how paths are organized. Defaults to None.

Returns

Dataset holding Tensor records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.from_numpy(ndarrays: Union[numpy.ndarray, List[numpy.ndarray]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a list of NumPy ndarrays.

Parameters

ndarrays – A NumPy ndarray or a list of NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

PublicAPI: This API is stable across Ray releases.

ray.data.from_numpy_refs(ndarrays: Union[ray.types.ObjectRef[numpy.ndarray], List[ray.types.ObjectRef[numpy.ndarray]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a list of NumPy ndarray futures.

Parameters

ndarrays – A Ray object reference to a NumPy ndarray or a list of Ray object references to NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.write_numpy(path: str, *, column: str = '__value__', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, ray_remote_args: Dict[str, Any] = None) None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_numpy("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. The default is "__value__", the column name that Datasets uses for storing tensors in single-column tables.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

Dataset.to_numpy_refs(*, column: Optional[str] = None) List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters
  • column – The name of the column to convert to numpy, or None to specify the

  • blocks (entire row. If not specified for Arrow or Pandas) –

  • returned (each) –

  • ndarrays. (future will represent a dict of column) –

Returns

A list of remote NumPy ndarrays created from this dataset.

Arrow#

ray.data.from_arrow(tables: Union[pyarrow.Table, bytes, List[Union[pyarrow.Table, bytes]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a list of Arrow tables.

Parameters

tables – An Arrow table, or a list of Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

PublicAPI: This API is stable across Ray releases.

ray.data.from_arrow_refs(tables: Union[ray.types.ObjectRef[Union[pyarrow.Table, bytes]], List[ray.types.ObjectRef[Union[pyarrow.Table, bytes]]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a set of Arrow tables.

Parameters

tables – A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.to_arrow_refs() List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

MongoDB#

ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: Optional[List[Dict]] = None, schema: Optional[pymongoarrow.api.Schema] = None, parallelism: int = - 1, ray_remote_args: Dict[str, Any] = None, **mongo_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from MongoDB.

The data to read from is specified via the uri, database and collection of the MongoDB. The dataset is created from the results of executing pipeline against the collection. If pipeline is None, the entire collection will be read.

You can check out more details here about these MongoDB concepts: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/

To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by parallelism which can be requested from this interface or automatically chosen if unspecified (see the parallelism arg below).

Examples

>>> import ray
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_mongo( 
...     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501
...     database="my_db",
...     collection="my_collection",
...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
...     parallelism=10,
... )
Parameters
  • uri – The URI of the source MongoDB where the dataset will be read from. For the URI format, see details in https://www.mongodb.com/docs/manual/reference/connection-string/.

  • database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError will be raised.

  • collection – The name of the collection in the database. This collection must exist otherwise ValueError will be raised.

  • pipeline – A MongoDB pipeline, which will be executed on the given collection with results used to create Dataset. If None, the entire collection will be read.

  • schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.

  • parallelism – The requested parallelism of the read. If -1, it will be automatically chosen based on the available cluster resources and estimated in-memory data size.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • mong_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.

Returns

Dataset holding Arrow records from the results of executing the pipeline on the specified MongoDB collection.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.data.Dataset.write_mongo(self, uri: str, database: str, collection: str, ray_remote_args: Optional[Dict[str, Any]] = None) None

Write the dataset to a MongoDB datasource.

This is only supported for datasets convertible to Arrow records. To control the number of parallel write tasks, use .repartition() before calling this method.

Note

Currently, this supports only a subset of the pyarrow’s types, due to the limitation of pymongoarrow which is used underneath. Writing unsupported types will fail on type checking. See all the supported types at: https://mongo-arrow.readthedocs.io/en/latest/supported_types.html.

Note

The records will be inserted into MongoDB as new documents. If a record has the _id field, this _id must be non-existent in MongoDB, otherwise the write will be rejected and fail (hence preexisting documents are protected from being mutated). It’s fine to not have _id field in record and MongoDB will auto generate one at insertion.

Examples

>>> import ray
>>> import pandas as pd
>>> docs = [{"title": "MongoDB Datasource test"} for key in range(4)]
>>> ds = ray.data.from_pandas(pd.DataFrame(docs))
>>> ds.write_mongo( 
>>>     MongoDatasource(), 
>>>     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501 
>>>     database="my_db", 
>>>     collection="my_collection", 
>>> ) 
Parameters
  • uri – The URI to the destination MongoDB where the dataset will be written to. For the URI format, see details in https://www.mongodb.com/docs/manual/reference/connection-string/.

  • database – The name of the database. This database must exist otherwise ValueError will be raised.

  • collection – The name of the collection in the database. This collection must exist otherwise ValueError will be raised.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

Dask#

ray.data.from_dask(df: dask.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI: This API is stable across Ray releases.

Dataset.to_dask(meta: Optional[Union[pandas.DataFrame, pandas.Series, Dict[str, Any], Iterable[Any], Tuple[Any]]] = None) dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Parameters

meta – An empty pandas DataFrame or Series that matches the dtypes and column names of the Dataset. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. By default, this will be inferred from the underlying Dataset schema, with this argument supplying an optional override.

Returns

A Dask DataFrame created from this dataset.

Spark#

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: Optional[int] = None) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a Spark dataframe.

Parameters
  • spark – A SparkSession, which must be created by RayDP (Spark-on-Ray).

  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_spark(spark: pyspark.sql.SparkSession) pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

Modin#

ray.data.from_modin(df: modin.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a Modin dataframe.

Parameters

df – A Modin dataframe, which must be using the Ray backend.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_modin() modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

Mars#

ray.data.from_mars(df: mars.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_mars() mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

Torch#

ray.data.from_torch(dataset: torch.utils.data.Dataset) ray.data.dataset.Dataset[source]#

Create a dataset from a Torch dataset.

This function is inefficient. Use it to read small datasets or prototype.

Warning

If your dataset is large, this function may execute slowly or raise an out-of-memory error. To avoid issues, read the underyling data with a function like read_images().

Note

This function isn’t paralellized. It loads the entire dataset into the head node’s memory before moving the data to the distributed object store.

Examples

>>> import ray
>>> from torchvision import datasets
>>> dataset = datasets.MNIST("data", download=True)  
>>> dataset = ray.data.from_torch(dataset)  
>>> dataset  
Dataset(num_blocks=200, num_rows=60000, schema=<class 'tuple'>)
>>> dataset.take(1)  
[(<PIL.Image.Image image mode=L size=28x28 at 0x...>, 5)]
Parameters

dataset – A Torch dataset.

Returns

A Dataset that contains the samples stored in the Torch dataset.

PublicAPI: This API is stable across Ray releases.

HuggingFace#

ray.data.from_huggingface(dataset: Union[datasets.Dataset, datasets.DatasetDict]) Union[ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow], Dict[str, ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow]]][source]#

Create a dataset from a Hugging Face Datasets Dataset.

This function is not parallelized, and is intended to be used with Hugging Face Datasets that are loaded into memory (as opposed to memory-mapped).

Parameters

dataset – A Hugging Face Dataset, or DatasetDict. IterableDataset is not supported.

Returns

Dataset holding Arrow records from the Hugging Face Dataset, or a dict of datasets in case dataset is a DatasetDict.

PublicAPI: This API is stable across Ray releases.

TensorFlow#

ray.data.from_tf(dataset: tf.data.Dataset) ray.data.dataset.Dataset[source]#

Create a dataset from a TensorFlow dataset.

This function is inefficient. Use it to read small datasets or prototype.

Warning

If your dataset is large, this function may execute slowly or raise an out-of-memory error. To avoid issues, read the underyling data with a function like read_images().

Note

This function isn’t paralellized. It loads the entire dataset into the head node’s memory before moving the data to the distributed object store.

Examples

>>> import ray
>>> import tensorflow_datasets as tfds
>>> dataset, _ = tfds.load('cifar10', split=["train", "test"])  
>>> dataset = ray.data.from_tf(dataset)  
>>> dataset  
Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: ArrowTensorType(shape=(32, 32, 3), dtype=uint8), label: int64})
>>> dataset.take(1)  
[{'id': b'train_16399', 'image': array([[[143,  96,  70],
[141,  96,  72],
[135,  93,  72],
...,
[ 96,  37,  19],
[105,  42,  18],
[104,  38,  20]],

…,

[[195, 161, 126],

[187, 153, 123], [186, 151, 128], …, [212, 177, 147], [219, 185, 155], [221, 187, 157]]], dtype=uint8), β€˜label’: 7}]

Parameters

dataset – A TensorFlow dataset.

Returns

A Dataset that contains the samples stored in the TensorFlow dataset.

PublicAPI: This API is stable across Ray releases.

Datasource API#

ray.data.read_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.read_api.T], *, parallelism: int = - 1, ray_remote_args: Optional[Dict[str, Any]] = None, **read_args) ray.data.dataset.Dataset[ray.data.read_api.T][source]#

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource. If set to -1, parallelism will be automatically chosen based on the available cluster resources and estimated in-memory data size.

  • read_args – Additional kwargs to pass to the datasource impl.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

Returns

Dataset holding the data read from the datasource.

PublicAPI: This API is stable across Ray releases.

Dataset.write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Optional[Dict[str, Any]] = None, **write_args) None[source]

Write the dataset to a custom datasource.

Examples

>>> import ray
>>> from ray.data.datasource import Datasource
>>> ds = ray.data.range(100) 
>>> class CustomDatasource(Datasource): 
...     # define custom data source
...     pass 
>>> ds.write_datasource(CustomDatasource(...)) 

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional write args to pass to the datasource.

class ray.data.Datasource(*args, **kwds)[source]#

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

Datasource instances must be serializable, since create_reader() and do_write() are called in remote tasks.

PublicAPI: This API is stable across Ray releases.

create_reader(**read_args) ray.data.datasource.datasource.Reader[ray.data.block.T][source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

prepare_read(parallelism: int, **read_args) List[ray.data.datasource.datasource.ReadTask[ray.data.block.T]][source]#

Deprecated: Please implement create_reader() instead.

Warning

DEPRECATED: This API is deprecated and may be removed in a future Ray release.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], ray_remote_args: Dict[str, Any], **write_args) List[ray.types.ObjectRef[Any]][source]#

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

on_write_complete(write_results: List[Any], **kwargs) None[source]#

Callback for when a write job completes.

This can be used to β€œcommit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_results – The list of the write task results.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_results: List[ray.types.ObjectRef[Any]], error: Exception, **kwargs) None[source]#

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_results – The list of the write task result futures.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[[], Iterable[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: ray.data.block.BlockMetadata)[source]#

A function used to read blocks from the dataset.

Read tasks are generated by reader.get_read_tasks(), and return a list of ray.data.Block when called. Initial metadata about the read operation can be retrieved via get_metadata() prior to executing the read. Final metadata is returned after the read along with the blocks.

Ray will execute read tasks in remote functions to parallelize execution. Note that the number of blocks returned can vary at runtime. For example, if a task is reading a single large file it can return multiple blocks to avoid running out of memory during the read.

The initial metadata should reflect all the blocks returned by the read, e.g., if the metadata says num_rows=1000, the read can return a single block of 1000 rows, or multiple blocks with 1000 rows altogether.

The final metadata (returned with the actual block) reflects the exact contents of the block itself.

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.Reader(*args, **kwds)[source]#

A bound read operation for a datasource.

This is a stateful class so that reads can be prepared in multiple stages. For example, it is useful for Datasets to know the in-memory size of the read prior to executing it.

PublicAPI: This API is stable across Ray releases.

estimate_inmemory_data_size() Optional[int][source]#

Return an estimate of the in-memory data size, or None if unknown.

Note that the in-memory data size may be larger than the on-disk data size.

get_read_tasks(parallelism: int) List[ray.data.datasource.datasource.ReadTask[ray.data.block.T]][source]#

Execute the read and return read tasks.

Parameters
  • parallelism – The requested read parallelism. The number of read tasks should equal to this value if possible.

  • read_args – Additional kwargs to pass to the datasource impl.

Returns

A list of read tasks that can be executed to read blocks from the datasource in parallel.

Built-in Datasources#

class ray.data.datasource.BinaryDatasource(*args, **kwds)[source]#

Binary datasource, for reading and writing binary files.

Examples

>>> import ray
>>> from ray.data.datasource import BinaryDatasource
>>> source = BinaryDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[b"file_data", ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.CSVDatasource(*args, **kwds)[source]#

CSV datasource, for reading and writing CSV files.

Examples

>>> import ray
>>> from ray.data.datasource import CSVDatasource
>>> source = CSVDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.FileBasedDatasource(*args, **kwds)[source]#

File-based datasource, for reading and writing files.

This class should not be used directly, and should instead be subclassed and tailored to particular file formats. Classes deriving from this class must implement _read_file().

If the _FILE_EXTENSION is defined, per default only files with this extension will be read. If None, no default filter is used.

Current subclasses:

JSONDatasource, CSVDatasource, NumpyDatasource, BinaryDatasource

DeveloperAPI: This API may change across minor Ray releases.

create_reader(**kwargs)[source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], path: str, dataset_uuid: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, write_args_fn: Callable[[], Dict[str, Any]] = <function FileBasedDatasource.<lambda>>, _block_udf: Optional[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]] = None, ray_remote_args: Dict[str, Any] = None, **write_args) List[ray.types.ObjectRef[Any]][source]#

Creates and returns write tasks for a file-based datasource.

class ray.data.datasource.ImageDatasource(*args, **kwds)[source]#

A datasource that lets you read images.

DeveloperAPI: This API may change across minor Ray releases.

create_reader(size: Optional[Tuple[int, int]] = None, mode: Optional[str] = None, include_paths: bool = False, **kwargs) Reader[T][source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

class ray.data.datasource.JSONDatasource(*args, **kwds)[source]#

JSON datasource, for reading and writing JSON files.

Examples

>>> import ray
>>> from ray.data.datasource import JSONDatasource
>>> source = JSONDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.NumpyDatasource(*args, **kwds)[source]#

Numpy datasource, for reading and writing Numpy files.

Examples

>>> import ray
>>> from ray.data.datasource import NumpyDatasource
>>> source = NumpyDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[array([0., 1., 2.]), ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.ParquetDatasource(*args, **kwds)[source]#

Parquet datasource, for reading and writing Parquet files.

The primary difference from ParquetBaseDatasource is that this uses PyArrow’s ParquetDataset abstraction for dataset reads, and thus offers automatic Arrow dataset schema inference and row count collection at the cost of some potential performance and/or compatibility penalties.

Examples

>>> import ray
>>> from ray.data.datasource import ParquetDatasource
>>> source = ParquetDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

create_reader(**kwargs)[source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

class ray.data.datasource.RangeDatasource(*args, **kwds)[source]#

An example datasource that generates ranges of numbers from [0..n).

Examples

>>> import ray
>>> from ray.data.datasource import RangeDatasource
>>> source = RangeDatasource() 
>>> ray.data.read_datasource(source, n=10).take() 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

PublicAPI: This API is stable across Ray releases.

create_reader(n: int, block_format: str = 'list', tensor_shape: Tuple = (1,)) List[ray.data.datasource.datasource.ReadTask][source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

class ray.data.datasource.TFRecordDatasource(*args, **kwds)[source]#

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.data.datasource.MongoDatasource(*args, **kwds)[source]#

Datasource for reading from and writing to MongoDB.

Examples

>>> import ray
>>> from ray.data.datasource import MongoDatasource
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_datasource( 
...     MongoDatasource(), 
...     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501 
...     database="my_db", 
...     collection="my_collection", 
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}), 
... ) 

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

create_reader(**kwargs) ray.data.datasource.datasource.Reader[source]#

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], ray_remote_args: Optional[Dict[str, Any]], uri: str, database: str, collection: str) List[ray.types.ObjectRef[Any]][source]#

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

Partitioning API#

class ray.data.datasource.Partitioning(style: ray.data.datasource.partitioning.PartitionStyle, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None)[source]#

Partition scheme used to describe path-based partitions.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

style#

The partition style - may be either HIVE or DIRECTORY.

Type

ray.data.datasource.partitioning.PartitionStyle

base_dir#

β€œ/”-delimited base directory that all partitioned paths should exist under (exclusive). File paths either outside of, or at the first level of, this directory will be considered unpartitioned. Specify None or an empty string to search for partitions in all file path directories.

Type

Optional[str]

field_names#

The partition key field names (i.e. column names for tabular datasets). When non-empty, the order and length of partition key field names must match the order and length of partition values. Required when parsing DIRECTORY partitioned paths or generating HIVE partitioned paths.

Type

Optional[List[str]]

filesystem#

Filesystem that will be used for partition path file I/O.

Type

Optional[pyarrow.fs.FileSystem]

DeveloperAPI: This API may change across minor Ray releases.

property normalized_base_dir: str#

Returns the base directory normalized for compatibility with a filesystem.

property resolved_filesystem: pyarrow.fs.FileSystem#

Returns the filesystem resolved for compatibility with a base directory.

class ray.data.datasource.PartitionStyle(value)[source]#

Supported dataset partition styles.

Inherits from str to simplify plain text serialization/deserialization.

Examples

>>> # Serialize to JSON text.
>>> json.dumps(PartitionStyle.HIVE)  
'"hive"'
>>> # Deserialize from JSON text.
>>> PartitionStyle(json.loads('"hive"'))  
<PartitionStyle.HIVE: 'hive'>

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.PathPartitionEncoder(partitioning: ray.data.datasource.partitioning.Partitioning)[source]#

Callable that generates directory path strings for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories will be generated using a β€œ{key1}={value1}/{key2}={value2}” naming convention under the base directory. An accompanying ordered list of partition key field names must also be provided, where the order and length of all partition values must match the order and length of field names

For DIRECTORY Partitioning, all directories will be generated from partition values using a β€œ{value1}/{value2}” naming convention under the base directory.

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionEncoder[source]#

Creates a new partition path encoder.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – β€œ/”-delimited base directory that all partition paths will be generated under (exclusive).

  • field_names – The partition key field names (i.e. column names for tabular datasets). Required for HIVE partition paths, optional for DIRECTORY partition paths. When non-empty, the order and length of partition key field names must match the order and length of partition values.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new partition path encoder.

property scheme: ray.data.datasource.partitioning.Partitioning#

Returns the partitioning for this encoder.

class ray.data.datasource.PathPartitionParser(partitioning: ray.data.datasource.partitioning.Partitioning)[source]#

Partition parser for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories under the base directory will be discovered based on β€œ{key1}={value1}/{key2}={value2}” naming conventions. Key/value pairs do not need to be presented in the same order across all paths. Directory names nested under the base directory that don’t follow this naming condition will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file.

For DIRECTORY Partitioning, all directories under the base directory will be interpreted as partition values of the form β€œ{value1}/{value2}”. An accompanying ordered list of partition field names must also be provided, where the order and length of all partition values must match the order and length of field names. Files stored directly in the base directory will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file. For example, if the base directory is β€œfoo” then β€œfoo.csv” and β€œfoo/bar.csv” would be considered unpartitioned files but β€œfoo/bar/baz.csv” would be associated with partition β€œbar”. If the base directory is undefined, then β€œfoo.csv” would be unpartitioned, β€œfoo/bar.csv” would be associated with partition β€œfoo”, and β€œfoo/bar/baz.csv” would be associated with partition (β€œfoo”, β€œbar”).

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionParser[source]#

Creates a path-based partition parser using a flattened argument list.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – β€œ/”-delimited base directory to start searching for partitions (exclusive). File paths outside of this directory will be considered unpartitioned. Specify None or an empty string to search for partitions in all file path directories.

  • field_names – The partition key names. Required for DIRECTORY partitioning. Optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. Partition key field names are not required to exist in the dataset schema.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new path-based partition parser.

property scheme: ray.data.datasource.partitioning.Partitioning#

Returns the partitioning for this parser.

class ray.data.datasource.PathPartitionFilter(path_partition_parser: ray.data.datasource.partitioning.PathPartitionParser, filter_fn: Callable[[Dict[str, str]], bool])[source]#

Partition filter for path-based partition formats.

Used to explicitly keep or reject files based on a custom filter function that takes partition keys and values parsed from the file’s path as input.

PublicAPI (beta): This API is in beta and may change before becoming stable.

MetadataProvider API#

class ray.data.datasource.FileMetadataProvider[source]#

Abstract callable that provides metadata for the files of a single dataset block.

Current subclasses:

BaseFileMetadataProvider ParquetMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.BaseFileMetadataProvider[source]#
Abstract callable that provides metadata for FileBasedDatasource

implementations that reuse the base prepare_read method.

Also supports file and file size discovery in input directory paths.

Current subclasses:

DefaultFileMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: Optional[pyarrow.fs.FileSystem]) Tuple[List[str], List[Optional[int]]][source]#

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.

class ray.data.datasource.ParquetMetadataProvider[source]#

Abstract callable that provides block metadata for Arrow Parquet file fragments.

All file fragments should belong to a single dataset block.

Supports optional pre-fetching of ordered metadata for all file fragments in a single batch to help optimize metadata resolution.

Current subclasses:

DefaultParquetMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

prefetch_file_metadata(pieces: List[pyarrow.dataset.ParquetFileFragment], **ray_remote_args) Optional[List[Any]][source]#

Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls to _get_block_metadata() together with their corresponding Parquet file fragments.

Implementations that don’t support pre-fetching file metadata shouldn’t override this method.

Parameters

pieces – The Parquet file fragments to fetch metadata for.

Returns

Metadata resolved for each input file fragment, or None. Metadata must be returned in the same order as all input file fragments, such that metadata[i] always contains the metadata for pieces[i].

class ray.data.datasource.DefaultFileMetadataProvider[source]#

Default metadata provider for FileBasedDatasource implementations that reuse the base prepare_read method.

Calculates block size in bytes as the sum of its constituent file sizes, and assumes a fixed number of rows per file.

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: pyarrow.fs.FileSystem) Tuple[List[str], List[Optional[int]]][source]#

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.

class ray.data.datasource.DefaultParquetMetadataProvider[source]#

The default file metadata provider for ParquetDatasource.

Aggregates total block bytes and number of rows using the Parquet file metadata associated with a list of Arrow Parquet dataset file fragments.

DeveloperAPI: This API may change across minor Ray releases.

prefetch_file_metadata(pieces: List[pyarrow.dataset.ParquetFileFragment], **ray_remote_args) Optional[List[pyarrow.parquet.FileMetaData]][source]#

Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls to _get_block_metadata() together with their corresponding Parquet file fragments.

Implementations that don’t support pre-fetching file metadata shouldn’t override this method.

Parameters

pieces – The Parquet file fragments to fetch metadata for.

Returns

Metadata resolved for each input file fragment, or None. Metadata must be returned in the same order as all input file fragments, such that metadata[i] always contains the metadata for pieces[i].

class ray.data.datasource.FastFileMetadataProvider[source]#

Fast Metadata provider for FileBasedDatasource implementations.

Offers improved performance vs. DefaultFileMetadataProvider by skipping directory path expansion and file size collection. While this performance improvement may be negligible for local filesystems, it can be substantial for cloud storage service providers.

This should only be used when all input paths are known to be files.

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: pyarrow.fs.FileSystem) Tuple[List[str], List[Optional[int]]][source]#

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.