Source code for ray.air.util.tensor_extensions.pandas

# Adapted from
# https://github.com/CODAIT/text-extensions-for-pandas/blob/dc03278689fe1c5f131573658ae19815ba25f33e/text_extensions_for_pandas/array/tensor.py
# and
# https://github.com/CODAIT/text-extensions-for-pandas/blob/dc03278689fe1c5f131573658ae19815ba25f33e/text_extensions_for_pandas/array/arrow_conversion.py

#
#  Copyright (c) 2020 IBM Corp.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# Modifications:
# - Added ArrowTensorType.to_pandas_type()
# - Added ArrowTensorArray.__getitem__()
# - Added ArrowTensorArray.__iter__()
# - Added support for column casts to extension types.
# - Fleshed out docstrings and examples.
# - Fixed TensorArray.isna() so it returns an appropriate ExtensionArray.
# - Added different (more vectorized) TensorArray.take() operation.
# - Added support for more reducers (agg funcs) to TensorArray.
# - Added support for logical operators to TensorArray(Element).
# - Added support for heterogeneously-shaped tensors.
# - Miscellaneous small bug fixes and optimizations.

import numbers
import os
from packaging.version import Version
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
import pyarrow as pa
from pandas._typing import Dtype
from pandas.compat import set_function_name
from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries
from pandas.core.indexers import check_array_indexer, validate_indices
from pandas.io.formats.format import ExtensionArrayFormatter

from ray.air.util.tensor_extensions.utils import (
    _create_possibly_ragged_ndarray,
    _is_ndarray_variable_shaped_tensor,
)
from ray.util.annotations import PublicAPI

try:
    from pandas.core.dtypes.generic import ABCIndex
except ImportError:
    # ABCIndexClass changed to ABCIndex in Pandas 1.3
    from pandas.core.dtypes.generic import ABCIndexClass as ABCIndex


#############################################
# Begin patching of ExtensionArrayFormatter #
#############################################


def _format_strings_patched(self) -> List[str]:
    from pandas.core.construction import extract_array
    from pandas.io.formats.format import format_array

    if not isinstance(self.values, TensorArray):
        return self._format_strings_orig()

    values = extract_array(self.values, extract_numpy=True)
    array = np.asarray(values)

    if array.ndim == 1:
        return self._format_strings_orig()

    def format_array_wrap(array_, formatter_):
        fmt_values = format_array(
            array_,
            formatter_,
            float_format=self.float_format,
            na_rep=self.na_rep,
            digits=self.digits,
            space=self.space,
            justify=self.justify,
            decimal=self.decimal,
            leading_space=self.leading_space,
            quoting=self.quoting,
        )
        return fmt_values

    flat_formatter = self.formatter
    if flat_formatter is None:
        flat_formatter = values._formatter(boxed=True)

    # Flatten array, call function, reshape (use ravel_compat in v1.3.0)
    flat_array = array.ravel("K")
    fmt_flat_array = np.asarray(format_array_wrap(flat_array, flat_formatter))
    order = "F" if array.flags.f_contiguous else "C"
    fmt_array = fmt_flat_array.reshape(array.shape, order=order)

    # Format the array of nested strings, use default formatter
    return format_array_wrap(fmt_array, None)


def _format_strings_patched_v1_0_0(self) -> List[str]:
    from functools import partial

    from pandas.core.construction import extract_array
    from pandas.io.formats.format import format_array
    from pandas.io.formats.printing import pprint_thing

    if not isinstance(self.values, TensorArray):
        return self._format_strings_orig()

    values = extract_array(self.values, extract_numpy=True)
    array = np.asarray(values)

    if array.ndim == 1:
        return self._format_strings_orig()

    def format_array_wrap(array_, formatter_):
        fmt_values = format_array(
            array_,
            formatter_,
            float_format=self.float_format,
            na_rep=self.na_rep,
            digits=self.digits,
            space=self.space,
            justify=self.justify,
            decimal=self.decimal,
            leading_space=self.leading_space,
        )
        return fmt_values

    flat_formatter = self.formatter
    if flat_formatter is None:
        flat_formatter = values._formatter(boxed=True)

    # Flatten array, call function, reshape (use ravel_compat in v1.3.0)
    flat_array = array.ravel("K")
    fmt_flat_array = np.asarray(format_array_wrap(flat_array, flat_formatter))
    order = "F" if array.flags.f_contiguous else "C"
    fmt_array = fmt_flat_array.reshape(array.shape, order=order)

    # Slimmed down version of GenericArrayFormatter due to:
    # https://github.com/pandas-dev/pandas/issues/33770
    def format_strings_slim(array_, leading_space):
        formatter = partial(
            pprint_thing,
            escape_chars=("\t", "\r", "\n"),
        )

        def _format(x):
            return str(formatter(x))

        fmt_values = []
        for v in array_:
            tpl = "{v}" if leading_space is False else " {v}"
            fmt_values.append(tpl.format(v=_format(v)))
        return fmt_values

    return format_strings_slim(fmt_array, self.leading_space)


