Transforming Datasets

Datasets can be transformed in parallel using .map(). Transformations are executed eagerly and block until the operation is finished. Datasets also supports .filter() and .flat_map().

ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]

ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]

ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]

To take advantage of vectorized functions, use .map_batches(). Note that you can also implement filter and flat_map using .map_batches(), since your map function can return an output batch of any size.

ds = ray.data.range_arrow(10000)
ds = ds.map_batches(
    lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [{'value': 0}, {'value': 2}, ...]

By default, transformations are executed using Ray tasks. For transformations that require setup, specify compute=ray.data.ActorPoolStrategy(min, max) and Ray will use an autoscaling actor pool of min to max actors to execute your transforms. For a fixed-size actor pool, specify ActorPoolStrategy(n, n). The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data:

from ray.data import ActorPoolStrategy

# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
    return image

class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

ds = ray.data.read_binary_files("s3://bucket/image-dir")

# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]

# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(
    BatchInferModel, compute=ActorPoolStrategy(10, 20),
    batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]

# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")