from typing import List, Tuple
import numpy as np
import pandas as pd
from ray.data import Dataset
from ray.data._internal.aggregate import AbsMax, Max, Mean, Min, Std
from ray.data.preprocessor import Preprocessor
from ray.util.annotations import PublicAPI
[docs]
@PublicAPI(stability="alpha")
class StandardScaler(Preprocessor):
r"""Translate and scale each column by its mean and standard deviation,
respectively.
The general formula is given by
.. math::
x' = \frac{x - \bar{x}}{s}
where :math:`x` is the column, :math:`x'` is the transformed column,
:math:`\bar{x}` is the column average, and :math:`s` is the column's sample
standard deviation. If :math:`s = 0` (i.e., the column is constant-valued),
then the transformed column will contain zeros.
.. warning::
:class:`StandardScaler` works best when your data is normal. If your data isn't
approximately normal, then the transformed features won't be meaningful.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import StandardScaler
>>>
>>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> ds.to_pandas() # doctest: +SKIP
X1 X2 X3
0 -2 -3 1
1 0 -3 1
2 2 3 1
Columns are scaled separately.
>>> preprocessor = StandardScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -1.224745 -0.707107 1
1 0.000000 -0.707107 1
2 1.224745 1.414214 1
Constant-valued columns get filled with zeros.
>>> preprocessor = StandardScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -2 -3 0.0
1 0 -3 0.0
2 2 3 0.0
Args:
columns: The columns to separately scale.
"""
def __init__(self, columns: List[str]):
self.columns = columns
def _fit(self, dataset: Dataset) -> Preprocessor:
mean_aggregates = [Mean(col) for col in self.columns]
std_aggregates = [Std(col, ddof=0) for col in self.columns]
self.stats_ = dataset.aggregate(*mean_aggregates, *std_aggregates)
return self
def _transform_pandas(self, df: pd.DataFrame):
def column_standard_scaler(s: pd.Series):
s_mean = self.stats_[f"mean({s.name})"]
s_std = self.stats_[f"std({s.name})"]
# Handle division by zero.
# TODO: extend this to handle near-zero values.
if s_std == 0:
s_std = 1
return (s - s_mean) / s_std
df.loc[:, self.columns] = df.loc[:, self.columns].transform(
column_standard_scaler
)
return df
def __repr__(self):
return f"{self.__class__.__name__}(columns={self.columns!r})"
[docs]
@PublicAPI(stability="alpha")
class MinMaxScaler(Preprocessor):
r"""Scale each column by its range.
The general formula is given by
.. math::
x' = \frac{x - \min(x)}{\max{x} - \min{x}}
where :math:`x` is the column and :math:`x'` is the transformed column. If
:math:`\max{x} - \min{x} = 0` (i.e., the column is constant-valued), then the
transformed column will get filled with zeros.
Transformed values are always in the range :math:`[0, 1]`.
.. tip::
This can be used as an alternative to :py:class:`StandardScaler`.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MinMaxScaler
>>>
>>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]}) # noqa: E501
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> ds.to_pandas() # doctest: +SKIP
X1 X2 X3
0 -2 -3 1
1 0 -3 1
2 2 3 1
Columns are scaled separately.
>>> preprocessor = MinMaxScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 0.0 0.0 1
1 0.5 0.0 1
2 1.0 1.0 1
Constant-valued columns get filled with zeros.
>>> preprocessor = MinMaxScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -2 -3 0.0
1 0 -3 0.0
2 2 3 0.0
Args:
columns: The columns to separately scale.
"""
def __init__(self, columns: List[str]):
self.columns = columns
def _fit(self, dataset: Dataset) -> Preprocessor:
aggregates = [Agg(col) for Agg in [Min, Max] for col in self.columns]
self.stats_ = dataset.aggregate(*aggregates)
return self
def _transform_pandas(self, df: pd.DataFrame):
def column_min_max_scaler(s: pd.Series):
s_min = self.stats_[f"min({s.name})"]
s_max = self.stats_[f"max({s.name})"]
diff = s_max - s_min
# Handle division by zero.
# TODO: extend this to handle near-zero values.
if diff == 0:
diff = 1
return (s - s_min) / diff
df.loc[:, self.columns] = df.loc[:, self.columns].transform(
column_min_max_scaler
)
return df
def __repr__(self):
return f"{self.__class__.__name__}(columns={self.columns!r})"
[docs]
@PublicAPI(stability="alpha")
class MaxAbsScaler(Preprocessor):
r"""Scale each column by its absolute max value.
The general formula is given by
.. math::
x' = \frac{x}{\max{\vert x \vert}}
where :math:`x` is the column and :math:`x'` is the transformed column. If
:math:`\max{\vert x \vert} = 0` (i.e., the column contains all zeros), then the
column is unmodified.
.. tip::
This is the recommended way to scale sparse data. If you data isn't sparse,
you can use :class:`MinMaxScaler` or :class:`StandardScaler` instead.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MaxAbsScaler
>>>
>>> df = pd.DataFrame({"X1": [-6, 3], "X2": [2, -4], "X3": [0, 0]}) # noqa: E501
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> ds.to_pandas() # doctest: +SKIP
X1 X2 X3
0 -6 2 0
1 3 -4 0
Columns are scaled separately.
>>> preprocessor = MaxAbsScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -1.0 0.5 0
1 0.5 -1.0 0
Zero-valued columns aren't scaled.
>>> preprocessor = MaxAbsScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -6 2 0.0
1 3 -4 0.0
Args:
columns: The columns to separately scale.
"""
def __init__(self, columns: List[str]):
self.columns = columns
def _fit(self, dataset: Dataset) -> Preprocessor:
aggregates = [AbsMax(col) for col in self.columns]
self.stats_ = dataset.aggregate(*aggregates)
return self
def _transform_pandas(self, df: pd.DataFrame):
def column_abs_max_scaler(s: pd.Series):
s_abs_max = self.stats_[f"abs_max({s.name})"]
# Handle division by zero.
# All values are 0.
if s_abs_max == 0:
s_abs_max = 1
return s / s_abs_max
df.loc[:, self.columns] = df.loc[:, self.columns].transform(
column_abs_max_scaler
)
return df
def __repr__(self):
return f"{self.__class__.__name__}(columns={self.columns!r})"
[docs]
@PublicAPI(stability="alpha")
class RobustScaler(Preprocessor):
r"""Scale and translate each column using quantiles.
The general formula is given by
.. math::
x' = \frac{x - \mu_{1/2}}{\mu_h - \mu_l}
where :math:`x` is the column, :math:`x'` is the transformed column,
:math:`\mu_{1/2}` is the column median. :math:`\mu_{h}` and :math:`\mu_{l}` are the
high and low quantiles, respectively. By default, :math:`\mu_{h}` is the third
quartile and :math:`\mu_{l}` is the first quartile.
.. tip::
This scaler works well when your data contains many outliers.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import RobustScaler
>>>
>>> df = pd.DataFrame({
... "X1": [1, 2, 3, 4, 5],
... "X2": [13, 5, 14, 2, 8],
... "X3": [1, 2, 2, 2, 3],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> ds.to_pandas() # doctest: +SKIP
X1 X2 X3
0 1 13 1
1 2 5 2
2 3 14 2
3 4 2 2
4 5 8 3
:class:`RobustScaler` separately scales each column.
>>> preprocessor = RobustScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X1 X2 X3
0 -1.0 0.625 1
1 -0.5 -0.375 2
2 0.0 0.750 2
3 0.5 -0.750 2
4 1.0 0.000 3
Args:
columns: The columns to separately scale.
quantile_range: A tuple that defines the lower and upper quantiles. Values
must be between 0 and 1. Defaults to the 1st and 3rd quartiles:
``(0.25, 0.75)``.
"""
def __init__(
self, columns: List[str], quantile_range: Tuple[float, float] = (0.25, 0.75)
):
self.columns = columns
self.quantile_range = quantile_range
def _fit(self, dataset: Dataset) -> Preprocessor:
low = self.quantile_range[0]
med = 0.50
high = self.quantile_range[1]
num_records = dataset.count()
max_index = num_records - 1
split_indices = [int(percentile * max_index) for percentile in (low, med, high)]
self.stats_ = {}
# TODO(matt): Handle case where quantile lands between 2 numbers.
# The current implementation will simply choose the closest index.
# This will affect the results of small datasets more than large datasets.
for col in self.columns:
filtered_dataset = dataset.map_batches(
lambda df: df[[col]], batch_format="pandas"
)
sorted_dataset = filtered_dataset.sort(col)
_, low, med, high = sorted_dataset.split_at_indices(split_indices)
def _get_first_value(ds: Dataset, c: str):
return ds.take(1)[0][c]
low_val = _get_first_value(low, col)
med_val = _get_first_value(med, col)
high_val = _get_first_value(high, col)
self.stats_[f"low_quantile({col})"] = low_val
self.stats_[f"median({col})"] = med_val
self.stats_[f"high_quantile({col})"] = high_val
return self
def _transform_pandas(self, df: pd.DataFrame):
def column_robust_scaler(s: pd.Series):
s_low_q = self.stats_[f"low_quantile({s.name})"]
s_median = self.stats_[f"median({s.name})"]
s_high_q = self.stats_[f"high_quantile({s.name})"]
diff = s_high_q - s_low_q
# Handle division by zero.
# Return all zeros.
if diff == 0:
return np.zeros_like(s)
return (s - s_median) / diff
df.loc[:, self.columns] = df.loc[:, self.columns].transform(
column_robust_scaler
)
return df
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"quantile_range={self.quantile_range!r})"
)