Source code for ray.data.preprocessors.discretizer

from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type, Union

import numpy as np
import pandas as pd

from ray.data.aggregate import Max, Min
from ray.data.preprocessor import SerializablePreprocessorBase
from ray.data.preprocessors.utils import (
    _Computed,
    _PublicField,
    migrate_private_fields,
)
from ray.data.preprocessors.version_support import SerializablePreprocessor
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
    from ray.data.dataset import Dataset


class _AbstractKBinsDiscretizer(SerializablePreprocessorBase):
    """Abstract base class for all KBinsDiscretizers.

    Essentially a thin wraper around ``pd.cut``.

    Expects either ``self.stats_`` or ``self.bins`` to be set and
    contain {column:list_of_bin_intervals}.
    """

    def _transform_pandas(self, df: pd.DataFrame):
        def bin_values(s: pd.Series) -> pd.Series:
            if s.name not in self.columns:
                return s
            labels = self.dtypes.get(s.name) if self.dtypes else False
            ordered = True
            if labels:
                if isinstance(labels, pd.CategoricalDtype):
                    ordered = labels.ordered
                    labels = list(labels.categories)
                else:
                    labels = False

            bins = self.stats_ if self._is_fittable else self.bins
            return pd.cut(
                s,
                bins[s.name] if isinstance(bins, dict) else bins,
                right=self.right,
                labels=labels,
                ordered=ordered,
                retbins=False,
                include_lowest=self.include_lowest,
                duplicates=self.duplicates,
            )

        binned_df = df.apply(bin_values, axis=0)
        df[self.output_columns] = binned_df[self.columns]
        return df

    def _validate_bins_columns(self):
        if isinstance(self.bins, dict) and not all(
            col in self.bins for col in self.columns
        ):
            raise ValueError(
                "If `bins` is a dictionary, all elements of `columns` must be present "
                "in it."
            )

    def __repr__(self):
        return (
            f"{self.__class__.__name__}("
            f"columns={self.columns!r}, "
            f"bins={self.bins!r}, "
            f"right={self.right!r}, "
            f"include_lowest={self.include_lowest!r}, "
            f"duplicates={self.duplicates!r}, "
            f"dtypes={self.dtypes!r}, "
            f"output_columns={self.output_columns!r})"
        )


