Source code for ray.data.preprocessors.concatenator

import logging

from typing import List, Optional, Union
import numpy as np
import pandas as pd

from ray.data.preprocessor import Preprocessor
from ray.util.annotations import PublicAPI


logger = logging.getLogger(__name__)


[docs]@PublicAPI(stability="alpha") class Concatenator(Preprocessor): """Combine numeric columns into a column of type :class:`~ray.air.util.tensor_extensions.pandas.TensorDtype`. This preprocessor concatenates numeric columns and stores the result in a new column. The new column contains :class:`~ray.air.util.tensor_extensions.pandas.TensorArrayElement` objects of shape :math:`(m,)`, where :math:`m` is the number of columns concatenated. The :math:`m` concatenated columns are dropped after concatenation. Examples: >>> import numpy as np >>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import Concatenator :py:class:`Concatenator` combines numeric columns into a column of :py:class:`~ray.air.util.tensor_extensions.pandas.TensorDtype`. >>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9]}) >>> ds = ray.data.from_pandas(df) # doctest: +SKIP >>> concatenator = Concatenator() >>> concatenator.fit_transform(ds).to_pandas() # doctest: +SKIP concat_out 0 [0.0, 0.5] 1 [3.0, 0.2] 2 [1.0, 0.9] By default, the created column is called `"concat_out"`, but you can specify a different name. >>> concatenator = Concatenator(output_column_name="tensor") >>> concatenator.fit_transform(ds).to_pandas() # doctest: +SKIP tensor 0 [0.0, 0.5] 1 [3.0, 0.2] 2 [1.0, 0.9] Sometimes, you might not want to concatenate all of of the columns in your dataset. In this case, you can exclude columns with the ``exclude`` parameter. >>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9], "Y": ["blue", "orange", "blue"]}) >>> ds = ray.data.from_pandas(df) # doctest: +SKIP >>> concatenator = Concatenator(exclude=["Y"]) >>> concatenator.fit_transform(ds).to_pandas() # doctest: +SKIP Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9] Alternatively, you can specify which columns to concatenate with the ``include`` parameter. >>> concatenator = Concatenator(include=["X0", "X1"]) >>> concatenator.fit_transform(ds).to_pandas() # doctest: +SKIP Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9] Note that if a column is in both ``include`` and ``exclude``, the column is excluded. >>> concatenator = Concatenator(include=["X0", "X1", "Y"], exclude=["Y"]) >>> concatenator.fit_transform(ds).to_pandas() # doctest: +SKIP Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9] By default, the concatenated tensor is a ``dtype`` common to the input columns. However, you can also explicitly set the ``dtype`` with the ``dtype`` parameter. >>> concatenator = Concatenator(include=["X0", "X1"], dtype=np.float32) >>> concatenator.fit_transform(ds) # doctest: +SKIP Dataset(num_blocks=1, num_rows=3, schema={Y: object, concat_out: TensorDtype(shape=(2,), dtype=float32)}) Args: output_column_name: The desired name for the new column. Defaults to ``"concat_out"``. include: A list of columns to concatenate. If ``None``, all columns are concatenated. exclude: A list of column to exclude from concatenation. If a column is in both ``include`` and ``exclude``, the column is excluded from concatenation. dtype: The ``dtype`` to convert the output tensors to. If unspecified, the ``dtype`` is determined by standard coercion rules. raise_if_missing: If ``True``, an error is raised if any of the columns in ``include`` or ``exclude`` don't exist. Defaults to ``False``. Raises: ValueError: if `raise_if_missing` is `True` and a column in `include` or `exclude` doesn't exist in the dataset. """ # noqa: E501 _is_fittable = False def __init__( self, output_column_name: str = "concat_out", include: Optional[List[str]] = None, exclude: Optional[Union[str, List[str]]] = None, dtype: Optional[np.dtype] = None, raise_if_missing: bool = False, ): if isinstance(include, str): include = [include] if isinstance(exclude, str): exclude = [exclude] self.output_column_name = output_column_name self.include = include self.exclude = exclude or [] self.dtype = dtype self.raise_if_missing = raise_if_missing def _validate(self, df: pd.DataFrame) -> None: for parameter in "include", "exclude": columns = getattr(self, parameter) if columns is None: continue missing_columns = set(columns) - set(df) if not missing_columns: continue message = f"Missing columns specified in '{parameter}': {missing_columns}" if self.raise_if_missing: raise ValueError(message) else: logger.warning(message) def _transform_pandas(self, df: pd.DataFrame): self._validate(df) included_columns = set(df) if self.include: # subset of included columns included_columns = set(self.include) columns_to_concat = list(included_columns - set(self.exclude)) ordered_columns_to_concat = [ col for col in df.columns if col in columns_to_concat ] concatenated = df[ordered_columns_to_concat].to_numpy(dtype=self.dtype) df = df.drop(columns=columns_to_concat) # Use a Pandas Series for column assignment to get more consistent # behavior across Pandas versions. df.loc[:, self.output_column_name] = pd.Series(list(concatenated)) return df def __repr__(self): default_values = { "output_column_name": "concat_out", "include": None, "exclude": [], "dtype": None, "raise_if_missing": False, } non_default_arguments = [] for parameter, default_value in default_values.items(): value = getattr(self, parameter) if value != default_value: non_default_arguments.append(f"{parameter}={value}") return f"{self.__class__.__name__}({', '.join(non_default_arguments)})"