ray.util.map_unordered#

ray.util.map_unordered(fn: RemoteFunction, items: Iterable[Any], *, backpressure_size: int | None = 100, chunk_size: int = 10, yield_obj_refs: bool = False, **kwargs) Iterator[Any | ObjectRef][source]#

Apply a Ray remote function to a list of items and return an iterator that yields the completed results as they become available.

This helper function applies backpressure to control the number of pending tasks, following the design pattern described in https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html.

Note

There is generally no guarantee on the order of results.

Example

Suppose we have a function that sleeps for x seconds depending on the input. We expect to obtain a partially sorted list of results.

import ray
import time

@ray.remote
def f(x):
    time.sleep(x)
    return x

# Example 1: chunk_size=2
for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], chunk_size=2):
    print(x)
4
2
6
8
10
# Example 2: backpressure_size=2, chunk_size=1
for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], backpressure_size=2, chunk_size=1):
    print(x)
4
10
6
8
2
Parameters:
  • fn – A remote function to apply to the list of items. For more complex use cases, use Ray Data’s map() / map_batches() instead.

  • items – An iterable of items to apply the function to.

  • backpressure_size – Maximum number of in-flight tasks allowed before calling a blocking wait() (default 100). If None, no backpressure is applied.

  • chunk_size – The number of tasks to wait for when the number of in-flight tasks exceeds backpressure_size. The parameter is passed as num_returns to wait() internally.

  • yield_obj_refs – If True, return Ray remote refs instead of results (by calling get()).

  • **kwargs – Additional keyword arguments to pass to wait(), e.g., timeout and fetch_local.

Yields:

Union[Any, ObjectRef] – The results (or optionally their Ray references) of the Ray tasks as they complete.

See also

as_completed()

Call this method for an existing list of Ray object refs.

map()

Use Ray Data APIs (e.g., map() and map_batches()) for better control and complex use cases, e.g., functions with multiple arguments.

Note

This is an altenative to pool.imap_unordered() in Ray’s Actor-based multiprocessing.Pool. See https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html for more details.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.