[docs] @PublicAPI(stability="alpha") @SerializablePreprocessor( version=1, identifier="io.ray.preprocessors.custom_kbins_discretizer" ) class CustomKBinsDiscretizer(_AbstractKBinsDiscretizer): """Bin values into discrete intervals using custom bin edges. Columns must contain numerical values. Examples: Use :class:`CustomKBinsDiscretizer` to bin continuous features. >>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import CustomKBinsDiscretizer >>> df = pd.DataFrame({ ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1], ... "value_2": [10, 15, 13, 12, 23, 25], ... }) >>> ds = ray.data.from_pandas(df) >>> discretizer = CustomKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins=[0, 1, 4, 10, 25] ... ) >>> discretizer.transform(ds).to_pandas() value_1 value_2 0 0 2 1 1 3 2 1 3 3 2 3 4 2 3 5 1 3 :class:`CustomKBinsDiscretizer` can also be used in append mode by providing the name of the output_columns that should hold the encoded values. >>> discretizer = CustomKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins=[0, 1, 4, 10, 25], ... output_columns=["value_1_discretized", "value_2_discretized"] ... ) >>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP value_1 value_2 value_1_discretized value_2_discretized 0 0.2 10 0 2 1 1.4 15 1 3 2 2.5 13 1 3 3 6.2 12 2 3 4 9.7 23 2 3 5 2.1 25 1 3 You can also specify different bin edges per column. >>> discretizer = CustomKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins={"value_1": [0, 1, 4], "value_2": [0, 18, 35, 70]}, ... ) >>> discretizer.transform(ds).to_pandas() value_1 value_2 0 0.0 0 1 1.0 0 2 1.0 0 3 NaN 0 4 NaN 1 5 1.0 1 Args: columns: The columns to discretize. bins: Defines custom bin edges. Can be an iterable of numbers, a ``pd.IntervalIndex``, or a dict mapping columns to either of them. Note that ``pd.IntervalIndex`` for bins must be non-overlapping. right: Indicates whether bins include the rightmost edge. include_lowest: Indicates whether the first interval should be left-inclusive. duplicates: Can be either 'raise' or 'drop'. If bin edges are not unique, raise ``ValueError`` or drop non-uniques. dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype`` objects or ``np.integer`` types. If you don't include a column in ``dtypes`` or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use a ``pd.CategoricalDtype``, the outputted column will be a ``pd.CategoricalDtype`` with the categories being mapped to bins. You can use ``pd.CategoricalDtype(categories, ordered=True)`` to preserve information about bin order. output_columns: The names of the transformed columns. If None, the transformed columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. .. seealso:: :class:`UniformKBinsDiscretizer` If you want to bin data into uniform width bins. """ def __init__( self, columns: List[str], bins: Union[ Iterable[float], pd.IntervalIndex, Dict[str, Union[Iterable[float], pd.IntervalIndex]], ], *, right: bool = True, include_lowest: bool = False, duplicates: str = "raise", dtypes: Optional[ Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]] ] = None, output_columns: Optional[List[str]] = None, ): self._columns = columns self._bins = bins self._right = right self._include_lowest = include_lowest self._duplicates = duplicates self._dtypes = dtypes self._output_columns = ( SerializablePreprocessorBase._derive_and_validate_output_columns( columns, output_columns ) ) self._validate_bins_columns() @property def columns(self) -> List[str]: return self._columns @property def bins( self, ) -> Union[ Iterable[float], pd.IntervalIndex, Dict[str, Union[Iterable[float], pd.IntervalIndex]], ]: return self._bins @property def right(self) -> bool: return self._right @property def include_lowest(self) -> bool: return self._include_lowest @property def duplicates(self) -> str: return self._duplicates @property def dtypes( self, ) -> Optional[Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]]: return self._dtypes @property def output_columns(self) -> List[str]: return self._output_columns _is_fittable = False def _get_serializable_fields(self) -> Dict[str, Any]: return { "columns": self._columns, "bins": self._bins, "right": self._right, "include_lowest": self._include_lowest, "duplicates": self._duplicates, "dtypes": self._dtypes, "output_columns": self._output_columns, } def _set_serializable_fields(self, fields: Dict[str, Any], version: int): # required fields self._columns = fields["columns"] self._bins = fields["bins"] self._right = fields["right"] self._include_lowest = fields["include_lowest"] self._duplicates = fields["duplicates"] self._dtypes = fields["dtypes"] self._output_columns = fields["output_columns"] def __setstate__(self, state: Dict[str, Any]) -> None: super().__setstate__(state) migrate_private_fields( self, fields={ "_columns": _PublicField(public_field="columns"), "_bins": _PublicField(public_field="bins"), "_right": _PublicField(public_field="right", default=True), "_include_lowest": _PublicField( public_field="include_lowest", default=False ), "_duplicates": _PublicField(public_field="duplicates", default="raise"), "_dtypes": _PublicField(public_field="dtypes", default=None), "_output_columns": _PublicField( public_field="output_columns", default=_Computed(lambda obj: obj._columns), ), }, )
[docs] @PublicAPI(stability="alpha") @SerializablePreprocessor( version=1, identifier="io.ray.preprocessors.uniform_kbins_discretizer" ) class UniformKBinsDiscretizer(_AbstractKBinsDiscretizer): """Bin values into discrete intervals (bins) of uniform width. Columns must contain numerical values. Examples: Use :class:`UniformKBinsDiscretizer` to bin continuous features. >>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import UniformKBinsDiscretizer >>> df = pd.DataFrame({ ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1], ... "value_2": [10, 15, 13, 12, 23, 25], ... }) >>> ds = ray.data.from_pandas(df) >>> discretizer = UniformKBinsDiscretizer( ... columns=["value_1", "value_2"], bins=4 ... ) >>> discretizer.fit_transform(ds).to_pandas() value_1 value_2 0 0 0 1 0 1 2 0 0 3 2 0 4 3 3 5 0 3 :class:`UniformKBinsDiscretizer` can also be used in append mode by providing the name of the output_columns that should hold the encoded values. >>> discretizer = UniformKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins=4, ... output_columns=["value_1_discretized", "value_2_discretized"] ... ) >>> discretizer.fit_transform(ds).to_pandas() # doctest: +SKIP value_1 value_2 value_1_discretized value_2_discretized 0 0.2 10 0 0 1 1.4 15 0 1 2 2.5 13 0 0 3 6.2 12 2 0 4 9.7 23 3 3 5 2.1 25 0 3 You can also specify different number of bins per column. >>> discretizer = UniformKBinsDiscretizer( ... columns=["value_1", "value_2"], bins={"value_1": 4, "value_2": 3} ... ) >>> discretizer.fit_transform(ds).to_pandas() value_1 value_2 0 0 0 1 0 0 2 0 0 3 2 0 4 3 2 5 0 2 Args: columns: The columns to discretize. bins: Defines the number of equal-width bins. Can be either an integer (which will be applied to all columns), or a dict that maps columns to integers. The range is extended by .1% on each side to include the minimum and maximum values. right: Indicates whether bins includes the rightmost edge or not. include_lowest: Whether the first interval should be left-inclusive or not. duplicates: Can be either 'raise' or 'drop'. If bin edges are not unique, raise ``ValueError`` or drop non-uniques. dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype`` objects or ``np.integer`` types. If you don't include a column in ``dtypes`` or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use a ``pd.CategoricalDtype``, the outputted column will be a ``pd.CategoricalDtype`` with the categories being mapped to bins. You can use ``pd.CategoricalDtype(categories, ordered=True)`` to preserve information about bin order. output_columns: The names of the transformed columns. If None, the transformed columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. .. seealso:: :class:`CustomKBinsDiscretizer` If you want to specify your own bin edges. """ def __init__( self, columns: List[str], bins: Union[int, Dict[str, int]], *, right: bool = True, include_lowest: bool = False, duplicates: str = "raise", dtypes: Optional[ Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]] ] = None, output_columns: Optional[List[str]] = None, ): super().__init__() self._columns = columns self._bins = bins self._right = right self._include_lowest = include_lowest self._duplicates = duplicates self._dtypes = dtypes self._output_columns = ( SerializablePreprocessorBase._derive_and_validate_output_columns( columns, output_columns ) ) @property def columns(self) -> List[str]: return self._columns @property def bins(self) -> Union[int, Dict[str, int]]: return self._bins @property def right(self) -> bool: return self._right @property def include_lowest(self) -> bool: return self._include_lowest @property def duplicates(self) -> str: return self._duplicates @property def dtypes( self, ) -> Optional[Dict[str, Union[pd.CategoricalDtype, Type[np.integer]]]]: return self._dtypes @property def output_columns(self) -> List[str]: return self._output_columns def _fit(self, dataset: "Dataset") -> SerializablePreprocessorBase: self._validate_on_fit() if isinstance(self.bins, dict): columns = self.bins.keys() else: columns = self.columns for column in columns: bins = self.bins[column] if isinstance(self.bins, dict) else self.bins if not isinstance(bins, int): raise TypeError( f"`bins` must be an integer or a dict of integers, got {bins}" ) self._stat_computation_plan.add_aggregator( aggregator_fn=Min, columns=columns, ) self._stat_computation_plan.add_aggregator( aggregator_fn=Max, columns=columns, ) return self def _validate_on_fit(self): self._validate_bins_columns() def _fit_execute(self, dataset: "Dataset"): stats = self._stat_computation_plan.compute(dataset) self.stats_ = post_fit_processor(stats, self.bins, self.right) return self def _get_serializable_fields(self) -> Dict[str, Any]: return { "columns": self._columns, "bins": self._bins, "right": self._right, "include_lowest": self._include_lowest, "duplicates": self._duplicates, "dtypes": self._dtypes, "output_columns": self._output_columns, "_fitted": getattr(self, "_fitted", None), } def _set_serializable_fields(self, fields: Dict[str, Any], version: int): # required fields self._columns = fields["columns"] self._bins = fields["bins"] self._right = fields["right"] self._include_lowest = fields["include_lowest"] self._duplicates = fields["duplicates"] self._dtypes = fields["dtypes"] self._output_columns = fields["output_columns"] # optional fields self._fitted = fields.get("_fitted") def __setstate__(self, state: Dict[str, Any]) -> None: super().__setstate__(state) migrate_private_fields( self, fields={ "_columns": _PublicField(public_field="columns"), "_bins": _PublicField(public_field="bins"), "_right": _PublicField(public_field="right", default=True), "_include_lowest": _PublicField( public_field="include_lowest", default=False ), "_duplicates": _PublicField(public_field="duplicates", default="raise"), "_dtypes": _PublicField(public_field="dtypes", default=None), "_output_columns": _PublicField( public_field="output_columns", default=_Computed(lambda obj: obj._columns), ), }, )
def post_fit_processor(aggregate_stats: dict, bins: Union[str, Dict], right: bool): mins, maxes, stats = {}, {}, {} for key, value in aggregate_stats.items(): column_name = key[4:-1] # min(column) -> column if key.startswith("min"): mins[column_name] = value if key.startswith("max"): maxes[column_name] = value for column in mins.keys(): stats[column] = _translate_min_max_number_of_bins_to_bin_edges( mn=mins[column], mx=maxes[column], bins=bins[column] if isinstance(bins, dict) else bins, right=right, ) return stats # Copied from # https://github.com/pandas-dev/pandas/blob/v1.4.4/pandas/core/reshape/tile.py#L257 # under # BSD 3-Clause License # # Copyright (c) 2008-2011, AQR Capital Management, LLC, Lambda Foundry, Inc. # and PyData Development Team # All rights reserved. # # Copyright (c) 2011-2022, Open source contributors. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. def _translate_min_max_number_of_bins_to_bin_edges( mn: float, mx: float, bins: int, right: bool ) -> List[float]: """Translates a range and desired number of bins into list of bin edges.""" rng = (mn, mx) mn, mx = (mi + 0.0 for mi in rng) if np.isinf(mn) or np.isinf(mx): raise ValueError( "Cannot specify integer `bins` when input data contains infinity." ) elif mn == mx: # adjust end points before binning mn -= 0.001 * abs(mn) if mn != 0 else 0.001 mx += 0.001 * abs(mx) if mx != 0 else 0.001 bins = np.linspace(mn, mx, bins + 1, endpoint=True) else: # adjust end points after binning bins = np.linspace(mn, mx, bins + 1, endpoint=True) adj = (mx - mn) * 0.001 # 0.1% of the range if right: bins[0] -= adj else: bins[-1] += adj return bins # TODO(ml-team) # Add QuantileKBinsDiscretizer