Accessing Datasets

The data underlying a Dataset can be accessed in several ways:

  • Retrieving a limited prefix of rows.

  • Iterating over rows and batches.

  • Converting into a Torch dataset or a TensorFlow dataset.

  • Converting into a RandomAccessDataset for random access (experimental).

Retrieving limited set of rows

A limited set of rows can be retried from a Dataset via the ds.take() API, along with its sibling helper APIs ds.take_all(), for retrieving all rows, and ds.show(), for printing a limited set of rows. These methods are convenient for quickly inspecting a subset (prefix) of rows. They have the benefit that, if used right after reading, they will only trigger more files to be read if needed to retrieve rows from that file; if inspecting a small prefix of rows, often only the first file will need to be read.

import ray

ds = ray.data.range(10000)

print(ds.take(5))
# -> [0, 1, 2, 3, 4]

# Warning: This will print all of the rows!
print(ds.take_all())

ds.show(5)
# -> 0
#    1
#    2
#    3
#    4

Iterating over Datasets

Datasets can be consumed a row at a time using the ds.iter_rows() API

import ray

ds = ray.data.range(10000)
num_rows = 0

# Consume all rows in the Dataset.
for row in ds.iter_rows():
    assert isinstance(row, int)
    num_rows += 1

print(num_rows)
# -> 10000

or a batch at a time using the ds.iter_batches() API, where you can specify batch size as well as the desired batch format. By default, the batch format is "native", which means that the batch format that’s native to the data type will be returned. For tabular data, the native format is a Pandas DataFrame; for Python objects, it’s a list.

import ray
import pandas as pd

ds = ray.data.range(10000)
num_batches = 0

# Consume all batches in the Dataset.
for batch in ds.iter_batches(batch_size=2):
    assert isinstance(batch, list)
    num_batches += 1

print(num_batches)
# -> 5000

# Consume data as Pandas DataFrame batches.
cum_sum = 0
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    assert isinstance(batch, pd.DataFrame)
    # Simple integer Dataset is converted to a single-column Pandas DataFrame.
    cum_sum += batch["value"]
print(cum_sum)
# -> 49995000

Datasets can be passed to Ray tasks or actors and accessed by these iteration methods. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:

import ray

@ray.remote
def consume(data: ray.data.Dataset[int]) -> int:
    num_batches = 0
    # Consume data in 2-record batches.
    for batch in data.iter_batches(batch_size=2):
        assert len(batch) == 2
        num_batches += 1
    return num_batches

ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 5000

Converting to Torch dataset

For ingestion into one or more Torch trainers, Datasets offers a ds.to_torch() API that returns a Torch IterableDataset that the Torch trainers can consume. This API takes care of both batching and converting the underlying Datasets data to Torch tensors, building on top of the ds.iter_batches() API.

Note

The returned torch.utils.data.IterableDataset instance should be consumed directly in your training loop directly; it should not be used with the Torch data loader. Using Torch’s data loader isn’t necessary because upstream Ray Datasets preprocessing operations in conjunction with ds.to_torch() implements the data loader functionality (shuffling, batching, prefetching, etc.). If you use the Torch data loader with this IterableDataset, it will perform inefficient unbatching and rebatching without adding any value.

import ray
import torch

ds = ray.data.range(10000)

torch_ds: torch.utils.data.IterableDataset = ds.to_torch(batch_size=2)

num_batches = 0
for batch, _ in torch_ds:
    assert isinstance(batch, torch.Tensor)
    assert batch.size(dim=0) == 2
    num_batches += 1

print(num_batches)
# -> 5000

When performing supervised learning, we’ll have both feature columns and a label column that we may want to split into separate tensors. By informing ds.to_torch() of the label column, it will yield (features, label) tensor pairs for each batch.

Note

We set unsqueeze_label_tensor=False in order to remove a redundant unit column dimension. E.g., with batch_size=2 and unsqueeze_label_tensor=True, you would get (2, 1)-shaped label tensor batches instead of the desired (2,) shape.

import ray
import torch
import pandas as pd

df = pd.DataFrame({
    "feature1": list(range(4)),
    "feature2": [2 * i for i in range(4)],
    "label": [True, False, True, False],
})
ds = ray.data.from_pandas(df)

# Specify the label column; all other columns will be treated as feature columns and
# will be concatenated into the same Torch tensor.
# We set unsqueeze_label_tensor=False in order to remove a redundant unit column
# dimension.
torch_ds: torch.utils.data.IterableDataset = ds.to_torch(
    label_column="label",
    batch_size=2,
    unsqueeze_label_tensor=False,
)

