Source code for ray.data._internal.compute

import logging
from typing import Any, Callable, Iterable, Optional, TypeVar, Union

from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import Block, UserDefinedFunction
from ray.util.annotations import DeveloperAPI, PublicAPI

logger = logging.getLogger(__name__)

T = TypeVar("T")
U = TypeVar("U")


# Block transform function applied by task and actor pools.
BlockTransform = Union[
    # TODO(Clark): Once Ray only supports Python 3.8+, use protocol to constrain block
    # transform type.
    # Callable[[Block, ...], Iterable[Block]]
    # Callable[[Block, UserDefinedFunction, ...], Iterable[Block]],
    Callable[[Iterable[Block], TaskContext], Iterable[Block]],
    Callable[[Iterable[Block], TaskContext, UserDefinedFunction], Iterable[Block]],
    Callable[..., Iterable[Block]],
]


@DeveloperAPI
class ComputeStrategy:
    pass


[docs] @PublicAPI class TaskPoolStrategy(ComputeStrategy): """Specify the task-based compute strategy for a Dataset transform. TaskPoolStrategy executes dataset transformations using Ray tasks that are scheduled through a pool. Provide ``size`` to cap the number of concurrent tasks; leave it unset to allow Ray Data to scale the task count automatically. """
[docs] def __init__( self, size: Optional[int] = None, ): """Construct TaskPoolStrategy for a Dataset transform. Args: size: Specify the maximum size of the task pool. """ if size is not None and size < 1: raise ValueError("`size` must be >= 1", size) self.size = size
def __eq__(self, other: Any) -> bool: return (isinstance(other, TaskPoolStrategy) and self.size == other.size) or ( other == "tasks" and self.size is None ) def __repr__(self) -> str: return f"TaskPoolStrategy(size={self.size})"
[docs] @PublicAPI class ActorPoolStrategy(ComputeStrategy): """Specify the actor-based compute strategy for a Dataset transform. ActorPoolStrategy specifies that an autoscaling pool of actors should be used for a given Dataset transform. This is useful for stateful setup of callable classes. For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``. To autoscale from ``m`` to ``n`` actors, use ``ActorPoolStrategy(min_size=m, max_size=n)``. To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use ``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``. To increase opportunities for pipelining task dependency prefetching with computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker actors, set max_tasks_in_flight_per_actor to 1. The `enable_true_multi_threading` argument primarily exists to prevent GPU OOM issues with multi-threaded actors. The life cycle of an actor task involves 3 main steps: 1. Batching Inputs 2. Running actor UDF 3. Batching Outputs The `enable_true_multi_threading` flag affects step 2. If set to `True`, then the UDF can be run concurrently. By default, it is set to `False`, so at most 1 actor UDF is running at a time per actor. The `max_concurrency` flag on `ray.remote` affects steps 1 and 3. Below is a matrix summary: - [`enable_true_multi_threading=False or True`, `max_concurrency=1`] = 1 actor task running per actor. So at most 1 of steps 1, 2, or 3 is running at any point in time. - [`enable_true_multi_threading=False`, `max_concurrency>1`] = multiple tasks running per actor (respecting GIL) but UDF runs 1 at a time. This is useful for doing CPU and GPU work, where you want to use a large batch size but want to hide the overhead of *batching* the inputs. In this case, CPU *batching* is done concurrently, while GPU *inference* is done 1 at a time. Concretely, steps 1 and 3 can have multiple threads, while step 2 is done serially. - [`enable_true_multi_threading=True`, `max_concurrency>1`] = multiple tasks running per actor. Unlike bullet #3 ^, the UDF runs concurrently (respecting GIL). No restrictions on steps 1, 2, or 3 NOTE: `enable_true_multi_threading` does not apply to async actors """
[docs] def __init__( self, *, size: Optional[int] = None, min_size: Optional[int] = None, max_size: Optional[int] = None, initial_size: Optional[int] = None, max_tasks_in_flight_per_actor: Optional[int] = None, enable_true_multi_threading: bool = False, ): """Construct ActorPoolStrategy for a Dataset transform. Args: size: Specify a fixed size actor pool of this size. It is an error to specify both `size` and `min_size` or `max_size`. min_size: The minimum size of the actor pool. max_size: The maximum size of the actor pool. initial_size: The initial number of actors to start with. If not specified, defaults to min_size. Must be between min_size and max_size. max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently send to a single actor worker. Increasing this will increase opportunities for pipelining task dependency prefetching with computation and avoiding actor startup delays, but will also increase queueing delay. enable_true_multi_threading: If enable_true_multi_threading=True, no more than 1 actor task runs per actor. Otherwise, respects the `max_concurrency` argument. """ if size is not None: if size < 1: raise ValueError("size must be >= 1", size) if max_size is not None or min_size is not None or initial_size is not None: raise ValueError( "min_size, max_size, and initial_size cannot be set at the same time as `size`" ) min_size = size max_size = size initial_size = size if min_size is not None and min_size < 1: raise ValueError("min_size must be >= 1", min_size) if max_size is not None: if min_size is None: min_size = 1 # Legacy default. if min_size > max_size: raise ValueError("min_size must be <= max_size", min_size, max_size) if ( max_tasks_in_flight_per_actor is not None and max_tasks_in_flight_per_actor < 1 ): raise ValueError( "max_tasks_in_flight_per_actor must be >= 1, got: ", max_tasks_in_flight_per_actor, ) self.min_size = min_size or 1 self.max_size = max_size or float("inf") # Validate and set initial_size if initial_size is not None: if initial_size < self.min_size: raise ValueError( f"initial_size ({initial_size}) must be >= min_size ({self.min_size})" ) if self.max_size != float("inf") and initial_size > self.max_size: raise ValueError( f"initial_size ({initial_size}) must be <= max_size ({self.max_size})" ) self.initial_size = initial_size or self.min_size self.max_tasks_in_flight_per_actor = max_tasks_in_flight_per_actor self.num_workers = 0 self.ready_to_total_workers_ratio = 0.8 self.enable_true_multi_threading = enable_true_multi_threading
def __eq__(self, other: Any) -> bool: return isinstance(other, ActorPoolStrategy) and ( self.min_size == other.min_size and self.max_size == other.max_size and self.initial_size == other.initial_size and self.enable_true_multi_threading == other.enable_true_multi_threading and self.max_tasks_in_flight_per_actor == other.max_tasks_in_flight_per_actor ) def __repr__(self) -> str: return ( f"ActorPoolStrategy(min_size={self.min_size}, " f"max_size={self.max_size}, " f"initial_size={self.initial_size}, " f"max_tasks_in_flight_per_actor={self.max_tasks_in_flight_per_actor})" f"num_workers={self.num_workers}, " f"enable_true_multi_threading={self.enable_true_multi_threading}, " f"ready_to_total_workers_ratio={self.ready_to_total_workers_ratio})" )
def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy: if not isinstance(compute_spec, (TaskPoolStrategy, ActorPoolStrategy)): raise ValueError( "In Ray 2.5, the compute spec must be either " f"TaskPoolStrategy or ActorPoolStrategy, was: {compute_spec}." ) elif not compute_spec or compute_spec == "tasks": return TaskPoolStrategy() elif compute_spec == "actors": return ActorPoolStrategy() elif isinstance(compute_spec, ComputeStrategy): return compute_spec else: raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]")