import logging
from collections import Counter
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Hashable,
List,
Optional,
Set,
Tuple,
Union,
)
import numpy as np
import pandas as pd
import pandas.api.types
import pyarrow as pa
import pyarrow.compute as pc
from ray.data._internal.util import is_null
from ray.data.block import BlockAccessor
from ray.data.preprocessor import (
Preprocessor,
PreprocessorNotFittedException,
SerializablePreprocessorBase,
)
from ray.data.preprocessors.utils import (
make_post_processor,
)
from ray.data.preprocessors.version_support import SerializablePreprocessor
from ray.data.util.data_batch_conversion import BatchFormat
from ray.util.annotations import DeveloperAPI, PublicAPI
if TYPE_CHECKING:
from ray.data.dataset import Dataset
logger = logging.getLogger(__name__)
def _get_unique_value_arrow_arrays(
stats: Dict[str, Any], input_col: str
) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values from encoder stats.
Args:
stats: The encoder's stats_ dictionary.
input_col: The name of the column to get arrays for.
Returns:
Tuple of (keys_array, values_array) for the column's ordinal mapping.
"""
stat_value = stats[f"unique_values({input_col})"]
if isinstance(stat_value, dict):
# Stats are in pandas dict format - convert to Arrow format
sorted_keys = sorted(stat_value.keys())
keys_array = pa.array(sorted_keys)
values_array = pa.array([stat_value[k] for k in sorted_keys], type=pa.int64())
else:
# Stats are in Arrow tuple format: (keys_array, values_array)
keys_array, values_array = stat_value
return keys_array, values_array
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.ordinal_encoder")
class OrdinalEncoder(SerializablePreprocessorBase):
r"""Encode values within columns as ordered integer values.
:class:`OrdinalEncoder` encodes categorical features as integers that range from
:math:`0` to :math:`n - 1`, where :math:`n` is the number of categories.
If you transform a value that isn't in the fitted datset, then the value is encoded
as ``float("nan")``.
Columns must contain either hashable values or lists of hashable values. Also, you
can't have both scalars and lists in the same column.
Examples:
Use :class:`OrdinalEncoder` to encode categorical features as integers.
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OrdinalEncoder
>>> df = pd.DataFrame({
... "sex": ["male", "female", "male", "female"],
... "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OrdinalEncoder(columns=["sex", "level"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level
0 1 1
1 0 2
2 1 0
3 0 1
:class:`OrdinalEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OrdinalEncoder(columns=["sex", "level"], output_columns=["sex_encoded", "level_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level sex_encoded level_encoded
0 male L4 1 1
1 female L5 0 2
2 male L3 1 0
3 female L4 0 1
If you transform a value not present in the original dataset, then the value
is encoded as ``float("nan")``.
>>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(ds).to_pandas() # doctest: +SKIP
sex level
0 0 NaN
:class:`OrdinalEncoder` can also encode categories in a list.
>>> df = pd.DataFrame({
... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
... "genre": [
... ["comedy", "action", "sports"],
... ["animation", "comedy", "action"],
... ["documentary"],
... ],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OrdinalEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [2, 0, 4]
1 Moana [1, 2, 0]
2 The Smartest Guys in the Room [3]
Args:
columns: The columns to separately encode.
encode_lists: If ``True``, encode list elements. If ``False``, encode
whole lists (i.e., replace each list with an integer). ``True``
by default.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`OneHotEncoder`
Another preprocessor that encodes categorical data.
"""
def __init__(
self,
columns: List[str],
*,
encode_lists: bool = True,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: allow user to specify order of values within each column.
self.columns = columns
self.encode_lists = encode_lists
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=self.encode_lists,
key_gen=key_gen,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def _get_ordinal_map(self, column_name: str) -> Dict[Any, int]:
"""Get the ordinal mapping for a column as a dict.
Stats can be stored in either:
- Dict format: {value: index} (from pandas-style processing)
- Arrow format: (keys_array, values_array) tuple
This method returns a dict in either case.
"""
stat_value = self.stats_[f"unique_values({column_name})"]
if isinstance(stat_value, dict):
return stat_value
# Arrow tuple format (keys_array, values_array)
keys_array, values_array = stat_value
return {k.as_py(): v.as_py() for k, v in zip(keys_array, values_array)}
def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values."""
return _get_unique_value_arrow_arrays(self.stats_, input_col)
def _encode_list_element(self, element: list, *, column_name: str):
ordinal_map = self._get_ordinal_map(column_name)
# If encoding lists, entire column is flattened, hence we map individual
# elements inside the list element (of the column)
if self.encode_lists:
return [ordinal_map.get(x) for x in element]
return ordinal_map.get(tuple(element))
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
def column_ordinal_encoder(s: pd.Series):
if _is_series_composed_of_lists(s):
return s.map(
lambda elem: self._encode_list_element(elem, column_name=s.name)
)
s_values = self._get_ordinal_map(s.name)
return s.map(s_values)
df[self.output_columns] = df[self.columns].apply(column_ordinal_encoder)
return df
def _transform_arrow(self, table: pa.Table) -> pa.Table:
"""Transform using fast native PyArrow operations for scalar columns.
List-type columns are preferably handled by _transform_pandas, which is selected
via _determine_transform_to_use when a PyArrow schema is available. However,
for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
until runtime, so we fall back to pandas here if list columns are found.
"""
# Validate that columns don't contain null values (consistent with pandas path)
_validate_arrow(table, *self.columns)
# Check for list columns (runtime fallback for PandasBlockSchema datasets)
for col_name in self.columns:
col_type = table.schema.field(col_name).type
if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
# Fall back to pandas transform for list columns
df = table.to_pandas()
result_df = self._transform_pandas(df)
return pa.Table.from_pandas(result_df, preserve_index=False)
for input_col, output_col in zip(self.columns, self.output_columns):
column = table.column(input_col)
encoded_column = self._encode_column_vectorized(column, input_col)
table = BlockAccessor.for_block(table).upsert_column(
output_col, encoded_column
)
return table
def _encode_column_vectorized(
self, column: pa.ChunkedArray, input_col: str
) -> pa.Array:
"""Encode column using PyArrow's vectorized pc.index_in.
Unseen categories are encoded as null in the output, which becomes NaN
when converted to pandas. Null values should be validated before calling
this method via _validate_arrow.
"""
keys_array, values_array = self._get_arrow_arrays(input_col)
if keys_array.type != column.type:
keys_array = pc.cast(keys_array, column.type)
# pc.index_in returns null for values not found in keys_array
# (including null input values and unseen categories)
indices = pc.index_in(column, keys_array)
# pc.take preserves nulls from indices, so null inputs -> null outputs
return pc.take(values_array, indices)
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"encode_lists": self.encode_lists,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.encode_lists = fields["encode_lists"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"encode_lists={self.encode_lists!r}, "
f"output_columns={self.output_columns!r})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.one_hot_encoder")
class OneHotEncoder(SerializablePreprocessorBase):
r"""`One-hot encode <https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics>`_
categorical data.
This preprocessor transforms each specified column into a one-hot encoded vector.
Each element in the vector corresponds to a unique category in the column, with a
value of 1 if the category matches and 0 otherwise.
If a category is infrequent (based on ``max_categories``) or not present in the
fitted dataset, it is encoded as all 0s.
Columns must contain hashable objects or lists of hashable objects.
.. note::
Lists are treated as categories. If you want to encode individual list
elements, use :class:`MultiHotEncoder`.
Example:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OneHotEncoder
>>>
>>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OneHotEncoder(columns=["color"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color
0 [0, 0, 1]
1 [0, 1, 0]
2 [0, 0, 1]
3 [0, 0, 1]
4 [1, 0, 0]
5 [0, 1, 0]
OneHotEncoder can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OneHotEncoder(columns=["color"], output_columns=["color_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color color_encoded
0 red [0, 0, 1]
1 green [0, 1, 0]
2 red [0, 0, 1]
3 red [0, 0, 1]
4 blue [1, 0, 0]
5 green [0, 1, 0]
If you one-hot encode a value that isn't in the fitted dataset, then the
value is encoded with zeros.
>>> df = pd.DataFrame({"color": ["yellow"]})
>>> batch = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(batch).to_pandas() # doctest: +SKIP
color color_encoded
0 yellow [0, 0, 0]
Likewise, if you one-hot encode an infrequent value, then the value is encoded
with zeros.
>>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2})
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color
0 [1, 0]
1 [0, 1]
2 [1, 0]
3 [1, 0]
4 [0, 0]
5 [0, 1]
Args:
columns: The columns to separately encode.
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`MultiHotEncoder`
If you want to encode individual list elements, use
:class:`MultiHotEncoder`.
:class:`OrdinalEncoder`
If your categories are ordered, you may want to use
:class:`OrdinalEncoder`.
""" # noqa: E501
def __init__(
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories or {}
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=False,
key_gen=key_gen,
max_categories=self.max_categories,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def safe_get(self, v: Any, stats: Dict[str, int]):
if isinstance(v, (list, np.ndarray)):
v = tuple(v)
if isinstance(v, Hashable):
return stats.get(v, -1)
else:
return -1 # Unhashable type treated as a missing category
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
# Compute new one-hot encoded columns
for column, output_column in zip(self.columns, self.output_columns):
stats = self.stats_[f"unique_values({column})"]
num_categories = len(stats)
one_hot = np.zeros((len(df), num_categories), dtype=np.uint8)
# Integer indices for each category in the column
codes = df[column].apply(lambda v: self.safe_get(v, stats)).to_numpy()
# Filter to only the rows that have a valid category
valid_category_mask = codes != -1
# Dimension should be (num_rows, ) - 1D boolean array
non_zero_indices = np.nonzero(valid_category_mask)[0]
# Mark the corresponding categories as 1
one_hot[
non_zero_indices,
codes[valid_category_mask],
] = 1
df[output_column] = one_hot.tolist()
return df
def _transform_arrow(self, table: pa.Table) -> pa.Table:
"""Transform using fast native PyArrow operations for scalar columns.
List-type columns are preferably handled by _transform_pandas, which is selected
via _determine_transform_to_use when a PyArrow schema is available. However,
for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
until runtime, so we fall back to pandas here if list columns are found.
"""
# Validate that columns don't contain null values (consistent with pandas path)
_validate_arrow(table, *self.columns)
# Check for list columns (runtime fallback for PandasBlockSchema datasets)
for col_name in self.columns:
col_type = table.schema.field(col_name).type
if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
# Fall back to pandas transform for list columns
df = table.to_pandas()
result_df = self._transform_pandas(df)
return pa.Table.from_pandas(result_df, preserve_index=False)
for input_col, output_col in zip(self.columns, self.output_columns):
column = table.column(input_col)
encoded_column = self._encode_column_one_hot(column, input_col)
table = BlockAccessor.for_block(table).upsert_column(
output_col, encoded_column
)
return table
def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values."""
return _get_unique_value_arrow_arrays(self.stats_, input_col)
def _encode_column_one_hot(
self, column: pa.ChunkedArray, input_col: str
) -> pa.FixedSizeListArray:
"""Encode a column to one-hot vectors using Arrow arrays.
Unseen categories are encoded as all-zeros vectors, matching the pandas
behavior. Null values should be validated before calling this method
via _validate_arrow.
"""
keys_array, _ = self._get_arrow_arrays(input_col)
num_categories = len(keys_array)
# Cast keys to match column type if needed
if keys_array.type != column.type:
keys_array = pc.cast(keys_array, column.type)
# Use pc.index_in to find position of each value in keys_array
# Returns null for null inputs and unseen categories (values not in keys_array)
indices = pc.index_in(column, keys_array)
# Fill nulls with -1 so they can be filtered out below (resulting in all-zeros)
indices_filled = pc.fill_null(indices, -1)
# Create one-hot encoded matrix using vectorized NumPy operations
num_rows = len(column)
indices_np = indices_filled.to_numpy()
one_hot_matrix = np.zeros((num_rows, num_categories), dtype=np.uint8)
# Find valid indices (not -1) and set 1s at the appropriate positions
valid_mask = indices_np != -1
valid_indices = np.nonzero(valid_mask)[0]
if len(valid_indices) > 0:
one_hot_matrix[valid_indices, indices_np[valid_mask]] = 1
# Convert to Arrow FixedSizeListArray for efficient storage
return pa.FixedSizeListArray.from_arrays(one_hot_matrix.ravel(), num_categories)
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"max_categories": self.max_categories,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.max_categories = fields["max_categories"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns!r})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(
version=1, identifier="io.ray.preprocessors.multi_hot_encoder"
)
class MultiHotEncoder(SerializablePreprocessorBase):
r"""Multi-hot encode categorical data.
This preprocessor replaces each list of categories with an :math:`m`-length binary
list, where :math:`m` is the number of unique categories in the column or the value
specified in ``max_categories``. The :math:`i\\text{-th}` element of the binary list
is :math:`1` if category :math:`i` is in the input list and :math:`0` otherwise.
Columns must contain hashable objects or lists of hashable objects.
Also, you can't have both types in the same column.
.. note::
The logic is similar to scikit-learn's [MultiLabelBinarizer][1]
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MultiHotEncoder
>>>
>>> df = pd.DataFrame({
... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
... "genre": [
... ["comedy", "action", "sports"],
... ["animation", "comedy", "action"],
... ["documentary"],
... ],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>>
>>> encoder = MultiHotEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [1, 0, 1, 0, 1]
1 Moana [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [0, 0, 0, 1, 0]
:class:`MultiHotEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = MultiHotEncoder(columns=["genre"], output_columns=["genre_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre genre_encoded
0 Shaolin Soccer [comedy, action, sports] [1, 0, 1, 0, 1]
1 Moana [animation, comedy, action] [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [documentary] [0, 0, 0, 1, 0]
If you specify ``max_categories``, then :class:`MultiHotEncoder`
creates features for only the most frequent categories.
>>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3})
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [1, 1, 1]
1 Moana [1, 1, 0]
2 The Smartest Guys in the Room [0, 0, 0]
>>> encoder.stats_ # doctest: +SKIP
OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])
Args:
columns: The columns to separately encode.
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every unique category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`OneHotEncoder`
If you're encoding individual categories instead of lists of
categories, use :class:`OneHotEncoder`.
:class:`OrdinalEncoder`
If your categories are ordered, you may want to use
:class:`OrdinalEncoder`.
[1]: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MultiLabelBinarizer.html
"""
def __init__(
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories or {}
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=True,
key_gen=key_gen,
max_categories=self.max_categories,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
def encode_list(element: list, *, name: str):
if isinstance(element, np.ndarray):
element = element.tolist()
elif not isinstance(element, list):
element = [element]
stats = self.stats_[f"unique_values({name})"]
counter = Counter(element)
return [counter.get(x, 0) for x in stats]
for column, output_column in zip(self.columns, self.output_columns):
df[output_column] = df[column].map(partial(encode_list, name=column))
return df
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"max_categories": self.max_categories,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.max_categories = fields["max_categories"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.label_encoder")
class LabelEncoder(SerializablePreprocessorBase):
r"""Encode labels as integer targets.
:class:`LabelEncoder` encodes labels as integer targets that range from
:math:`0` to :math:`n - 1`, where :math:`n` is the number of unique labels.
If you transform a label that isn't in the fitted datset, then the label is encoded
as ``float("nan")``.
Examples:
>>> import pandas as pd
>>> import ray
>>> df = pd.DataFrame({
... "sepal_width": [5.1, 7, 4.9, 6.2],
... "sepal_height": [3.5, 3.2, 3, 3.4],
... "species": ["setosa", "versicolor", "setosa", "virginica"]
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>>
>>> from ray.data.preprocessors import LabelEncoder
>>> encoder = LabelEncoder(label_column="species")
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species
0 5.1 3.5 0
1 7.0 3.2 1
2 4.9 3.0 0
3 6.2 3.4 2
You can also provide the name of the output column that should hold the encoded
labels if you want to use :class:`LabelEncoder` in append mode.
>>> encoder = LabelEncoder(label_column="species", output_column="species_encoded")
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species species_encoded
0 5.1 3.5 setosa 0
1 7.0 3.2 versicolor 1
2 4.9 3.0 setosa 0
3 6.2 3.4 virginica 2
If you transform a label not present in the original dataset, then the new
label is encoded as ``float("nan")``.
>>> df = pd.DataFrame({
... "sepal_width": [4.2],
... "sepal_height": [2.7],
... "species": ["bracteata"]
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species
0 4.2 2.7 NaN
Args:
label_column: A column containing labels that you want to encode.
output_column: The name of the column that will contain the encoded
labels. If None, the output column will have the same name as the
input column.
.. seealso::
:class:`OrdinalEncoder`
If you're encoding ordered features, use :class:`OrdinalEncoder` instead of
:class:`LabelEncoder`.
"""
def __init__(self, label_column: str, *, output_column: Optional[str] = None):
super().__init__()
self.label_column = label_column
self.output_column = output_column or label_column
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=[self.label_column],
key_gen=key_gen,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=[self.label_column],
)
return self
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, self.label_column)
def column_label_encoder(s: pd.Series):
s_values = self.stats_[f"unique_values({s.name})"]
return s.map(s_values)
df[self.output_column] = df[self.label_column].transform(column_label_encoder)
return df
def _inverse_transform_pandas(self, df: pd.DataFrame):
def column_label_decoder(s: pd.Series):
inverse_values = {
value: key
for key, value in self.stats_[
f"unique_values({self.label_column})"
].items()
}
return s.map(inverse_values)
df[self.label_column] = df[self.output_column].transform(column_label_decoder)
return df
def get_input_columns(self) -> List[str]:
return [self.label_column]
def get_output_columns(self) -> List[str]:
return [self.output_column]
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"label_column": self.label_column,
"output_column": self.output_column,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.label_column = fields["label_column"]
self.output_column = fields["output_column"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return f"{self.__class__.__name__}(label_column={self.label_column!r}, output_column={self.output_column!r})"
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.categorizer")
class Categorizer(SerializablePreprocessorBase):
r"""Convert columns to ``pd.CategoricalDtype``.
Use this preprocessor with frameworks that have built-in support for
``pd.CategoricalDtype`` like LightGBM.
.. warning::
If you don't specify ``dtypes``, fit this preprocessor before splitting
your dataset into train and test splits. This ensures categories are
consistent across splits.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import Categorizer
>>>
>>> df = pd.DataFrame(
... {
... "sex": ["male", "female", "male", "female"],
... "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> categorizer = Categorizer(columns=["sex", "level"])
>>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]
:class:`Categorizer` can also be used in append mode by providing the
name of the output_columns that should hold the categorized values.
>>> categorizer = Categorizer(columns=["sex", "level"], output_columns=["sex_cat", "level_cat"])
>>> categorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level sex_cat level_cat
0 male L4 male L4
1 female L5 female L5
2 male L3 male L3
3 female L4 female L4
If you know the categories in advance, you can specify the categories with the
``dtypes`` parameter.
>>> categorizer = Categorizer(
... columns=["sex", "level"],
... dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)},
... )
>>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]
Args:
columns: The columns to convert to ``pd.CategoricalDtype``.
dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
objects. If you don't include a column in ``dtypes``, the categories
are inferred.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
""" # noqa: E501
def __init__(
self,
columns: List[str],
dtypes: Optional[Dict[str, pd.CategoricalDtype]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
if not dtypes:
dtypes = {}
self.columns = columns
self.dtypes = dtypes
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
columns_to_get = [
column for column in self.columns if column not in self.dtypes
]
self.stats_ |= self.dtypes
if not columns_to_get:
return self
def callback(unique_indices: Dict[str, Dict]) -> pd.CategoricalDtype:
return pd.CategoricalDtype(unique_indices.keys())
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=columns_to_get,
key_gen=key_gen,
),
post_process_fn=make_post_processor(
base_fn=unique_post_fn(drop_na_values=True),
callbacks=[callback],
),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: col,
columns=columns_to_get,
)
return self
def _transform_pandas(self, df: pd.DataFrame):
df[self.output_columns] = df[self.columns].astype(self.stats_)
return df
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"_fitted": getattr(self, "_fitted", None),
"dtypes": {
col: {"categories": list(dtype.categories), "ordered": dtype.ordered}
for col, dtype in self.dtypes.items()
}
if hasattr(self, "dtypes") and self.dtypes
else None,
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
# Handle dtypes field specially
self.dtypes = (
{
col: pd.CategoricalDtype(
categories=dtype_data["categories"], ordered=dtype_data["ordered"]
)
for col, dtype_data in fields["dtypes"].items()
}
if fields.get("dtypes")
else {}
)
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"dtypes={self.dtypes!r}, output_columns={self.output_columns!r})"
)
def compute_unique_value_indices(
*,
dataset: "Dataset",
columns: List[str],
key_gen: Callable,
encode_lists: bool = True,
max_categories: Optional[Dict[str, int]] = None,
):
if max_categories is None:
max_categories = {}
columns_set = set(columns)
for column in max_categories:
if column not in columns_set:
raise ValueError(
f"You set `max_categories` for {column}, which is not present in "
f"{columns}."
)
def get_pd_value_counts_per_column(col: pd.Series) -> Dict:
# special handling for lists
if _is_series_composed_of_lists(col):
if encode_lists:
counter = Counter()
def update_counter(element):
counter.update(element)
return element
col.map(update_counter)
return counter
else:
# convert to tuples to make lists hashable
col = col.map(lambda x: tuple(x))
return Counter(col.value_counts(dropna=False).to_dict())
def get_pd_value_counts(df: pd.DataFrame) -> Dict[str, List[Dict]]:
df_columns = df.columns.tolist()
result = {}
for col in columns:
if col in df_columns:
result[col] = [get_pd_value_counts_per_column(df[col])]
else:
raise ValueError(
f"Column '{col}' does not exist in DataFrame, which has columns: {df_columns}" # noqa: E501
)
return result
value_counts_ds = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
unique_values_by_col: Dict[str, Set] = {key_gen(col): set() for col in columns}
for batch in value_counts_ds.iter_batches(batch_size=None):
for col, counters in batch.items():
for counter in counters:
counter: Dict[Any, int] = {
k: v for k, v in counter.items() if v is not None
}
if col in max_categories:
counter: Dict[Any, int] = dict(
Counter(counter).most_common(max_categories[col])
)
# add only column values since frequencies are needed beyond this point
unique_values_by_col[key_gen(col)].update(counter.keys())
return unique_values_by_col
# FIXME: the arrow format path is broken: https://anyscale1.atlassian.net/browse/DATA-1788
def unique_post_fn(
drop_na_values: bool = False, batch_format: BatchFormat = None
) -> Callable:
"""
Returns a post-processing function that generates an encoding map by
sorting the unique values produced during aggregation or stats computation.
Args:
drop_na_values: If True, NA/null values will be silently dropped from the
encoding map. If False, raises an error if any NA/null values are present.
batch_format: Determines the output format of the encoding map.
- If BatchFormat.ARROW: Returns Arrow format (tuple of arrays) for scalar
types, or dict format for list types that PyArrow can't sort.
- Otherwise: Returns pandas dict format {value: index}.
Returns:
A callable that takes unique values and returns an encoding map.
The map format depends on batch_format and input types:
- Dict format: {value: int} - used for pandas path or list-type data
- Arrow format: (keys_array, values_array) - used for Arrow path with scalar data
"""
def gen_value_index(values: List) -> Dict[Any, int]:
"""
Generate an encoding map from a list of unique values using Python sorting.
Args:
values: List of unique values to encode (can include lists/tuples).
Returns:
Dict mapping each value to a unique integer index.
List values are converted to tuples for hashability.
Raises:
ValueError: If null values are present and drop_na_values is False.
"""
# NOTE: We special-case null here since it prevents provided
# values sequence from being sortable
if any(is_null(v) for v in values) and not drop_na_values:
raise ValueError(
"Unable to fit column because it contains null"
" values. Consider imputing missing values first."
)
non_null_values = [v for v in values if not is_null(v)]
return {
(v if not isinstance(v, list) else tuple(v)): i
# NOTE: Sorting applied to produce stable encoding
for i, v in enumerate(sorted(non_null_values))
}
def gen_value_index_arrow_from_arrow(
values: Union["pa.ListScalar", "pa.Array"],
) -> Union[Tuple["pa.Array", "pa.Array"], Dict[Any, int]]:
"""Generate an encoding map from unique values using Arrow-native operations.
Args:
values: The aggregation result as a pa.ListScalar (list of unique values)
or a pa.Array of values directly.
Returns:
For scalar types that PyArrow can sort natively, returns a tuple of
(sorted_keys, indices) as pa.Array. For list types that require fallback,
returns a dict mapping {value: index}.
Note:
PyArrow's sort_indices doesn't support list types, so we fall back to
dict format for columns containing lists. The _transform_arrow method
handles this by detecting dict-format stats and converting as needed.
"""
# Handle ListScalar from aggregation result
if isinstance(values, pa.ListScalar):
values = values.values
# Check if values contain list types - PyArrow can't sort these
# Fall back to pandas dict format for list types
if pa.types.is_list(values.type) or pa.types.is_large_list(values.type):
return gen_value_index(values.to_pylist())
# Drop nulls if requested
if drop_na_values:
values = pc.drop_null(values)
else:
if pc.any(pc.is_null(values)).as_py():
raise ValueError(
"Unable to fit column because it contains null"
" values. Consider imputing missing values first."
)
# Sort the values
sorted_indices = pc.sort_indices(values)
sorted_values = pc.take(values, sorted_indices)
# Create the index array
values_array = pa.array(range(len(sorted_values)), type=pa.int64())
return (sorted_values, values_array)
return (
gen_value_index_arrow_from_arrow
if batch_format == BatchFormat.ARROW
else gen_value_index
)
def _validate_df(df: pd.DataFrame, *columns: str) -> None:
null_columns = [column for column in columns if df[column].isnull().values.any()]
if null_columns:
raise ValueError(
f"Unable to transform columns {null_columns} because they contain "
f"null values. Consider imputing missing values first."
)
def _validate_arrow(table: pa.Table, *columns: str) -> None:
"""Validate that specified columns in an Arrow table do not contain null values.
Args:
table: The Arrow table to validate.
*columns: Column names to check for null values.
Raises:
ValueError: If any of the specified columns contain null values.
"""
null_columns = [
column for column in columns if pc.any(pc.is_null(table.column(column))).as_py()
]
if null_columns:
raise ValueError(
f"Unable to transform columns {null_columns} because they contain "
f"null values. Consider imputing missing values first."
)
def _is_series_composed_of_lists(series: pd.Series) -> bool:
# we assume that all elements are a list here
first_not_none_element = next(
(element for element in series if element is not None), None
)
return pandas.api.types.is_object_dtype(series.dtype) and isinstance(
first_not_none_element, (list, np.ndarray)
)