ML Tensor Support
Contents
ML Tensor Support#
Tensor (multi-dimensional array) data is ubiquitous in ML workloads. However, popular data formats such as Pandas, Parquet, and Arrow don’t natively support tensor data types. To bridge this gap, Datasets provides a unified tensor data type that can be used to represent, transform, and store tensor data:
For Pandas, Datasets will transparently convert
List[np.ndarray]
columns to and from theTensorDtype
extension type.For Parquet, Datasets has an Arrow extension
ArrowTensorType
that allows tensors to be loaded from and stored in the Parquet format.In addition, single-column tensor datasets can be created from NumPy (.npy) files.
Datasets automatically converts between the extension types/arrays above. This means you can think of a Tensor
as a first-class data type in Datasets.
Creating Tensor Datasets#
This section shows how to create single and multi-column tensor datasets.
Create a synthetic tensor dataset from a range of integers.
Single-column only:
import ray
# Create a Dataset of tensors.
ds = ray.data.range_tensor(10000, shape=(64, 64))
# -> Dataset(num_blocks=200, num_rows=10000,
# schema={__value__: ArrowTensorType(shape=(64, 64), dtype=int64)})
ds.take(2)
# -> [array([[0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# ...,
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0]]),
# array([[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]])]
Create tensor datasets by returning List[np.ndarray]
columns from a Pandas
user-defined function.
Single-column:
import ray
import pandas as pd
import numpy as np
# Start with a tabular base dataset.
ds = ray.data.range_table(1000)
# Create a single TensorArray column.
def single_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
bs = len(batch)
# Lists of ndarrays are automatically cast to TensorArray.
arr = [np.zeros((128, 128, 3)) for _ in range(bs)]
return pd.DataFrame({"__value__": arr})
## Alternatively, manually construct a TensorArray from a single ndarray.
# from ray.data.extensions.tensor_extension import TensorArray
# arr = TensorArray(np.zeros((bs, 128, 128, 3), dtype=np.int64))
# return pd.DataFrame({"__value__": arr})
ds.map_batches(single_col_udf)
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={__value__: TensorDtype(shape=(128, 128, 3), dtype=int64)})
Multi-column:
# Create multiple TensorArray columns.
def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
bs = len(batch)
# Lists of ndarrays are automatically cast to TensorArray.
image = [np.zeros((128, 128, 3), dtype=np.int64) for _ in range(bs)]
embed = [np.zeros((256,), dtype=np.uint8) for _ in range(bs)]
return pd.DataFrame({"image": image, "embed": embed})
## Alternatively, manually construct TensorArrays from ndarray batches.
# image = TensorArray(np.zeros((bs, 128, 128, 3), dtype=np.int64))
# embed = TensorArray(np.zeros((bs, 256,), dtype=np.uint8))
# return pd.DataFrame({"image": image, "embed": embed})
ds.map_batches(multi_col_udf)
ds.materialize()
# -> Dataset(num_blocks=17, num_rows=1000,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=int64),
# embed: TensorDtype(shape=(256,), dtype=uint8)})
Create from in-memory NumPy data or from previously saved NumPy (.npy) files.
Single-column only:
import ray
# From in-memory numpy data.
ray.data.from_numpy(np.zeros((1000, 128, 128, 3), dtype=np.int64))
# -> Dataset(num_blocks=1, num_rows=1000,
# schema={__value__: ArrowTensorType(shape=(128, 128, 3), dtype=int64)})
# From saved numpy files.
ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
There are two ways to construct a Parquet tensor dataset: (1) loading a previously-saved tensor dataset, or (2) casting non-tensor Parquet columns to tensor type. When casting data, a tensor schema or deserialization user-defined function must be provided. The following are examples for each method.
Previously-saved tensor datasets:
import ray
# Reading previously saved Tensor data works out of the box.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
# label: string})
ds.take(1)
# -> [{'image':
# array([[[ 92, 71, 57],
# [107, 87, 72],
# ...,
# [141, 161, 185],
# [139, 158, 184]],
#
# ...,
#
# [[135, 135, 109],
# [135, 135, 108],
# ...,
# [167, 150, 89],
# [165, 146, 90]]], dtype=uint8),
# 'label': 'cat',
# }]
Cast from data stored in C-contiguous format:
For tensors stored as raw NumPy ndarray bytes in C-contiguous order (e.g., via ndarray.tobytes()), all you need to specify is the tensor column schema. The following is an end-to-end example:
import ray
import numpy as np
import pandas as pd
path = "/tmp/some_path"
# Create a DataFrame with a list of serialized ndarrays as a column.
# Note that we do not cast it to a tensor array, so each element in the
# column is an opaque blob of bytes.
arr = np.arange(24).reshape((3, 2, 2, 2))
df = pd.DataFrame({
"one": [1, 2, 3],
"two": [tensor.tobytes() for tensor in arr]})
# Write the dataset to Parquet. The tensor column will be written as an
# array of opaque byte blobs.
ds = ray.data.from_pandas([df])
ds.write_parquet(path)
# Read the Parquet files into a new Dataset, with the serialized tensors
# automatically cast to our tensor column extension type.
ds = ray.data.read_parquet(
path, tensor_column_schema={"two": (np.int_, (2, 2, 2))})
# The new column is represented with as a Tensor extension type.
print(ds.schema())
# -> one: int64
# two: extension<arrow.py_extension_type<ArrowTensorType>>
Cast from data stored in custom formats:
For tensors stored in other formats (e.g., pickled), you can specify a deserializer
user-defined function that returns
TensorArray
columns:
import pickle
import pyarrow as pa
from ray.data.extensions import TensorArray
path = "/tmp/some_path"
# Create a DataFrame with a list of pickled ndarrays as a column.
arr = np.arange(24).reshape((3, 2, 2, 2))
df = pd.DataFrame({
"one": [1, 2, 3],
"two": [pickle.dumps(tensor) for tensor in arr]})
# Write the dataset to Parquet. The tensor column will be written as an
# array of opaque byte blobs.
ds = ray.data.from_pandas([df])
ds.write_parquet(path)
# Manually deserialize the tensor pickle bytes and cast to our tensor
# extension type.
def cast_udf(block: pa.Table) -> pa.Table:
block = block.to_pandas()
block["two"] = TensorArray([pickle.loads(a) for a in block["two"]])
return pa.Table.from_pandas(block)
# Read the Parquet files into a new Dataset, applying the casting UDF
# on-the-fly within the underlying read tasks.
ds = ray.data.read_parquet(path, _block_udf=cast_udf)
# The new column is represented with as a Tensor extension type.
print(ds.schema())
# -> one: int64
# two: extension<arrow.py_extension_type<ArrowTensorType>>
Load image data stored as individual files using read_images()
:
Image and label columns:
ds = ray.data.read_images("example://image-datasets/simple")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(32, 32, 3), dtype=uint8)})
ds.take(1)
# -> [array([[[ 88, 70, 68],
# [103, 88, 85],
# [112, 96, 97],
# ...,
# [168, 151, 81],
# [167, 149, 83],
# [166, 148, 82]]], dtype=uint8)]
Note
By convention, single-column tensor datasets are represented with a single __value__
column.
This kind of dataset will be converted automatically to/from NumPy ndarray format in all transformation and consumption APIs.
Transforming / Consuming Tensor Data#
Like any other Dataset, Datasets with tensor columns can be consumed / transformed in batches via the ds.iter_batches(batch_format=<format>)
and ds.map_batches(fn, batch_format=<format>)
APIs. This section shows the available batch formats and their behavior:
Single-column:
import ray
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
def add_one(batch: np.ndarray) -> np.ndarray:
return batch + 1
# This processes batches in numpy.ndarray format.
ds = ds.map_batches(add_one)
# This returns batches in numpy.ndarray format.
next(ds.iter_batches())
# -> array([[[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]],
#
# ...,
#
# [[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]]], dtype=uint8)
Multi-column:
import ray
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
# label: string})
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
batch["image"] += 1
return batch
# This processes batches in pd.DataFrame format.
ds = ds.map_batches(add_one)
# This returns pandas batches with List[np.ndarray] columns.
next(ds.iter_batches())
# -> image label
# 0 [[[ 96, 76, 61], [ 92, 72, 57], [ 92, 72,... cat
# 1 [[[ 38, 38, 39], [ 39, 39, 40], [ 39, 39,... cat
# 2 [[[ 47, 39, 33], [ 43, 36, 29], [ 43, 36,... dog
Single-column:
import ray
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
batch["__value__"] += 1
return batch
# This processes batches in pd.DataFrame format.
ds = ds.map_batches(add_one, batch_format="pandas")
# This returns pandas batches with List[np.ndarray] columns.
next(ds.iter_batches(batch_format="pandas"))
# -> __value__
# 0 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
# 1 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
# 2 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
Multi-column:
import ray
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
# label: string})
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
batch["image"] += 1
return batch
# This processes batches in pd.DataFrame format.
ds = ds.map_batches(add_one, batch_format="pandas")
# This returns pandas batches with List[np.ndarray] columns.
next(ds.iter_batches(batch_format="pandas"))
# -> image label
# 0 [[[ 96, 76, 61], [ 92, 72, 57], [ 92, 72,... cat
# 1 [[[ 38, 38, 39], [ 39, 39, 40], [ 39, 39,... cat
# 2 [[[ 47, 39, 33], [ 43, 36, 29], [ 43, 36,... dog
Single-column:
import ray
from ray.data.extensions.tensor_extension import ArrowTensorArray
import pyarrow
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
def add_one(batch: pyarrow.Table) -> pyarrow.Table:
np_col = np.array(
[
np.ndarray((28, 28), buffer=buf, dtype=np.uint8)
for buf in batch.column("__value__")
]
)
np_col += 1
return batch.set_column(
batch._ensure_integer_index("__value__"),
"__value__",
ArrowTensorArray.from_numpy(np_col),
)
# This processes batches in pyarrow.Table format.
ds = ds.map_batches(add_one, batch_format="pyarrow")
# This returns batches in pyarrow.Table format.
next(ds.iter_batches(batch_format="pyarrow"))
# pyarrow.Table
# __value__: extension<arrow.py_extension_type<ArrowTensorType>>
# ----
# __value__: [[[1,1,1,1,1,1,1,1,1,1,...],...,[1,1,1,1,1,1,1,1,1,1,...]]]
Multi-column:
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
def add_one(batch: pyarrow.Table) -> pyarrow.Table:
np_col = np.array(
[
np.ndarray((128, 128, 3), buffer=buf, dtype=np.uint8)
for buf in batch.column("image")
]
)
np_col += 1
return batch.set_column(
batch._ensure_integer_index("image"),
"image",
ArrowTensorArray.from_numpy(np_col),
)
# This processes batches in pyarrow.Table format.
ds = ds.map_batches(add_one, batch_format="pyarrow")
# This returns batches in pyarrow.Table format.
next(ds.iter_batches(batch_format="pyarrow"))
# pyarrow.Table
# image: extension<arrow.py_extension_type<ArrowTensorType>>
# label: string
# ----
# image: [[[92,71,57,107,87,72,113,97,85,122,...,85,170,152,88,167,150,89,165,146,90]]]
# label: [["cat"]]
Single-column:
import ray
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
def add_one(batch: np.ndarray) -> np.ndarray:
batch += 1
return batch
# This processes batches in np.ndarray format.
ds = ds.map_batches(add_one, batch_format="numpy")
# This returns batches in np.ndarray format.
next(ds.iter_batches(batch_format="numpy"))
# -> array([[[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]],
#
# ...,
#
# [[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]]], dtype=uint8)
Multi-column:
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
assert isinstance(batch, dict)
batch["image"] += 1
return batch
# This processes batches in np.ndarray format.
ds = ds.map_batches(add_one, batch_format="numpy")
# This returns batches in Dict[str, np.ndarray] format.
next(ds.iter_batches(batch_format="numpy"))
# -> {'image': array([[[[ 92, 71, 57],
# [107, 87, 72],
# ...,
# [141, 161, 185],
# [139, 158, 184]],
#
# ...,
#
# [[135, 135, 109],
# [135, 135, 108],
# ...,
# [167, 150, 89],
# [165, 146, 90]]]], dtype=uint8),
# 'label': array(['cat'], dtype=object)}
Saving Tensor Datasets#
Because tensor datasets rely on Datasets-specific extension types, they can only be saved in formats that preserve Arrow metadata (currently only Parquet). In addition, single-column tensor datasets can be saved in NumPy format.
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
# You can write the dataset to Parquet.
ds.write_parquet("/tmp/some_path")
# And you can read it back.
read_ds = ray.data.read_parquet("/tmp/some_path")
print(read_ds.schema())
# -> image: extension<arrow.py_extension_type<ArrowTensorType>>
# label: string
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
# You can write the dataset to Parquet.
ds.write_numpy("/tmp/some_path")
# And you can read it back.
read_ds = ray.data.read_numpy("/tmp/some_path")
print(read_ds.schema())
# -> __value__: extension<arrow.py_extension_type<ArrowTensorType>>
Ragged Tensor Support#
Ragged tensors, i.e. tensors with non-uniform dimensions, pop up in NLP (textual sentences/documents of different lengths, N-grams), computer vision (images of differing resolution, ssd300_vgg16 detection outputs), and audio ML (differing durations). Datasets has basic support for ragged tensors, namely tensors that are a collection (batch) of variably-shaped subtensors, e.g. a batch of images of differing sizes or a batch of sentences of differing lengths.
# Create a Dataset of variable-shaped tensors.
ragged_array = np.array([np.ones((2, 2)), np.ones((3, 3))], dtype=object)
df = pd.DataFrame({"feature": ragged_array, "label": [1, 1]})
ds = ray.data.from_pandas([df, df])
# -> Dataset(num_blocks=2, num_rows=4,
# schema={feature: TensorDtype(shape=(None, None), dtype=float64),
# label: int64})
ds.take(2)
# -> [{'feature': array([[1., 1.],
# [1., 1.]]),
# 'label': 1},
# {'feature': array([[1., 1., 1.],
# [1., 1., 1.],
# [1., 1., 1.]]),
# 'label': 1}]
These variable-shaped tensors can be exchanged with popular training frameworks that support ragged tensors, such as TensorFlow.
# Convert Ray Dataset to a TensorFlow Dataset.
tf_ds = ds.to_tf(
batch_size=2,
feature_columns="feature",
label_columns="label"
)
# Iterate through the tf.RaggedTensors.
for ragged_tensor in tf_ds:
print(ragged_tensor)
# -> (<tf.RaggedTensor [[[1.0, 1.0], [1.0, 1.0]],
# [[1.0, 1.0, 1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]]>,
# <tf.Tensor: shape=(2,), dtype=int64, numpy=array([1, 1])>)
# (<tf.RaggedTensor [[[1.0, 1.0], [1.0, 1.0]],
# [[1.0, 1.0, 1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]]>,
# <tf.Tensor: shape=(2,), dtype=int64, numpy=array([1, 1])>)
Disabling Tensor Extension Casting#
To disable automatic casting of Pandas and Arrow arrays to
TensorArray
, run the code
below.
from ray.data import DatasetContext
ctx = DatasetContext.get_current()
ctx.enable_tensor_extension_casting = False
Limitations#
The following are current limitations of tensor datasets.
Arbitrarily nested/ragged tensors are not supported. Only tensors with all uniform dimensions (i.e. a fully well-defined shape) and tensors representing a collection of variable-shaped tensor elements (e.g. a collection of images with different shapes) are supported; arbitrary raggedness and nested ragged tensors is not supported.