Input/Output

Synthetic Data

ray.data.range(n: int, *, parallelism: int = - 1) ray.data.dataset.Dataset[int][source]

Create a dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range(10000) 
>>> ds 
Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
>>> ds.map(lambda x: x * 2).take(4) 
[0, 2, 4, 6]
Parameters
  • n – The upper bound of the range of integers.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers.

PublicAPI: This API is stable across Ray releases.

ray.data.range_table(n: int, *, parallelism: int = - 1) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a tabular dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_table(1000) 
>>> ds 
Dataset(num_blocks=200, num_rows=1000, schema={value: int64})
>>> ds.map(lambda r: {"v2": r["value"] * 2}).take(2) 
[ArrowRow({'v2': 0}), ArrowRow({'v2': 2})]

This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {“value”: N}.

Parameters
  • n – The upper bound of the range of integer records.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow records.

PublicAPI: This API is stable across Ray releases.

ray.data.range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = - 1) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a Tensor dataset from a range of integers [0..n).

Examples

>>> import ray
>>> ds = ray.data.range_tensor(1000, shape=(2, 2)) 
>>> ds 
Dataset(
    num_blocks=200,
    num_rows=1000,
    schema={__value__: <ArrowTensorType: shape=(2, 2), dtype=int64>},
)
>>> ds.map_batches(lambda arr: arr * 2).take(2) 
[array([[0, 0],
        [0, 0]]),
array([[2, 2],
        [2, 2]])]

This is similar to range_table(), but uses the ArrowTensorArray extension type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}.

Parameters
  • n – The upper bound of the range of integer records.

  • shape – The shape of each record.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the integers as Arrow tensor records.

PublicAPI: This API is stable across Ray releases.

Python Objects

ray.data.from_items(items: List[Any], *, parallelism: int = - 1) ray.data.dataset.Dataset[Any][source]

Create a dataset from a list of local Python objects.

Examples

>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 4, 5]) 
>>> ds 
Dataset(num_blocks=5, num_rows=5, schema=<class 'int'>)
>>> ds.take(2) 
[1, 2]
Parameters
  • items – List of local Python objects.

  • parallelism – The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items.

Returns

Dataset holding the items.

PublicAPI: This API is stable across Ray releases.

Parquet

