Source code for ray.util.collective.collective

"""APIs exposed under the namespace ray.util.collective."""

import logging
import os
import socket
import threading
import time
from typing import List, Tuple

import numpy as np

import ray
import ray.experimental.internal_kv as _internal_kv
from . import types
from ray._common.network_utils import find_free_port, is_ipv6
from ray.util.collective.collective_group.torch_gloo_collective_group import (
    get_master_address_metadata_key as _get_master_addr_key,
)

logger = logging.getLogger(__name__)

try:
    from ray.util.collective.collective_group.nccl_collective_group import NCCLGroup

    _NCCL_AVAILABLE = True
    _LOG_NCCL_WARNING = False
except ImportError:
    _NCCL_AVAILABLE = False
    _LOG_NCCL_WARNING = True


try:
    from ray.util.collective.collective_group.torch_gloo_collective_group import (
        TorchGLOOGroup,
    )

    _TORCH_DISTRIBUTED_AVAILABLE = True
except ImportError:
    _TORCH_DISTRIBUTED_AVAILABLE = False


def nccl_available():
    global _LOG_NCCL_WARNING
    if ray.get_gpu_ids() and _LOG_NCCL_WARNING:
        logger.warning(
            "NCCL seems unavailable. Please install Cupy "
            "following the guide at: "
            "https://docs.cupy.dev/en/stable/install.html."
        )
        _LOG_NCCL_WARNING = False
    return _NCCL_AVAILABLE


def gloo_available():
    # Since we use torch_gloo as the backend for Gloo,
    # we can just return the availability of torch.distributed.
    return _TORCH_DISTRIBUTED_AVAILABLE


def torch_distributed_available():
    return _TORCH_DISTRIBUTED_AVAILABLE


