Source code for ray.util.placement_group

import warnings
from typing import Dict, List, Optional, Union

import ray
from ray._private.auto_init_hook import auto_init_ray
from ray._private.client_mode_hook import client_mode_should_convert, client_mode_wrap
from ray._private.utils import hex_to_binary, get_ray_doc_version
from ray._raylet import PlacementGroupID
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
import ray._private.ray_constants as ray_constants

bundle_reservation_check = None

VALID_PLACEMENT_GROUP_STRATEGIES = {
    "PACK",
    "SPREAD",
    "STRICT_PACK",
    "STRICT_SPREAD",
}


# We need to import this method to use for ready API.
# But ray.remote is only available in runtime, and
# if we define this method inside ready method, this function is
# exported whenever ready is called, which can impact performance,
# https://github.com/ray-project/ray/issues/6240.
def _export_bundle_reservation_check_method_if_needed():
    global bundle_reservation_check
    if bundle_reservation_check:
        return

    @ray.remote(num_cpus=0)
    def bundle_reservation_check_func(placement_group):
        return placement_group

    bundle_reservation_check = bundle_reservation_check_func


[docs]@PublicAPI class PlacementGroup: """A handle to a placement group.""" @staticmethod def empty() -> "PlacementGroup": return PlacementGroup(PlacementGroupID.nil()) def __init__( self, id: "ray._raylet.PlacementGroupID", bundle_cache: Optional[List[Dict]] = None, ): self.id = id self.bundle_cache = bundle_cache @property def is_empty(self): return self.id.is_nil()
[docs] def ready(self) -> "ray._raylet.ObjectRef": """Returns an ObjectRef to check ready status. This API runs a small dummy task to wait for placement group creation. It is compatible to ray.get and ray.wait. Example: .. testcode:: import ray pg = ray.util.placement_group([{"CPU": 1}]) ray.get(pg.ready()) pg = ray.util.placement_group([{"CPU": 1}]) ray.wait([pg.ready()]) """ self._fill_bundle_cache_if_needed() _export_bundle_reservation_check_method_if_needed() assert len(self.bundle_cache) != 0, ( "ready() cannot be called on placement group object with a " "bundle length == 0, current bundle length: " f"{len(self.bundle_cache)}" ) return bundle_reservation_check.options( scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=self), ).remote(self)
[docs] def wait(self, timeout_seconds: Union[float, int] = 30) -> bool: """Wait for the placement group to be ready within the specified time. Args: timeout_seconds(float|int): Timeout in seconds. Return: True if the placement group is created. False otherwise. """ return _call_placement_group_ready(self.id, timeout_seconds)
@property def bundle_specs(self) -> List[Dict]: """List[Dict]: Return bundles belonging to this placement group.""" self._fill_bundle_cache_if_needed() return self.bundle_cache @property def bundle_count(self) -> int: self._fill_bundle_cache_if_needed() return len(self.bundle_cache) def _fill_bundle_cache_if_needed(self) -> None: if not self.bundle_cache: self.bundle_cache = _get_bundle_cache(self.id) def __eq__(self, other): if not isinstance(other, PlacementGroup): return False return self.id == other.id def __hash__(self): return hash(self.id)
@client_mode_wrap def _call_placement_group_ready(pg_id: PlacementGroupID, timeout_seconds: int) -> bool: worker = ray._private.worker.global_worker worker.check_connected() return worker.core_worker.wait_placement_group_ready(pg_id, timeout_seconds) @client_mode_wrap def _get_bundle_cache(pg_id: PlacementGroupID) -> List[Dict]: worker = ray._private.worker.global_worker worker.check_connected() return list( ray._private.state.state.placement_group_table(pg_id)["bundles"].values() )
[docs]@PublicAPI @client_mode_wrap def placement_group( bundles: List[Dict[str, float]], strategy: str = "PACK", name: str = "", lifetime: Optional[str] = None, _max_cpu_fraction_per_node: float = 1.0, _soft_target_node_id: Optional[str] = None, ) -> PlacementGroup: """Asynchronously creates a PlacementGroup. Args: bundles: A list of bundles which represent the resources requirements. strategy: The strategy to create the placement group. - "PACK": Packs Bundles into as few nodes as possible. - "SPREAD": Places Bundles across distinct nodes as even as possible. - "STRICT_PACK": Packs Bundles into one node. The group is not allowed to span multiple nodes. - "STRICT_SPREAD": Packs Bundles across distinct nodes. name: The name of the placement group. lifetime: Either `None`, which defaults to the placement group will fate share with its creator and will be deleted once its creator is dead, or "detached", which means the placement group will live as a global object independent of the creator. _max_cpu_fraction_per_node: (Experimental) Disallow placing bundles on nodes if it would cause the fraction of CPUs used by bundles from *any* placement group on the node to exceed this fraction. This effectively sets aside CPUs that placement groups cannot occupy on nodes. when `max_cpu_fraction_per_node < 1.0`, at least 1 CPU will be excluded from placement group scheduling. Note: This feature is experimental and is not recommended for use with autoscaling clusters (scale-up will not trigger properly). _soft_target_node_id: (Private, Experimental) Soft hint where bundles of this placement group should be placed. The target node is specified by it's hex ID. If the target node has no available resources or died, bundles can be placed elsewhere. This currently only works with STRICT_PACK pg. Raises: ValueError if bundle type is not a list. ValueError if empty bundle or empty resource bundles are given. ValueError if the wrong lifetime arguments are given. Return: PlacementGroup: Placement group object. """ worker = ray._private.worker.global_worker worker.check_connected() validate_placement_group( bundles=bundles, strategy=strategy, lifetime=lifetime, _max_cpu_fraction_per_node=_max_cpu_fraction_per_node, _soft_target_node_id=_soft_target_node_id, ) if lifetime == "detached": detached = True else: detached = False placement_group_id = worker.core_worker.create_placement_group( name, bundles, strategy, detached, _max_cpu_fraction_per_node, _soft_target_node_id, ) return PlacementGroup(placement_group_id)
[docs]@PublicAPI @client_mode_wrap def remove_placement_group(placement_group: PlacementGroup) -> None: """Asynchronously remove placement group. Args: placement_group: The placement group to delete. """ assert placement_group is not None worker = ray._private.worker.global_worker worker.check_connected() worker.core_worker.remove_placement_group(placement_group.id)
@PublicAPI @client_mode_wrap def get_placement_group(placement_group_name: str) -> PlacementGroup: """Get a placement group object with a global name. Returns: None if can't find a placement group with the given name. The placement group object otherwise. """ if not placement_group_name: raise ValueError("Please supply a non-empty value to get_placement_group") worker = ray._private.worker.global_worker worker.check_connected() placement_group_info = ray._private.state.state.get_placement_group_by_name( placement_group_name, worker.namespace ) if placement_group_info is None: raise ValueError( f"Failed to look up placement group with name: {placement_group_name}" ) else: return PlacementGroup( PlacementGroupID(hex_to_binary(placement_group_info["placement_group_id"])) )
[docs]@DeveloperAPI @client_mode_wrap def placement_group_table(placement_group: PlacementGroup = None) -> dict: """Get the state of the placement group from GCS. Args: placement_group: placement group to see states. """ worker = ray._private.worker.global_worker worker.check_connected() placement_group_id = placement_group.id if (placement_group is not None) else None return ray._private.state.state.placement_group_table(placement_group_id)
[docs]@PublicAPI def get_current_placement_group() -> Optional[PlacementGroup]: """Get the current placement group which a task or actor is using. It returns None if there's no current placement group for the worker. For example, if you call this method in your driver, it returns None (because drivers never belong to any placement group). Examples: .. testcode:: import ray from ray.util.placement_group import get_current_placement_group from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @ray.remote def f(): # This returns the placement group the task f belongs to. # It means this pg is identical to the pg created below. return get_current_placement_group() pg = ray.util.placement_group([{"CPU": 2}]) assert ray.get(f.options( scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg)).remote()) == pg # Driver doesn't belong to any placement group, # so it returns None. assert get_current_placement_group() is None Return: PlacementGroup: Placement group object. None if the current task or actor wasn't created with any placement group. """ auto_init_ray() if client_mode_should_convert(): # Client mode is only a driver. return None worker = ray._private.worker.global_worker worker.check_connected() pg_id = worker.placement_group_id if pg_id.is_nil(): return None return PlacementGroup(pg_id)
def check_placement_group_index( placement_group: PlacementGroup, bundle_index: int ) -> None: assert placement_group is not None if placement_group.id.is_nil(): if bundle_index != -1: raise ValueError( "If placement group is not set, " "the value of bundle index must be -1." ) elif bundle_index >= placement_group.bundle_count or bundle_index < -1: raise ValueError( f"placement group bundle index {bundle_index} " f"is invalid. Valid placement group indexes: " f"0-{placement_group.bundle_count}" ) def validate_placement_group( bundles: List[Dict[str, float]], strategy: str = "PACK", lifetime: Optional[str] = None, _max_cpu_fraction_per_node: float = 1.0, _soft_target_node_id: Optional[str] = None, ) -> bool: """Validates inputs for placement_group. Raises ValueError if inputs are invalid. """ assert _max_cpu_fraction_per_node is not None if _max_cpu_fraction_per_node <= 0 or _max_cpu_fraction_per_node > 1: raise ValueError( "Invalid argument `_max_cpu_fraction_per_node`: " f"{_max_cpu_fraction_per_node}. " "_max_cpu_fraction_per_node must be a float between 0 and 1. " ) if _soft_target_node_id and strategy != "STRICT_PACK": raise ValueError( "_soft_target_node_id currently only works " f"with STRICT_PACK but got {strategy}" ) if _soft_target_node_id and ray.NodeID.from_hex(_soft_target_node_id).is_nil(): raise ValueError( f"Invalid hex ID of _soft_target_node_id, got {_soft_target_node_id}" ) _validate_bundles(bundles) if strategy not in VALID_PLACEMENT_GROUP_STRATEGIES: raise ValueError( f"Invalid placement group strategy {strategy}. " f"Supported strategies are: {VALID_PLACEMENT_GROUP_STRATEGIES}." ) if lifetime not in [None, "detached"]: raise ValueError( "Placement group `lifetime` argument must be either `None` or " f"'detached'. Got {lifetime}." ) def _validate_bundles(bundles: List[Dict[str, float]]): """Validates each bundle and raises a ValueError if any bundle is invalid.""" if not isinstance(bundles, list): raise ValueError( "Placement group bundles must be a list, " f"got {type(bundles)}." ) if len(bundles) == 0: raise ValueError( "Bundles must be a non-empty list of resource " 'dictionaries. For example: `[{"CPU": 1.0}, {"GPU": 1.0}]`. ' "Got empty list instead." ) for bundle in bundles: if ( not isinstance(bundle, dict) or not all(isinstance(k, str) for k in bundle.keys()) or not all(isinstance(v, (int, float)) for v in bundle.values()) ): raise ValueError( "Bundles must be a non-empty list of " "resource dictionaries. For example: " '`[{"CPU": 1.0}, {"GPU": 1.0}]`.' ) if len(bundle) == 0 or all( resource_value == 0 for resource_value in bundle.values() ): raise ValueError( "Bundles cannot be an empty dictionary or " f"resources with only 0 values. Bundles: {bundles}" ) if "object_store_memory" in bundle.keys(): warnings.warn( "Setting 'object_store_memory' for" " bundles is deprecated since it doesn't actually" " reserve the required object store memory." f" Use object spilling that's enabled by default (https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/objects/object-spilling.html) " # noqa: E501 "instead to bypass the object store memory size limitation.", DeprecationWarning, stacklevel=1, ) def _valid_resource_shape(resources, bundle_specs): """ If the resource shape cannot fit into every bundle spec, return False """ for bundle in bundle_specs: fit_in_bundle = True for resource, requested_val in resources.items(): # Skip "bundle" resource as it is automatically added # to all nodes with bundles by the placement group. if resource == ray_constants.PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME: continue if bundle.get(resource, 0) < requested_val: fit_in_bundle = False break if fit_in_bundle: # If resource request fits in any bundle, it is valid. return True return False def _validate_resource_shape( placement_group, resources, placement_resources, task_or_actor_repr ): bundles = placement_group.bundle_specs resources_valid = _valid_resource_shape(resources, bundles) placement_resources_valid = _valid_resource_shape(placement_resources, bundles) if not resources_valid: raise ValueError( f"Cannot schedule {task_or_actor_repr} with " "the placement group because the resource request " f"{resources} cannot fit into any bundles for " f"the placement group, {bundles}." ) if not placement_resources_valid: # Happens for the default actor case. # placement_resources is not an exposed concept to users, # so we should write more specialized error messages. raise ValueError( f"Cannot schedule {task_or_actor_repr} with " "the placement group because the actor requires " f"{placement_resources.get('CPU', 0)} CPU for " "creation, but it cannot " f"fit into any bundles for the placement group, " f"{bundles}. Consider " "creating a placement group with CPU resources." ) def _configure_placement_group_based_on_context( placement_group_capture_child_tasks: bool, bundle_index: int, resources: Dict, placement_resources: Dict, task_or_actor_repr: str, placement_group: Union[PlacementGroup, str, None] = "default", ) -> PlacementGroup: """Configure the placement group based on the given context. Based on the given context, this API returns the placement group instance for task/actor scheduling. Params: placement_group_capture_child_tasks: Whether or not the placement group needs to be captured from the global context. bundle_index: The bundle index for tasks/actor scheduling. resources: The scheduling resources. placement_resources: The scheduling placement resources for actors. task_or_actor_repr: The repr of task or actor function/class descriptor. placement_group: The placement group instance. - "default": Default placement group argument. Currently, the default behavior is to capture the parent task' placement group if placement_group_capture_child_tasks is set. - None: means placement group is explicitly not configured. - Placement group instance: In this case, do nothing. Returns: Placement group instance based on the given context. Raises: ValueError: If the bundle index is invalid for the placement group or the requested resources shape doesn't fit to any bundles. """ # Validate inputs. assert placement_group_capture_child_tasks is not None assert resources is not None # Validate and get the PlacementGroup instance. # Placement group could be None, default, or placement group. # Default behavior is "do not capture child tasks". if placement_group != "default": if not placement_group: placement_group = PlacementGroup.empty() elif placement_group == "default": if placement_group_capture_child_tasks: placement_group = get_current_placement_group() else: placement_group = PlacementGroup.empty() if not placement_group: placement_group = PlacementGroup.empty() assert isinstance(placement_group, PlacementGroup) # Validate the index. check_placement_group_index(placement_group, bundle_index) # Validate the shape. if not placement_group.is_empty: _validate_resource_shape( placement_group, resources, placement_resources, task_or_actor_repr ) return placement_group