Pattern: Using resources to limit the number of concurrently running tasks

In this pattern, we use resources to limit the number of concurrently running tasks.

By default, Ray tasks require 1 CPU each and Ray actors require 0 CPU each, so the scheduler limits task concurrency to the available CPUs and actor concurrency to infinite. Tasks that use more than 1 CPU (e.g., via mutlithreading) may experience slowdown due to interference from concurrent ones, but otherwise are safe to run.

However, tasks or actors that use more than their proportionate share of memory may overload a node and cause issues like OOM. If that is the case, we can reduce the number of concurrently running tasks or actors on each node by increasing the amount of resources requested by them. This works because Ray makes sure that the sum of the resource requirements of all of the concurrently running tasks and actors on a given node does not exceed the node’s total resources.

Note

For actor tasks, the number of running actors limits the number of concurrently running actor tasks we can have.

Example use case

You have a data processing workload that processes each input file independently using Ray remote functions. Since each task needs to load the input data into heap memory and do the processing, running too many of them can cause OOM. In this case, you can use the memory resource to limit the number of concurrently running tasks (usage of other resources like num_cpus can achieve the same goal as well). Note that similar to num_cpus, the memory resource requirement is logical, meaning that Ray will not enforce the physical memory usage of each task if it exceeds this amount.

Code example

Without limit:

import ray

# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()


@ray.remote
def process(file):
    # Actual work is reading the file and process the data.
    # Assume it needs to use 2G memory.
    pass


NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
    # By default, process task will use 1 CPU resource and no other resources.
    # This means 16 tasks can run concurrently
    # and will OOM since 32G memory is needed while the node only has 16G.
    result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)

With limit:

result_refs = []
for i in range(NUM_FILES):
    # Now each task will use 2G memory resource
    # and the number of concurrently running tasks is limited to 8.
    # In this case, setting num_cpus to 2 has the same effect.
    result_refs.append(
        process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
    )
ray.get(result_refs)