Iterating over Data#

Ray Data lets you iterate over rows or batches of data.

This guide shows you how to:

Iterating over rows#

To iterate over the rows of your dataset, call Dataset.iter_rows(). Ray Data represents each row as a dictionary.

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for row in ds.iter_rows():
    print(row)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
...
{'sepal length (cm)': 5.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 5.1, 'petal width (cm)': 1.8, 'target': 2}

For more information on working with rows, see Transforming rows and Inspecting rows.

Iterating over batches#

A batch contains data from multiple rows. Iterate over batches of dataset in different formats by calling one of the following methods:

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    print(batch)
{'image': array([[[[...]]]], dtype=uint8)}
...
{'image': array([[[[...]]]], dtype=uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    print(batch)
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                5.1               3.5                1.4               0.2       0
1                4.9               3.0                1.4               0.2       0
...
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                6.2               3.4                5.4               2.3       2
1                5.9               3.0                5.1               1.8       2
import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_torch_batches(batch_size=2):
    print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

tf_dataset = ds.to_tf(
    feature_columns="sepal length (cm)",
    label_columns="target",
    batch_size=2
)
for features, labels in tf_dataset:
    print(features, labels)
tf.Tensor([5.1 4.9], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64)
...
tf.Tensor([6.2 5.9], shape=(2,), dtype=float64) tf.Tensor([2 2], shape=(2,), dtype=int64)

For more information on working with batches, see Transforming batches and Inspecting batches.

Iterating over batches with shuffling#

Dataset.random_shuffle is slow because it shuffles all rows. If a full global shuffle isn’t required, you can shuffle a subset of rows up to a provided buffer size during iteration by specifying local_shuffle_buffer_size. While this isn’t a true global shuffle like random_shuffle, it’s more performant because it doesn’t require excessive data movement. For more details about these options, see Shuffling Data.

Tip

To configure local_shuffle_buffer_size, choose the smallest value that achieves sufficient randomness. Higher values result in more randomness at the cost of slower iteration. See Local shuffle when iterating over batches on how to diagnose slowdowns.

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="numpy",
    local_shuffle_buffer_size=250,
):
    print(batch)
{'image': array([[[[...]]]], dtype=uint8)}
...
{'image': array([[[[...]]]], dtype=uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="pandas",
    local_shuffle_buffer_size=250,
):
    print(batch)
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                6.3               2.9                5.6               1.8       2
1                5.7               4.4                1.5               0.4       0
...
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                5.6               2.7                4.2               1.3       1
1                4.8               3.0                1.4               0.1       0
import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(
    batch_size=2,
    local_shuffle_buffer_size=250,
):
    print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

tf_dataset = ds.to_tf(
    feature_columns="sepal length (cm)",
    label_columns="target",
    batch_size=2,
    local_shuffle_buffer_size=250,
)
for features, labels in tf_dataset:
    print(features, labels)
tf.Tensor([5.2 6.3], shape=(2,), dtype=float64) tf.Tensor([1 2], shape=(2,), dtype=int64)
...
tf.Tensor([5.  5.8], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64)

Splitting datasets for distributed parallel training#

If you’re performing distributed data parallel training, call Dataset.streaming_split to split your dataset into disjoint shards.

Note

If you’re using Ray Train, you don’t need to split the dataset. Ray Train automatically splits your dataset for you. To learn more, see Data Loading for ML Training guide.

import ray

@ray.remote
class Worker:

    def train(self, data_iterator):
        for batch in data_iterator.iter_batches(batch_size=8):
            pass

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
workers = [Worker.remote() for _ in range(4)]
shards = ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])