num_batches = 0
for feature, label in torch_ds:
    assert isinstance(feature, torch.Tensor)
    assert isinstance(label, torch.Tensor)
    # Batch dimension.
    assert feature.size(dim=0) == 2
    # Column dimension.
    assert feature.size(dim=1) == 2
    # Batch dimension.
    assert label.size(dim=0) == 2
    num_batches += 1

print(num_batches)
# -> 2

The types of the label and feature columns will be inferred from the data by default; these can be overridden with the label_column_dtype and feature_column_dtypes args.

By default, all feature columns will be concatenated into a single tensor; however, depending on the structure of the feature_columns argument, you can also get feature column batches as a list of tensors or a dict of tensors (with one or more column in each tensor). See the .to_torch() API docs for details.

Note

If we have tensor feature columns (where each item in the column is an multi-dimensional tensor) and any of the feature columns are different shapes, these columns are incompatible and we will not be able to stack the column tensors into a single tensor. Instead, we will need to group the columns by compatibility in the feature_columns argument.

Check out the tensor data feature guide for more information on how to handle this.

Converting to TensorFlow dataset

For ingestion into one or more TensorFlow trainers, Datasets offers a ds.to_tf() API that returns a tf.data.Dataset that the TensorFlow trainers can consume. This API takes care of both batching and converting the underlying Datasets data to TensorFlow tensors, building on top of the ds.iter_batches() API.

import ray
import tensorflow as tf

ds = ray.data.range(10000)

tf_ds: tf.data.Dataset = ds.to_tf(
    batch_size=2,
    output_signature=tf.TensorSpec(shape=(None, 1), dtype=tf.int64),
)

num_batches = 0
for batch in tf_ds:
    assert isinstance(batch, tf.Tensor)
    assert batch.shape[0] == 2, batch.shape
    num_batches += 1

print(num_batches)
# -> 5000

When performing supervised learning, we’ll have both feature columns and a label column that we may want to split into separate tensors. By informing ds.to_tf() of the label column, it will yield (features, label) tensor pairs for each batch.

import ray
import tensorflow as tf
import pandas as pd

df = pd.DataFrame({
    "feature1": list(range(4)),
    "feature2": [2 * i for i in range(4)],
    "label": [True, False, True, False],
})
ds = ray.data.from_pandas(df)

# Specify the label column; all other columns will be treated as feature columns and
# will be concatenated into the same TensorFlow tensor.
tf_ds: tf.data.Dataset = ds.to_tf(
    label_column="label",
    batch_size=2,
    output_signature=(
        tf.TensorSpec(shape=(None, 2), dtype=tf.int64),
        tf.TensorSpec(shape=(None,), dtype=tf.int64),
    ),
)

num_batches = 0
for feature, label in tf_ds:
    assert isinstance(feature, tf.Tensor)
    assert isinstance(label, tf.Tensor)
    # Batch dimension.
    assert feature.shape[0] == 2
    # Column dimension.
    assert feature.shape[1] == 2
    # Batch dimension.
    assert label.shape[0] == 2
    num_batches += 1

print(num_batches)
# -> 2

The types of the label and feature columns will be inferred from the data by default; these can be overridden with the label_column_dtype and feature_column_dtypes args.

By default, all feature columns will be concatenated into a single tensor; however, depending on the structure of the feature_columns argument, you can also get feature column batches as a list of tensors or a dict of tensors (with one or more column in each tensor). See the .to_tf() API docs for details.

Note

If we have tensor feature columns (where each item in the column is an multi-dimensional tensor) and any of the feature columns are different shapes, these columns are incompatible and we will not be able to stack the column tensors into a single tensor. Instead, we will need to group the columns by compatibility in the feature_columns argument.

Check out the tensor data feature guide for more information on how to handle this.

Splitting Into and Consuming Shards

Datasets can be split up into disjoint sub-datasets, or shards. Locality-aware splitting is supported if you pass in a list of actor handles to the ds.split() function along with the number of desired splits. This is a common pattern useful for loading and sharding data between distributed training actors:

Note

If using Ray Train for distributed training, you do not need to split the dataset; Ray Train will automatically do locality-aware splitting into per-trainer shards for you!

# @ray.remote(num_gpus=1)  # Uncomment this to run on GPUs.
@ray.remote
class Worker:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset[int]) -> int:
        for batch in shard.to_torch(batch_size=256):
            pass
        return shard.count()

workers = [Worker.remote(i) for i in range(4)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=4, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=2500, schema=<class 'int'>),
#     Dataset(num_blocks=13, num_rows=2500, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [2500, 2500, 2500, 2500]

Random Access Datasets (Experimental)

Datasets can be converted to a format that supports efficient random access with ds.to_random_access_dataset() API, which partitions the dataset on a sort key and provides random access via distributed binary search.

See the random access feature guide for more information.