Source code for ray.data.preprocessors.normalizer

from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd

from ray.data.preprocessor import SerializablePreprocessorBase
from ray.data.preprocessors.utils import _Computed, _PublicField, migrate_private_fields
from ray.data.preprocessors.version_support import SerializablePreprocessor
from ray.util.annotations import PublicAPI


[docs] @PublicAPI(stability="alpha") @SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.normalizer") class Normalizer(SerializablePreprocessorBase): r"""Scales each sample to have unit norm. This preprocessor works by dividing each sample (i.e., row) by the sample's norm. The general formula is given by .. math:: s' = \frac{s}{\lVert s \rVert_p} where :math:`s` is the sample, :math:`s'` is the transformed sample, :math:\lVert s \rVert`, and :math:`p` is the norm type. The following norms are supported: * `"l1"` (:math:`L^1`): Sum of the absolute values. * `"l2"` (:math:`L^2`): Square root of the sum of the squared values. * `"max"` (:math:`L^\infty`): Maximum value. Examples: >>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import Normalizer >>> >>> df = pd.DataFrame({"X1": [1, 1], "X2": [1, 0], "X3": [0, 1]}) >>> ds = ray.data.from_pandas(df) # doctest: +SKIP >>> ds.to_pandas() # doctest: +SKIP X1 X2 X3 0 1 1 0 1 1 0 1 The :math:`L^2`-norm of the first sample is :math:`\sqrt{2}`, and the :math:`L^2`-norm of the second sample is :math:`1`. >>> preprocessor = Normalizer(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP X1 X2 X3 0 0.707107 0.707107 0 1 1.000000 0.000000 1 The :math:`L^1`-norm of the first sample is :math:`2`, and the :math:`L^1`-norm of the second sample is :math:`1`. >>> preprocessor = Normalizer(columns=["X1", "X2"], norm="l1") >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP X1 X2 X3 0 0.5 0.5 0 1 1.0 0.0 1 The :math:`L^\infty`-norm of the both samples is :math:`1`. >>> preprocessor = Normalizer(columns=["X1", "X2"], norm="max") >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP X1 X2 X3 0 1.0 1.0 0 1 1.0 0.0 1 :class:`Normalizer` can also be used in append mode by providing the name of the output_columns that should hold the normalized values. >>> preprocessor = Normalizer(columns=["X1", "X2"], output_columns=["X1_normalized", "X2_normalized"]) >>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP X1 X2 X3 X1_normalized X2_normalized 0 1 1 0 0.707107 0.707107 1 1 0 1 1.000000 0.000000 Args: columns: The columns to scale. For each row, these colmumns are scaled to unit-norm. norm: The norm to use. The supported values are ``"l1"``, ``"l2"``, or ``"max"``. Defaults to ``"l2"``. output_columns: The names of the transformed columns. If None, the transformed columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. Raises: ValueError: if ``norm`` is not ``"l1"``, ``"l2"``, or ``"max"``. """ _norm_fns = { "l1": lambda cols: np.abs(cols).sum(axis=1), "l2": lambda cols: np.sqrt(np.power(cols, 2).sum(axis=1)), "max": lambda cols: np.max(abs(cols), axis=1), } _is_fittable = False def __init__( self, columns: List[str], norm: str = "l2", *, output_columns: Optional[List[str]] = None, ): super().__init__() self._columns = columns self._norm = norm if norm not in self._norm_fns: raise ValueError( f"Norm {norm} is not supported." f"Supported values are: {self._norm_fns.keys()}" ) self._output_columns = ( SerializablePreprocessorBase._derive_and_validate_output_columns( columns, output_columns ) ) @property def columns(self) -> List[str]: return self._columns @property def norm(self) -> str: return self._norm @property def output_columns(self) -> List[str]: return self._output_columns def _transform_pandas(self, df: pd.DataFrame): columns = df.loc[:, self._columns] column_norms = self._norm_fns[self._norm](columns) df[self._output_columns] = columns.div(column_norms, axis=0) return df def __repr__(self): return ( f"{self.__class__.__name__}(columns={self._columns!r}, " f"norm={self._norm!r}, " f"output_columns={self._output_columns!r})" ) def _get_serializable_fields(self) -> Dict[str, Any]: return { "columns": self._columns, "norm": self._norm, "output_columns": self._output_columns, } def _set_serializable_fields(self, fields: Dict[str, Any], version: int): # required fields self._columns = fields["columns"] self._norm = fields["norm"] self._output_columns = fields["output_columns"] def __setstate__(self, state: Dict[str, Any]) -> None: super().__setstate__(state) migrate_private_fields( self, fields={ "_columns": _PublicField(public_field="columns"), "_norm": _PublicField(public_field="norm", default="l2"), "_output_columns": _PublicField( public_field="output_columns", default=_Computed(lambda obj: obj._columns), ), }, )