import itertools
import sys
from typing import Iterable, Optional, Tuple, List, Sequence, Union
from pkg_resources._vendor.packaging.version import parse as parse_version
import numpy as np
import pyarrow as pa
from ray.air.util.tensor_extensions.utils import (
_is_ndarray_variable_shaped_tensor,
create_ragged_ndarray,
)
from ray._private.utils import _get_pyarrow_version
from ray.util.annotations import PublicAPI
PYARROW_VERSION = _get_pyarrow_version()
if PYARROW_VERSION is not None:
PYARROW_VERSION = parse_version(PYARROW_VERSION)
# Minimum version of Arrow that supports ExtensionScalars.
# TODO(Clark): Remove conditional definition once we only support Arrow 8.0.0+.
MIN_PYARROW_VERSION_SCALAR = parse_version("8.0.0")
# Minimum version of Arrow that supports subclassable ExtensionScalars.
# TODO(Clark): Remove conditional definition once we only support Arrow 9.0.0+.
MIN_PYARROW_VERSION_SCALAR_SUBCLASS = parse_version("9.0.0")
NUM_BYTES_PER_UNICODE_CHAR = 4
def _arrow_supports_extension_scalars():
"""
Whether Arrow ExtensionScalars are supported in the current pyarrow version.
This returns True if the pyarrow version is 8.0.0+, or if the pyarrow version is
unknown.
"""
# TODO(Clark): Remove utility once we only support Arrow 8.0.0+.
return PYARROW_VERSION is None or PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR
def _arrow_extension_scalars_are_subclassable():
"""
Whether Arrow ExtensionScalars support subclassing in the current pyarrow version.
This returns True if the pyarrow version is 9.0.0+, or if the pyarrow version is
unknown.
"""
# TODO(Clark): Remove utility once we only support Arrow 9.0.0+.
return (
PYARROW_VERSION is None
or PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR_SUBCLASS
)
[docs]@PublicAPI(stability="beta")
class ArrowTensorType(pa.PyExtensionType):
"""
Arrow ExtensionType for an array of fixed-shaped, homogeneous-typed
tensors.
This is the Arrow side of TensorDtype.
See Arrow extension type docs:
https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types
"""
def __init__(self, shape: Tuple[int, ...], dtype: pa.DataType):
"""
Construct the Arrow extension type for array of fixed-shaped tensors.
Args:
shape: Shape of contained tensors.
dtype: pyarrow dtype of tensor elements.
"""
self._shape = shape
super().__init__(pa.list_(dtype))
@property
def shape(self):
"""
Shape of contained tensors.
"""
return self._shape
@property
def scalar_type(self):
"""Returns the type of the underlying tensor elements."""
return self.storage_type.value_type
[docs] def to_pandas_dtype(self):
"""
Convert Arrow extension type to corresponding Pandas dtype.
Returns:
An instance of pd.api.extensions.ExtensionDtype.
"""
from ray.air.util.tensor_extensions.pandas import TensorDtype
return TensorDtype(self._shape, self.storage_type.value_type.to_pandas_dtype())
def __reduce__(self):
return ArrowTensorType, (self._shape, self.storage_type.value_type)
def __arrow_ext_class__(self):
"""
ExtensionArray subclass with custom logic for this array of tensors
type.
Returns:
A subclass of pd.api.extensions.ExtensionArray.
"""
return ArrowTensorArray
if _arrow_extension_scalars_are_subclassable():
# TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+.
def __arrow_ext_scalar_class__(self):
"""
ExtensionScalar subclass with custom logic for this array of tensors type.
"""
return ArrowTensorScalar
if _arrow_supports_extension_scalars():
# TODO(Clark): Remove this version guard once we only support Arrow 8.0.0+.
def _extension_scalar_to_ndarray(
self, scalar: pa.ExtensionScalar
) -> np.ndarray:
"""
Convert an ExtensionScalar to a tensor element.
"""
raw_values = scalar.value.values
shape = scalar.type.shape
value_type = raw_values.type
offset = raw_values.offset
data_buffer = raw_values.buffers()[1]
return _to_ndarray_helper(shape, value_type, offset, data_buffer)
def __str__(self) -> str:
return (
f"ArrowTensorType(shape={self.shape}, dtype={self.storage_type.value_type})"
)
def __repr__(self) -> str:
return str(self)
@classmethod
def _need_variable_shaped_tensor_array(
cls,
array_types: Sequence[
Union["ArrowTensorType", "ArrowVariableShapedTensorType"]
],
) -> bool:
"""
Whether the provided list of tensor types needs a variable-shaped
representation (i.e. `ArrowVariableShapedTensorType`) when concatenating
or chunking. If one or more of the tensor types in `array_types` are
variable-shaped and/or any of the tensor arrays have a different shape
than the others, a variable-shaped tensor array representation will be
required and this method will return True.
Args:
array_types: List of tensor types to check if a variable-shaped
representation is required for concatenation
Returns:
True if concatenating arrays with types `array_types` requires
a variable-shaped representation
"""
shape = None
for arr_type in array_types:
# If at least one of the arrays is variable-shaped, we can immediately
# short-circuit since we require a variable-shaped representation.
if isinstance(arr_type, ArrowVariableShapedTensorType):
return True
if not isinstance(arr_type, ArrowTensorType):
raise ValueError(
"All provided array types must be an instance of either "
"ArrowTensorType or ArrowVariableShapedTensorType, but "
f"got {arr_type}"
)
# We need variable-shaped representation if any of the tensor arrays have
# different shapes.
if shape is not None and arr_type.shape != shape:
return True
shape = arr_type.shape
return False
if _arrow_extension_scalars_are_subclassable():
# TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+.
@PublicAPI(stability="beta")
class ArrowTensorScalar(pa.ExtensionScalar):
def as_py(self) -> np.ndarray:
return self.type._extension_scalar_to_ndarray(self)
def __array__(self) -> np.ndarray:
return self.as_py()
# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+.
class _ArrowTensorScalarIndexingMixin:
"""
A mixin providing support for scalar indexing in tensor extension arrays for
Arrow < 9.0.0, before full ExtensionScalar support was added. This mixin overrides
__getitem__, __iter__, and to_pylist.
"""
# This mixin will be a no-op (no methods added) for Arrow 9.0.0+.
if not _arrow_extension_scalars_are_subclassable():
# NOTE: These __iter__ and to_pylist definitions are shared for both
# Arrow < 8.0.0 and Arrow 8.*.
def __iter__(self):
# Override pa.Array.__iter__() in order to return an iterator of
# properly shaped tensors instead of an iterator of flattened tensors.
# See comment in above __getitem__ method.
for i in range(len(self)):
# Use overridden __getitem__ method.
yield self.__getitem__(i)
def to_pylist(self):
# Override pa.Array.to_pylist() due to a lack of ExtensionScalar
# support (see comment in __getitem__).
return list(self)
if _arrow_supports_extension_scalars():
# NOTE(Clark): This __getitem__ override is only needed for Arrow 8.*,
# before ExtensionScalar subclassing support was added.
# TODO(Clark): Remove these methods once we only support Arrow 9.0.0+.
def __getitem__(self, key):
# This __getitem__ hook allows us to support proper indexing when
# accessing a single tensor (a "scalar" item of the array). Without this
# hook for integer keys, the indexing will fail on pyarrow < 9.0.0 due
# to a lack of ExtensionScalar subclassing support.
# NOTE(Clark): We'd like to override the pa.Array.getitem() helper
# instead, which would obviate the need for overriding __iter__(), but
# unfortunately overriding Cython cdef methods with normal Python
# methods isn't allowed.
item = super().__getitem__(key)
if not isinstance(key, slice):
item = item.type._extension_scalar_to_ndarray(item)
return item
else:
# NOTE(Clark): This __getitem__ override is only needed for Arrow < 8.0.0,
# before any ExtensionScalar support was added.
# TODO(Clark): Remove these methods once we only support Arrow 8.0.0+.
def __getitem__(self, key):
# This __getitem__ hook allows us to support proper indexing when
# accessing a single tensor (a "scalar" item of the array). Without this
# hook for integer keys, the indexing will fail on pyarrow < 8.0.0 due
# to a lack of ExtensionScalar support.
# NOTE(Clark): We'd like to override the pa.Array.getitem() helper
# instead, which would obviate the need for overriding __iter__(), but
# unfortunately overriding Cython cdef methods with normal Python
# methods isn't allowed.
if isinstance(key, slice):
return super().__getitem__(key)
return self._to_numpy(key)
# NOTE: We need to inherit from the mixin before pa.ExtensionArray to ensure that the
# mixin's overriding methods appear first in the MRO.
# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+.
[docs]@PublicAPI(stability="beta")
class ArrowTensorArray(_ArrowTensorScalarIndexingMixin, pa.ExtensionArray):
"""
An array of fixed-shape, homogeneous-typed tensors.
This is the Arrow side of TensorArray.
See Arrow docs for customizing extension arrays:
https://arrow.apache.org/docs/python/extending_types.html#custom-extension-array-class
"""
OFFSET_DTYPE = np.int32
[docs] @classmethod
def from_numpy(
cls, arr: Union[np.ndarray, Iterable[np.ndarray]]
) -> Union["ArrowTensorArray", "ArrowVariableShapedTensorArray"]:
"""
Convert an ndarray or an iterable of ndarrays to an array of homogeneous-typed
tensors. If given fixed-shape tensor elements, this will return an
``ArrowTensorArray``; if given variable-shape tensor elements, this will return
an ``ArrowVariableShapedTensorArray``.
Args:
arr: An ndarray or an iterable of ndarrays.
Returns:
- If fixed-shape tensor elements, an ``ArrowTensorArray`` containing
``len(arr)`` tensors of fixed shape.
- If variable-shaped tensor elements, an ``ArrowVariableShapedTensorArray``
containing ``len(arr)`` tensors of variable shape.
- If scalar elements, a ``pyarrow.Array``.
"""
if isinstance(arr, (list, tuple)) and arr and isinstance(arr[0], np.ndarray):
# Stack ndarrays and pass through to ndarray handling logic below.
try:
arr = np.stack(arr, axis=0)
except ValueError:
# ndarray stacking may fail if the arrays are heterogeneously-shaped.
arr = np.array(arr, dtype=object)
if isinstance(arr, np.ndarray):
if len(arr) > 0 and np.isscalar(arr[0]):
# Elements are scalar so a plain Arrow Array will suffice.
return pa.array(arr)
if _is_ndarray_variable_shaped_tensor(arr):
# Tensor elements have variable shape, so we delegate to
# ArrowVariableShapedTensorArray.
return ArrowVariableShapedTensorArray.from_numpy(arr)
if not arr.flags.c_contiguous:
# We only natively support C-contiguous ndarrays.
arr = np.ascontiguousarray(arr)
pa_dtype = pa.from_numpy_dtype(arr.dtype)
if pa.types.is_string(pa_dtype):
if arr.dtype.byteorder == ">" or (
arr.dtype.byteorder == "=" and sys.byteorder == "big"
):
raise ValueError(
"Only little-endian string tensors are supported, "
f"but got: {arr.dtype}",
)
pa_dtype = pa.binary(arr.dtype.itemsize)
outer_len = arr.shape[0]
element_shape = arr.shape[1:]
total_num_items = arr.size
num_items_per_element = np.prod(element_shape) if element_shape else 1
# Data buffer.
if pa.types.is_boolean(pa_dtype):
# NumPy doesn't represent boolean arrays as bit-packed, so we manually
# bit-pack the booleans before handing the buffer off to Arrow.
# NOTE: Arrow expects LSB bit-packed ordering.
# NOTE: This creates a copy.
arr = np.packbits(arr, bitorder="little")
data_buffer = pa.py_buffer(arr)
data_array = pa.Array.from_buffers(
pa_dtype, total_num_items, [None, data_buffer]
)
# Offset buffer.
offset_buffer = pa.py_buffer(
cls.OFFSET_DTYPE(
[i * num_items_per_element for i in range(outer_len + 1)]
)
)
storage = pa.Array.from_buffers(
pa.list_(pa_dtype),
outer_len,
[None, offset_buffer],
children=[data_array],
)
type_ = ArrowTensorType(element_shape, pa_dtype)
return pa.ExtensionArray.from_storage(type_, storage)
elif isinstance(arr, Iterable):
return cls.from_numpy(list(arr))
else:
raise ValueError("Must give ndarray or iterable of ndarrays.")
def _to_numpy(self, index: Optional[int] = None, zero_copy_only: bool = False):
"""
Helper for getting either an element of the array of tensors as an
ndarray, or the entire array of tensors as a single ndarray.
Args:
index: The index of the tensor element that we wish to return as
an ndarray. If not given, the entire array of tensors is
returned as an ndarray.
zero_copy_only: If True, an exception will be raised if the
conversion to a NumPy array would require copying the
underlying data (e.g. in presence of nulls, or for
non-primitive types). This argument is currently ignored, so
zero-copy isn't enforced even if this argument is true.
Returns:
The corresponding tensor element as an ndarray if an index was
given, or the entire array of tensors as an ndarray otherwise.
"""
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
# Buffers schema:
# [None, offset_buffer, None, data_buffer]
buffers = self.buffers()
data_buffer = buffers[3]
storage_list_type = self.storage.type
value_type = storage_list_type.value_type
ext_dtype = value_type.to_pandas_dtype()
shape = self.type.shape
if pa.types.is_boolean(value_type):
# Arrow boolean array buffers are bit-packed, with 8 entries per byte,
# and are accessed via bit offsets.
buffer_item_width = value_type.bit_width
else:
# We assume all other array types are accessed via byte array
# offsets.
buffer_item_width = value_type.bit_width // 8
# Number of items per inner ndarray.
num_items_per_element = np.prod(shape) if shape else 1
# Base offset into data buffer, e.g. due to zero-copy slice.
buffer_offset = self.offset * num_items_per_element
# Offset of array data in buffer.
offset = buffer_item_width * buffer_offset
if index is not None:
# Getting a single tensor element of the array.
offset_buffer = buffers[1]
offset_array = np.ndarray(
(len(self),), buffer=offset_buffer, dtype=self.OFFSET_DTYPE
)
# Offset into array to reach logical index.
index_offset = offset_array[index]
# Add the index offset to the base offset.
offset += buffer_item_width * index_offset
else:
# Getting the entire array of tensors.
shape = (len(self),) + shape
if pa.types.is_boolean(value_type):
# Special handling for boolean arrays, since Arrow bit-packs boolean arrays
# while NumPy does not.
# Cast as uint8 array and let NumPy unpack into a boolean view.
# Offset into uint8 array, where each element is a bucket for 8 booleans.
byte_bucket_offset = offset // 8
# Offset for a specific boolean, within a uint8 array element.
bool_offset = offset % 8
# The number of uint8 array elements (buckets) that our slice spans.
# Note that, due to the offset for a specific boolean, the slice can span
# byte boundaries even if it contains less than 8 booleans.
num_boolean_byte_buckets = 1 + ((bool_offset + np.prod(shape) - 1) // 8)
# Construct the uint8 array view on the buffer.
arr = np.ndarray(
(num_boolean_byte_buckets,),
dtype=np.uint8,
buffer=data_buffer,
offset=byte_bucket_offset,
)
# Unpack into a byte per boolean, using LSB bit-packed ordering.
arr = np.unpackbits(arr, bitorder="little")
# Interpret buffer as boolean array.
return np.ndarray(shape, dtype=np.bool_, buffer=arr, offset=bool_offset)
# Special handling of binary/string types. Assumes unicode string tensor columns
if pa.types.is_fixed_size_binary(value_type):
ext_dtype = np.dtype(
f"<U{value_type.byte_width // NUM_BYTES_PER_UNICODE_CHAR}"
)
return np.ndarray(shape, dtype=ext_dtype, buffer=data_buffer, offset=offset)
[docs] def to_numpy(self, zero_copy_only: bool = True):
"""
Convert the entire array of tensors into a single ndarray.
Args:
zero_copy_only: If True, an exception will be raised if the
conversion to a NumPy array would require copying the
underlying data (e.g. in presence of nulls, or for
non-primitive types). This argument is currently ignored, so
zero-copy isn't enforced even if this argument is true.
Returns:
A single ndarray representing the entire array of tensors.
"""
return self._to_numpy(zero_copy_only=zero_copy_only)
@classmethod
def _concat_same_type(
cls,
to_concat: Sequence[
Union["ArrowTensorArray", "ArrowVariableShapedTensorArray"]
],
) -> Union["ArrowTensorArray", "ArrowVariableShapedTensorArray"]:
"""
Concatenate multiple tensor arrays.
If one or more of the tensor arrays in to_concat are variable-shaped and/or any
of the tensor arrays have a different shape than the others, a variable-shaped
tensor array will be returned.
"""
to_concat_types = [arr.type for arr in to_concat]
if ArrowTensorType._need_variable_shaped_tensor_array(to_concat_types):
# Need variable-shaped tensor array.
# TODO(Clark): Eliminate this NumPy roundtrip by directly constructing the
# underlying storage array buffers (NumPy roundtrip will not be zero-copy
# for e.g. boolean arrays).
# NOTE(Clark): Iterating over a tensor extension array converts each element
# to an ndarray view.
return ArrowVariableShapedTensorArray.from_numpy(
[e for a in to_concat for e in a]
)
else:
storage = pa.concat_arrays([c.storage for c in to_concat])
return ArrowTensorArray.from_storage(to_concat[0].type, storage)
@classmethod
def _chunk_tensor_arrays(
cls, arrs: Sequence[Union["ArrowTensorArray", "ArrowVariableShapedTensorArray"]]
) -> pa.ChunkedArray:
"""
Create a ChunkedArray from multiple tensor arrays.
"""
arrs_types = [arr.type for arr in arrs]
if ArrowTensorType._need_variable_shaped_tensor_array(arrs_types):
new_arrs = []
for a in arrs:
if isinstance(a.type, ArrowTensorType):
a = a.to_variable_shaped_tensor_array()
assert isinstance(a.type, ArrowVariableShapedTensorType)
new_arrs.append(a)
arrs = new_arrs
return pa.chunked_array(arrs)
[docs] def to_variable_shaped_tensor_array(self) -> "ArrowVariableShapedTensorArray":
"""
Convert this tensor array to a variable-shaped tensor array.
This is primarily used when concatenating multiple chunked tensor arrays where
at least one chunked array is already variable-shaped and/or the shapes of the
chunked arrays differ, in which case the resulting concatenated tensor array
will need to be in the variable-shaped representation.
"""
# TODO(Clark): Eliminate this NumPy roundtrip by directly constructing the
# underlying storage array buffers (NumPy roundtrip will not be zero-copy for
# e.g. boolean arrays).
return ArrowVariableShapedTensorArray.from_numpy(self.to_numpy())
[docs]@PublicAPI(stability="alpha")
class ArrowVariableShapedTensorType(pa.PyExtensionType):
"""
Arrow ExtensionType for an array of heterogeneous-shaped, homogeneous-typed
tensors.
This is the Arrow side of TensorDtype for tensor elements with different shapes.
Note that this extension only supports non-ragged tensor elements; i.e., when
considering each tensor element in isolation, they must have a well-defined,
non-ragged shape.
See Arrow extension type docs:
https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types
"""
def __init__(self, dtype: pa.DataType, ndim: int):
"""
Construct the Arrow extension type for array of heterogeneous-shaped tensors.
Args:
dtype: pyarrow dtype of tensor elements.
ndim: The number of dimensions in the tensor elements.
"""
self._ndim = ndim
super().__init__(
pa.struct([("data", pa.list_(dtype)), ("shape", pa.list_(pa.int64()))])
)
[docs] def to_pandas_dtype(self):
"""
Convert Arrow extension type to corresponding Pandas dtype.
Returns:
An instance of pd.api.extensions.ExtensionDtype.
"""
from ray.air.util.tensor_extensions.pandas import TensorDtype
return TensorDtype(
(None,) * self.ndim,
self.storage_type["data"].type.value_type.to_pandas_dtype(),
)
@property
def ndim(self) -> int:
"""Return the number of dimensions in the tensor elements."""
return self._ndim
@property
def scalar_type(self):
"""Returns the type of the underlying tensor elements."""
data_field_index = self.storage_type.get_field_index("data")
return self.storage_type[data_field_index].type.value_type
def __reduce__(self):
return (
ArrowVariableShapedTensorType,
(self.storage_type["data"].type.value_type, self._ndim),
)
def __arrow_ext_class__(self):
"""
ExtensionArray subclass with custom logic for this array of tensors
type.
Returns:
A subclass of pd.api.extensions.ExtensionArray.
"""
return ArrowVariableShapedTensorArray
if _arrow_extension_scalars_are_subclassable():
# TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+.
def __arrow_ext_scalar_class__(self):
"""
ExtensionScalar subclass with custom logic for this array of tensors type.
"""
return ArrowTensorScalar
def __str__(self) -> str:
dtype = self.storage_type["data"].type.value_type
return f"ArrowVariableShapedTensorType(dtype={dtype}, ndim={self.ndim})"
def __repr__(self) -> str:
return str(self)
if _arrow_supports_extension_scalars():
# TODO(Clark): Remove this version guard once we only support Arrow 8.0.0+.
def _extension_scalar_to_ndarray(
self, scalar: pa.ExtensionScalar
) -> np.ndarray:
"""
Convert an ExtensionScalar to a tensor element.
"""
data = scalar.value.get("data")
raw_values = data.values
shape = tuple(scalar.value.get("shape").as_py())
value_type = raw_values.type
offset = raw_values.offset
data_buffer = raw_values.buffers()[1]
return _to_ndarray_helper(shape, value_type, offset, data_buffer)
# NOTE: We need to inherit from the mixin before pa.ExtensionArray to ensure that the
# mixin's overriding methods appear first in the MRO.
# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+.
[docs]@PublicAPI(stability="alpha")
class ArrowVariableShapedTensorArray(
_ArrowTensorScalarIndexingMixin, pa.ExtensionArray
):
"""
An array of heterogeneous-shaped, homogeneous-typed tensors.
This is the Arrow side of TensorArray for tensor elements that have differing
shapes. Note that this extension only supports non-ragged tensor elements; i.e.,
when considering each tensor element in isolation, they must have a well-defined
shape. This extension also only supports tensor elements that all have the same
number of dimensions.
See Arrow docs for customizing extension arrays:
https://arrow.apache.org/docs/python/extending_types.html#custom-extension-array-class
"""
OFFSET_DTYPE = np.int32
[docs] @classmethod
def from_numpy(
cls, arr: Union[np.ndarray, List[np.ndarray], Tuple[np.ndarray]]
) -> "ArrowVariableShapedTensorArray":
"""
Convert an ndarray or an iterable of heterogeneous-shaped ndarrays to an array
of heterogeneous-shaped, homogeneous-typed tensors.
Args:
arr: An ndarray or an iterable of heterogeneous-shaped ndarrays.
Returns:
An ArrowVariableShapedTensorArray containing len(arr) tensors of
heterogeneous shape.
"""
# Implementation note - Arrow representation of ragged tensors:
#
# We represent an array of ragged tensors using a struct array containing two
# fields:
# - data: a variable-sized list array, where each element in the array is a
# tensor element stored in a 1D (raveled) variable-sized list of the
# underlying scalar data type.
# - shape: a variable-sized list array containing the shapes of each tensor
# element.
if isinstance(arr, Iterable):
arr = list(arr)
elif not isinstance(arr, (list, tuple)):
raise ValueError(
"ArrowVariableShapedTensorArray can only be constructed from an "
f"ndarray or a list/tuple of ndarrays, but got: {type(arr)}"
)
if len(arr) == 0:
# Empty ragged tensor arrays are not supported.
raise ValueError("Creating empty ragged tensor arrays is not supported.")
# Whether all subndarrays are contiguous views of the same ndarray.
shapes, sizes, raveled = [], [], []
ndim = None
for a in arr:
a = np.asarray(a)
if ndim is not None and a.ndim != ndim:
raise ValueError(
"ArrowVariableShapedTensorArray only supports tensor elements that "
"all have the same number of dimensions, but got tensor elements "
f"with dimensions: {ndim}, {a.ndim}"
)
ndim = a.ndim
shapes.append(a.shape)
sizes.append(a.size)
# Convert to 1D array view; this should be zero-copy in the common case.
# NOTE: If array is not in C-contiguous order, this will convert it to
# C-contiguous order, incurring a copy.
a = np.ravel(a, order="C")
raveled.append(a)
# Get size offsets and total size.
sizes = np.array(sizes)
size_offsets = np.cumsum(sizes)
total_size = size_offsets[-1]
# Concatenate 1D views into a contiguous 1D array.
if all(_is_contiguous_view(curr, prev) for prev, curr in _pairwise(raveled)):
# An optimized zero-copy path if raveled tensor elements are already
# contiguous in memory, e.g. if this tensor array has already done a
# roundtrip through our Arrow representation.
np_data_buffer = raveled[-1].base
else:
np_data_buffer = np.concatenate(raveled)
dtype = np_data_buffer.dtype
if dtype.type is np.object_:
types_and_shapes = [(f"dtype={a.dtype}", f"shape={a.shape}") for a in arr]
raise ValueError(
"ArrowVariableShapedTensorArray only supports heterogeneous-shaped "
"tensor collections, not arbitrarily nested ragged tensors. Got "
f"arrays: {types_and_shapes}"
)
pa_dtype = pa.from_numpy_dtype(dtype)
if pa.types.is_string(pa_dtype):
if dtype.byteorder == ">" or (
dtype.byteorder == "=" and sys.byteorder == "big"
):
raise ValueError(
"Only little-endian string tensors are supported, "
f"but got: {dtype}"
)
pa_dtype = pa.binary(dtype.itemsize)
if dtype.type is np.bool_:
# NumPy doesn't represent boolean arrays as bit-packed, so we manually
# bit-pack the booleans before handing the buffer off to Arrow.
# NOTE: Arrow expects LSB bit-packed ordering.
# NOTE: This creates a copy.
np_data_buffer = np.packbits(np_data_buffer, bitorder="little")
data_buffer = pa.py_buffer(np_data_buffer)
# Construct underlying data array.
value_array = pa.Array.from_buffers(pa_dtype, total_size, [None, data_buffer])
# Construct array for offsets into the 1D data array, where each offset
# corresponds to a tensor element.
size_offsets = np.insert(size_offsets, 0, 0)
offset_array = pa.array(size_offsets)
data_array = pa.ListArray.from_arrays(offset_array, value_array)
# We store the tensor element shapes so we can reconstruct each tensor when
# converting back to NumPy ndarrays.
shape_array = pa.array(shapes)
# Build storage array containing tensor data and the tensor element shapes.
storage = pa.StructArray.from_arrays(
[data_array, shape_array],
["data", "shape"],
)
type_ = ArrowVariableShapedTensorType(pa_dtype, ndim)
return pa.ExtensionArray.from_storage(type_, storage)
def _to_numpy(self, index: Optional[int] = None, zero_copy_only: bool = False):
"""
Helper for getting either an element of the array of tensors as an ndarray, or
the entire array of tensors as a single ndarray.
Args:
index: The index of the tensor element that we wish to return as an
ndarray. If not given, the entire array of tensors is returned as an
ndarray.
zero_copy_only: If True, an exception will be raised if the conversion to a
NumPy array would require copying the underlying data (e.g. in presence
of nulls, or for non-primitive types). This argument is currently
ignored, so zero-copy isn't enforced even if this argument is true.
Returns:
The corresponding tensor element as an ndarray if an index was given, or
the entire array of tensors as an ndarray otherwise.
"""
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
if index is None:
# Get individual ndarrays for each tensor element.
arrs = [self._to_numpy(i, zero_copy_only) for i in range(len(self))]
# Return ragged NumPy ndarray in the ndarray of ndarray pointers
# representation.
return create_ragged_ndarray(arrs)
data = self.storage.field("data")
shapes = self.storage.field("shape")
shape = shapes[index].as_py()
value_type = data.type.value_type
offset = data.offsets[index].as_py()
data_buffer = data.buffers()[3]
return _to_ndarray_helper(shape, value_type, offset, data_buffer)
[docs] def to_numpy(self, zero_copy_only: bool = True):
"""
Convert the entire array of tensors into a single ndarray.
Args:
zero_copy_only: If True, an exception will be raised if the conversion to a
NumPy array would require copying the underlying data (e.g. in presence
of nulls, or for non-primitive types). This argument is currently
ignored, so zero-copy isn't enforced even if this argument is true.
Returns:
A single ndarray representing the entire array of tensors.
"""
return self._to_numpy(zero_copy_only=zero_copy_only)
def _is_contiguous_view(curr: np.ndarray, prev: Optional[np.ndarray]) -> bool:
"""Check if the provided tensor element is contiguous with the previous tensor
element.
Args:
curr: The tensor element whose contiguity that we wish to check.
prev: The previous tensor element in the tensor array.
Returns:
Whether the provided tensor element is contiguous with the previous tensor
element.
"""
if (
curr.base is None
or not curr.data.c_contiguous
or (prev is not None and curr.base is not prev.base)
):
# curr is either:
# - not a view,
# - not in C-contiguous order,
# - a view that does not share its base with the other subndarrays.
return False
else:
# curr is a C-contiguous view that shares the same base with the seen
# subndarrays, but we need to confirm that it is contiguous with the
# previous subndarray.
if prev is not None and (
_get_buffer_address(curr) - _get_buffer_address(prev)
!= prev.base.dtype.itemsize * prev.size
):
# This view is not contiguous with the previous view.
return False
else:
return True
def _get_buffer_address(arr: np.ndarray) -> int:
"""Get the address of the buffer underlying the provided NumPy ndarray."""
return arr.__array_interface__["data"][0]
def _pairwise(iterable):
# pairwise('ABCDEFG') --> AB BC CD DE EF FG
# Backport of itertools.pairwise for Python < 3.10.
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def _to_ndarray_helper(shape, value_type, offset, data_buffer):
if pa.types.is_boolean(value_type):
# Arrow boolean array buffers are bit-packed, with 8 entries per byte,
# and are accessed via bit offsets.
buffer_item_width = value_type.bit_width
else:
# We assume all other array types are accessed via byte array
# offsets.
buffer_item_width = value_type.bit_width // 8
data_offset = buffer_item_width * offset
if pa.types.is_boolean(value_type):
# Special handling for boolean arrays, since Arrow
# bit-packs boolean arrays while NumPy does not.
# Cast as uint8 array and let NumPy unpack into a boolean view.
# Offset into uint8 array, where each element is
# a bucket for 8 booleans.
byte_bucket_offset = data_offset // 8
# Offset for a specific boolean, within a uint8 array element.
bool_offset = data_offset % 8
# The number of uint8 array elements (buckets) that our slice spans.
# Note that, due to the offset for a specific boolean,
# the slice can span byte boundaries even if it contains
# less than 8 booleans.
num_boolean_byte_buckets = 1 + ((bool_offset + np.prod(shape) - 1) // 8)
# Construct the uint8 array view on the buffer.
arr = np.ndarray(
(num_boolean_byte_buckets,),
dtype=np.uint8,
buffer=data_buffer,
offset=byte_bucket_offset,
)
# Unpack into a byte per boolean, using LSB bit-packed ordering.
arr = np.unpackbits(arr, bitorder="little")
# Interpret buffer as boolean array.
return np.ndarray(shape, dtype=np.bool_, buffer=arr, offset=bool_offset)
ext_dtype = value_type.to_pandas_dtype()
# Special handling of ragged string tensors
if pa.types.is_fixed_size_binary(value_type):
ext_dtype = np.dtype(f"<U{value_type.byte_width // NUM_BYTES_PER_UNICODE_CHAR}")
return np.ndarray(shape, dtype=ext_dtype, buffer=data_buffer, offset=data_offset)