Exchanging DatasetsΒΆ

Datasets can be passed to Ray tasks or actors and read with .iter_batches() or .iter_rows(). This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:

@ray.remote
def consume(data: Dataset[int]) -> int:
    num_batches = 0
    for batch in data.iter_batches():
        num_batches += 1
    return num_batches

ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200

Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the split() function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:

@ray.remote(num_gpus=1)
class Worker:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset[int]) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
#     Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [650, 650, ...]