ray.util.as_completed#

ray.util.as_completed(refs: Sequence[ObjectRef], *, chunk_size: int = 10, yield_obj_refs: bool = False, **kwargs) Iterator[Any | ObjectRef][source]#

Given a list of Ray task references, yield results as they become available.

Unlike calling get() on a list of references (i.e., ray.get(refs)) which waits for all results to be ready, this function begins to yield result as soon as a batch of chunk_size results are ready.

Note

Generally there is no guarantee on the order of results. For example, the first result is not necessarily the first one completed, but rather the first one submitted in the first available batch (See wait() for more details about preservation of submission order).

Note

Use this function instead of calling get() inside a for loop. See https://docs.ray.io/en/latest/ray-core/patterns/ray-get-loop.html for more details.

Example

Suppose we have a function that sleeps for x seconds depending on the input. We expect to obtain a partially sorted list of results.

@ray.remote
def f(x):
    time.sleep(x)
    return x

refs = [f.remote(i) for i in [10, 4, 6, 8, 2]]
for x in ray.util.as_completed(refs, chunk_size=2):
    print(x)
# Output:
4
2
6
8
10
Parameters:
  • refs – A list of Ray object refs.

  • chunk_size – The number of tasks to wait for in each iteration (default 10). The parameter is passed as num_returns to wait() internally.

  • yield_obj_refs – If True, return Ray remote refs instead of results (by calling get()).

  • **kwargs – Additional keyword arguments to pass to wait(), e.g., timeout and fetch_local.

Yields:

Union[Any, ObjectRef] – The results (or optionally their Ray references) of the Ray tasks as they complete.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.