Source code for ray.data._internal.compute

import logging
from typing import Any, Callable, Iterable, Optional, TypeVar, Union

from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import Block, UserDefinedFunction
from ray.util.annotations import DeveloperAPI, PublicAPI

logger = logging.getLogger(__name__)

T = TypeVar("T")
U = TypeVar("U")


# Block transform function applied by task and actor pools.
BlockTransform = Union[
    # TODO(Clark): Once Ray only supports Python 3.8+, use protocol to constrain block
    # transform type.
    # Callable[[Block, ...], Iterable[Block]]
    # Callable[[Block, UserDefinedFunction, ...], Iterable[Block]],
    Callable[[Iterable[Block], TaskContext], Iterable[Block]],
    Callable[[Iterable[Block], TaskContext, UserDefinedFunction], Iterable[Block]],
    Callable[..., Iterable[Block]],
]


@DeveloperAPI
class ComputeStrategy:
    pass


[docs] @PublicAPI class TaskPoolStrategy(ComputeStrategy): """Specify the task-based compute strategy for a Dataset transform. TaskPoolStrategy executes dataset transformations using Ray tasks that are scheduled through a pool. Provide ``size`` to cap the number of concurrent tasks; leave it unset to allow Ray Data to scale the task count automatically. """
[docs] def __init__( self, size: Optional[int] = None, ): """Construct TaskPoolStrategy for a Dataset transform. Args: size: Specify the maximum size of the task pool. """ if size is not None and size < 1: raise ValueError("`size` must be >= 1", size) self.size = size
def __eq__(self, other: Any) -> bool: return (isinstance(other, TaskPoolStrategy) and self.size == other.size) or ( other == "tasks" and self.size is None ) def __repr__(self) -> str: return f"TaskPoolStrategy(size={self.size})"
[docs] @PublicAPI class ActorPoolStrategy(ComputeStrategy): """Specify the actor-based compute strategy for a Dataset transform. ActorPoolStrategy specifies that an autoscaling pool of actors should be used for a given Dataset transform. This is useful for stateful setup of callable classes. For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``. To autoscale from ``m`` to ``n`` actors, use ``ActorPoolStrategy(min_size=m, max_size=n)``. To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use ``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``. To increase opportunities for pipelining task dependency prefetching with computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker actors, set max_tasks_in_flight_per_actor to 1. """
[docs] def __init__( self, *, size: Optional[int] = None, min_size: Optional[int] = None, max_size: Optional[int] = None, initial_size: Optional[int] = None, max_tasks_in_flight_per_actor: Optional[int] = None, ): """Construct ActorPoolStrategy for a Dataset transform. Args: size: Specify a fixed size actor pool of this size. It is an error to specify both `size` and `min_size` or `max_size`. min_size: The minimum size of the actor pool. max_size: The maximum size of the actor pool. initial_size: The initial number of actors to start with. If not specified, defaults to min_size. Must be between min_size and max_size. max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently send to a single actor worker. Increasing this will increase opportunities for pipelining task dependency prefetching with computation and avoiding actor startup delays, but will also increase queueing delay. """ if size is not None: if size < 1: raise ValueError("size must be >= 1", size) if max_size is not None or min_size is not None or initial_size is not None: raise ValueError( "min_size, max_size, and initial_size cannot be set at the same time as `size`" ) min_size = size max_size = size initial_size = size if min_size is not None and min_size < 1: raise ValueError("min_size must be >= 1", min_size) if max_size is not None: if min_size is None: min_size = 1 # Legacy default. if min_size > max_size: raise ValueError("min_size must be <= max_size", min_size, max_size) if ( max_tasks_in_flight_per_actor is not None and max_tasks_in_flight_per_actor < 1 ): raise ValueError( "max_tasks_in_flight_per_actor must be >= 1, got: ", max_tasks_in_flight_per_actor, ) self.min_size = min_size or 1 self.max_size = max_size or float("inf") # Validate and set initial_size if initial_size is not None: if initial_size < self.min_size: raise ValueError( f"initial_size ({initial_size}) must be >= min_size ({self.min_size})" ) if self.max_size != float("inf") and initial_size > self.max_size: raise ValueError( f"initial_size ({initial_size}) must be <= max_size ({self.max_size})" ) self.initial_size = initial_size or self.min_size self.max_tasks_in_flight_per_actor = max_tasks_in_flight_per_actor self.num_workers = 0 self.ready_to_total_workers_ratio = 0.8
def __eq__(self, other: Any) -> bool: return isinstance(other, ActorPoolStrategy) and ( self.min_size == other.min_size and self.max_size == other.max_size and self.initial_size == other.initial_size and self.max_tasks_in_flight_per_actor == other.max_tasks_in_flight_per_actor ) def __repr__(self) -> str: return ( f"ActorPoolStrategy(min_size={self.min_size}, " f"max_size={self.max_size}, " f"initial_size={self.initial_size}, " f"max_tasks_in_flight_per_actor={self.max_tasks_in_flight_per_actor})" f"num_workers={self.num_workers}, " f"ready_to_total_workers_ratio={self.ready_to_total_workers_ratio})" )
def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy: if not isinstance(compute_spec, (TaskPoolStrategy, ActorPoolStrategy)): raise ValueError( "In Ray 2.5, the compute spec must be either " f"TaskPoolStrategy or ActorPoolStrategy, was: {compute_spec}." ) elif not compute_spec or compute_spec == "tasks": return TaskPoolStrategy() elif compute_spec == "actors": return ActorPoolStrategy() elif isinstance(compute_spec, ComputeStrategy): return compute_spec else: raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]")