Source code for ray.data._internal.execution.interfaces.execution_options

import math
import os
from typing import Any, Dict, List, Optional, Union

from .common import NodeIdStr
from ray.data._internal.execution.util import memory_string
from ray.util.annotations import DeveloperAPI


[docs] class ExecutionResources: """Specifies resources usage or resource limits for execution. By default this class represents resource usage. Use `for_limits` or set `default_to_inf` to True to create an object that represents resource limits. """ def __init__( self, cpu: Optional[float] = None, gpu: Optional[float] = None, object_store_memory: Optional[float] = None, memory: Optional[float] = None, ): """Initializes ExecutionResources. Args: cpu: Amount of logical CPU slots. gpu: Amount of logical GPU slots. object_store_memory: Amount of object store memory. memory: Amount of logical memory in bytes. """ # NOTE: Ray Core allocates fractional resources in up to 5th decimal # digit, hence we round the values here up to it self._cpu: Optional[float] = safe_round(cpu, 5) self._gpu: Optional[float] = safe_round(gpu, 5) self._object_store_memory: Optional[float] = safe_round(object_store_memory) self._memory: Optional[float] = safe_round(memory)
[docs] @classmethod def from_resource_dict( cls, resource_dict: Dict[str, float], ): """Create an ExecutionResources object from a resource dict.""" return ExecutionResources( cpu=resource_dict.get("CPU", None) or resource_dict.get("num_cpus", None), gpu=resource_dict.get("GPU", None) or resource_dict.get("num_gpus", None), object_store_memory=resource_dict.get("object_store_memory", None), memory=resource_dict.get("memory", None), )
[docs] @classmethod def for_limits( cls, cpu: Optional[float] = None, gpu: Optional[float] = None, object_store_memory: Optional[float] = None, memory: Optional[float] = None, ) -> "ExecutionResources": """Create an ExecutionResources object that represents resource limits. Args: cpu: Amount of logical CPU slots. gpu: Amount of logical GPU slots. object_store_memory: Amount of object store memory. memory: Amount of logical memory in bytes. """ return ExecutionResources( cpu=safe_or(cpu, float("inf")), gpu=safe_or(gpu, float("inf")), object_store_memory=safe_or(object_store_memory, float("inf")), memory=safe_or(memory, float("inf")), )
@property def cpu(self) -> float: return self._cpu or 0.0 @property def gpu(self) -> float: return self._gpu or 0.0 @property def object_store_memory(self) -> float: return self._object_store_memory or 0 @property def memory(self) -> float: return self._memory or 0 def __repr__(self): return ( f"ExecutionResources(cpu={self.cpu}, gpu={self.gpu}, " f"object_store_memory={self.object_store_memory_str()}, " f"memory={self.memory_str()})" ) def __eq__(self, other: "ExecutionResources") -> bool: return ( self.cpu == other.cpu and self.gpu == other.gpu and self.object_store_memory == other.object_store_memory and self.memory == other.memory )
[docs] @classmethod def zero(cls) -> "ExecutionResources": """Returns an ExecutionResources object with zero resources.""" return ExecutionResources(0.0, 0.0, 0.0, 0.0)
[docs] @classmethod def inf(cls) -> "ExecutionResources": """Returns an ExecutionResources object with infinite resources.""" return ExecutionResources.for_limits()
[docs] def is_zero(self) -> bool: """Returns True if all resources are zero.""" return ( self.cpu == 0.0 and self.gpu == 0.0 and self.object_store_memory == 0.0 and self.memory == 0.0 )
[docs] def is_non_negative(self) -> bool: """Returns True if all resources are non-negative.""" return ( self.cpu >= 0 and self.gpu >= 0 and self.object_store_memory >= 0 and self.memory >= 0 )
[docs] def object_store_memory_str(self) -> str: """Returns a human-readable string for the object store memory field.""" if self.object_store_memory == float("inf"): return "inf" return memory_string(self.object_store_memory)
[docs] def memory_str(self) -> str: """Returns a human-readable string for the memory field.""" if self.memory == float("inf"): return "inf" return memory_string(self.memory)
[docs] def copy( self, cpu: Optional[float] = None, gpu: Optional[float] = None, memory: Optional[float] = None, object_store_memory: Optional[float] = None, ) -> "ExecutionResources": """Returns a copy of this ExecutionResources object allowing to override specific resources as necessary""" return ExecutionResources( cpu=safe_or(cpu, self.cpu), gpu=safe_or(gpu, self.gpu), object_store_memory=safe_or(object_store_memory, self.object_store_memory), memory=safe_or(memory, self.memory), )
[docs] def add(self, other: "ExecutionResources") -> "ExecutionResources": """Adds execution resources. Returns: A new ExecutionResource object with summed resources. """ return ExecutionResources( cpu=self.cpu + other.cpu, gpu=self.gpu + other.gpu, object_store_memory=self.object_store_memory + other.object_store_memory, memory=self.memory + other.memory, )
[docs] def subtract(self, other: "ExecutionResources") -> "ExecutionResources": """Subtracts execution resources. Returns: A new ExecutionResource object with subtracted resources. """ return ExecutionResources( cpu=self.cpu - other.cpu, gpu=self.gpu - other.gpu, object_store_memory=self.object_store_memory - other.object_store_memory, memory=self.memory - other.memory, )
[docs] def max(self, other: "ExecutionResources") -> "ExecutionResources": """Returns the maximum for each resource type.""" return ExecutionResources( cpu=max(self.cpu, other.cpu), gpu=max(self.gpu, other.gpu), object_store_memory=max( self.object_store_memory, other.object_store_memory ), memory=max(self.memory, other.memory), )
[docs] def min(self, other: "ExecutionResources") -> "ExecutionResources": """Returns the minimum for each resource type.""" return ExecutionResources( cpu=min(self.cpu, other.cpu), gpu=min(self.gpu, other.gpu), object_store_memory=min( self.object_store_memory, other.object_store_memory ), memory=min(self.memory, other.memory), )
[docs] def satisfies_limit( self, limit: "ExecutionResources", *, ignore_object_store_memory=False, ) -> bool: """Return if this resource struct meets the specified limits. Note that None for a field means no limit. Args: limit: The resource limits to check against. ignore_object_store_memory: If True, ignore the object store memory limit when checking if this resource struct meets the limits. """ return ( self.cpu <= limit.cpu and self.gpu <= limit.gpu and ( ignore_object_store_memory or self.object_store_memory <= limit.object_store_memory ) and self.memory <= limit.memory )
[docs] def scale(self, f: float) -> "ExecutionResources": """Return copy with all set values scaled by `f`.""" if f < 0: raise ValueError("Scaling factor must be non-negative.") if f == 0: # Explicitly handle the zero case, because `0 * inf` is undefined. return ExecutionResources.zero() return ExecutionResources( cpu=self.cpu * f, gpu=self.gpu * f, object_store_memory=self.object_store_memory * f, memory=self.memory * f, )
[docs] @DeveloperAPI class ExecutionOptions: """Common options for execution. Some options may not be supported on all executors (e.g., resource limits). Attributes: resource_limits: Set a soft limit on the resource usage during execution. Autodetected by default. exclude_resources: Amount of resources to exclude from Ray Data. Set this if you have other workloads running on the same cluster. Note, - If using Ray Data with Ray Train, training resources will be automatically excluded. - For each resource type, resource_limits and exclude_resources can not be both set. locality_with_output: Set this to prefer running tasks on the same node as the output node (node driving the execution). It can also be set to a list of node ids to spread the outputs across those nodes. Off by default. preserve_order: Set this to preserve the ordering between blocks processed by operators. Off by default. actor_locality_enabled: Whether to enable locality-aware task dispatch to actors (off by default). This parameter applies to both stateful map and streaming_split operations. verbose_progress: Whether to report progress individually per operator. By default, only AllToAll operators and global progress is reported. This option is useful for performance debugging. On by default. """ def __init__( self, resource_limits: Optional[ExecutionResources] = None, exclude_resources: Optional[ExecutionResources] = None, locality_with_output: Union[bool, List[NodeIdStr]] = False, preserve_order: bool = False, actor_locality_enabled: bool = True, verbose_progress: Optional[bool] = None, ): if resource_limits is None: resource_limits = ExecutionResources.for_limits() self.resource_limits = resource_limits if exclude_resources is None: exclude_resources = ExecutionResources.zero() self.exclude_resources = exclude_resources self.locality_with_output = locality_with_output self.preserve_order = preserve_order self.actor_locality_enabled = actor_locality_enabled if verbose_progress is None: verbose_progress = bool( int(os.environ.get("RAY_DATA_VERBOSE_PROGRESS", "1")) ) self.verbose_progress = verbose_progress def __repr__(self) -> str: return ( f"ExecutionOptions(resource_limits={self.resource_limits}, " f"exclude_resources={self.exclude_resources}, " f"locality_with_output={self.locality_with_output}, " f"preserve_order={self.preserve_order}, " f"actor_locality_enabled={self.actor_locality_enabled}, " f"verbose_progress={self.verbose_progress})" ) @property def resource_limits(self) -> ExecutionResources: return self._resource_limits @resource_limits.setter def resource_limits(self, value: ExecutionResources) -> None: self._resource_limits = ExecutionResources.for_limits( cpu=value._cpu, gpu=value._gpu, object_store_memory=value._object_store_memory, memory=value._memory, )
[docs] def is_resource_limits_default(self): """Returns True if resource_limits is the default value.""" return self._resource_limits == ExecutionResources.for_limits()
[docs] def validate(self) -> None: """Validate the options.""" for attr in ["cpu", "gpu", "object_store_memory"]: if ( getattr(self.resource_limits, attr) != float("inf") and getattr(self.exclude_resources, attr, 0) > 0 ): raise ValueError( "resource_limits and exclude_resources cannot " f" both be set for {attr} resource." )
def safe_or(value: Optional[Any], alt: Any) -> Any: return value if value is not None else alt def safe_round( value: Optional[float], ndigits: Optional[int] = None ) -> Optional[float]: if value is None: return None elif math.isinf(value): return value else: return round(value, ndigits)