Accessing Datasets¶
The data underlying a Dataset
can be accessed in several ways:
Retrieving a limited prefix of rows.
Iterating over rows and batches.
Converting into a Torch dataset or a TensorFlow dataset.
Converting into a RandomAccessDataset for random access (experimental).
Retrieving limited set of rows¶
A limited set of rows can be retried from a Dataset
via the
ds.take()
API, along with its sibling helper APIs
ds.take_all()
, for retrieving all rows, and
ds.show()
, for printing a limited set of rows. These
methods are convenient for quickly inspecting a subset (prefix) of rows. They have the
benefit that, if used right after reading, they will only trigger more files to be
read if needed to retrieve rows from that file; if inspecting a small prefix of rows,
often only the first file will need to be read.
import ray
ds = ray.data.range(10000)
print(ds.take(5))
# -> [0, 1, 2, 3, 4]
# Warning: This will print all of the rows!
print(ds.take_all())
ds.show(5)
# -> 0
# 1
# 2
# 3
# 4
Iterating over Datasets¶
Datasets can be consumed a row at a time using the
ds.iter_rows()
API
import ray
ds = ray.data.range(10000)
num_rows = 0
# Consume all rows in the Dataset.
for row in ds.iter_rows():
assert isinstance(row, int)
num_rows += 1
print(num_rows)
# -> 10000
or a batch at a time using the
ds.iter_batches()
API, where you can specify
batch size as well as the desired batch format. By default, the batch format is
"native"
, which means that the batch format that’s native to the data type will be
returned. For tabular data, the native format is a Pandas DataFrame; for Python objects,
it’s a list.
import ray
import pandas as pd
ds = ray.data.range(10000)
num_batches = 0
# Consume all batches in the Dataset.
for batch in ds.iter_batches(batch_size=2):
assert isinstance(batch, list)
num_batches += 1
print(num_batches)
# -> 5000
# Consume data as Pandas DataFrame batches.
cum_sum = 0
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
assert isinstance(batch, pd.DataFrame)
# Simple integer Dataset is converted to a single-column Pandas DataFrame.
cum_sum += batch["value"]
print(cum_sum)
# -> 49995000
Datasets can be passed to Ray tasks or actors and accessed by these iteration methods. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
import ray
@ray.remote
def consume(data: ray.data.Dataset[int]) -> int:
num_batches = 0
# Consume data in 2-record batches.
for batch in data.iter_batches(batch_size=2):
assert len(batch) == 2
num_batches += 1
return num_batches
ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 5000
Converting to Torch dataset¶
For ingestion into one or more Torch trainers, Datasets offers a ds.to_torch()
API that returns a
Torch IterableDataset
that the Torch trainers can consume. This API takes care of both batching and converting
the underlying Datasets data to Torch tensors, building on top of the
ds.iter_batches()
API.
Note
The returned torch.utils.data.IterableDataset
instance should be consumed directly
in your training loop directly; it should not be used with the Torch data loader.
Using Torch’s data loader isn’t necessary because upstream Ray Datasets preprocessing
operations in conjunction with ds.to_torch()
implements the data loader functionality (shuffling, batching, prefetching, etc.). If
you use the Torch data loader with this IterableDataset
, it will perform
inefficient unbatching and rebatching without adding any value.
import ray
import torch
ds = ray.data.range(10000)
torch_ds: torch.utils.data.IterableDataset = ds.to_torch(batch_size=2)
num_batches = 0
for batch, _ in torch_ds:
assert isinstance(batch, torch.Tensor)
assert batch.size(dim=0) == 2
num_batches += 1
print(num_batches)
# -> 5000
When performing supervised learning, we’ll have both feature columns and a label column
that we may want to split into separate tensors. By informing ds.to_torch()
of the
label column, it will yield (features, label)
tensor pairs for each batch.
Note
We set unsqueeze_label_tensor=False
in order to remove a redundant unit column
dimension. E.g., with batch_size=2
and unsqueeze_label_tensor=True
, you would
get (2, 1)
-shaped label tensor batches instead of the desired (2,)
shape.
import ray
import torch
import pandas as pd
df = pd.DataFrame({
"feature1": list(range(4)),
"feature2": [2 * i for i in range(4)],
"label": [True, False, True, False],
})
ds = ray.data.from_pandas(df)
# Specify the label column; all other columns will be treated as feature columns and
# will be concatenated into the same Torch tensor.
# We set unsqueeze_label_tensor=False in order to remove a redundant unit column
# dimension.
torch_ds: torch.utils.data.IterableDataset = ds.to_torch(
label_column="label",
batch_size=2,
unsqueeze_label_tensor=False,
)
num_batches = 0
for feature, label in torch_ds:
assert isinstance(feature, torch.Tensor)
assert isinstance(label, torch.Tensor)
# Batch dimension.
assert feature.size(dim=0) == 2
# Column dimension.
assert feature.size(dim=1) == 2
# Batch dimension.
assert label.size(dim=0) == 2
num_batches += 1
print(num_batches)
# -> 2
The types of the label and feature columns will be inferred from the data by default;
these can be overridden with the label_column_dtype
and feature_column_dtypes
args.
By default, all feature columns will be concatenated into a single tensor; however,
depending on the structure of the feature_columns
argument, you can also get feature
column batches as a list of tensors or a dict of tensors (with one or more column in
each tensor). See the .to_torch() API docs
for
details.
Note
If we have tensor feature columns (where each item in the column is an multi-dimensional
tensor) and any of the feature columns are different shapes, these columns are
incompatible and we will not be able to stack the column tensors into a single tensor.
Instead, we will need to group the columns by compatibility in the feature_columns
argument.
Check out the tensor data feature guide for more information on how to handle this.
Converting to TensorFlow dataset¶
For ingestion into one or more TensorFlow trainers, Datasets offers a ds.to_tf()
API that returns a
tf.data.Dataset
that the TensorFlow trainers can consume. This API takes care of both batching and converting
the underlying Datasets data to TensorFlow tensors, building on top of the
ds.iter_batches()
API.
import ray
import tensorflow as tf
ds = ray.data.range(10000)
tf_ds: tf.data.Dataset = ds.to_tf(
batch_size=2,
output_signature=tf.TensorSpec(shape=(None, 1), dtype=tf.int64),
)
num_batches = 0
for batch in tf_ds:
assert isinstance(batch, tf.Tensor)
assert batch.shape[0] == 2, batch.shape
num_batches += 1
print(num_batches)
# -> 5000
When performing supervised learning, we’ll have both feature columns and a label column
that we may want to split into separate tensors. By informing ds.to_tf()
of the
label column, it will yield (features, label)
tensor pairs for each batch.
import ray
import tensorflow as tf
import pandas as pd
df = pd.DataFrame({
"feature1": list(range(4)),
"feature2": [2 * i for i in range(4)],
"label": [True, False, True, False],
})
ds = ray.data.from_pandas(df)
# Specify the label column; all other columns will be treated as feature columns and
# will be concatenated into the same TensorFlow tensor.
tf_ds: tf.data.Dataset = ds.to_tf(
label_column="label",
batch_size=2,
output_signature=(
tf.TensorSpec(shape=(None, 2), dtype=tf.int64),
tf.TensorSpec(shape=(None,), dtype=tf.int64),
),
)
num_batches = 0
for feature, label in tf_ds:
assert isinstance(feature, tf.Tensor)
assert isinstance(label, tf.Tensor)
# Batch dimension.
assert feature.shape[0] == 2
# Column dimension.
assert feature.shape[1] == 2
# Batch dimension.
assert label.shape[0] == 2
num_batches += 1
print(num_batches)
# -> 2
The types of the label and feature columns will be inferred from the data by default;
these can be overridden with the label_column_dtype
and feature_column_dtypes
args.
By default, all feature columns will be concatenated into a single tensor; however,
depending on the structure of the feature_columns
argument, you can also get feature
column batches as a list of tensors or a dict of tensors (with one or more column in
each tensor). See the .to_tf() API docs
for
details.
Note
If we have tensor feature columns (where each item in the column is an multi-dimensional
tensor) and any of the feature columns are different shapes, these columns are
incompatible and we will not be able to stack the column tensors into a single tensor.
Instead, we will need to group the columns by compatibility in the feature_columns
argument.
Check out the tensor data feature guide for more information on how to handle this.
Splitting Into and Consuming Shards¶
Datasets can be split up into disjoint sub-datasets, or shards.
Locality-aware splitting is supported if you pass in a list of actor handles to the
ds.split()
function along with the number of desired splits.
This is a common pattern useful for loading and sharding data between distributed training actors:
Note
If using Ray Train for distributed training, you do not need to split the dataset; Ray Train will automatically do locality-aware splitting into per-trainer shards for you!
# @ray.remote(num_gpus=1) # Uncomment this to run on GPUs.
@ray.remote
class Worker:
def __init__(self, rank: int):
pass
def train(self, shard: ray.data.Dataset[int]) -> int:
for batch in shard.to_torch(batch_size=256):
pass
return shard.count()
workers = [Worker.remote(i) for i in range(4)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
shards = ds.split(n=4, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=2500, schema=<class 'int'>),
# Dataset(num_blocks=13, num_rows=2500, schema=<class 'int'>), ...]
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [2500, 2500, 2500, 2500]
Random Access Datasets (Experimental)¶
Datasets can be converted to a format that supports efficient random access with
ds.to_random_access_dataset() API
,
which partitions the dataset on a sort key and provides random access via distributed
binary search.
See the random access feature guide for more information.