ray.data.read_parquet_bulk
ray.data.read_parquet_bulk#
- ray.data.read_parquet_bulk(paths: Union[str, List[str]], *, filesystem: Optional[pyarrow.fs.FileSystem] = None, columns: Optional[List[str]] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_file_args: Optional[Dict[str, Any]] = None, tensor_column_schema: Optional[Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]]] = None, meta_provider: ray.data.datasource.file_meta_provider.BaseFileMetadataProvider = <ray.data.datasource.file_meta_provider.FastFileMetadataProvider object>, partition_filter: Optional[ray.data.datasource.partitioning.PathPartitionFilter] = FileExtensionFilter(extensions=['.parquet'], allow_if_no_extensions=False), **arrow_parquet_args) ray.data.dataset.Dataset [source]#
Create an Arrow dataset from a large number (such as >1K) of parquet files quickly.
By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to one that supports directory expansion (e.g.
DefaultFileMetadataProvider
).Offers improved performance vs.
read_parquet()
due to not using PyArrow’sParquetDataset
abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node.Also supports a wider variety of input Parquet file types than
read_parquet()
due to not trying to merge and resolve a unified schema for all files.However, unlike
read_parquet()
, this does not offer file metadata resolution by default, so a custom metadata provider should be provided if your use-case requires a unified schema, block sizes, row counts, etc.Examples
>>> # Read multiple local files. You should always provide only input file >>> # paths (i.e. no directory paths) when known to minimize read latency. >>> ray.data.read_parquet_bulk( ... ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken >>> # when providing directory paths, since the time to both check each path >>> # type and expand its contents may result in greatly increased latency >>> # and/or request rate throttling from cloud storage service providers. >>> ray.data.read_parquet_bulk( ... "s3://bucket/path", ... meta_provider=DefaultFileMetadataProvider())
- Parameters
paths – A single file path or a list of file paths. If one or more directories are provided, then
meta_provider
should also be set to an implementation that supports directory expansion (e.g.DefaultFileMetadataProvider
).filesystem – The filesystem implementation to read from.
columns – A list of column names to read.
parallelism – The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset.
ray_remote_args – kwargs passed to ray.remote in the read tasks.
arrow_open_file_args – kwargs passed to
pyarrow.fs.FileSystem.open_input_file
.tensor_column_schema – A dict of column name –> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via
arr.tobytes()
).meta_provider – File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be files. Change to
DefaultFileMetadataProvider
or a custom metadata provider if directory expansion and/or file metadata resolution is required.partition_filter – Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match “.parquet”.
arrow_parquet_args – Other parquet read options to pass to pyarrow.
- Returns
Dataset producing Arrow records read from the specified paths.
PublicAPI: This API is stable across Ray releases.