Datasets can be passed to Ray tasks or actors and read with
This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
@ray.remote def consume(data: Dataset[int]) -> int: num_batches = 0 for batch in data.iter_batches(): num_batches += 1 return num_batches ds = ray.data.range(10000) ray.get(consume.remote(ds)) # -> 200
Datasets can be split up into disjoint sub-datasets.
Locality-aware splitting is supported if you pass in a list of actor handles to the
split() function along with the number of desired splits.
This is a common pattern useful for loading and splitting data between distributed training actors:
@ray.remote(num_gpus=1) class Worker: def __init__(self, rank: int): pass def train(self, shard: ray.data.Dataset[int]) -> int: for batch in shard.iter_batches(batch_size=256): pass return shard.count() workers = [Worker.remote(i) for i in range(16)] # -> [Actor(Worker, ...), Actor(Worker, ...), ...] ds = ray.data.range(10000) # -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>) shards = ds.split(n=16, locality_hints=workers) # -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), # Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...] ray.get([w.train.remote(s) for w, s in zip(workers, shards)]) # -> [650, 650, ...]