Source code for ray.util.scheduling_strategies

from typing import Dict, Union, Optional, TYPE_CHECKING
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
    from ray.util.placement_group import PlacementGroup

# "DEFAULT": The default hybrid scheduling strategy
# based on config scheduler_spread_threshold.
# This disables any potential placement group capture.

# "SPREAD": Spread scheduling on a best effort basis.


[docs]@PublicAPI class PlacementGroupSchedulingStrategy: """Placement group based scheduling strategy. Attributes: placement_group: the placement group this actor belongs to, or None if it doesn't belong to any group. placement_group_bundle_index: the index of the bundle if the actor belongs to a placement group, which may be -1 to specify any available bundle. placement_group_capture_child_tasks: Whether or not children tasks of this actor should implicitly use the same placement group as its parent. It is False by default. """ def __init__( self, placement_group: "PlacementGroup", placement_group_bundle_index: int = -1, placement_group_capture_child_tasks: Optional[bool] = None, ): self.placement_group = placement_group self.placement_group_bundle_index = placement_group_bundle_index self.placement_group_capture_child_tasks = placement_group_capture_child_tasks
[docs]@PublicAPI class NodeAffinitySchedulingStrategy: """Static scheduling strategy used to run a task or actor on a particular node. Attributes: node_id: the hex id of the node where the task or actor should run. soft: whether the scheduler should run the task or actor somewhere else if the target node doesn't exist (e.g. the node dies) or is infeasible during scheduling. If the node exists and is feasible, the task or actor will only be scheduled there. This means if the node doesn't have the available resources, the task or actor will wait indefinitely until resources become available. If the node doesn't exist or is infeasible, the task or actor will fail if soft is False or be scheduled somewhere else if soft is True. """ def __init__( self, node_id: str, soft: bool, _spill_on_unavailable: bool = False, _fail_on_unavailable: bool = False, ): # This will be removed once we standardize on node id being hex string. if not isinstance(node_id, str): node_id = node_id.hex() self.node_id = node_id self.soft = soft self._spill_on_unavailable = _spill_on_unavailable self._fail_on_unavailable = _fail_on_unavailable
def _validate_label_match_operator_values(values, operator): if not values: raise ValueError( f"The variadic parameter of the {operator} operator" f' must be a non-empty tuple: e.g. {operator}("value1", "value2").' ) index = 0 for value in values: if not isinstance(value, str): raise ValueError( f"Type of value in position {index} for the {operator} operator " f'must be str (e.g. {operator}("value1", "value2")) ' f"but got {str(value)} of type {type(value)}." ) index = index + 1 @PublicAPI(stability="alpha") class In: def __init__(self, *values): _validate_label_match_operator_values(values, "In") self.values = list(values) @PublicAPI(stability="alpha") class NotIn: def __init__(self, *values): _validate_label_match_operator_values(values, "NotIn") self.values = list(values) @PublicAPI(stability="alpha") class Exists: def __init__(self): pass @PublicAPI(stability="alpha") class DoesNotExist: def __init__(self): pass class _LabelMatchExpression: """An expression used to select node by node's labels Attributes: key: the key of label operator: In、NotIn、Exists、DoesNotExist """ def __init__(self, key: str, operator: Union[In, NotIn, Exists, DoesNotExist]): self.key = key self.operator = operator LabelMatchExpressionsT = Dict[str, Union[In, NotIn, Exists, DoesNotExist]] @PublicAPI(stability="alpha") class NodeLabelSchedulingStrategy: """Label based node affinity scheduling strategy scheduling_strategy=NodeLabelSchedulingStrategy({ "region": In("us"), "gpu_type": Exists() }) """ def __init__( self, hard: LabelMatchExpressionsT, *, soft: LabelMatchExpressionsT = None ): self.hard = _convert_map_to_expressions(hard, "hard") self.soft = _convert_map_to_expressions(soft, "soft") self._check_usage() def _check_usage(self): if not (self.hard or self.soft): raise ValueError( "The `hard` and `soft` parameter " "of NodeLabelSchedulingStrategy cannot both be empty." ) def _convert_map_to_expressions(map_expressions: LabelMatchExpressionsT, param: str): expressions = [] if map_expressions is None: return expressions if not isinstance(map_expressions, Dict): raise ValueError( f'The {param} parameter must be a map (e.g. {{"key1": In("value1")}}) ' f"but got type {type(map_expressions)}." ) for key, value in map_expressions.items(): if not isinstance(key, str): raise ValueError( f"The map key of the {param} parameter must " f'be of type str (e.g. {{"key1": In("value1")}}) ' f"but got {str(key)} of type {type(key)}." ) if not isinstance(value, (In, NotIn, Exists, DoesNotExist)): raise ValueError( f"The map value for key {key} of the {param} parameter " f"must be one of the `In`, `NotIn`, `Exists` or `DoesNotExist` " f'operator (e.g. {{"key1": In("value1")}}) ' f"but got {str(value)} of type {type(value)}." ) expressions.append(_LabelMatchExpression(key, value)) return expressions SchedulingStrategyT = Union[ None, str, # Literal["DEFAULT", "SPREAD"] PlacementGroupSchedulingStrategy, NodeAffinitySchedulingStrategy, NodeLabelSchedulingStrategy, ]