Anti-pattern: Fetching too many objects at once with ray.get causes failure

TLDR: Avoid calling ray.get() on too many objects since this will lead to heap out-of-memory or object store out-of-space. Instead fetch and process one batch at a time.

If you have a large number of tasks that you want to run in parallel, trying to do ray.get() on all of them at once could lead to failure with heap out-of-memory or object store out-of-space since Ray needs to fetch all the objects to the caller at the same time. Instead you should get and process the results one batch at a time. Once a batch is processed, Ray will evict objects in that batch to make space for future batches.

../../_images/ray-get-too-many-objects.svg

Fetching too many objects at once with ray.get()

Code example

Anti-pattern:

import ray
import numpy as np

ray.init()


def process_results(results):
    # custom process logic
    pass


@ray.remote
def return_big_object():
    return np.zeros(1024 * 10)


NUM_TASKS = 1000

object_refs = [return_big_object.remote() for _ in range(NUM_TASKS)]
# This will fail with heap out-of-memory
# or object store out-of-space if NUM_TASKS is large enough.
results = ray.get(object_refs)
process_results(results)

Better approach:

BATCH_SIZE = 100

while object_refs:
    # Process results in the finish order instead of the submission order.
    ready_object_refs, object_refs = ray.wait(object_refs, num_returns=BATCH_SIZE)
    # The node only needs enough space to store
    # a batch of objects instead of all objects.
    results = ray.get(ready_object_refs)
    process_results(results)

Here besides getting one batch at a time to avoid failure, we are also using ray.wait() to process results in the finish order instead of the submission order to reduce the runtime. See Anti-pattern: Processing results in submission order using ray.get increases runtime for more details.