Source code for ray.data.preprocessors.chain
from typing import TYPE_CHECKING, Optional
from ray.air.util.data_batch_conversion import BatchFormat
from ray.data import Dataset
from ray.data.preprocessor import Preprocessor
if TYPE_CHECKING:
from ray.air.data_batch_type import DataBatchType
[docs]
class Chain(Preprocessor):
"""Combine multiple preprocessors into a single :py:class:`Preprocessor`.
When you call ``fit``, each preprocessor is fit on the dataset produced by the
preceeding preprocessor's ``fit_transform``.
Example:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import *
>>>
>>> df = pd.DataFrame({
... "X0": [0, 1, 2],
... "X1": [3, 4, 5],
... "Y": ["orange", "blue", "orange"],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>>
>>> preprocessor = Chain(
... StandardScaler(columns=["X0", "X1"]),
... Concatenator(columns=["X0", "X1"], output_column_name="X"),
... LabelEncoder(label_column="Y")
... )
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
Y X
0 1 [-1.224744871391589, -1.224744871391589]
1 0 [0.0, 0.0]
2 1 [1.224744871391589, 1.224744871391589]
Args:
preprocessors: The preprocessors to sequentially compose.
"""
def fit_status(self):
fittable_count = 0
fitted_count = 0
for p in self.preprocessors:
if p.fit_status() == Preprocessor.FitStatus.FITTED:
fittable_count += 1
fitted_count += 1
elif p.fit_status() in (
Preprocessor.FitStatus.NOT_FITTED,
Preprocessor.FitStatus.PARTIALLY_FITTED,
):
fittable_count += 1
else:
assert p.fit_status() == Preprocessor.FitStatus.NOT_FITTABLE
if fittable_count > 0:
if fitted_count == fittable_count:
return Preprocessor.FitStatus.FITTED
elif fitted_count > 0:
return Preprocessor.FitStatus.PARTIALLY_FITTED
else:
return Preprocessor.FitStatus.NOT_FITTED
else:
return Preprocessor.FitStatus.NOT_FITTABLE
def __init__(self, *preprocessors: Preprocessor):
self.preprocessors = preprocessors
def _fit(self, ds: Dataset) -> Preprocessor:
for preprocessor in self.preprocessors[:-1]:
ds = preprocessor.fit_transform(ds)
self.preprocessors[-1].fit(ds)
return self
def fit_transform(self, ds: Dataset) -> Dataset:
for preprocessor in self.preprocessors:
ds = preprocessor.fit_transform(ds)
return ds
def _transform(
self,
ds: Dataset,
batch_size: Optional[int],
num_cpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[int] = None,
) -> Dataset:
for preprocessor in self.preprocessors:
ds = preprocessor.transform(
ds,
batch_size=batch_size,
num_cpus=num_cpus,
memory=memory,
concurrency=concurrency,
)
return ds
def _transform_batch(self, df: "DataBatchType") -> "DataBatchType":
for preprocessor in self.preprocessors:
df = preprocessor.transform_batch(df)
return df
def __repr__(self):
arguments = ", ".join(repr(preprocessor) for preprocessor in self.preprocessors)
return f"{self.__class__.__name__}({arguments})"
def _determine_transform_to_use(self) -> BatchFormat:
# This is relevant for BatchPrediction.
# For Chain preprocessor, we picked the first one as entry point.
# TODO (jiaodong): We should revisit if our Chain preprocessor is
# still optimal with context of lazy execution.
return self.preprocessors[0]._determine_transform_to_use()