Anti-pattern: Returning ray.put() ObjectRefs from a task harms performance and fault tolerance#

TLDR: Avoid calling ray.put() on task return values and returning the resulting ObjectRefs. Instead, return these values directly if possible.

Returning ray.put() ObjectRefs are considered anti-patterns for the following reasons:

  • It disallows inlining small return values: Ray has a performance optimization to return small (<= 100KB) values inline directly to the caller, avoiding going through the distributed object store. On the other hand, ray.put() will unconditionally store the value to the object store which makes the optimization for small return values impossible.

  • Returning ObjectRefs involves extra distributed reference counting protocol which is slower than returning the values directly.

  • It’s less fault tolerant: the worker process that calls ray.put() is the “owner” of the returned ObjectRef and the return value fate shares with the owner. If the worker process dies, the return value is lost. In contrast, the caller process (often the driver) is the owner of the return value if it’s returned directly.

Code example#

If you want to return a single value regardless if it’s small or large, you should return it directly.

import ray
import numpy as np


@ray.remote
def task_with_single_small_return_value_bad():
    small_return_value = 1
    # The value will be stored in the object store
    # and the reference is returned to the caller.
    small_return_value_ref = ray.put(small_return_value)
    return small_return_value_ref


@ray.remote
def task_with_single_small_return_value_good():
    small_return_value = 1
    # Ray will return the value inline to the caller
    # which is faster than the previous approach.
    return small_return_value


assert ray.get(ray.get(task_with_single_small_return_value_bad.remote())) == ray.get(
    task_with_single_small_return_value_good.remote()
)


@ray.remote
def task_with_single_large_return_value_bad():
    large_return_value = np.zeros(10 * 1024 * 1024)
    large_return_value_ref = ray.put(large_return_value)
    return large_return_value_ref


@ray.remote
def task_with_single_large_return_value_good():
    # Both approaches will store the large array to the object store
    # but this is better since it's faster and more fault tolerant.
    large_return_value = np.zeros(10 * 1024 * 1024)
    return large_return_value


assert np.array_equal(
    ray.get(ray.get(task_with_single_large_return_value_bad.remote())),
    ray.get(task_with_single_large_return_value_good.remote()),
)


# Same thing applies for actor tasks as well.
@ray.remote
class Actor:
    def task_with_single_return_value_bad(self):
        single_return_value = np.zeros(9 * 1024 * 1024)
        return ray.put(single_return_value)

    def task_with_single_return_value_good(self):
        return np.zeros(9 * 1024 * 1024)


actor = Actor.remote()
assert np.array_equal(
    ray.get(ray.get(actor.task_with_single_return_value_bad.remote())),
    ray.get(actor.task_with_single_return_value_good.remote()),
)

If you want to return multiple values and you know the number of returns before calling the task, you should use the num_returns option.

# This will return a single object
# which is a tuple of two ObjectRefs to the actual values.
@ray.remote(num_returns=1)
def task_with_static_multiple_returns_bad1():
    return_value_1_ref = ray.put(1)
    return_value_2_ref = ray.put(2)
    return (return_value_1_ref, return_value_2_ref)


# This will return two objects each of which is an ObjectRef to the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_bad2():
    return_value_1_ref = ray.put(1)
    return_value_2_ref = ray.put(2)
    return (return_value_1_ref, return_value_2_ref)


# This will return two objects each of which is the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_good():
    return_value_1 = 1
    return_value_2 = 2
    return (return_value_1, return_value_2)


assert (
    ray.get(ray.get(task_with_static_multiple_returns_bad1.remote())[0])
    == ray.get(ray.get(task_with_static_multiple_returns_bad2.remote()[0]))
    == ray.get(task_with_static_multiple_returns_good.remote()[0])
)


@ray.remote
class Actor:
    @ray.method(num_returns=1)
    def task_with_static_multiple_returns_bad1(self):
        return_value_1_ref = ray.put(1)
        return_value_2_ref = ray.put(2)
        return (return_value_1_ref, return_value_2_ref)

    @ray.method(num_returns=2)
    def task_with_static_multiple_returns_bad2(self):
        return_value_1_ref = ray.put(1)
        return_value_2_ref = ray.put(2)
        return (return_value_1_ref, return_value_2_ref)

    @ray.method(num_returns=2)
    def task_with_static_multiple_returns_good(self):
        # This is faster and more fault tolerant.
        return_value_1 = 1
        return_value_2 = 2
        return (return_value_1, return_value_2)


actor = Actor.remote()
assert (
    ray.get(ray.get(actor.task_with_static_multiple_returns_bad1.remote())[0])
    == ray.get(ray.get(actor.task_with_static_multiple_returns_bad2.remote()[0]))
    == ray.get(actor.task_with_static_multiple_returns_good.remote()[0])
)

If you don’t know the number of returns before calling the task, you should use the dynamic generator pattern if possible.

@ray.remote(num_returns=1)
def task_with_dynamic_returns_bad(n):
    return_value_refs = []
    for i in range(n):
        return_value_refs.append(ray.put(np.zeros(i * 1024 * 1024)))
    return return_value_refs


@ray.remote(num_returns="dynamic")
def task_with_dynamic_returns_good(n):
    for i in range(n):
        yield np.zeros(i * 1024 * 1024)


assert np.array_equal(
    ray.get(ray.get(task_with_dynamic_returns_bad.remote(2))[0]),
    ray.get(next(iter(ray.get(task_with_dynamic_returns_good.remote(2))))),
)