ray.data.read_parquet(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.ParquetMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultParquetMetadataProvider object>, **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from parquet files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) 
Parameters
  • paths – A single file path or directory, or a list of file paths. Multiple directories are not supported.

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.read_parquet_bulk(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_file_args: Optional[Dict[str, Any]] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.FastFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.parquet'], allow_if_no_extensions=False), **arrow_parquet_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from a large number (e.g. >1K) of parquet files quickly.

By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to one that supports directory expansion (e.g. DefaultFileMetadataProvider).

Offers improved performance vs. read_parquet() due to not using PyArrow’s ParquetDataset abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node.

Also supports a wider variety of input Parquet file types than read_parquet() due to not trying to merge and resolve a unified schema for all files.

However, unlike read_parquet(), this does not offer file metadata resolution by default, so a custom metadata provider should be provided if your use-case requires a unified dataset schema, block sizes, row counts, etc.

Examples

>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk( 
...     ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk( 
...     "s3://bucket/path",
...     meta_provider=DefaultFileMetadataProvider())
Parameters
  • paths – A single file path or a list of file paths. If one or more directories are provided, then meta_provider should also be set to an implementation that supports directory expansion (e.g. DefaultFileMetadataProvider).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_file_args – kwargs passed to pyarrow.fs.FileSystem.open_input_file.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be files. Change to DefaultFileMetadataProvider or a custom metadata provider if directory expansion and/or file metadata resolution is required.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.parquet”.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_parquet("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

CSV

ray.data.read_csv(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.csv'], allow_if_no_extensions=False), **arrow_csv_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from csv files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_csv( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.csv”.

  • arrow_csv_args – Other csv read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_csv("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

JSON

ray.data.read_json(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.json'], allow_if_no_extensions=False), **arrow_json_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from json files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_json( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.json”.

  • arrow_json_args – Other json read options to pass to pyarrow.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Dataset.write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_json("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

Text

ray.data.read_text(paths: Union[str, List[str]], *, encoding: str = 'utf-8', errors: str = 'ignore', drop_empty_lines: bool = True, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Optional[Dict[str, Any]] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None) ray.data.dataset.Dataset[str][source]

Create a dataset from lines stored in text files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_text("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_text(["/path/to/file1", "/path/to/file2"]) 
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • encoding – The encoding of the files (e.g., “utf-8” or “ascii”).

  • errors – What to do with errors on decoding. Specify either “strict”, “ignore”, or “replace”. Defaults to “ignore”.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – Kwargs passed to ray.remote in the read tasks and in the subsequent text decoding map task.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files. If wishing to filter out all file paths except those whose file extension matches e.g. “.txt”, a FileXtensionFilter("txt") can be provided.

Returns

Dataset holding lines of text read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Binary

ray.data.read_binary_files(paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = None) ray.data.dataset.Dataset[Union[Tuple[str, bytes], bytes]][source]

Create a dataset from binary files of arbitrary contents.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_binary_files( 
...     ["/path/to/file1", "/path/to/file2"])
Parameters
  • paths – A single file path or a list of file paths (or directories).

  • include_paths – Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents.

  • filesystem – The filesystem implementation to read from.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this does not filter out any files.

Returns

Dataset holding Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

Pandas

ray.data.from_pandas(dfs: Union[pandas.DataFrame, List[pandas.DataFrame]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Pandas dataframes.

Parameters

dfs – A Pandas dataframe or a list of Pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

PublicAPI: This API is stable across Ray releases.

ray.data.from_pandas_refs(dfs: Union[ray.types.ObjectRef[pandas.DataFrame], List[ray.types.ObjectRef[pandas.DataFrame]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Ray object references to Pandas dataframes.

Parameters

dfs – A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes.

Returns

Dataset holding Arrow records read from the dataframes.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.to_pandas(limit: int = 100000) pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow or Pandas records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

Dataset.to_pandas_refs() List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

NumPy

ray.data.read_numpy(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, parallelism: int = -1, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.DefaultFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.npy'], allow_if_no_extensions=False), **numpy_load_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create an Arrow dataset from numpy files.

Examples

>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_numpy("s3://bucket/path") 
>>> # Read multiple local files.
>>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) 
>>> # Read multiple directories.
>>> ray.data.read_numpy( 
...     ["s3://bucket/path1", "s3://bucket/path2"])
Parameters
  • paths – A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories.

  • filesystem – The filesystem implementation to read from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_input_stream

  • numpy_load_args – Other options to pass to np.load.

  • meta_provider – File metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.npy”.

Returns

Dataset holding Tensor records read from the specified paths.

PublicAPI: This API is stable across Ray releases.

ray.data.from_numpy(ndarrays: Union[numpy.ndarray, List[numpy.ndarray]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of NumPy ndarrays.

Parameters

ndarrays – A NumPy ndarray or a list of NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

PublicAPI: This API is stable across Ray releases.

ray.data.from_numpy_refs(ndarrays: Union[ray.types.ObjectRef[numpy.ndarray], List[ray.types.ObjectRef[numpy.ndarray]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of NumPy ndarray futures.

Parameters

ndarrays – A Ray object reference to a NumPy ndarray or a list of Ray object references to NumPy ndarrays.

Returns

Dataset holding the given ndarrays.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.write_numpy(path: str, *, column: str = '__value__', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, ray_remote_args: Dict[str, Any] = None) None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_numpy("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. The default is "__value__", the column name that Datasets uses for storing tensors in single-column tables.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

Dataset.to_numpy_refs(*, column: Optional[str] = None) List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters
  • column – The name of the column to convert to numpy, or None to specify the

  • blocks (entire row. If not specified for Arrow or Pandas) –

  • returned (each) –

  • ndarrays. (future will represent a dict of column) –

Returns

A list of remote NumPy ndarrays created from this dataset.

Arrow

ray.data.from_arrow(tables: Union[pyarrow.Table, bytes, List[Union[pyarrow.Table, bytes]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a list of Arrow tables.

Parameters

tables – An Arrow table, or a list of Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

PublicAPI: This API is stable across Ray releases.

ray.data.from_arrow_refs(tables: Union[ray.types.ObjectRef[Union[pyarrow.Table, bytes]], List[ray.types.ObjectRef[Union[pyarrow.Table, bytes]]]]) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a set of Arrow tables.

Parameters

tables – A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes.

Returns

Dataset holding Arrow records from the tables.

DeveloperAPI: This API may change across minor Ray releases.

Dataset.to_arrow_refs() List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

Dask

ray.data.from_dask(df: dask.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Dask DataFrame.

Parameters

df – A Dask DataFrame.

Returns

Dataset holding Arrow records read from the DataFrame.

PublicAPI: This API is stable across Ray releases.

Dataset.to_dask() dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Returns

A Dask DataFrame created from this dataset.

Spark

ray.data.from_spark(df: pyspark.sql.DataFrame, *, parallelism: Optional[int] = None) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Spark dataframe.

Parameters
  • spark – A SparkSession, which must be created by RayDP (Spark-on-Ray).

  • df – A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_spark(spark: pyspark.sql.SparkSession) pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

Modin

ray.data.from_modin(df: modin.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a Modin dataframe.

Parameters

df – A Modin dataframe, which must be using the Ray backend.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_modin() modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

Mars

ray.data.from_mars(df: mars.DataFrame) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]

Create a dataset from a MARS dataframe.

Parameters

df – A MARS dataframe, which must be executed by MARS-on-Ray.

Returns

Dataset holding Arrow records read from the dataframe.

PublicAPI: This API is stable across Ray releases.

Dataset.to_mars() mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

HuggingFace

ray.data.from_huggingface(dataset: Union[datasets.Dataset, datasets.DatasetDict]) Union[ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow], Dict[str, ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow]]][source]

Create a dataset from a Hugging Face Datasets Dataset.

This function is not parallelized, and is intended to be used with Hugging Face Datasets that are loaded into memory (as opposed to memory-mapped).

Parameters

dataset – A Hugging Face Dataset, or DatasetDict. IterableDataset is not supported.

Returns

Dataset holding Arrow records from the Hugging Face Dataset, or a dict of datasets in case dataset is a DatasetDict.

PublicAPI: This API is stable across Ray releases.

Datasource API

ray.data.read_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.read_api.T], *, parallelism: int = - 1, ray_remote_args: Optional[Dict[str, Any]] = None, **read_args) ray.data.dataset.Dataset[ray.data.read_api.T][source]

Read a dataset from a custom data source.

Parameters
  • datasource – The datasource to read data from.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource. If set to -1, parallelism will be automatically chosen based on the available cluster resources and estimated in-memory data size.

  • read_args – Additional kwargs to pass to the datasource impl.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

Returns

Dataset holding the data read from the datasource.

PublicAPI: This API is stable across Ray releases.

Dataset.write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Optional[Dict[str, Any]] = None, **write_args) None[source]

Write the dataset to a custom datasource.

Examples

>>> import ray
>>> from ray.data.datasource import Datasource
>>> ds = ray.data.range(100) 
>>> class CustomDatasource(Datasource): 
...     # define custom data source
...     pass 
>>> ds.write_datasource(CustomDatasource(...)) 

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional write args to pass to the datasource.

class ray.data.Datasource(*args, **kwds)[source]

Interface for defining a custom ray.data.Dataset datasource.

To read a datasource into a dataset, use ray.data.read_datasource(). To write to a writable datasource, use Dataset.write_datasource().

See RangeDatasource and DummyOutputDatasource for examples of how to implement readable and writable datasources.

Datasource instances must be serializable, since create_reader() and do_write() are called in remote tasks.

PublicAPI: This API is stable across Ray releases.

create_reader(**read_args) ray.data.datasource.datasource.Reader[ray.data.block.T][source]

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

prepare_read(parallelism: int, **read_args) List[ray.data.datasource.datasource.ReadTask[ray.data.block.T]][source]

Deprecated: Please implement create_reader() instead. DEPRECATED: This API is deprecated and may be removed in future Ray releases.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], ray_remote_args: Dict[str, Any], **write_args) List[ray.types.ObjectRef[Any]][source]

Launch Ray tasks for writing blocks out to the datasource.

Parameters
  • blocks – List of data block references. It is recommended that one write task be generated per block.

  • metadata – List of block metadata.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional kwargs to pass to the datasource impl.

Returns

A list of the output of the write tasks.

on_write_complete(write_results: List[Any], **kwargs) None[source]

Callback for when a write job completes.

This can be used to “commit” a write output. This method must succeed prior to write_datasource() returning to the user. If this method fails, then on_write_failed() will be called.

Parameters
  • write_results – The list of the write task results.

  • kwargs – Forward-compatibility placeholder.

on_write_failed(write_results: List[ray.types.ObjectRef[Any]], error: Exception, **kwargs) None[source]

Callback for when a write job fails.

This is called on a best-effort basis on write failures.

Parameters
  • write_results – The list of the write task result futures.

  • error – The first error encountered.

  • kwargs – Forward-compatibility placeholder.

class ray.data.ReadTask(read_fn: Callable[[], Iterable[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: BlockMetadata)[source]

A function used to read blocks from the dataset.

Read tasks are generated by reader.get_read_tasks(), and return a list of ray.data.Block when called. Initial metadata about the read operation can be retrieved via get_metadata() prior to executing the read. Final metadata is returned after the read along with the blocks.

Ray will execute read tasks in remote functions to parallelize execution. Note that the number of blocks returned can vary at runtime. For example, if a task is reading a single large file it can return multiple blocks to avoid running out of memory during the read.

The initial metadata should reflect all the blocks returned by the read, e.g., if the metadata says num_rows=1000, the read can return a single block of 1000 rows, or multiple blocks with 1000 rows altogether.

The final metadata (returned with the actual block) reflects the exact contents of the block itself.

DeveloperAPI: This API may change across minor Ray releases.

Built-in Datasources

class ray.data.datasource.BinaryDatasource(*args, **kwds)[source]

Binary datasource, for reading and writing binary files.

Examples

>>> import ray
>>> from ray.data.datasource import BinaryDatasource
>>> source = BinaryDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[b"file_data", ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.CSVDatasource(*args, **kwds)[source]

CSV datasource, for reading and writing CSV files.

Examples

>>> import ray
>>> from ray.data.datasource import CSVDatasource
>>> source = CSVDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.FileBasedDatasource(*args, **kwds)[source]

File-based datasource, for reading and writing files.

This class should not be used directly, and should instead be subclassed and tailored to particular file formats. Classes deriving from this class must implement _read_file().

If the _FILE_EXTENSION is defined, per default only files with this extension will be read. If None, no default filter is used.

Current subclasses:

JSONDatasource, CSVDatasource, NumpyDatasource, BinaryDatasource

DeveloperAPI: This API may change across minor Ray releases.

create_reader(**kwargs)[source]

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

do_write(blocks: List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]], metadata: List[ray.data.block.BlockMetadata], path: str, dataset_uuid: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, write_args_fn: Callable[[], Dict[str, Any]] = <function FileBasedDatasource.<lambda>>, _block_udf: Optional[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]] = None, ray_remote_args: Dict[str, Any] = None, **write_args) List[ray.types.ObjectRef[Any]][source]

Creates and returns write tasks for a file-based datasource.

class ray.data.datasource.ImageFolderDatasource(*args, **kwds)[source]

A datasource that lets you read datasets like ImageNet.

This datasource works with any dataset where images are arranged in this way:

root/dog/xxx.png
root/dog/xxy.png
root/dog/[...]/xxz.png

root/cat/123.png
root/cat/nsdf3.png
root/cat/[...]/asd932_.png

Datasets read with this datasource contain two columns: 'image' and 'label'.

  • The 'image' column is of type TensorDtype. The shape of the tensors are \((H, W)\) if the images are grayscale and \((H, W, C)\) otherwise.

  • The 'label' column contains strings representing class names (e.g., ‘cat’).

Examples

>>> import ray
>>> from ray.data.datasource import ImageFolderDatasource
>>> ds = ray.data.read_datasource(  
...     ImageFolderDatasource(),
...     root="/data/imagenet/train",
...     size=(224, 224)
... )
>>> sample = ds.take(1)[0]  
>>> sample["image"].to_numpy().shape  
(224, 224, 3)
>>> sample["label"]  
'n01443537'

To convert class labels to integer-valued targets, use OrdinalEncoder.

>>> import ray
>>> from ray.data.preprocessors import OrdinalEncoder
>>> ds = ray.data.read_datasource(  
...     ImageFolderDatasource(),
...     root="/data/imagenet/train",
...     size=(224, 224)
... )
>>> oe = OrdinalEncoder(columns=["label"])  
>>> ds = oe.fit_transform(ds)  
>>> sample = ds.take(1)[0]  
>>> sample["label"]  
71

DeveloperAPI: This API may change across minor Ray releases.

create_reader(root: str, size: Optional[Tuple[int, int]] = None, mode: Optional[str] = None) Reader[T][source]

Return a Reader that reads images.

Warning

If your dataset contains images of varying sizes and you don’t specify size, this datasource will error. To prevent errors, specify size or disable tensor extension casting.

Parameters
  • root – Path to the dataset root.

  • size – The desired height and width of loaded images. If unspecified, images retain their original shape.

  • mode – A Pillow mode describing the desired type and depth of pixels. If unspecified, image modes are inferred by Pillow.

Raises
  • ValueError – if size contains non-positive numbers.

  • ValueError – if mode is unsupported.

class ray.data.datasource.JSONDatasource(*args, **kwds)[source]

JSON datasource, for reading and writing JSON files.

Examples

>>> import ray
>>> from ray.data.datasource import JSONDatasource
>>> source = JSONDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.NumpyDatasource(*args, **kwds)[source]

Numpy datasource, for reading and writing Numpy files.

Examples

>>> import ray
>>> from ray.data.datasource import NumpyDatasource
>>> source = NumpyDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[array([0., 1., 2.]), ...]

PublicAPI: This API is stable across Ray releases.

class ray.data.datasource.ParquetDatasource(*args, **kwds)[source]

Parquet datasource, for reading and writing Parquet files.

The primary difference from ParquetBaseDatasource is that this uses PyArrow’s ParquetDataset abstraction for dataset reads, and thus offers automatic Arrow dataset schema inference and row count collection at the cost of some potential performance and/or compatibility penalties.

Examples

>>> import ray
>>> from ray.data.datasource import ParquetDatasource
>>> source = ParquetDatasource() 
>>> ray.data.read_datasource( 
...     source, paths="/path/to/dir").take()
[{"a": 1, "b": "foo"}, ...]

PublicAPI: This API is stable across Ray releases.

create_reader(**kwargs)[source]

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

class ray.data.datasource.RangeDatasource(*args, **kwds)[source]

An example datasource that generates ranges of numbers from [0..n).

Examples

>>> import ray
>>> from ray.data.datasource import RangeDatasource
>>> source = RangeDatasource() 
>>> ray.data.read_datasource(source, n=10).take() 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

PublicAPI: This API is stable across Ray releases.

create_reader(n: int, block_format: str = 'list', tensor_shape: Tuple = (1,)) List[ray.data.datasource.datasource.ReadTask][source]

Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and generating the actual read tasks to retrieve the data blocks upon request.

Parameters

read_args – Additional kwargs to pass to the datasource impl.

class ray.data.datasource.SimpleTensorFlowDatasource(*args, **kwds)[source]

A datasource that lets you use TensorFlow datasets with Ray Data.

Warning

SimpleTensorFlowDataset doesn’t support parallel reads. You should only use this datasource for small datasets like MNIST or CIFAR.

Example

>>> import ray.data
>>> from ray.data.datasource import SimpleTensorFlowDatasource
>>> import tensorflow_datasets as tfds  
>>>
>>> def dataset_factory():
...     return tfds.load("cifar10", split=["train"], as_supervised=True)[0]
...
>>> dataset = ray.data.read_datasource(  
...     SimpleTensorFlowDatasource(),
...     parallelism=1,
...     dataset_factory=dataset_factory
... )
>>> features, label = dataset.take(1)[0]  
>>> features.shape  
TensorShape([32, 32, 3])
>>> label  
<tf.Tensor: shape=(), dtype=int64, numpy=7>

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, dataset_factory: Callable[[], tf.data.Dataset]) List[ray.data.datasource.datasource.ReadTask][source]

Return a read task that loads a TensorFlow dataset.

Parameters
  • parallelism – This argument isn’t used.

  • dataset_factory – A no-argument function that returns the TensorFlow dataset to be read.

class ray.data.datasource.SimpleTorchDatasource(*args, **kwds)[source]

A datasource that let’s you use Torch datasets with Ray Data.

Warning

SimpleTorchDatasource doesn’t support parallel reads. You should only use this datasource for small datasets like MNIST or CIFAR.

Example

>>> import ray
>>> from ray.data.datasource import SimpleTorchDatasource
>>>
>>> dataset_factory = lambda: torchvision.datasets.MNIST("data", download=True)
>>> dataset = ray.data.read_datasource(  
...     SimpleTorchDatasource(), parallelism=1, dataset_factory=dataset_factory
... )
>>> dataset.take(1)  
(<PIL.Image.Image image mode=L size=28x28 at 0x1142CCA60>, 5)

PublicAPI: This API is stable across Ray releases.

prepare_read(parallelism: int, dataset_factory: Callable[[], torch.utils.data.Dataset]) List[ray.data.datasource.datasource.ReadTask][source]

Return a read task that loads a Torch dataset.

Parameters
  • parallelism – This argument isn’t used.

  • dataset_factory – A no-argument function that returns the Torch dataset to be read.

Partitioning API

class ray.data.datasource.PartitionStyle(value)[source]

Supported dataset partition styles.

Inherits from str to simplify plain text serialization/deserialization.

Examples

>>> # Serialize to JSON text.
>>> json.dumps(PartitionStyle.HIVE)  
'"hive"'
>>> # Deserialize from JSON text.
>>> PartitionStyle(json.loads('"hive"'))  
<PartitionStyle.HIVE: 'hive'>

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.PathPartitionScheme(style: ray.data.datasource.partitioning.PartitionStyle, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None)[source]

Partition scheme used to describe path-based partitions.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

DeveloperAPI: This API may change across minor Ray releases.

property style: ray.data.datasource.partitioning.PartitionStyle

Gets the path partitioning style.

property base_dir: str

Gets the original base directory supplied during object construction.

property normalized_base_dir: str

Returns the base directory normalized for compatibility with a filesystem.

property field_names: Optional[List[str]]

Gets the partition key field names.

property filesystem: Optional[pyarrow.fs.FileSystem]

Gets the original filesystem supplied during object construction.

property resolved_filesystem: pyarrow.fs.FileSystem

Returns the filesystem resolved for compatibility with a base directory.

class ray.data.datasource.PathPartitionEncoder(path_partition_scheme: ray.data.datasource.partitioning.PathPartitionScheme)[source]

Callable that generates directory path strings for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories will be generated using a “{key1}={value1}/{key2}={value2}” naming convention under the base directory. An accompanying ordered list of partition key field names must also be provided, where the order and length of all partition values must match the order and length of field names

For DIRECTORY Partitioning, all directories will be generated from partition values using a “{value1}/{value2}” naming convention under the base directory.

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionEncoder[source]

Creates a new partition path encoder.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – “/”-delimited base directory that all partition paths will be generated under (exclusive).

  • field_names – The partition key field names (i.e. column names for tabular datasets). Required for HIVE partition paths, optional for DIRECTORY partition paths. When non-empty, the order and length of partition key field names must match the order and length of partition values.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new partition path encoder.

property scheme: ray.data.datasource.partitioning.PathPartitionScheme

Returns the path partition scheme for this encoder.

class ray.data.datasource.PathPartitionParser(path_partition_scheme: ray.data.datasource.partitioning.PathPartitionScheme)[source]

Partition parser for path-based partition formats.

Path-based partition formats embed all partition keys and values directly in their dataset file paths.

Two path partition formats are currently supported - HIVE and DIRECTORY.

For HIVE Partitioning, all partition directories under the base directory will be discovered based on “{key1}={value1}/{key2}={value2}” naming conventions. Key/value pairs do not need to be presented in the same order across all paths. Directory names nested under the base directory that don’t follow this naming condition will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file.

For DIRECTORY Partitioning, all directories under the base directory will be interpreted as partition values of the form “{value1}/{value2}”. An accompanying ordered list of partition field names must also be provided, where the order and length of all partition values must match the order and length of field names. Files stored directly in the base directory will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file. For example, if the base directory is “foo” then “foo.csv” and “foo/bar.csv” would be considered unpartitioned files but “foo/bar/baz.csv” would be associated with partition “bar”. If the base directory is undefined, then “foo.csv” would be unpartitioned, “foo/bar.csv” would be associated with partition “foo”, and “foo/bar/baz.csv” would be associated with partition (“foo”, “bar”).

DeveloperAPI: This API may change across minor Ray releases.

static of(style: ray.data.datasource.partitioning.PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, filesystem: Optional[pyarrow.fs.FileSystem] = None) PathPartitionParser[source]

Creates a path-based partition parser using a flattened argument list.

Parameters
  • style – The partition style - may be either HIVE or DIRECTORY.

  • base_dir – “/”-delimited base directory to start searching for partitions (exclusive). File paths outside of this directory will be considered unpartitioned. Specify None or an empty string to search for partitions in all file path directories.

  • field_names – The partition key names. Required for DIRECTORY partitioning. Optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. Partition key field names are not required to exist in the dataset schema.

  • filesystem – Filesystem that will be used for partition path file I/O.

Returns

The new path-based partition parser.

property scheme: ray.data.datasource.partitioning.PathPartitionScheme

Returns the path partition scheme for this parser.

class ray.data.datasource.PathPartitionFilter(path_partition_parser: ray.data.datasource.partitioning.PathPartitionParser, filter_fn: Callable[[Dict[str, str]], bool])[source]

Partition filter for path-based partition formats.

Used to explicitly keep or reject files based on a custom filter function that takes partition keys and values parsed from the file’s path as input.

PublicAPI (beta): This API is in beta and may change before becoming stable.

MetadataProvider API

class ray.data.datasource.FileMetadataProvider[source]

Abstract callable that provides metadata for the files of a single dataset block.

Current subclasses:

BaseFileMetadataProvider ParquetMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

class ray.data.datasource.BaseFileMetadataProvider[source]
Abstract callable that provides metadata for FileBasedDatasource

implementations that reuse the base prepare_read method.

Also supports file and file size discovery in input directory paths.

Current subclasses:

DefaultFileMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: Optional[pyarrow.fs.FileSystem]) Tuple[List[str], List[Optional[int]]][source]

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.

class ray.data.datasource.ParquetMetadataProvider[source]

Abstract callable that provides block metadata for Arrow Parquet file fragments.

All file fragments should belong to a single dataset block.

Supports optional pre-fetching of ordered metadata for all file fragments in a single batch to help optimize metadata resolution.

Current subclasses:

DefaultParquetMetadataProvider

DeveloperAPI: This API may change across minor Ray releases.

prefetch_file_metadata(pieces: List[pyarrow.dataset.ParquetFileFragment]) Optional[List[Any]][source]

Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls to _get_block_metadata() together with their corresponding Parquet file fragments.

Implementations that don’t support pre-fetching file metadata shouldn’t override this method.

Parameters

pieces – The Parquet file fragments to fetch metadata for.

Returns

Metadata resolved for each input file fragment, or None. Metadata must be returned in the same order as all input file fragments, such that metadata[i] always contains the metadata for pieces[i].

class ray.data.datasource.DefaultFileMetadataProvider[source]

Default metadata provider for FileBasedDatasource implementations that reuse the base prepare_read method.

Calculates block size in bytes as the sum of its constituent file sizes, and assumes a fixed number of rows per file.

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: pyarrow.fs.FileSystem) Tuple[List[str], List[Optional[int]]][source]

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.

class ray.data.datasource.DefaultParquetMetadataProvider[source]

The default file metadata provider for ParquetDatasource.

Aggregates total block bytes and number of rows using the Parquet file metadata associated with a list of Arrow Parquet dataset file fragments.

DeveloperAPI: This API may change across minor Ray releases.

prefetch_file_metadata(pieces: List[pyarrow.dataset.ParquetFileFragment]) Optional[List[pyarrow.parquet.FileMetaData]][source]

Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls to _get_block_metadata() together with their corresponding Parquet file fragments.

Implementations that don’t support pre-fetching file metadata shouldn’t override this method.

Parameters

pieces – The Parquet file fragments to fetch metadata for.

Returns

Metadata resolved for each input file fragment, or None. Metadata must be returned in the same order as all input file fragments, such that metadata[i] always contains the metadata for pieces[i].

class ray.data.datasource.FastFileMetadataProvider[source]

Fast Metadata provider for FileBasedDatasource implementations.

Offers improved performance vs. DefaultFileMetadataProvider by skipping directory path expansion and file size collection. While this performance improvement may be negligible for local filesystems, it can be substantial for cloud storage service providers.

This should only be used when all input paths are known to be files.

DeveloperAPI: This API may change across minor Ray releases.

expand_paths(paths: List[str], filesystem: pyarrow.fs.FileSystem) Tuple[List[str], List[Optional[int]]][source]

Expands all paths into concrete file paths by walking directories.

Also returns a sidecar of file sizes.

The input paths must be normalized for compatibility with the input filesystem prior to invocation.

Args:
paths: A list of file and/or directory paths compatible with the

given filesystem.

filesystem: The filesystem implementation that should be used for

expanding all paths and reading their files.

Returns:

A tuple whose first item contains the list of file paths discovered, and whose second item contains the size of each file. None may be returned if a file size is either unknown or will be fetched later by _get_block_metadata(), but the length of both lists must be equal.