_FORMATTER_ENABLED_ENV_VAR = "TENSOR_COLUMN_EXTENSION_FORMATTER_ENABLED"

if os.getenv(_FORMATTER_ENABLED_ENV_VAR, "1") == "1":
    ExtensionArrayFormatter._format_strings_orig = (
        ExtensionArrayFormatter._format_strings
    )
    if Version("1.1.0") <= Version(pd.__version__) < Version("1.3.0"):
        ExtensionArrayFormatter._format_strings = _format_strings_patched
    else:
        ExtensionArrayFormatter._format_strings = _format_strings_patched_v1_0_0
    ExtensionArrayFormatter._patched_by_ray_datasets = True

###########################################
# End patching of ExtensionArrayFormatter #
###########################################


[docs]@PublicAPI(stability="beta") @pd.api.extensions.register_extension_dtype class TensorDtype(pd.api.extensions.ExtensionDtype): """ Pandas extension type for a column of homogeneous-typed tensors. This extension supports tensors in which the elements have different shapes. However, each tensor element must be non-ragged, i.e. each tensor element must have a well-defined, non-ragged shape. See: https://github.com/pandas-dev/pandas/blob/master/pandas/core/dtypes/base.py for up-to-date interface documentation and the subclassing contract. The docstrings of the below properties and methods were copied from the base ExtensionDtype. Examples: >>> # Create a DataFrame with a list of ndarrays as a column. >>> import pandas as pd >>> import numpy as np >>> import ray >>> df = pd.DataFrame({ ... "one": [1, 2, 3], ... "two": list(np.arange(24).reshape((3, 2, 2, 2)))}) >>> # Note the opaque np.object dtype for this column. >>> df.dtypes # doctest: +SKIP one int64 two object dtype: object >>> # Cast column to our TensorDtype extension type. >>> from ray.data.extensions import TensorDtype >>> df["two"] = df["two"].astype(TensorDtype(np.int64, (3, 2, 2, 2))) >>> # Note that the column dtype is now TensorDtype instead of >>> # np.object. >>> df.dtypes # doctest: +SKIP one int64 two TensorDtype(shape=(3, 2, 2, 2), dtype=int64) dtype: object >>> # Pandas is now aware of this tensor column, and we can do the >>> # typical DataFrame operations on this column. >>> col = 2 * (df["two"] + 10) >>> # The ndarrays underlying the tensor column will be manipulated, >>> # but the column itself will continue to be a Pandas type. >>> type(col) # doctest: +SKIP pandas.core.series.Series >>> col # doctest: +SKIP 0 [[[ 2 4] [ 6 8]] [[10 12] [14 16]]] 1 [[[18 20] [22 24]] [[26 28] [30 32]]] 2 [[[34 36] [38 40]] [[42 44] [46 48]]] Name: two, dtype: TensorDtype(shape=(3, 2, 2, 2), dtype=int64) >>> # Once you do an aggregation on that column that returns a single >>> # row's value, you get back our TensorArrayElement type. >>> tensor = col.mean() >>> type(tensor) # doctest: +SKIP ray.data.extensions.tensor_extension.TensorArrayElement >>> tensor # doctest: +SKIP array([[[18., 20.], [22., 24.]], [[26., 28.], [30., 32.]]]) >>> # This is a light wrapper around a NumPy ndarray, and can easily >>> # be converted to an ndarray. >>> type(tensor.to_numpy()) # doctest: +SKIP numpy.ndarray >>> # In addition to doing Pandas operations on the tensor column, >>> # you can now put the DataFrame into a Dataset. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP >>> # Internally, this column is represented the corresponding >>> # Arrow tensor extension type. >>> ds.schema() # doctest: +SKIP one: int64 two: extension<arrow.py_extension_type<ArrowTensorType>> >>> # You can write the dataset to Parquet. >>> ds.write_parquet("/some/path") # doctest: +SKIP >>> # And you can read it back. >>> read_ds = ray.data.read_parquet("/some/path") # doctest: +SKIP >>> read_ds.schema() # doctest: +SKIP one: int64 two: extension<arrow.py_extension_type<ArrowTensorType>> >>> read_df = ray.get(read_ds.to_pandas_refs())[0] # doctest: +SKIP >>> read_df.dtypes # doctest: +SKIP one int64 two TensorDtype(shape=(3, 2, 2, 2), dtype=int64) dtype: object >>> # The tensor extension type is preserved along the >>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas >>> # conversion chain. >>> read_df.equals(df) # doctest: +SKIP True """ # NOTE(Clark): This is apparently required to prevent integer indexing # errors, but is an undocumented ExtensionDtype attribute. See issue: # https://github.com/CODAIT/text-extensions-for-pandas/issues/166 base = None def __init__(self, shape: Tuple[Optional[int], ...], dtype: np.dtype): self._shape = shape self._dtype = dtype @property def type(self): """ The scalar type for the array, e.g. ``int`` It's expected ``ExtensionArray[item]`` returns an instance of ``ExtensionDtype.type`` for scalar ``item``, assuming that value is valid (not NA). NA values do not need to be instances of `type`. """ return TensorArrayElement @property def element_dtype(self): """ The dtype of the underlying tensor elements. """ return self._dtype @property def element_shape(self): """ The shape of the underlying tensor elements. This will be a tuple of Nones if the corresponding TensorArray for this TensorDtype holds variable-shaped tensor elements. """ return self._shape @property def is_variable_shaped(self): """ Whether the corresponding TensorArray for this TensorDtype holds variable-shaped tensor elements. """ return all(dim_size is None for dim_size in self.shape) @property def name(self) -> str: """ A string identifying the data type. Will be used for display in, e.g. ``Series.dtype`` """ return f"{type(self).__name__}(shape={self._shape}, dtype={self._dtype})"
[docs] @classmethod def construct_from_string(cls, string: str): r""" Construct this type from a string. This is useful mainly for data types that accept parameters. For example, a period dtype accepts a frequency parameter that can be set as ``period[H]`` (where H means hourly frequency). By default, in the abstract class, just the name of the type is expected. But subclasses can overwrite this method to accept parameters. Parameters ---------- string : str The name of the type, for example ``category``. Returns ------- ExtensionDtype Instance of the dtype. Raises ------ TypeError If a class cannot be constructed from this 'string'. Examples -------- For extension dtypes with arguments the following may be an adequate implementation. >>> import re >>> @classmethod ... def construct_from_string(cls, string): ... pattern = re.compile(r"^my_type\[(?P<arg_name>.+)\]$") ... match = pattern.match(string) ... if match: ... return cls(**match.groupdict()) ... else: ... raise TypeError( ... f"Cannot construct a '{cls.__name__}' from '{string}'" ... ) """ import ast import re if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) # Upstream code uses exceptions as part of its normal control flow and # will pass this method bogus class names. regex = r"^TensorDtype\(shape=(\((?:(?:\d+|None),?\s?)*\)), dtype=(\w+)\)$" m = re.search(regex, string) err_msg = ( f"Cannot construct a '{cls.__name__}' from '{string}'; expected a string " "like 'TensorDtype(shape=(1, 2, 3), dtype=int64)'." ) if m is None: raise TypeError(err_msg) groups = m.groups() if len(groups) != 2: raise TypeError(err_msg) shape, dtype = groups shape = ast.literal_eval(shape) dtype = np.dtype(dtype) return cls(shape, dtype)
[docs] @classmethod def construct_array_type(cls): """ Return the array type associated with this dtype. Returns ------- type """ return TensorArray
def __from_arrow__(self, array: Union[pa.Array, pa.ChunkedArray]): """ Convert a pyarrow (chunked) array to a TensorArray. This and TensorArray.__arrow_array__ make up the Pandas extension type + array <--> Arrow extension type + array interoperability protocol. See https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow for more information. """ if isinstance(array, pa.ChunkedArray): if array.num_chunks > 1: # TODO(Clark): Remove concat and construct from list with # shape. values = np.concatenate( [chunk.to_numpy() for chunk in array.iterchunks()] ) else: values = array.chunk(0).to_numpy() else: values = array.to_numpy() return TensorArray(values) def __str__(self) -> str: return self.name def __repr__(self) -> str: return str(self) @property def _is_boolean(self): """ Whether this extension array should be considered boolean. By default, ExtensionArrays are assumed to be non-numeric. Setting this to True will affect the behavior of several places, e.g. * is_bool * boolean indexing Returns ------- bool """ # This is needed to support returning a TensorArray from .isnan(). from pandas.core.dtypes.common import is_bool_dtype return is_bool_dtype(self._dtype)
class _TensorOpsMixin(pd.api.extensions.ExtensionScalarOpsMixin): """ Mixin for TensorArray operator support, applying operations on the underlying ndarrays. """ @classmethod def _create_method(cls, op, coerce_to_dtype=True, result_dtype=None): """ Add support for binary operators by unwrapping, applying, and rewrapping. """ # NOTE(Clark): This overrides, but coerce_to_dtype, result_dtype might # not be needed def _binop(self, other): lvalues = self._tensor if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndex)): # Rely on Pandas to unbox and dispatch to us. return NotImplemented # divmod returns a tuple if op_name in ["__divmod__", "__rdivmod__"]: # TODO(Clark): Add support for divmod and rdivmod. # div, mod = result raise NotImplementedError if isinstance(other, (TensorArray, TensorArrayElement)): rvalues = other._tensor else: rvalues = other result = op(lvalues, rvalues) # Force a TensorArray if rvalue is not a scalar. if isinstance(self, TensorArrayElement) and ( not isinstance(other, TensorArrayElement) or not np.isscalar(other) ): result_wrapped = TensorArray(result) else: result_wrapped = cls(result) return result_wrapped op_name = f"__{op.__name__}__" return set_function_name(_binop, op_name, cls) @classmethod def _create_logical_method(cls, op): return cls._create_method(op) class _TensorScalarCastMixin: """ Mixin for casting scalar tensors to a particular numeric type. """ def _scalarfunc(self, func: Callable[[Any], Any]): return func(self._tensor) def __complex__(self): return self._scalarfunc(complex) def __float__(self): return self._scalarfunc(float) def __int__(self): return self._scalarfunc(int) def __hex__(self): return self._scalarfunc(hex) def __oct__(self): return self._scalarfunc(oct) @PublicAPI(stability="beta") class TensorArrayElement(_TensorOpsMixin, _TensorScalarCastMixin): """ Single element of a TensorArray, wrapping an underlying ndarray. """ def __init__(self, values: np.ndarray): """ Construct a TensorArrayElement from a NumPy ndarray. Args: values: ndarray that underlies this TensorArray element. """ self._tensor = values def __repr__(self): return self._tensor.__repr__() def __str__(self): return self._tensor.__str__() @property def numpy_dtype(self): """ Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray """ return self._tensor.dtype @property def numpy_ndim(self): """ Get the number of tensor dimensions. :return: integer for the number of dimensions """ return self._tensor.ndim @property def numpy_shape(self): """ Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray """ return self._tensor.shape @property def numpy_size(self): """ Get the size of the tensor. :return: integer for the number of elements in the tensor """ return self._tensor.size def to_numpy(self): """ Return the values of this element as a NumPy ndarray. """ return np.asarray(self._tensor) def __array__(self, dtype: np.dtype = None, **kwargs) -> np.ndarray: return np.asarray(self._tensor, dtype=dtype, **kwargs)
[docs]@PublicAPI(stability="beta") class TensorArray( pd.api.extensions.ExtensionArray, _TensorOpsMixin, _TensorScalarCastMixin, ): """ Pandas `ExtensionArray` representing a tensor column, i.e. a column consisting of ndarrays as elements. This extension supports tensors in which the elements have different shapes. However, each tensor element must be non-ragged, i.e. each tensor element must have a well-defined, non-ragged shape. Examples: >>> # Create a DataFrame with a list of ndarrays as a column. >>> import pandas as pd >>> import numpy as np >>> import ray >>> from ray.data.extensions import TensorArray >>> df = pd.DataFrame({ ... "one": [1, 2, 3], ... "two": TensorArray(np.arange(24).reshape((3, 2, 2, 2)))}) >>> # Note that the column dtype is TensorDtype. >>> df.dtypes # doctest: +SKIP one int64 two TensorDtype(shape=(3, 2, 2, 2), dtype=int64) dtype: object >>> # Pandas is aware of this tensor column, and we can do the >>> # typical DataFrame operations on this column. >>> col = 2 * (df["two"] + 10) >>> # The ndarrays underlying the tensor column will be manipulated, >>> # but the column itself will continue to be a Pandas type. >>> type(col) # doctest: +SKIP pandas.core.series.Series >>> col # doctest: +SKIP 0 [[[ 2 4] [ 6 8]] [[10 12] [14 16]]] 1 [[[18 20] [22 24]] [[26 28] [30 32]]] 2 [[[34 36] [38 40]] [[42 44] [46 48]]] Name: two, dtype: TensorDtype(shape=(3, 2, 2, 2), dtype=int64) >>> # Once you do an aggregation on that column that returns a single >>> # row's value, you get back our TensorArrayElement type. >>> tensor = col.mean() # doctest: +SKIP >>> type(tensor) # doctest: +SKIP ray.data.extensions.tensor_extension.TensorArrayElement >>> tensor # doctest: +SKIP array([[[18., 20.], [22., 24.]], [[26., 28.], [30., 32.]]]) >>> # This is a light wrapper around a NumPy ndarray, and can easily >>> # be converted to an ndarray. >>> type(tensor.to_numpy()) # doctest: +SKIP numpy.ndarray >>> # In addition to doing Pandas operations on the tensor column, >>> # you can now put the DataFrame into a Dataset. >>> ds = ray.data.from_pandas(df) # doctest: +SKIP >>> # Internally, this column is represented the corresponding >>> # Arrow tensor extension type. >>> ds.schema() # doctest: +SKIP one: int64 two: extension<arrow.py_extension_type<ArrowTensorType>> >>> # You can write the dataset to Parquet. >>> ds.write_parquet("/some/path") # doctest: +SKIP >>> # And you can read it back. >>> read_ds = ray.data.read_parquet("/some/path") # doctest: +SKIP >>> read_ds.schema() # doctest: +SKIP one: int64 two: extension<arrow.py_extension_type<ArrowTensorType>> >>> read_df = ray.get(read_ds.to_pandas_refs())[0] # doctest: +SKIP >>> read_df.dtypes # doctest: +SKIP one int64 two TensorDtype(shape=(3, 2, 2, 2), dtype=int64) dtype: object >>> # The tensor extension type is preserved along the >>> # Pandas --> Arrow --> Parquet --> Arrow --> Pandas >>> # conversion chain. >>> read_df.equals(df) # doctest: +SKIP True """ SUPPORTED_REDUCERS = { "sum": np.sum, "all": np.all, "any": np.any, "min": np.min, "max": np.max, "mean": np.mean, "median": np.median, "prod": np.prod, "std": np.std, "var": np.var, } # See https://github.com/pandas-dev/pandas/blob/master/pandas/core/arrays/base.py # for interface documentation and the subclassing contract. def __init__( self, values: Union[ np.ndarray, ABCSeries, Sequence[Union[np.ndarray, TensorArrayElement]], TensorArrayElement, Any, ], ): """ Args: values: A NumPy ndarray or sequence of NumPy ndarrays of equal shape. """ # Try to convert some well-known objects to ndarrays before handing off to # ndarray handling logic. if isinstance(values, ABCSeries): values = _create_possibly_ragged_ndarray(values) elif isinstance(values, Sequence): values = [ np.asarray(v) if isinstance(v, TensorArrayElement) else v for v in values ] values = _create_possibly_ragged_ndarray(values) elif isinstance(values, TensorArrayElement): values = np.array([np.asarray(values)], copy=False) if isinstance(values, np.ndarray): if values.dtype.type is np.object_: if len(values) == 0: # Tensor is empty, pass through to create empty TensorArray. pass elif all( isinstance(v, (np.ndarray, TensorArrayElement, Sequence)) and not isinstance(v, str) for v in values ): values = [np.asarray(v) for v in values] # Try to convert ndarrays of ndarrays/TensorArrayElements with an # opaque object type to a properly typed ndarray of ndarrays. values = _create_possibly_ragged_ndarray(values) else: raise TypeError( "Expected a well-typed ndarray or an object-typed ndarray of " "ndarray pointers, but got an object-typed ndarray whose " f"subndarrays are of type {type(values[0])}." ) elif isinstance(values, TensorArray): raise TypeError("Use the copy() method to create a copy of a TensorArray.") else: raise TypeError( "Expected a numpy.ndarray or sequence of numpy.ndarray, " f"but received {values} of type {type(values).__name__} instead." ) assert isinstance(values, np.ndarray) self._tensor = values self._is_variable_shaped = None @classmethod def _from_sequence( cls, scalars, *, dtype: Optional[Dtype] = None, copy: bool = False ): """ Construct a new ExtensionArray from a sequence of scalars. Parameters ---------- scalars : Sequence Each element will be an instance of the scalar type for this array, ``cls.dtype.type`` or be converted into this type in this method. dtype : dtype, optional Construct for this particular dtype. This should be a Dtype compatible with the ExtensionArray. copy : bool, default False If True, copy the underlying data. Returns ------- ExtensionArray """ if copy and isinstance(scalars, np.ndarray): scalars = scalars.copy() elif isinstance(scalars, TensorArray): scalars = scalars._tensor.copy() if copy else scalars._tensor return TensorArray(scalars) @classmethod def _from_factorized( cls, values: np.ndarray, original: pd.api.extensions.ExtensionArray ): """ Reconstruct an ExtensionArray after factorization. Parameters ---------- values : ndarray An integer ndarray with the factorized values. original : ExtensionArray The original ExtensionArray that factorize was called on. See Also -------- factorize : Top-level factorize method that dispatches here. ExtensionArray.factorize : Encode the extension array as an enumerated type. """ raise NotImplementedError def __getitem__( self, item: Union[int, slice, np.ndarray] ) -> Union["TensorArray", "TensorArrayElement"]: """ Select a subset of self. Parameters ---------- item : int, slice, or ndarray * int: The position in 'self' to get. * slice: A slice object, where 'start', 'stop', and 'step' are integers or None * ndarray: A 1-d boolean NumPy ndarray the same length as 'self' Returns ------- item : scalar or ExtensionArray Notes ----- For scalar ``item``, return a scalar value suitable for the array's type. This should be an instance of ``self.dtype.type``. For slice ``key``, return an instance of ``ExtensionArray``, even if the slice is length 0 or 1. For a boolean mask, return an instance of ``ExtensionArray``, filtered to the values where ``item`` is True. """ # Return scalar if single value is selected, a TensorArrayElement for # single array element, or TensorArray for slice. if isinstance(item, int): value = self._tensor[item] if np.isscalar(value): return value else: return TensorArrayElement(value) else: # BEGIN workaround for Pandas issue #42430 if isinstance(item, tuple) and len(item) > 1 and item[0] == Ellipsis: if len(item) > 2: # Hopefully this case is not possible, but can't be sure raise ValueError( "Workaround Pandas issue #42430 not " "implemented for tuple length > 2" ) item = item[1] # END workaround for issue #42430 if isinstance(item, TensorArray): item = np.asarray(item) item = check_array_indexer(self, item) return TensorArray(self._tensor[item]) def __len__(self) -> int: """ Length of this array. Returns ------- length : int """ return len(self._tensor) @property def dtype(self) -> pd.api.extensions.ExtensionDtype: """ An instance of 'ExtensionDtype'. """ if self.is_variable_shaped: # A tensor is only considered variable-shaped if it's non-empty, so no # non-empty check is needed here. dtype = self._tensor[0].dtype shape = (None,) * self._tensor[0].ndim else: dtype = self.numpy_dtype shape = self.numpy_shape[1:] return TensorDtype(shape, dtype) @property def is_variable_shaped(self): """ Whether this TensorArray holds variable-shaped tensor elements. """ if self._is_variable_shaped is None: self._is_variable_shaped = _is_ndarray_variable_shaped_tensor(self._tensor) return self._is_variable_shaped @property def nbytes(self) -> int: """ The number of bytes needed to store this object in memory. """ return self._tensor.nbytes
[docs] def isna(self) -> "TensorArray": """ A 1-D array indicating if each value is missing. Returns ------- na_values : Union[np.ndarray, ExtensionArray] In most cases, this should return a NumPy ndarray. For exceptional cases like ``SparseArray``, where returning an ndarray would be expensive, an ExtensionArray may be returned. Notes ----- If returning an ExtensionArray, then * ``na_values._is_boolean`` should be True * `na_values` should implement :func:`ExtensionArray._reduce` * ``na_values.any`` and ``na_values.all`` should be implemented """ if self._tensor.dtype.type is np.object_: # Avoid comparing with __eq__ because the elements of the tensor # may do something funny with that operation. return np.array( [self._tensor[i] is None for i in range(len(self))], dtype=bool ) elif self._tensor.dtype.type is np.str_: return np.all(self._tensor == "", axis=tuple(range(1, self._tensor.ndim))) else: return np.all( np.isnan(self._tensor), axis=tuple(range(1, self._tensor.ndim)) )
[docs] def take( self, indices: Sequence[int], allow_fill: bool = False, fill_value: Any = None ) -> "TensorArray": """ Take elements from an array. Parameters ---------- indices : sequence of int Indices to be taken. allow_fill : bool, default False How to handle negative values in `indices`. * False: negative values in `indices` indicate positional indices from the right (the default). This is similar to :func:`numpy.take`. * True: negative values in `indices` indicate missing values. These values are set to `fill_value`. Any other other negative values raise a ``ValueError``. fill_value : any, optional Fill value to use for NA-indices when `allow_fill` is True. This may be ``None``, in which case the default NA value for the type, ``self.dtype.na_value``, is used. For many ExtensionArrays, there will be two representations of `fill_value`: a user-facing "boxed" scalar, and a low-level physical NA value. `fill_value` should be the user-facing version, and the implementation should handle translating that to the physical version for processing the take if necessary. Returns ------- ExtensionArray Raises ------ IndexError When the indices are out of bounds for the array. ValueError When `indices` contains negative values other than ``-1`` and `allow_fill` is True. See Also -------- numpy.take : Take elements from an array along an axis. api.extensions.take : Take elements from an array. Notes ----- ExtensionArray.take is called by ``Series.__getitem__``, ``.loc``, ``iloc``, when `indices` is a sequence of values. Additionally, it's called by :meth:`Series.reindex`, or any other method that causes realignment, with a `fill_value`. Examples -------- Here's an example implementation, which relies on casting the extension array to object dtype. This uses the helper method :func:`pandas.api.extensions.take`. .. code-block:: python def take(self, indices, allow_fill=False, fill_value=None): from pandas.core.algorithms import take # If the ExtensionArray is backed by an ndarray, then # just pass that here instead of coercing to object. data = self.astype(object) if allow_fill and fill_value is None: fill_value = self.dtype.na_value # fill value should always be translated from the scalar # type for the array, to the physical storage type for # the data, before passing to take. result = take(data, indices, fill_value=fill_value, allow_fill=allow_fill) return self._from_sequence(result, dtype=self.dtype) """ if allow_fill: # With allow_fill being True, negative values in `indices` indicate # missing values and should be set to `fill_value`. indices = np.asarray(indices, dtype=np.intp) validate_indices(indices, len(self._tensor)) # Check if there are missing indices to fill, otherwise we can # delegate to NumPy ndarray .take(). has_missing = np.any(indices < 0) if has_missing: if fill_value is None: fill_value = np.nan # Create an array populated with fill value. values = np.full((len(indices),) + self._tensor.shape[1:], fill_value) # Put tensors at the given positive indices into array. is_nonneg = indices >= 0 np.put(values, np.where(is_nonneg)[0], self._tensor[indices[is_nonneg]]) return TensorArray(values) # Delegate take to NumPy array. values = self._tensor.take(indices, axis=0) return TensorArray(values)
[docs] def copy(self) -> "TensorArray": """ Return a copy of the array. Returns ------- ExtensionArray """ # TODO(Clark): Copy cached properties. return TensorArray(self._tensor.copy())
@classmethod def _concat_same_type(cls, to_concat: Sequence["TensorArray"]) -> "TensorArray": """ Concatenate multiple array of this dtype. Parameters ---------- to_concat : sequence of this type Returns ------- ExtensionArray """ should_flatten = False shape = None for a in to_concat: if shape is None: shape = a.dtype.element_shape if a.is_variable_shaped or a.dtype.element_shape != shape: should_flatten = True break if should_flatten: concated = TensorArray( np.array([e for a in to_concat for e in a._tensor], dtype=object) ) else: concated = TensorArray(np.concatenate([a._tensor for a in to_concat])) return concated def __setitem__(self, key: Union[int, np.ndarray], value: Any) -> None: """ Set one or more values inplace. This method is not required to satisfy the pandas extension array interface. Parameters ---------- key : int, ndarray, or slice When called from, e.g. ``Series.__setitem__``, ``key`` will be one of * scalar int * ndarray of integers. * boolean ndarray * slice object value : ExtensionDtype.type, Sequence[ExtensionDtype.type], or object value or values to be set of ``key``. Returns ------- None """ key = check_array_indexer(self, key) if isinstance(value, TensorArrayElement) or np.isscalar(value): value = np.asarray(value) if isinstance(value, list): value = [ np.asarray(v) if isinstance(v, TensorArrayElement) else v for v in value ] if isinstance(value, ABCSeries) and isinstance(value.dtype, TensorDtype): value = value.values if value is None or isinstance(value, Sequence) and len(value) == 0: self._tensor[key] = np.full_like(self._tensor[key], np.nan) elif isinstance(key, (int, slice, np.ndarray)): self._tensor[key] = value else: raise NotImplementedError( f"__setitem__ with key type '{type(key)}' not implemented" ) def __contains__(self, item) -> bool: """ Return for `item in self`. """ if isinstance(item, TensorArrayElement): np_item = np.asarray(item) if np_item.size == 1 and np.isnan(np_item).all(): return self.isna().any() return super().__contains__(item) def __repr__(self): return self._tensor.__repr__() def __str__(self): return self._tensor.__str__() def _values_for_factorize(self) -> Tuple[np.ndarray, Any]: # TODO(Clark): return self._tensor, np.nan raise NotImplementedError def _reduce(self, name: str, skipna: bool = True, **kwargs): """ Return a scalar result of performing the reduction operation. Parameters ---------- name : str Name of the function, supported values are: { any, all, min, max, sum, mean, median, prod, std, var, sem, kurt, skew }. skipna : bool, default True If True, skip NaN values. **kwargs Additional keyword arguments passed to the reduction function. Currently, `ddof` is the only supported kwarg. Returns ------- scalar Raises ------ TypeError : subclass does not define reductions """ supported_kwargs = ["ddof"] reducer_kwargs = {} for kw in supported_kwargs: try: reducer_kwargs[kw] = kwargs[kw] except KeyError: pass try: return TensorArrayElement( self.SUPPORTED_REDUCERS[name](self._tensor, axis=0, **reducer_kwargs) ) except KeyError: raise NotImplementedError(f"'{name}' aggregate not implemented.") from None def __array__(self, dtype: np.dtype = None, **kwargs) -> np.ndarray: return np.asarray(self._tensor, dtype=dtype, **kwargs) def __array_ufunc__(self, ufunc: Callable, method: str, *inputs, **kwargs): """ Supports NumPy ufuncs without requiring sloppy coercion to an ndarray. """ out = kwargs.get("out", ()) for x in inputs + out: if not isinstance(x, (TensorArray, np.ndarray, numbers.Number)): return NotImplemented # Defer to the implementation of the ufunc on unwrapped values. inputs = tuple(x._tensor if isinstance(x, TensorArray) else x for x in inputs) if out: kwargs["out"] = tuple( x._tensor if isinstance(x, TensorArray) else x for x in out ) result = getattr(ufunc, method)(*inputs, **kwargs) if type(result) is tuple: # Multiple return values. return tuple(type(self)(x) for x in result) elif method == "at": # No return value. return None else: # One return value. return type(self)(result)
[docs] def to_numpy( self, dtype: np.dtype = None, copy: bool = False, na_value: Any = pd.api.extensions.no_default, ): """ Convert to a NumPy ndarray. .. versionadded:: 1.0.0 This is similar to :meth:`numpy.asarray`, but may provide additional control over how the conversion is done. Parameters ---------- dtype : str or numpy.dtype, optional The dtype to pass to :meth:`numpy.asarray`. copy : bool, default False Whether to ensure that the returned value is a not a view on another array. Note that ``copy=False`` does not *ensure* that ``to_numpy()`` is no-copy. Rather, ``copy=True`` ensure that a copy is made, even if not strictly necessary. na_value : Any, optional The value to use for missing values. The default value depends on `dtype` and the type of the array. Returns ------- numpy.ndarray """ if dtype is not None: dtype = pd.api.types.pandas_dtype(dtype) if copy: values = np.array(self._tensor, dtype=dtype, copy=True) else: values = self._tensor.astype(dtype) elif copy: values = self._tensor.copy() else: values = self._tensor return values
@property def numpy_dtype(self): """ Get the dtype of the tensor. :return: The numpy dtype of the backing ndarray """ return self._tensor.dtype @property def numpy_ndim(self): """ Get the number of tensor dimensions. :return: integer for the number of dimensions """ return self._tensor.ndim @property def numpy_shape(self): """ Get the shape of the tensor. :return: A tuple of integers for the numpy shape of the backing ndarray """ return self._tensor.shape @property def numpy_size(self): """ Get the size of the tensor. :return: integer for the number of elements in the tensor """ return self._tensor.size
[docs] def astype(self, dtype, copy=True): """ Cast to a NumPy array with 'dtype'. Parameters ---------- dtype : str or dtype Typecode or data-type to which the array is cast. copy : bool, default True Whether to copy the data, even if not necessary. If False, a copy is made only if the old dtype does not match the new dtype. Returns ------- array : ndarray NumPy ndarray with 'dtype' for its dtype. """ dtype = pd.api.types.pandas_dtype(dtype) if isinstance(dtype, TensorDtype): values = TensorArray(self._tensor.copy()) if copy else self elif not ( pd.api.types.is_object_dtype(dtype) and pd.api.types.is_string_dtype(dtype) ): values = np.array([str(t) for t in self._tensor]) if isinstance(dtype, pd.StringDtype): return dtype.construct_array_type()._from_sequence(values, copy=False) else: return values elif pd.api.types.is_object_dtype(dtype): # Interpret astype(object) as "cast to an array of numpy arrays" values = np.empty(len(self), dtype=object) for i in range(len(self)): values[i] = self._tensor[i] else: values = self._tensor.astype(dtype, copy=copy) return values
[docs] def any(self, axis=None, out=None, keepdims=False): """ Test whether any array element along a given axis evaluates to True. See numpy.any() documentation for more information https://numpy.org/doc/stable/reference/generated/numpy.any.html#numpy.any :param axis: Axis or axes along which a logical OR reduction is performed. :param out: Alternate output array in which to place the result. :param keepdims: If this is set to True, the axes which are reduced are left in the result as dimensions with size one. :return: single boolean unless axis is not None else TensorArray """ result = self._tensor.any(axis=axis, out=out, keepdims=keepdims) return result if axis is None else TensorArray(result)
[docs] def all(self, axis=None, out=None, keepdims=False): """ Test whether all array elements along a given axis evaluate to True. :param axis: Axis or axes along which a logical AND reduction is performed. :param out: Alternate output array in which to place the result. :param keepdims: If this is set to True, the axes which are reduced are left in the result as dimensions with size one. :return: single boolean unless axis is not None else TensorArray """ result = self._tensor.all(axis=axis, out=out, keepdims=keepdims) return result if axis is None else TensorArray(result)
def __arrow_array__(self, type=None): """ Convert this TensorArray to an ArrowTensorArray extension array. This and TensorDtype.__from_arrow__ make up the Pandas extension type + array <--> Arrow extension type + array interoperability protocol. See https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow for more information. """ from ray.air.util.tensor_extensions.arrow import ( ArrowTensorArray, ArrowVariableShapedTensorArray, ) if self.is_variable_shaped: return ArrowVariableShapedTensorArray.from_numpy(self._tensor) else: return ArrowTensorArray.from_numpy(self._tensor) @property def _is_boolean(self): """ Whether this extension array should be considered boolean. By default, ExtensionArrays are assumed to be non-numeric. Setting this to True will affect the behavior of several places, e.g. * is_bool * boolean indexing Returns ------- bool """ # This is needed to support returning a TensorArray from .isnan(). return self.dtype._is_boolean()
# Add operators from the mixin to the TensorArrayElement and TensorArray # classes. TensorArrayElement._add_arithmetic_ops() TensorArrayElement._add_comparison_ops() TensorArrayElement._add_logical_ops() TensorArray._add_arithmetic_ops() TensorArray._add_comparison_ops() TensorArray._add_logical_ops() @PublicAPI(stability="beta") def column_needs_tensor_extension(s: pd.Series) -> bool: """Return whether the provided pandas Series column needs a tensor extension representation. This tensor extension representation provides more efficient slicing and interop with ML frameworks. Args: s: The pandas Series column that may need to be represented using the tensor extension. Returns: Whether the provided Series needs a tensor extension representation. """ # NOTE: This is an O(1) check. return ( s.dtype.type is np.object_ and not s.empty and isinstance(s.iloc[0], np.ndarray) )