[docs] def get_address_and_port() -> Tuple[str, int]: """Returns the IP address and a free port on this node.""" addr = ray.util.get_node_ip_address() port = find_free_port(socket.AF_INET6 if is_ipv6(addr) else socket.AF_INET) return addr, port
[docs] class GroupManager(object): """Use this class to manage the collective groups we created so far. Each process will have an instance of `GroupManager`. Each process could belong to multiple collective groups. The membership information and other metadata are stored in the global `_group_mgr` object. """ def __init__(self): self._name_group_map = {}
[docs] def create_collective_group( self, backend, world_size, rank, group_name, gloo_timeout ): """The entry to create new collective groups in the manager. Put the registration and the group information into the manager metadata as well. """ backend = types.Backend(backend) if backend == types.Backend.GLOO: # Rendezvous: ensure a MASTER_ADDR:MASTER_PORT is published in internal_kv. metadata_key = _get_master_addr_key(group_name) if rank == 0: addr, port = get_address_and_port() _internal_kv._internal_kv_put(metadata_key, f"{addr}:{port}") else: # Wait until rank 0 publishes the metadata or timeout. deadline_s = time.time() + ( gloo_timeout / 1000.0 if gloo_timeout else 30.0 ) while True: meta = _internal_kv._internal_kv_get(metadata_key) if meta is not None: break if time.time() > deadline_s: raise TimeoutError( f"Timed out waiting for GLOO rendezvous metadata for group '{group_name}'." ) time.sleep(0.05) logger.debug( "Creating torch.distributed GLOO group: '{}'...".format(group_name) ) g = TorchGLOOGroup(world_size, rank, group_name, gloo_timeout) elif backend == types.Backend.NCCL: _check_backend_availability(backend) logger.debug("Creating NCCL group: '{}'...".format(group_name)) g = NCCLGroup(world_size, rank, group_name) else: raise RuntimeError(f"Unexpected backend: {backend}") self._name_group_map[group_name] = g return self._name_group_map[group_name]
def is_group_exist(self, group_name): return group_name in self._name_group_map
[docs] def get_group_by_name(self, group_name): """Get the collective group handle by its name.""" if not self.is_group_exist(group_name): logger.warning("The group '{}' is not initialized.".format(group_name)) return None return self._name_group_map[group_name]
[docs] def destroy_collective_group(self, group_name): """Group destructor.""" if not self.is_group_exist(group_name): logger.warning("The group '{}' does not exist.".format(group_name)) return # release the collective group resource g = self._name_group_map[group_name] # clean up the dicts del self._name_group_map[group_name] # Release the communicator resources g.destroy_group() # Release the detached actors spawned by `create_collective_group()` name = "info_" + group_name try: store = ray.get_actor(name) ray.kill(store) except ValueError: pass
_group_mgr = GroupManager() # This lock is used to make external calls to the _group_mgr thread-safe. _group_mgr_lock = threading.Lock()
[docs] def is_group_initialized(group_name): """Check if the group is initialized in this process by the group name.""" global _group_mgr global _group_mgr_lock with _group_mgr_lock: return _group_mgr.is_group_exist(group_name)
[docs] def init_collective_group( world_size: int, rank: int, backend=types.Backend.NCCL, group_name: str = "default", gloo_timeout: int = 30000, ): """Initialize a collective group inside an actor process. Args: world_size: the total number of processes in the group. rank: the rank of the current process. backend: the CCL backend to use, NCCL or GLOO. group_name: the name of the collective group. Returns: None """ _check_inside_actor() backend = types.Backend(backend) _check_backend_availability(backend) global _group_mgr global _group_mgr_lock # TODO(Hao): implement a group auto-counter. if not group_name: raise ValueError("group_name '{}' needs to be a string.".format(group_name)) with _group_mgr_lock: if _group_mgr.is_group_exist(group_name): raise RuntimeError("Trying to initialize a group twice.") assert world_size > 0 assert rank >= 0 assert rank < world_size _group_mgr.create_collective_group( backend, world_size, rank, group_name, gloo_timeout )
[docs] def create_collective_group( actors, world_size: int, ranks: List[int], backend=types.Backend.NCCL, group_name: str = "default", gloo_timeout: int = 30000, ): """Declare a list of actors as a collective group. Note: This function should be called in a driver process. Args: actors: a list of actors to be set in a collective group. world_size: the total number of processes in the group. ranks (List[int]): the rank of each actor. backend: the CCL backend to use, NCCL or GLOO. group_name: the name of the collective group. Returns: None """ backend = types.Backend(backend) _check_backend_availability(backend) name = "info_" + group_name try: ray.get_actor(name) raise RuntimeError("Trying to initialize a group twice.") except ValueError: pass if len(ranks) != len(actors): raise RuntimeError( "Each actor should correspond to one rank. Got '{}' " "ranks but '{}' actors".format(len(ranks), len(actors)) ) if set(ranks) != set(range(len(ranks))): raise RuntimeError( "Ranks must be a permutation from 0 to '{}'. Got '{}'.".format( len(ranks), "".join([str(r) for r in ranks]) ) ) if world_size <= 0: raise RuntimeError( "World size must be greater than zero. Got '{}'.".format(world_size) ) if not all(ranks) >= 0: raise RuntimeError("Ranks must be non-negative.") if not all(ranks) < world_size: raise RuntimeError("Ranks cannot be greater than world_size.") # avoid a circular dependency from ray.util.collective.util import Info # store the information into a NamedActor that can be accessed later. name = "info_" + group_name actors_id = [a._ray_actor_id for a in actors] # TODO (Dacheng): how do we recycle this name actor? info = Info.options(name=name, lifetime="detached").remote() ray.get([info.set_info.remote(actors_id, world_size, ranks, backend, gloo_timeout)])
# TODO (we need a declarative destroy() API here.)
[docs] def destroy_collective_group(group_name: str = "default") -> None: """Destroy a collective group given its group name.""" _check_inside_actor() global _group_mgr global _group_mgr_lock with _group_mgr_lock: _group_mgr.destroy_collective_group(group_name)
[docs] def get_rank(group_name: str = "default") -> int: """Return the rank of this process in the given group. Args: group_name: the name of the group to query Returns: the rank of this process in the named group, -1 if the group does not exist or the process does not belong to the group. """ _check_inside_actor() global _group_mgr global _group_mgr_lock with _group_mgr_lock: if not _group_mgr.is_group_exist(group_name): return -1 g = _group_mgr.get_group_by_name(group_name) return g.rank
[docs] def get_collective_group_size(group_name: str = "default") -> int: """Return the size of the collective group with the given name. Args: group_name: the name of the group to query Returns: The world size of the collective group, -1 if the group does not exist or the process does not belong to the group. """ _check_inside_actor() global _group_mgr global _group_mgr_lock with _group_mgr_lock: if not _group_mgr.is_group_exist(group_name): return -1 g = _group_mgr.get_group_by_name(group_name) return g.world_size
[docs] def allreduce(tensor, group_name: str = "default", op=types.ReduceOp.SUM): """Collective allreduce the tensor across the group. Args: tensor: the tensor to be all-reduced on this process. group_name: the collective group name to perform allreduce. op: The reduce operation. Returns: None """ _check_single_tensor_input(tensor) g = get_group_handle(group_name) opts = types.AllReduceOptions opts.reduceOp = op g.allreduce([tensor], opts)
[docs] def allreduce_multigpu( tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM ): """Collective allreduce a list of tensors across the group. Args: tensor_list (List[tensor]): list of tensors to be allreduced, each on a GPU. group_name: the collective group name to perform allreduce. Returns: None """ if not types.cupy_available(): raise RuntimeError("Multigpu calls requires NCCL and Cupy.") _check_tensor_list_input(tensor_list) g = get_group_handle(group_name) opts = types.AllReduceOptions opts.reduceOp = op g.allreduce(tensor_list, opts)
[docs] def barrier(group_name: str = "default"): """Barrier all processes in the collective group. Args: group_name: the name of the group to barrier. Returns: None """ g = get_group_handle(group_name) g.barrier()
[docs] def reduce( tensor, dst_rank: int = 0, group_name: str = "default", op=types.ReduceOp.SUM ): """Reduce the tensor across the group to the destination rank. Args: tensor: the tensor to be reduced on this process. dst_rank: the rank of the destination process. group_name: the collective group name to perform reduce. op: The reduce operation. Returns: None """ _check_single_tensor_input(tensor) g = get_group_handle(group_name) # check dst rank _check_rank_valid(g, dst_rank) opts = types.ReduceOptions() opts.reduceOp = op opts.root_rank = dst_rank opts.root_tensor = 0 g.reduce([tensor], opts)
[docs] def reduce_multigpu( tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = "default", op=types.ReduceOp.SUM, ): """Reduce the tensor across the group to the destination rank and destination tensor. Args: tensor_list: the list of tensors to be reduced on this process; each tensor located on a GPU. dst_rank: the rank of the destination process. dst_tensor: the index of GPU at the destination. group_name: the collective group name to perform reduce. op: The reduce operation. Returns: None """ if not types.cupy_available(): raise RuntimeError("Multigpu calls requires NCCL and Cupy.") _check_tensor_list_input(tensor_list) g = get_group_handle(group_name) # check dst rank _check_rank_valid(g, dst_rank) _check_root_tensor_valid(len(tensor_list), dst_tensor) opts = types.ReduceOptions() opts.reduceOp = op opts.root_rank = dst_rank opts.root_tensor = dst_tensor g.reduce(tensor_list, opts)
[docs] def broadcast(tensor, src_rank: int = 0, group_name: str = "default"): """Broadcast the tensor from a source process to all others. Args: tensor: the tensor to be broadcasted (src) or received (destination). src_rank: the rank of the source process. group_name: the collective group name to perform broadcast. Returns: None """ _check_single_tensor_input(tensor) g = get_group_handle(group_name) # check src rank _check_rank_valid(g, src_rank) opts = types.BroadcastOptions() opts.root_rank = src_rank opts.root_tensor = 0 g.broadcast([tensor], opts)
[docs] def broadcast_multigpu( tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = "default" ): """Broadcast the tensor from a source GPU to all other GPUs. Args: tensor_list: the tensors to broadcast (src) or receive (dst). src_rank: the rank of the source process. src_tensor: the index of the source GPU on the source process. group_name: the collective group name to perform broadcast. Returns: None """ if not types.cupy_available(): raise RuntimeError("Multigpu calls requires NCCL and Cupy.") _check_tensor_list_input(tensor_list) g = get_group_handle(group_name) # check src rank _check_rank_valid(g, src_rank) _check_root_tensor_valid(len(tensor_list), src_tensor) opts = types.BroadcastOptions() opts.root_rank = src_rank opts.root_tensor = src_tensor g.broadcast(tensor_list, opts)
[docs] def allgather(tensor_list: list, tensor, group_name: str = "default"): """Allgather tensors from each process of the group into a list. Args: tensor_list: the results, stored as a list of tensors. tensor: the tensor (to be gathered) in the current process group_name: the name of the collective group. Returns: None """ _check_single_tensor_input(tensor) _check_tensor_list_input(tensor_list) g = get_group_handle(group_name) if len(tensor_list) != g.world_size: # Typically CLL lib requires len(tensor_list) >= world_size; # Here we make it more strict: len(tensor_list) == world_size. raise RuntimeError( "The length of the tensor list operands to allgather " "must be equal to world_size." ) opts = types.AllGatherOptions() g.allgather([tensor_list], [tensor], opts)
[docs] def allgather_multigpu( output_tensor_lists: list, input_tensor_list: list, group_name: str = "default" ): """Allgather tensors from each gpus of the group into lists. Args: output_tensor_lists (List[List[tensor]]): gathered results, with shape must be num_gpus * world_size * shape(tensor). input_tensor_list: (List[tensor]): a list of tensors, with shape num_gpus * shape(tensor). group_name: the name of the collective group. Returns: None """ if not types.cupy_available(): raise RuntimeError("Multigpu calls requires NCCL and Cupy.") _check_tensor_lists_input(output_tensor_lists) _check_tensor_list_input(input_tensor_list) g = get_group_handle(group_name) opts = types.AllGatherOptions() g.allgather(output_tensor_lists, input_tensor_list, opts)
[docs] def reducescatter( tensor, tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM ): """Reducescatter a list of tensors across the group. Reduce the list of the tensors across each process in the group, then scatter the reduced list of tensors -- one tensor for each process. Args: tensor: the resulted tensor on this process. tensor_list: The list of tensors to be reduced and scattered. group_name: the name of the collective group. op: The reduce operation. Returns: None """ _check_single_tensor_input(tensor) _check_tensor_list_input(tensor_list) g = get_group_handle(group_name) opts = types.ReduceScatterOptions() opts.reduceOp = op if len(tensor_list) != g.world_size: raise RuntimeError( "The length of the tensor list operands to reducescatter " "must not be equal to world_size." ) g.reducescatter([tensor], [tensor_list], opts)
[docs] def reducescatter_multigpu( output_tensor_list, input_tensor_lists, group_name: str = "default", op=types.ReduceOp.SUM, ): """Reducescatter a list of tensors across all GPUs. Args: output_tensor_list: the resulted list of tensors, with shape: num_gpus * shape(tensor). input_tensor_lists: the original tensors, with shape: num_gpus * world_size * shape(tensor). group_name: the name of the collective group. op: The reduce operation. Returns: None. """ if not types.cupy_available(): raise RuntimeError("Multigpu calls requires NCCL and Cupy.") _check_tensor_lists_input(input_tensor_lists) _check_tensor_list_input(output_tensor_list) g = get_group_handle(group_name) opts = types.ReduceScatterOptions() opts.reduceOp = op g.reducescatter(output_tensor_list, input_tensor_lists, opts)
[docs] def send(tensor, dst_rank: int, group_name: str = "default"): """Send a tensor to a remote process synchronously. Args: tensor: the tensor to send. dst_rank: the rank of the destination process. group_name: the name of the collective group. Returns: None """ _check_single_tensor_input(tensor) g = get_group_handle(group_name) _check_rank_valid(g, dst_rank) if dst_rank == g.rank: raise RuntimeError("The destination rank '{}' is self.".format(dst_rank)) opts = types.SendOptions() opts.dst_rank = dst_rank g.send([tensor], opts)
[docs] def send_multigpu( tensor, dst_rank: int, dst_gpu_index: int, group_name: str = "default", n_elements: int = 0, ): """Send a tensor to a remote GPU synchronously. The function assumes each process owns >1 GPUs, and the sender process and receiver process has equal number of GPUs. Args: tensor: the tensor to send, located on a GPU. dst_rank: the rank of the destination process. dst_gpu_index: the destination gpu index. group_name: the name of the collective group. n_elements: if specified, send the next n elements from the starting address of tensor. Returns: None """ if not types.cupy_available(): raise RuntimeError("send_multigpu call requires NCCL.") _check_single_tensor_input(tensor) g = get_group_handle(group_name) _check_rank_valid(g, dst_rank) if dst_rank == g.rank: raise RuntimeError( "The dst_rank '{}' is self. Considering " "doing GPU to GPU memcpy instead?".format(dst_rank) ) if n_elements < 0: raise RuntimeError("The n_elements '{}' should >= 0.".format(n_elements)) opts = types.SendOptions() opts.dst_rank = dst_rank opts.dst_gpu_index = dst_gpu_index opts.n_elements = n_elements g.send([tensor], opts)
[docs] def recv(tensor, src_rank: int, group_name: str = "default"): """Receive a tensor from a remote process synchronously. Args: tensor: the received tensor. src_rank: the rank of the source process. group_name: the name of the collective group. Returns: None """ _check_single_tensor_input(tensor) g = get_group_handle(group_name) _check_rank_valid(g, src_rank) if src_rank == g.rank: raise RuntimeError("The destination rank '{}' is self.".format(src_rank)) opts = types.RecvOptions() opts.src_rank = src_rank g.recv([tensor], opts)
[docs] def recv_multigpu( tensor, src_rank: int, src_gpu_index: int, group_name: str = "default", n_elements: int = 0, ): """Receive a tensor from a remote GPU synchronously. The function asssume each process owns >1 GPUs, and the sender process and receiver process has equal nubmer of GPUs. Args: tensor: The received tensor, located on a GPU. src_rank: The rank of the source process. src_gpu_index: The index of the source GPU on the src process. group_name: The name of the collective group. Returns: None """ if not types.cupy_available(): raise RuntimeError("recv_multigpu call requires NCCL.") _check_single_tensor_input(tensor) g = get_group_handle(group_name) _check_rank_valid(g, src_rank) if src_rank == g.rank: raise RuntimeError( "The dst_rank '{}' is self. Considering " "doing GPU to GPU memcpy instead?".format(src_rank) ) if n_elements < 0: raise RuntimeError("The n_elements '{}' should be >= 0.".format(n_elements)) opts = types.RecvOptions() opts.src_rank = src_rank opts.src_gpu_index = src_gpu_index opts.n_elements = n_elements g.recv([tensor], opts)
[docs] def synchronize(gpu_id: int): """Synchronize the current process to a give device. Args: gpu_id: the GPU device id to synchronize. Returns: None """ if not types.cupy_available(): raise RuntimeError("synchronize call requires CUDA and NCCL.") import cupy as cp cp.cuda.Device(gpu_id).synchronize()
[docs] def get_group_handle(group_name: str = "default"): """Check if the group is initialized and return the group handle. Args: group_name: the name of the collective group. Returns: The collective group handle. """ _check_inside_actor() global _group_mgr global _group_mgr_lock with _group_mgr_lock: if not _group_mgr.is_group_exist(group_name): # try loading from remote info store try: # if the information is stored in an Info object, # get and create the group. name = "info_" + group_name mgr = ray.get_actor(name=name) ids, world_size, rank, backend, gloo_timeout = ray.get( mgr.get_info.remote() ) worker = ray._private.worker.global_worker id_ = worker.core_worker.get_actor_id() r = rank[ids.index(id_)] _group_mgr.create_collective_group( backend, world_size, r, group_name, gloo_timeout ) except ValueError as exc: # check if this group is initialized using options() if ( "collective_group_name" in os.environ and os.environ["collective_group_name"] == group_name ): rank = int(os.environ["collective_rank"]) world_size = int(os.environ["collective_world_size"]) backend = os.environ["collective_backend"] gloo_timeout = os.getenv("collective_gloo_timeout", 30000) _group_mgr.create_collective_group( backend, world_size, rank, group_name, gloo_timeout ) else: raise RuntimeError( "The collective group '{}' is not " "initialized in the process.".format(group_name) ) from exc g = _group_mgr.get_group_by_name(group_name) return g
def _check_single_tensor_input(tensor): """Check if the tensor is with a supported type.""" if isinstance(tensor, np.ndarray): return if types.cupy_available(): if isinstance(tensor, types.cp.ndarray): return if types.torch_available(): if isinstance(tensor, types.th.Tensor): return raise RuntimeError( "Unrecognized tensor type '{}'. Supported types are: " "np.ndarray, torch.Tensor, cupy.ndarray.".format(type(tensor)) ) def _check_backend_availability(backend: types.Backend): """Check whether the backend is available.""" if backend == types.Backend.GLOO: # Now we have deprecated pygloo, and use torch_gloo in all cases. if not torch_distributed_available(): raise RuntimeError("torch.distributed is not available.") elif backend == types.Backend.NCCL: if not nccl_available(): raise RuntimeError("NCCL is not available.") def _check_inside_actor(): """Check if currently it is inside a Ray actor/task.""" worker = ray._private.worker.global_worker if worker.mode == ray.WORKER_MODE: return else: raise RuntimeError( "The collective APIs shall be only used inside a Ray actor or task." ) def _check_rank_valid(g, rank: int): """Check the rank: 0 <= rank < world_size.""" if rank < 0: raise ValueError("rank '{}' is negative.".format(rank)) if rank >= g.world_size: raise ValueError( "rank '{}' must be less than world size '{}'".format(rank, g.world_size) ) def _check_tensor_list_input(tensor_list): """Check if the input is a list of supported tensor types.""" if not isinstance(tensor_list, list): raise RuntimeError( "The input must be a list of tensors. " "Got '{}'.".format(type(tensor_list)) ) if not tensor_list: raise RuntimeError("Got an empty list of tensors.") for t in tensor_list: _check_single_tensor_input(t) def _check_tensor_lists_input(tensor_lists): """Check if the input is a list of lists of supported tensor types.""" if not isinstance(tensor_lists, list): raise RuntimeError( "The input must be a list of lists of tensors. " "Got '{}'.".format(type(tensor_lists)) ) if not tensor_lists: raise RuntimeError(f"Did not receive tensors. Got: {tensor_lists}") for t in tensor_lists: _check_tensor_list_input(t) def _check_root_tensor_valid(length, root_tensor): """Check the root_tensor device is 0 <= root_tensor < length""" if root_tensor < 0: raise ValueError("root_tensor '{}' is negative.".format(root_tensor)) if root_tensor >= length: raise ValueError( "root_tensor '{}' is greater than the number of GPUs: " "'{}'".format(root_tensor, length) )