import logging
from collections import Counter
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Hashable,
List,
Optional,
Set,
Tuple,
Union,
)
import numpy as np
import pandas as pd
import pandas.api.types
import pyarrow as pa
import pyarrow.compute as pc
from ray.data._internal.util import is_null
from ray.data.block import BlockAccessor
from ray.data.preprocessor import (
Preprocessor,
PreprocessorNotFittedException,
SerializablePreprocessorBase,
)
from ray.data.preprocessors.utils import (
make_post_processor,
)
from ray.data.preprocessors.version_support import SerializablePreprocessor
from ray.data.util.data_batch_conversion import BatchFormat
from ray.util.annotations import DeveloperAPI, PublicAPI
if TYPE_CHECKING:
from ray.data.dataset import Dataset
logger = logging.getLogger(__name__)
def _get_unique_value_arrow_arrays(
stats: Dict[str, Any], input_col: str
) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values from encoder stats.
Args:
stats: The encoder's stats_ dictionary.
input_col: The name of the column to get arrays for.
Returns:
Tuple of (keys_array, values_array) for the column's ordinal mapping.
"""
stat_value = stats[f"unique_values({input_col})"]
if isinstance(stat_value, dict):
# Stats are in pandas dict format - convert to Arrow format
sorted_keys = sorted(stat_value.keys())
keys_array = pa.array(sorted_keys)
values_array = pa.array([stat_value[k] for k in sorted_keys], type=pa.int64())
else:
# Stats are in Arrow tuple format: (keys_array, values_array)
keys_array, values_array = stat_value
return keys_array, values_array
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.ordinal_encoder")
class OrdinalEncoder(SerializablePreprocessorBase):
r"""Encode values within columns as ordered integer values.
:class:`OrdinalEncoder` encodes categorical features as integers that range from
:math:`0` to :math:`n - 1`, where :math:`n` is the number of categories.
If you transform a value that isn't in the fitted datset, then the value is encoded
as ``float("nan")``.
Columns must contain either hashable values or lists of hashable values. Also, you
can't have both scalars and lists in the same column.
Examples:
Use :class:`OrdinalEncoder` to encode categorical features as integers.
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OrdinalEncoder
>>> df = pd.DataFrame({
... "sex": ["male", "female", "male", "female"],
... "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OrdinalEncoder(columns=["sex", "level"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level
0 1 1
1 0 2
2 1 0
3 0 1
:class:`OrdinalEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OrdinalEncoder(columns=["sex", "level"], output_columns=["sex_encoded", "level_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level sex_encoded level_encoded
0 male L4 1 1
1 female L5 0 2
2 male L3 1 0
3 female L4 0 1
If you transform a value not present in the original dataset, then the value
is encoded as ``float("nan")``.
>>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(ds).to_pandas() # doctest: +SKIP
sex level
0 0 NaN
:class:`OrdinalEncoder` can also encode categories in a list.
>>> df = pd.DataFrame({
... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
... "genre": [
... ["comedy", "action", "sports"],
... ["animation", "comedy", "action"],
... ["documentary"],
... ],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OrdinalEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [2, 0, 4]
1 Moana [1, 2, 0]
2 The Smartest Guys in the Room [3]
Args:
columns: The columns to separately encode.
encode_lists: If ``True``, encode list elements. If ``False``, encode
whole lists (i.e., replace each list with an integer). ``True``
by default.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`OneHotEncoder`
Another preprocessor that encodes categorical data.
"""
def __init__(
self,
columns: List[str],
*,
encode_lists: bool = True,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: allow user to specify order of values within each column.
self.columns = columns
self.encode_lists = encode_lists
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=self.encode_lists,
key_gen=key_gen,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def _get_ordinal_map(self, column_name: str) -> Dict[Any, int]:
"""Get the ordinal mapping for a column as a dict.
Stats can be stored in either:
- Dict format: {value: index} (from pandas-style processing)
- Arrow format: (keys_array, values_array) tuple
This method returns a dict in either case.
"""
stat_value = self.stats_[f"unique_values({column_name})"]
if isinstance(stat_value, dict):
return stat_value
# Arrow tuple format (keys_array, values_array)
keys_array, values_array = stat_value
return {k.as_py(): v.as_py() for k, v in zip(keys_array, values_array)}
def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values."""
return _get_unique_value_arrow_arrays(self.stats_, input_col)
def _encode_list_element(self, element: list, *, column_name: str):
ordinal_map = self._get_ordinal_map(column_name)
# If encoding lists, entire column is flattened, hence we map individual
# elements inside the list element (of the column)
if self.encode_lists:
return [ordinal_map.get(x) for x in element]
return ordinal_map.get(tuple(element))
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
def column_ordinal_encoder(s: pd.Series):
if _is_series_composed_of_lists(s):
return s.map(
lambda elem: self._encode_list_element(elem, column_name=s.name)
)
s_values = self._get_ordinal_map(s.name)
return s.map(s_values)
df[self.output_columns] = df[self.columns].apply(column_ordinal_encoder)
return df
def _transform_arrow(self, table: pa.Table) -> pa.Table:
"""Transform using fast native PyArrow operations for scalar columns.
List-type columns are preferably handled by _transform_pandas, which is selected
via _determine_transform_to_use when a PyArrow schema is available. However,
for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
until runtime, so we fall back to pandas here if list columns are found.
"""
# Validate that columns don't contain null values (consistent with pandas path)
_validate_arrow(table, *self.columns)
# Check for list columns (runtime fallback for PandasBlockSchema datasets)
for col_name in self.columns:
col_type = table.schema.field(col_name).type
if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
# Fall back to pandas transform for list columns
df = table.to_pandas()
result_df = self._transform_pandas(df)
return pa.Table.from_pandas(result_df, preserve_index=False)
for input_col, output_col in zip(self.columns, self.output_columns):
column = table.column(input_col)
encoded_column = self._encode_column_vectorized(column, input_col)
table = BlockAccessor.for_block(table).upsert_column(
output_col, encoded_column
)
return table
def _encode_column_vectorized(
self, column: pa.ChunkedArray, input_col: str
) -> pa.Array:
"""Encode column using PyArrow's vectorized pc.index_in.
Unseen categories are encoded as null in the output, which becomes NaN
when converted to pandas. Null values should be validated before calling
this method via _validate_arrow.
"""
keys_array, values_array = self._get_arrow_arrays(input_col)
if keys_array.type != column.type:
keys_array = pc.cast(keys_array, column.type)
# pc.index_in returns null for values not found in keys_array
# (including null input values and unseen categories)
indices = pc.index_in(column, keys_array)
# pc.take preserves nulls from indices, so null inputs -> null outputs
return pc.take(values_array, indices)
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"encode_lists": self.encode_lists,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.encode_lists = fields["encode_lists"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"encode_lists={self.encode_lists!r}, "
f"output_columns={self.output_columns!r})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.one_hot_encoder")
class OneHotEncoder(SerializablePreprocessorBase):
r"""`One-hot encode <https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics>`_
categorical data.
This preprocessor transforms each specified column into a one-hot encoded vector.
Each element in the vector corresponds to a unique category in the column, with a
value of 1 if the category matches and 0 otherwise.
If a category is infrequent (based on ``max_categories``) or not present in the
fitted dataset, it is encoded as all 0s.
Columns must contain hashable objects or lists of hashable objects.
.. note::
Lists are treated as categories. If you want to encode individual list
elements, use :class:`MultiHotEncoder`.
Example:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OneHotEncoder
>>>
>>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder = OneHotEncoder(columns=["color"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color
0 [0, 0, 1]
1 [0, 1, 0]
2 [0, 0, 1]
3 [0, 0, 1]
4 [1, 0, 0]
5 [0, 1, 0]
OneHotEncoder can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = OneHotEncoder(columns=["color"], output_columns=["color_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color color_encoded
0 red [0, 0, 1]
1 green [0, 1, 0]
2 red [0, 0, 1]
3 red [0, 0, 1]
4 blue [1, 0, 0]
5 green [0, 1, 0]
If you one-hot encode a value that isn't in the fitted dataset, then the
value is encoded with zeros.
>>> df = pd.DataFrame({"color": ["yellow"]})
>>> batch = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(batch).to_pandas() # doctest: +SKIP
color color_encoded
0 yellow [0, 0, 0]
Likewise, if you one-hot encode an infrequent value, then the value is encoded
with zeros.
>>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2})
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
color
0 [1, 0]
1 [0, 1]
2 [1, 0]
3 [1, 0]
4 [0, 0]
5 [0, 1]
Args:
columns: The columns to separately encode.
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`MultiHotEncoder`
If you want to encode individual list elements, use
:class:`MultiHotEncoder`.
:class:`OrdinalEncoder`
If your categories are ordered, you may want to use
:class:`OrdinalEncoder`.
""" # noqa: E501
def __init__(
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories or {}
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=False,
key_gen=key_gen,
max_categories=self.max_categories,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def safe_get(self, v: Any, stats: Dict[str, int]):
if isinstance(v, (list, np.ndarray)):
v = tuple(v)
if isinstance(v, Hashable):
return stats.get(v, -1)
else:
return -1 # Unhashable type treated as a missing category
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
# Compute new one-hot encoded columns
for column, output_column in zip(self.columns, self.output_columns):
stats = self.stats_[f"unique_values({column})"]
num_categories = len(stats)
one_hot = np.zeros((len(df), num_categories), dtype=np.uint8)
# Integer indices for each category in the column
codes = df[column].apply(lambda v: self.safe_get(v, stats)).to_numpy()
# Filter to only the rows that have a valid category
valid_category_mask = codes != -1
# Dimension should be (num_rows, ) - 1D boolean array
non_zero_indices = np.nonzero(valid_category_mask)[0]
# Mark the corresponding categories as 1
one_hot[
non_zero_indices,
codes[valid_category_mask],
] = 1
df[output_column] = one_hot.tolist()
return df
def _transform_arrow(self, table: pa.Table) -> pa.Table:
"""Transform using fast native PyArrow operations for scalar columns.
List-type columns are preferably handled by _transform_pandas, which is selected
via _determine_transform_to_use when a PyArrow schema is available. However,
for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
until runtime, so we fall back to pandas here if list columns are found.
"""
# Validate that columns don't contain null values (consistent with pandas path)
_validate_arrow(table, *self.columns)
# Check for list columns (runtime fallback for PandasBlockSchema datasets)
for col_name in self.columns:
col_type = table.schema.field(col_name).type
if pa.types.is_list(col_type) or pa.types.is_large_list(col_type):
# Fall back to pandas transform for list columns
df = table.to_pandas()
result_df = self._transform_pandas(df)
return pa.Table.from_pandas(result_df, preserve_index=False)
for input_col, output_col in zip(self.columns, self.output_columns):
column = table.column(input_col)
encoded_column = self._encode_column_one_hot(column, input_col)
table = BlockAccessor.for_block(table).upsert_column(
output_col, encoded_column
)
return table
def _get_arrow_arrays(self, input_col: str) -> Tuple[pa.Array, pa.Array]:
"""Get Arrow arrays for keys and values."""
return _get_unique_value_arrow_arrays(self.stats_, input_col)
def _encode_column_one_hot(
self, column: pa.ChunkedArray, input_col: str
) -> pa.FixedSizeListArray:
"""Encode a column to one-hot vectors using Arrow arrays.
Unseen categories are encoded as all-zeros vectors, matching the pandas
behavior. Null values should be validated before calling this method
via _validate_arrow.
"""
keys_array, _ = self._get_arrow_arrays(input_col)
num_categories = len(keys_array)
# Cast keys to match column type if needed
if keys_array.type != column.type:
keys_array = pc.cast(keys_array, column.type)
# Use pc.index_in to find position of each value in keys_array
# Returns null for null inputs and unseen categories (values not in keys_array)
indices = pc.index_in(column, keys_array)
# Fill nulls with -1 so they can be filtered out below (resulting in all-zeros)
indices_filled = pc.fill_null(indices, -1)
# Create one-hot encoded matrix using vectorized NumPy operations
num_rows = len(column)
indices_np = indices_filled.to_numpy()
one_hot_matrix = np.zeros((num_rows, num_categories), dtype=np.uint8)
# Find valid indices (not -1) and set 1s at the appropriate positions
valid_mask = indices_np != -1
valid_indices = np.nonzero(valid_mask)[0]
if len(valid_indices) > 0:
one_hot_matrix[valid_indices, indices_np[valid_mask]] = 1
# Convert to Arrow FixedSizeListArray for efficient storage
return pa.FixedSizeListArray.from_arrays(one_hot_matrix.ravel(), num_categories)
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"max_categories": self.max_categories,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.max_categories = fields["max_categories"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns!r})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(
version=1, identifier="io.ray.preprocessors.multi_hot_encoder"
)
class MultiHotEncoder(SerializablePreprocessorBase):
r"""Multi-hot encode categorical data.
This preprocessor replaces each list of categories with an :math:`m`-length binary
list, where :math:`m` is the number of unique categories in the column or the value
specified in ``max_categories``. The :math:`i\\text{-th}` element of the binary list
is :math:`1` if category :math:`i` is in the input list and :math:`0` otherwise.
Columns must contain hashable objects or lists of hashable objects.
Also, you can't have both types in the same column.
.. note::
The logic is similar to scikit-learn's [MultiLabelBinarizer][1]
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MultiHotEncoder
>>>
>>> df = pd.DataFrame({
... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
... "genre": [
... ["comedy", "action", "sports"],
... ["animation", "comedy", "action"],
... ["documentary"],
... ],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>>
>>> encoder = MultiHotEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [1, 0, 1, 0, 1]
1 Moana [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [0, 0, 0, 1, 0]
:class:`MultiHotEncoder` can also be used in append mode by providing the
name of the output_columns that should hold the encoded values.
>>> encoder = MultiHotEncoder(columns=["genre"], output_columns=["genre_encoded"])
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre genre_encoded
0 Shaolin Soccer [comedy, action, sports] [1, 0, 1, 0, 1]
1 Moana [animation, comedy, action] [1, 1, 1, 0, 0]
2 The Smartest Guys in the Room [documentary] [0, 0, 0, 1, 0]
If you specify ``max_categories``, then :class:`MultiHotEncoder`
creates features for only the most frequent categories.
>>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3})
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
name genre
0 Shaolin Soccer [1, 1, 1]
1 Moana [1, 1, 0]
2 The Smartest Guys in the Room [0, 0, 0]
>>> encoder.stats_ # doctest: +SKIP
OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])
Args:
columns: The columns to separately encode.
max_categories: The maximum number of features to create for each column.
If a value isn't specified for a column, then a feature is created
for every unique category in that column.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
.. seealso::
:class:`OneHotEncoder`
If you're encoding individual categories instead of lists of
categories, use :class:`OneHotEncoder`.
:class:`OrdinalEncoder`
If your categories are ordered, you may want to use
:class:`OrdinalEncoder`.
[1]: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MultiLabelBinarizer.html
"""
def __init__(
self,
columns: List[str],
*,
max_categories: Optional[Dict[str, int]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
# TODO: add `drop` parameter.
self.columns = columns
self.max_categories = max_categories or {}
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=self.columns,
encode_lists=True,
key_gen=key_gen,
max_categories=self.max_categories,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=self.columns,
)
return self
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, *self.columns)
def encode_list(element: list, *, name: str):
if isinstance(element, np.ndarray):
element = element.tolist()
elif not isinstance(element, list):
element = [element]
stats = self.stats_[f"unique_values({name})"]
counter = Counter(element)
return [counter.get(x, 0) for x in stats]
for column, output_column in zip(self.columns, self.output_columns):
df[output_column] = df[column].map(partial(encode_list, name=column))
return df
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"max_categories": self.max_categories,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
self.max_categories = fields["max_categories"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"max_categories={self.max_categories!r}, "
f"output_columns={self.output_columns})"
)
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.label_encoder")
class LabelEncoder(SerializablePreprocessorBase):
r"""Encode labels as integer targets.
:class:`LabelEncoder` encodes labels as integer targets that range from
:math:`0` to :math:`n - 1`, where :math:`n` is the number of unique labels.
If you transform a label that isn't in the fitted datset, then the label is encoded
as ``float("nan")``.
Examples:
>>> import pandas as pd
>>> import ray
>>> df = pd.DataFrame({
... "sepal_width": [5.1, 7, 4.9, 6.2],
... "sepal_height": [3.5, 3.2, 3, 3.4],
... "species": ["setosa", "versicolor", "setosa", "virginica"]
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>>
>>> from ray.data.preprocessors import LabelEncoder
>>> encoder = LabelEncoder(label_column="species")
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species
0 5.1 3.5 0
1 7.0 3.2 1
2 4.9 3.0 0
3 6.2 3.4 2
You can also provide the name of the output column that should hold the encoded
labels if you want to use :class:`LabelEncoder` in append mode.
>>> encoder = LabelEncoder(label_column="species", output_column="species_encoded")
>>> encoder.fit_transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species species_encoded
0 5.1 3.5 setosa 0
1 7.0 3.2 versicolor 1
2 4.9 3.0 setosa 0
3 6.2 3.4 virginica 2
If you transform a label not present in the original dataset, then the new
label is encoded as ``float("nan")``.
>>> df = pd.DataFrame({
... "sepal_width": [4.2],
... "sepal_height": [2.7],
... "species": ["bracteata"]
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> encoder.transform(ds).to_pandas() # doctest: +SKIP
sepal_width sepal_height species
0 4.2 2.7 NaN
Args:
label_column: A column containing labels that you want to encode.
output_column: The name of the column that will contain the encoded
labels. If None, the output column will have the same name as the
input column.
.. seealso::
:class:`OrdinalEncoder`
If you're encoding ordered features, use :class:`OrdinalEncoder` instead of
:class:`LabelEncoder`.
"""
def __init__(self, label_column: str, *, output_column: Optional[str] = None):
super().__init__()
self.label_column = label_column
self.output_column = output_column or label_column
def _fit(self, dataset: "Dataset") -> Preprocessor:
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=[self.label_column],
key_gen=key_gen,
),
post_process_fn=unique_post_fn(),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: f"unique_values({col})",
columns=[self.label_column],
)
return self
def _transform_pandas(self, df: pd.DataFrame):
_validate_df(df, self.label_column)
def column_label_encoder(s: pd.Series):
s_values = self.stats_[f"unique_values({s.name})"]
return s.map(s_values)
df[self.output_column] = df[self.label_column].transform(column_label_encoder)
return df
def _inverse_transform_pandas(self, df: pd.DataFrame):
def column_label_decoder(s: pd.Series):
inverse_values = {
value: key
for key, value in self.stats_[
f"unique_values({self.label_column})"
].items()
}
return s.map(inverse_values)
df[self.label_column] = df[self.output_column].transform(column_label_decoder)
return df
def get_input_columns(self) -> List[str]:
return [self.label_column]
def get_output_columns(self) -> List[str]:
return [self.output_column]
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"label_column": self.label_column,
"output_column": self.output_column,
"_fitted": getattr(self, "_fitted", None),
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
self.label_column = fields["label_column"]
self.output_column = fields["output_column"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return f"{self.__class__.__name__}(label_column={self.label_column!r}, output_column={self.output_column!r})"
[docs]
@PublicAPI(stability="alpha")
@SerializablePreprocessor(version=1, identifier="io.ray.preprocessors.categorizer")
class Categorizer(SerializablePreprocessorBase):
r"""Convert columns to ``pd.CategoricalDtype``.
Use this preprocessor with frameworks that have built-in support for
``pd.CategoricalDtype`` like LightGBM.
.. warning::
If you don't specify ``dtypes``, fit this preprocessor before splitting
your dataset into train and test splits. This ensures categories are
consistent across splits.
Examples:
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import Categorizer
>>>
>>> df = pd.DataFrame(
... {
... "sex": ["male", "female", "male", "female"],
... "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> categorizer = Categorizer(columns=["sex", "level"])
>>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]
:class:`Categorizer` can also be used in append mode by providing the
name of the output_columns that should hold the categorized values.
>>> categorizer = Categorizer(columns=["sex", "level"], output_columns=["sex_cat", "level_cat"])
>>> categorizer.fit_transform(ds).to_pandas() # doctest: +SKIP
sex level sex_cat level_cat
0 male L4 male L4
1 female L5 female L5
2 male L3 male L3
3 female L4 female L4
If you know the categories in advance, you can specify the categories with the
``dtypes`` parameter.
>>> categorizer = Categorizer(
... columns=["sex", "level"],
... dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)},
... )
>>> categorizer.fit_transform(ds).schema().types # doctest: +SKIP
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]
Args:
columns: The columns to convert to ``pd.CategoricalDtype``.
dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
objects. If you don't include a column in ``dtypes``, the categories
are inferred.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
""" # noqa: E501
def __init__(
self,
columns: List[str],
dtypes: Optional[Dict[str, pd.CategoricalDtype]] = None,
output_columns: Optional[List[str]] = None,
):
super().__init__()
if not dtypes:
dtypes = {}
self.columns = columns
self.dtypes = dtypes
self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)
def _fit(self, dataset: "Dataset") -> Preprocessor:
columns_to_get = [
column for column in self.columns if column not in self.dtypes
]
self.stats_ |= self.dtypes
if not columns_to_get:
return self
def callback(unique_indices: Dict[str, Dict]) -> pd.CategoricalDtype:
return pd.CategoricalDtype(unique_indices.keys())
self.stat_computation_plan.add_callable_stat(
stat_fn=lambda key_gen: compute_unique_value_indices(
dataset=dataset,
columns=columns_to_get,
key_gen=key_gen,
),
post_process_fn=make_post_processor(
base_fn=unique_post_fn(drop_na_values=True),
callbacks=[callback],
),
stat_key_fn=lambda col: f"unique({col})",
post_key_fn=lambda col: col,
columns=columns_to_get,
)
return self
def _transform_pandas(self, df: pd.DataFrame):
df[self.output_columns] = df[self.columns].astype(self.stats_)
return df
def _get_serializable_fields(self) -> Dict[str, Any]:
return {
"columns": self.columns,
"output_columns": self.output_columns,
"_fitted": getattr(self, "_fitted", None),
"dtypes": {
col: {"categories": list(dtype.categories), "ordered": dtype.ordered}
for col, dtype in self.dtypes.items()
}
if hasattr(self, "dtypes") and self.dtypes
else None,
}
def _set_serializable_fields(self, fields: Dict[str, Any], version: int):
# required fields
# Handle dtypes field specially
self.dtypes = (
{
col: pd.CategoricalDtype(
categories=dtype_data["categories"], ordered=dtype_data["ordered"]
)
for col, dtype_data in fields["dtypes"].items()
}
if fields.get("dtypes")
else {}
)
self.columns = fields["columns"]
self.output_columns = fields["output_columns"]
# optional fields
self._fitted = fields.get("_fitted")
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"dtypes={self.dtypes!r}, output_columns={self.output_columns!r})"
)
def compute_unique_value_indices(
*,
dataset: "Dataset",
columns: List[str],
key_gen: Callable,
encode_lists: bool = True,
max_categories: Optional[Dict[str, int]] = None,
):
"""Compute the set of unique values for each column across the full dataset.
Counts value frequencies globally (summed across all partitions) and then,
if ``max_categories`` is specified for a column, selects only the top-k most
frequent values. This ensures that a value appearing moderately in many
partitions is not missed — e.g. a value with count 3 in each of two
partitions (global count 6) is correctly preferred over a value with count 5
in a single partition.
Args:
dataset: The Ray Dataset to compute value counts over.
columns: Column names to compute unique values for.
key_gen: A callable that maps a column name to the key used in the
returned dictionary (e.g. ``lambda col: f"unique({col})"``).
encode_lists: If ``True``, list-type column elements are exploded so
that each list element is counted individually. If ``False``, entire
lists are treated as single categorical values (converted to tuples
for hashability).
max_categories: Optional mapping from column name to the maximum number
of unique values to keep. Only the most frequent values (by global
count) are retained. Columns not present in the mapping keep all
unique values.
Returns:
Dict[str, Set]: A mapping from ``key_gen(col)`` to the set of unique
values for that column (limited to top-k if ``max_categories`` applies).
Raises:
ValueError: If a column in ``max_categories`` is not in ``columns``.
ValueError: If a column listed in ``columns`` is missing from the
dataset.
"""
if max_categories is None:
max_categories = {}
columns_set = set(columns)
for column in max_categories:
if column not in columns_set:
raise ValueError(
f"You set `max_categories` for {column}, which is not present in "
f"{columns}."
)
def get_pd_value_counts_per_column(col: pd.Series) -> Dict:
# special handling for lists
if _is_series_composed_of_lists(col):
if encode_lists:
counter = Counter()
def update_counter(element):
counter.update(element)
return element
col.map(update_counter)
return counter
else:
# convert to tuples to make lists hashable
col = col.map(lambda x: tuple(x))
return Counter(col.value_counts(dropna=False).to_dict())
def get_pd_value_counts(df: pd.DataFrame) -> Dict[str, List[Dict]]:
df_columns = df.columns.tolist()
result = {}
for col in columns:
if col in df_columns:
result[col] = [get_pd_value_counts_per_column(df[col])]
else:
raise ValueError(
f"Column '{col}' does not exist in DataFrame, which has columns: {df_columns}" # noqa: E501
)
return result
value_counts_ds = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
# Aggregate counters globally per column before applying max_categories,
# so that top-k is computed over the full dataset rather than per-partition.
global_counters: Dict[str, Counter] = {col: Counter() for col in columns}
for batch in value_counts_ds.iter_batches(batch_size=None):
for col, counters in batch.items():
for counter in counters:
filtered: Dict[Any, int] = {
k: v for k, v in counter.items() if v is not None
}
global_counters[col].update(filtered)
unique_values_by_col: Dict[str, Set] = {key_gen(col): set() for col in columns}
for col in columns:
counter = global_counters[col]
if col in max_categories:
top_k_values = dict(counter.most_common(max_categories[col]))
unique_values_by_col[key_gen(col)].update(top_k_values.keys())
else:
unique_values_by_col[key_gen(col)].update(counter.keys())
return unique_values_by_col
# FIXME: the arrow format path is broken: https://anyscale1.atlassian.net/browse/DATA-1788
def unique_post_fn(
drop_na_values: bool = False, batch_format: BatchFormat = None
) -> Callable:
"""
Returns a post-processing function that generates an encoding map by
sorting the unique values produced during aggregation or stats computation.
Args:
drop_na_values: If True, NA/null values will be silently dropped from the
encoding map. If False, raises an error if any NA/null values are present.
batch_format: Determines the output format of the encoding map.
- If BatchFormat.ARROW: Returns Arrow format (tuple of arrays) for scalar
types, or dict format for list types that PyArrow can't sort.
- Otherwise: Returns pandas dict format {value: index}.
Returns:
A callable that takes unique values and returns an encoding map.
The map format depends on batch_format and input types:
- Dict format: {value: int} - used for pandas path or list-type data
- Arrow format: (keys_array, values_array) - used for Arrow path with scalar data
"""
def gen_value_index(values: List) -> Dict[Any, int]:
"""
Generate an encoding map from a list of unique values using Python sorting.
Args:
values: List of unique values to encode (can include lists/tuples).
Returns:
Dict mapping each value to a unique integer index.
List values are converted to tuples for hashability.
Raises:
ValueError: If null values are present and drop_na_values is False.
"""
# NOTE: We special-case null here since it prevents provided
# values sequence from being sortable
if any(is_null(v) for v in values) and not drop_na_values:
raise ValueError(
"Unable to fit column because it contains null"
" values. Consider imputing missing values first."
)
non_null_values = [v for v in values if not is_null(v)]
return {
(v if not isinstance(v, list) else tuple(v)): i
# NOTE: Sorting applied to produce stable encoding
for i, v in enumerate(sorted(non_null_values))
}
def gen_value_index_arrow_from_arrow(
values: Union["pa.ListScalar", "pa.Array"],
) -> Union[Tuple["pa.Array", "pa.Array"], Dict[Any, int]]:
"""Generate an encoding map from unique values using Arrow-native operations.
Args:
values: The aggregation result as a pa.ListScalar (list of unique values)
or a pa.Array of values directly.
Returns:
For scalar types that PyArrow can sort natively, returns a tuple of
(sorted_keys, indices) as pa.Array. For list types that require fallback,
returns a dict mapping {value: index}.
Note:
PyArrow's sort_indices doesn't support list types, so we fall back to
dict format for columns containing lists. The _transform_arrow method
handles this by detecting dict-format stats and converting as needed.
"""
# Handle ListScalar from aggregation result
if isinstance(values, pa.ListScalar):
values = values.values
# Check if values contain list types - PyArrow can't sort these
# Fall back to pandas dict format for list types
if pa.types.is_list(values.type) or pa.types.is_large_list(values.type):
return gen_value_index(values.to_pylist())
# Drop nulls if requested
if drop_na_values:
values = pc.drop_null(values)
else:
if pc.any(pc.is_null(values)).as_py():
raise ValueError(
"Unable to fit column because it contains null"
" values. Consider imputing missing values first."
)
# Sort the values
sorted_indices = pc.sort_indices(values)
sorted_values = pc.take(values, sorted_indices)
# Create the index array
values_array = pa.array(range(len(sorted_values)), type=pa.int64())
return (sorted_values, values_array)
return (
gen_value_index_arrow_from_arrow
if batch_format == BatchFormat.ARROW
else gen_value_index
)
def _validate_df(df: pd.DataFrame, *columns: str) -> None:
null_columns = [column for column in columns if df[column].isnull().values.any()]
if null_columns:
raise ValueError(
f"Unable to transform columns {null_columns} because they contain "
f"null values. Consider imputing missing values first."
)
def _validate_arrow(table: pa.Table, *columns: str) -> None:
"""Validate that specified columns in an Arrow table do not contain null values.
Args:
table: The Arrow table to validate.
*columns: Column names to check for null values.
Raises:
ValueError: If any of the specified columns contain null values.
"""
null_columns = [
column for column in columns if pc.any(pc.is_null(table.column(column))).as_py()
]
if null_columns:
raise ValueError(
f"Unable to transform columns {null_columns} because they contain "
f"null values. Consider imputing missing values first."
)
def _is_series_composed_of_lists(series: pd.Series) -> bool:
# we assume that all elements are a list here
first_not_none_element = next(
(element for element in series if element is not None), None
)
return pandas.api.types.is_object_dtype(series.dtype) and isinstance(
first_not_none_element, (list, np.ndarray)
)