ray.data.read_parquet_bulk#

ray.data.read_parquet_bulk(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_file_args: Optional[Dict[str, Any]] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.FastFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.parquet'], allow_if_no_extensions=False), **arrow_parquet_args) ray.data.dataset.Dataset[source]#

Create an Arrow dataset from a large number (such as >1K) of parquet files quickly.

By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to one that supports directory expansion (e.g. DefaultFileMetadataProvider).

Offers improved performance vs. read_parquet() due to not using PyArrow’s ParquetDataset abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node.

Also supports a wider variety of input Parquet file types than read_parquet() due to not trying to merge and resolve a unified schema for all files.

However, unlike read_parquet(), this does not offer file metadata resolution by default, so a custom metadata provider should be provided if your use-case requires a unified schema, block sizes, row counts, etc.

Examples

>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk( 
...     ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk( 
...     "s3://bucket/path",
...     meta_provider=DefaultFileMetadataProvider())
Parameters
  • paths – A single file path or a list of file paths. If one or more directories are provided, then meta_provider should also be set to an implementation that supports directory expansion (e.g. DefaultFileMetadataProvider).

  • filesystem – The filesystem implementation to read from.

  • columns – A list of column names to read.

  • parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • arrow_open_file_args – kwargs passed to pyarrow.fs.FileSystem.open_input_file.

  • tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via arr.tobytes()).

  • meta_provider – File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be files. Change to DefaultFileMetadataProvider or a custom metadata provider if directory expansion and/or file metadata resolution is required.

  • partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.parquet”.

  • arrow_parquet_args – Other parquet read options to pass to pyarrow.

Returns

Dataset producing Arrow records read from the specified paths.

PublicAPI: This API is stable across Ray releases.