from typing import TYPE_CHECKING, Any, Iterable, Iterator, Optional, Sequence, Union
import ray
from ray.util.annotations import PublicAPI
if TYPE_CHECKING:
from ray import ObjectRef
from ray.remote_function import RemoteFunction
# ray.wait() has a default num_returns of 1.
# Using a slightly larger batch until the optimization is fully implemented, see
# https://github.com/ray-project/ray/issues/49905
DEFAULT_CHUNK_SIZE = 10
DEFAULT_BACKPRESSURE_SIZE = 100
def _wait_and_get_single_batch(
refs: "Sequence[ObjectRef]",
*,
chunk_size: int,
yield_obj_refs: bool = False,
**kwargs,
) -> tuple[list[Union[Any, "ObjectRef"]], "list[ObjectRef]"]:
"""Call ray.wait and explicitly return the ready objects/results
and remaining Ray remote refs.
Args:
refs: A list of Ray object refs.
chunk_size: The `num_returns` parameter to pass to `ray.wait()`.
yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
**kwargs: Additional keyword arguments to pass to `ray.wait()`.
Returns:
A tuple of two lists, ready and not ready. This is the same as the return value of `ray.wait()`.
"""
if chunk_size < 1:
raise ValueError("`chunk_size` must be >= 1")
kwargs = kwargs or {}
# num_returns must be <= len(refs)
ready, refs = ray.wait(
refs,
num_returns=min(chunk_size, len(refs)),
**kwargs,
)
if not yield_obj_refs:
return ray.get(ready), refs
return ready, refs
[docs]
@PublicAPI(stability="alpha")
def as_completed(
refs: "Sequence[ObjectRef]",
*,
chunk_size: int = DEFAULT_CHUNK_SIZE,
yield_obj_refs: bool = False,
**kwargs,
) -> Iterator[Union[Any, "ObjectRef"]]:
"""Given a list of Ray task references, yield results as they become available.
Unlike calling :meth:`~ray.get` on a list of references (i.e., `ray.get(refs)`) which
waits for all results to be ready, this function begins to yield result as soon as
a batch of `chunk_size` results are ready.
.. note::
Generally there is no guarantee on the order of results. For example, the first result
is not necessarily the first one completed, but rather the first one submitted in the
first available batch (See :meth:`~ray.wait` for more details about
preservation of submission order).
.. note::
Use this function instead of calling :meth:`~ray.get` inside a for loop. See
https://docs.ray.io/en/latest/ray-core/patterns/ray-get-loop.html for more details.
Example:
Suppose we have a function that sleeps for x seconds depending on the input.
We expect to obtain a partially sorted list of results.
.. testcode:: python
import ray
import time
@ray.remote
def f(x):
time.sleep(x)
return x
refs = [f.remote(i) for i in [10, 4, 6, 8, 2]]
for x in ray.util.as_completed(refs, chunk_size=2):
print(x)
.. testoutput::
:options: +MOCK
# Output:
4
2
6
8
10
Args:
refs: A list of Ray object refs.
chunk_size: The number of tasks to wait for in each iteration (default 10).
The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
**kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
`timeout` and `fetch_local`.
Yields:
Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
"""
if chunk_size < 1:
raise ValueError("`chunk_size` must be >= 1")
if "num_returns" in kwargs:
raise ValueError("Use the `chunksize` argument instead of `num_returns`.")
while refs:
results, refs = _wait_and_get_single_batch(
refs,
chunk_size=chunk_size,
yield_obj_refs=yield_obj_refs,
**kwargs,
)
yield from results
[docs]
@PublicAPI(stability="alpha")
def map_unordered(
fn: "RemoteFunction",
items: Iterable[Any],
*,
backpressure_size: Optional[int] = DEFAULT_BACKPRESSURE_SIZE,
chunk_size: int = DEFAULT_CHUNK_SIZE,
yield_obj_refs: bool = False,
**kwargs,
) -> Iterator[Union[Any, "ObjectRef"]]:
"""Apply a Ray remote function to a list of items and return an iterator that yields
the completed results as they become available.
This helper function applies backpressure to control the number of pending tasks, following the
design pattern described in
https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html.
.. note::
There is generally no guarantee on the order of results.
Example:
Suppose we have a function that sleeps for x seconds depending on the input.
We expect to obtain a partially sorted list of results.
.. testcode:: python
import ray
import time
@ray.remote
def f(x):
time.sleep(x)
return x
# Example 1: chunk_size=2
for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], chunk_size=2):
print(x)
.. testoutput::
:options: +MOCK
4
2
6
8
10
.. testcode:: python
# Example 2: backpressure_size=2, chunk_size=1
for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], backpressure_size=2, chunk_size=1):
print(x)
.. testoutput::
:options: +MOCK
4
10
6
8
2
Args:
fn: A remote function to apply to the list of items. For more complex use cases, use Ray Data's
:meth:`~ray.data.Dataset.map` / :meth:`~ray.data.Dataset.map_batches` instead.
items: An iterable of items to apply the function to.
backpressure_size: Maximum number of in-flight tasks allowed before
calling a blocking :meth:`~ray.wait` (default 100). If None, no backpressure is applied.
chunk_size: The number of tasks to wait for when the number of in-flight tasks exceeds
`backpressure_size`. The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
**kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
`timeout` and `fetch_local`.
Yields:
Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
.. seealso::
:meth:`~ray.util.as_completed`
Call this method for an existing list of Ray object refs.
:meth:`~ray.data.Dataset.map`
Use Ray Data APIs (e.g., :meth:`~ray.data.Dataset.map` and :meth:`~ray.data.Dataset.map_batches`)
for better control and complex use cases, e.g., functions with multiple arguments.
.. note::
This is an altenative to `pool.imap_unordered()` in Ray's Actor-based `multiprocessing.Pool`.
See https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html for more details.
"""
if backpressure_size is None:
backpressure_size: float = float("inf")
elif backpressure_size <= 0:
raise ValueError("backpressure_size must be positive.")
if chunk_size < 1:
raise ValueError("`chunk_size` must be >= 1")
if "num_returns" in kwargs:
raise ValueError("Use the `chunk_size` argument instead of `num_returns`.")
refs = []
for item in items:
refs.append(fn.remote(item))
if len(refs) >= backpressure_size:
results, refs = _wait_and_get_single_batch(
refs,
chunk_size=chunk_size,
yield_obj_refs=yield_obj_refs,
**kwargs,
)
yield from results
else:
yield from as_completed(
refs,
chunk_size=chunk_size,
yield_obj_refs=yield_obj_refs,
**kwargs,
)