Source code for ray.data.datasource.partitioning

import posixpath
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union

from ray.util.annotations import DeveloperAPI, PublicAPI

if TYPE_CHECKING:
    import pyarrow


PartitionDataType = Type[Union[int, float, str, bool]]


[docs] @DeveloperAPI class PartitionStyle(str, Enum): """Supported dataset partition styles. Inherits from `str` to simplify plain text serialization/deserialization. Examples: >>> # Serialize to JSON text. >>> json.dumps(PartitionStyle.HIVE) # doctest: +SKIP '"hive"' >>> # Deserialize from JSON text. >>> PartitionStyle(json.loads('"hive"')) # doctest: +SKIP <PartitionStyle.HIVE: 'hive'> """ HIVE = "hive" DIRECTORY = "dir"
[docs] @DeveloperAPI @dataclass class Partitioning: """Partition scheme used to describe path-based partitions. Path-based partition formats embed all partition keys and values directly in their dataset file paths. For example, to read a dataset with `Hive-style partitions <https://athena.guide/articles/hive-style-partitioning>`_: >>> import ray >>> from ray.data.datasource.partitioning import Partitioning >>> ds = ray.data.read_csv( ... "s3://anonymous@ray-example-data/iris.csv", ... partitioning=Partitioning("hive"), ... ) Instead, if your files are arranged in a directory structure such as: .. code:: root/dog/dog_0.jpeg root/dog/dog_1.jpeg ... root/cat/cat_0.jpeg root/cat/cat_1.jpeg ... Then you can use directory-based partitioning: >>> import ray >>> from ray.data.datasource.partitioning import Partitioning >>> root = "s3://anonymous@air-example-data/cifar-10/images" >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root) >>> ds = ray.data.read_images(root, partitioning=partitioning) """ #: The partition style - may be either HIVE or DIRECTORY. style: PartitionStyle #: "/"-delimited base directory that all partitioned paths should #: exist under (exclusive). File paths either outside of, or at the first #: level of, this directory will be considered unpartitioned. Specify #: `None` or an empty string to search for partitions in all file path #: directories. base_dir: Optional[str] = None #: The partition key field names (i.e. column names for tabular #: datasets). When non-empty, the order and length of partition key #: field names must match the order and length of partition values. #: Required when parsing DIRECTORY partitioned paths or generating #: HIVE partitioned paths. field_names: Optional[List[str]] = None #: A dictionary that maps partition key names to their desired data type. If not #: provided, the data type defaults to string. field_types: Optional[Dict[str, PartitionDataType]] = None #: Filesystem that will be used for partition path file I/O. filesystem: Optional["pyarrow.fs.FileSystem"] = None def __post_init__(self): if self.base_dir is None: self.base_dir = "" if self.field_types is None: self.field_types = {} self._normalized_base_dir = None self._resolved_filesystem = None @property def normalized_base_dir(self) -> str: """Returns the base directory normalized for compatibility with a filesystem.""" if self._normalized_base_dir is None: self._normalize_base_dir() return self._normalized_base_dir @property def resolved_filesystem(self) -> "pyarrow.fs.FileSystem": """Returns the filesystem resolved for compatibility with a base directory.""" if self._resolved_filesystem is None: self._normalize_base_dir() return self._resolved_filesystem def _normalize_base_dir(self): """Normalizes the partition base directory for compatibility with the given filesystem. This should be called once a filesystem has been resolved to ensure that this base directory is correctly discovered at the root of all partitioned file paths. """ from ray.data.datasource.path_util import _resolve_paths_and_filesystem paths, self._resolved_filesystem = _resolve_paths_and_filesystem( self.base_dir, self.filesystem, ) assert ( len(paths) == 1 ), f"Expected 1 normalized base directory, but found {len(paths)}" normalized_base_dir = paths[0] if len(normalized_base_dir) and not normalized_base_dir.endswith("/"): normalized_base_dir += "/" self._normalized_base_dir = normalized_base_dir
[docs] @DeveloperAPI class PathPartitionParser: """Partition parser for path-based partition formats. Path-based partition formats embed all partition keys and values directly in their dataset file paths. Two path partition formats are currently supported - `HIVE` and `DIRECTORY`. For `HIVE` Partitioning, all partition directories under the base directory will be discovered based on `{key1}={value1}/{key2}={value2}` naming conventions. Key/value pairs do not need to be presented in the same order across all paths. Directory names nested under the base directory that don't follow this naming condition will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file. For `DIRECTORY` Partitioning, all directories under the base directory will be interpreted as partition values of the form `{value1}/{value2}`. An accompanying ordered list of partition field names must also be provided, where the order and length of all partition values must match the order and length of field names. Files stored directly in the base directory will be considered unpartitioned. If a partition filter is defined, then it will be called with an empty input dictionary for each unpartitioned file. For example, if the base directory is `"foo"`, then `"foo.csv"` and `"foo/bar.csv"` would be considered unpartitioned files but `"foo/bar/baz.csv"` would be associated with partition `"bar"`. If the base directory is undefined, then `"foo.csv"` would be unpartitioned, `"foo/bar.csv"` would be associated with partition `"foo"`, and "foo/bar/baz.csv" would be associated with partition `("foo", "bar")`. """
[docs] @staticmethod def of( style: PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, field_types: Optional[Dict[str, PartitionDataType]] = None, filesystem: Optional["pyarrow.fs.FileSystem"] = None, ) -> "PathPartitionParser": """Creates a path-based partition parser using a flattened argument list. Args: style: The partition style - may be either HIVE or DIRECTORY. base_dir: "/"-delimited base directory to start searching for partitions (exclusive). File paths outside of this directory will be considered unpartitioned. Specify `None` or an empty string to search for partitions in all file path directories. field_names: The partition key names. Required for DIRECTORY partitioning. Optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. Partition key field names are not required to exist in the dataset schema. field_types: A dictionary that maps partition key names to their desired data type. If not provided, the data type default to string. filesystem: Filesystem that will be used for partition path file I/O. Returns: The new path-based partition parser. """ scheme = Partitioning(style, base_dir, field_names, field_types, filesystem) return PathPartitionParser(scheme)
[docs] def __init__(self, partitioning: Partitioning): """Creates a path-based partition parser. Args: partitioning: The path-based partition scheme. The parser starts searching for partitions from this scheme's base directory. File paths outside the base directory will be considered unpartitioned. If the base directory is `None` or an empty string then this will search for partitions in all file path directories. Field names are required for DIRECTORY partitioning, and optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. """ style = partitioning.style field_names = partitioning.field_names if style == PartitionStyle.DIRECTORY and not field_names: raise ValueError( "Directory partitioning requires a corresponding list of " "partition key field names. Please retry your request with one " "or more field names specified." ) parsers = { PartitionStyle.HIVE: self._parse_hive_path, PartitionStyle.DIRECTORY: self._parse_dir_path, } self._parser_fn: Callable[[str], Dict[str, str]] = parsers.get(style) if self._parser_fn is None: raise ValueError( f"Unsupported partition style: {style}. " f"Supported styles: {parsers.keys()}" ) self._scheme = partitioning
def __call__(self, path: str) -> Dict[str, str]: """Parses partition keys and values from a single file path. Args: path: Input file path to parse. Returns: Dictionary mapping directory partition keys to values from the input file path. Returns an empty dictionary for unpartitioned files. """ dir_path = self._dir_path_trim_base(path) if dir_path is None: return {} partitions: Dict[str, str] = self._parser_fn(dir_path) for field, data_type in self._scheme.field_types.items(): partitions[field] = _cast_value(partitions[field], data_type) return partitions @property def scheme(self) -> Partitioning: """Returns the partitioning for this parser.""" return self._scheme def _dir_path_trim_base(self, path: str) -> Optional[str]: """Trims the normalized base directory and returns the directory path. Returns None if the path does not start with the normalized base directory. Simply returns the directory path if the base directory is undefined. """ if not path.startswith(self._scheme.normalized_base_dir): return None path = path[len(self._scheme.normalized_base_dir) :] return posixpath.dirname(path) def _parse_hive_path(self, dir_path: str) -> Dict[str, str]: """Hive partition path parser. Returns a dictionary mapping partition keys to values given a hive-style partition path of the form "{key1}={value1}/{key2}={value2}/..." or an empty dictionary for unpartitioned files. """ dirs = [d for d in dir_path.split("/") if d and (d.count("=") == 1)] kv_pairs = [d.split("=") for d in dirs] if dirs else [] field_names = self._scheme.field_names if field_names and kv_pairs: if len(kv_pairs) != len(field_names): raise ValueError( f"Expected {len(field_names)} partition value(s) but found " f"{len(kv_pairs)}: {kv_pairs}." ) for i, field_name in enumerate(field_names): if kv_pairs[i][0] != field_name: raise ValueError( f"Expected partition key {field_name} but found " f"{kv_pairs[i][0]}" ) return dict(kv_pairs) def _parse_dir_path(self, dir_path: str) -> Dict[str, str]: """Directory partition path parser. Returns a dictionary mapping directory partition keys to values from a partition path of the form "{value1}/{value2}/..." or an empty dictionary for unpartitioned files. Requires a corresponding ordered list of partition key field names to map the correct key to each value. """ dirs = [d for d in dir_path.split("/") if d] field_names = self._scheme.field_names if dirs and len(dirs) != len(field_names): raise ValueError( f"Expected {len(field_names)} partition value(s) but found " f"{len(dirs)}: {dirs}." ) if not dirs: return {} return { field: directory for field, directory in zip(field_names, dirs) if field is not None }
[docs] @PublicAPI(stability="beta") class PathPartitionFilter: """Partition filter for path-based partition formats. Used to explicitly keep or reject files based on a custom filter function that takes partition keys and values parsed from the file's path as input. """
[docs] @staticmethod def of( filter_fn: Callable[[Dict[str, str]], bool], style: PartitionStyle = PartitionStyle.HIVE, base_dir: Optional[str] = None, field_names: Optional[List[str]] = None, field_types: Optional[Dict[str, PartitionDataType]] = None, filesystem: Optional["pyarrow.fs.FileSystem"] = None, ) -> "PathPartitionFilter": """Creates a path-based partition filter using a flattened argument list. Args: filter_fn: Callback used to filter partitions. Takes a dictionary mapping partition keys to values as input. Unpartitioned files are denoted with an empty input dictionary. Returns `True` to read a file for that partition or `False` to skip it. Partition keys and values are always strings read from the filesystem path. For example, this removes all unpartitioned files: .. code:: python lambda d: True if d else False This raises an assertion error for any unpartitioned file found: .. code:: python def do_assert(val, msg): assert val, msg lambda d: do_assert(d, "Expected all files to be partitioned!") And this only reads files from January, 2022 partitions: .. code:: python lambda d: d["month"] == "January" and d["year"] == "2022" style: The partition style - may be either HIVE or DIRECTORY. base_dir: "/"-delimited base directory to start searching for partitions (exclusive). File paths outside of this directory will be considered unpartitioned. Specify `None` or an empty string to search for partitions in all file path directories. field_names: The partition key names. Required for DIRECTORY partitioning. Optional for HIVE partitioning. When non-empty, the order and length of partition key field names must match the order and length of partition directories discovered. Partition key field names are not required to exist in the dataset schema. field_types: A dictionary that maps partition key names to their desired data type. If not provided, the data type defaults to string. filesystem: Filesystem that will be used for partition path file I/O. Returns: The new path-based partition filter. """ scheme = Partitioning(style, base_dir, field_names, field_types, filesystem) path_partition_parser = PathPartitionParser(scheme) return PathPartitionFilter(path_partition_parser, filter_fn)
[docs] def __init__( self, path_partition_parser: PathPartitionParser, filter_fn: Callable[[Dict[str, str]], bool], ): """Creates a new path-based partition filter based on a parser. Args: path_partition_parser: The path-based partition parser. filter_fn: Callback used to filter partitions. Takes a dictionary mapping partition keys to values as input. Unpartitioned files are denoted with an empty input dictionary. Returns `True` to read a file for that partition or `False` to skip it. Partition keys and values are always strings read from the filesystem path. For example, this removes all unpartitioned files: ``lambda d: True if d else False`` This raises an assertion error for any unpartitioned file found: ``lambda d: assert d, "Expected all files to be partitioned!"`` And this only reads files from January, 2022 partitions: ``lambda d: d["month"] == "January" and d["year"] == "2022"`` """ self._parser = path_partition_parser self._filter_fn = filter_fn
def __call__(self, paths: List[str]) -> List[str]: """Returns all paths that pass this partition scheme's partition filter. If no partition filter is set, then returns all input paths. If a base directory is set, then only paths under this base directory will be parsed for partitions. All paths outside of this base directory will automatically be considered unpartitioned, and passed into the filter function as empty dictionaries. Also normalizes the partition base directory for compatibility with the given filesystem before applying the filter. Args: paths: Paths to pass through the partition filter function. All paths should be normalized for compatibility with the given filesystem. Returns: List of paths that pass the partition filter, or all paths if no partition filter is defined. """ filtered_paths = paths if self._filter_fn is not None: filtered_paths = [ path for path in paths if self._filter_fn(self._parser(path)) ] return filtered_paths @property def parser(self) -> PathPartitionParser: """Returns the path partition parser for this filter.""" return self._parser
def _cast_value(value: str, data_type: PartitionDataType) -> Any: if data_type is int: return int(value) elif data_type is float: return float(value) elif data_type is bool: return value.lower() == "true" else: return value