ray.data.read_parquet#

ray.data.read_parquet(paths: str | List[str], *, filesystem: pyarrow.fs.FileSystem | None = None, columns: List[str] | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, tensor_column_schema: Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]] | None = None, meta_provider: ParquetMetadataProvider | None = None, partition_filter: PathPartitionFilter | None = None, partitioning: Partitioning | None = Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), shuffle: Literal['files'] | FileShuffleConfig | None = None, include_paths: bool = False, file_extensions: List[str] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_parquet_args) Dataset[source]#

Creates a Dataset from parquet files.

Examples

Read a file in remote storage.

>>> import ray
>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
>>> ds.schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Read a directory in remote storage.

>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris-parquet/")

Read multiple local files.

>>> ray.data.read_parquet(
...    ["local:///path/to/file1", "local:///path/to/file2"]) 

Specify a schema for the parquet file.

>>> import pyarrow as pa
>>> fields = [("sepal.length", pa.float32()),
...           ("sepal.width", pa.float32()),
...           ("petal.length", pa.float32()),
...           ("petal.width", pa.float32()),
...           ("variety", pa.string())]
>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet",
...     schema=pa.schema(fields))
>>> ds.schema()
Column        Type
------        ----
sepal.length  float
sepal.width   float
petal.length  float
petal.width   float
variety       string

The Parquet reader also supports projection and filter pushdown, allowing column selection and row filtering to be pushed down to the file scan.

import pyarrow as pa

# Create a Dataset by reading a Parquet file, pushing column selection and
# row filtering down to the file scan.
ds = ray.data.read_parquet(
    "s3://anonymous@ray-example-data/iris.parquet",
    columns=["sepal.length", "variety"],
    filter=pa.dataset.field("sepal.length") > 5.0,
)

ds.show(2)
{'sepal.length': 5.1, 'variety': 'Setosa'}
{'sepal.length': 5.4, 'variety': 'Setosa'}

For further arguments you can pass to PyArrow as a keyword argument, see the PyArrow API reference.

Parameters:
  • paths – A single file path or directory, or a list of file paths. Multiple directories are not supported.

  • filesystem – The PyArrow filesystem implementation to read from. These filesystems are specified in the pyarrow docs. Specify this parameter if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with s3://, the S3FileSystem is used. If None, this function uses a system-chosen implementation.

  • columns – A list of column names to read. Only the specified columns are read during the file scan.

  • parallelism – This argument is deprecated. Use override_num_blocks argument.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • tensor_column_schema – A dict of column name to PyArrow dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to PyArrow tensors. This function assumes that the tensors are serialized in the raw NumPy array format in C-contiguous order (e.g., via arr.tobytes()).

  • meta_provider – A file metadata provider. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately. In most cases you do not need to set this parameter.

  • partition_filter – A PathPartitionFilter. Use with a custom callback to read only selected partitions of a dataset.

  • partitioning – A Partitioning object that describes how paths are organized. Defaults to HIVE partitioning.

  • shuffle – If setting to “files”, randomly shuffle input files order before read. If setting to FileShuffleConfig, you can pass a seed to shuffle the input files. Defaults to not shuffle with None.

  • arrow_parquet_args – Other parquet read options to pass to PyArrow. For the full set of arguments, see the PyArrow API

  • include_paths – If True, include the path to each file. File paths are stored in the 'path' column.

  • file_extensions – A list of file extensions to filter files by.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

Returns:

Dataset producing records read from the specified parquet files.