"""APIs exposed under the namespace ray.util.collective."""
import logging
import os
from typing import List
import numpy as np
import ray
from ray.util.collective import types
_NCCL_AVAILABLE = True
_GLOO_AVAILABLE = True
logger = logging.getLogger(__name__)
try:
from ray.util.collective.collective_group.nccl_collective_group import NCCLGroup
except ImportError:
_NCCL_AVAILABLE = False
logger.warning(
"NCCL seems unavailable. Please install Cupy "
"following the guide at: "
"https://docs.cupy.dev/en/stable/install.html."
)
try:
from ray.util.collective.collective_group.gloo_collective_group import GLOOGroup
except ImportError:
_GLOO_AVAILABLE = False
def nccl_available():
return _NCCL_AVAILABLE
def gloo_available():
return _GLOO_AVAILABLE
[docs]
class GroupManager(object):
"""Use this class to manage the collective groups we created so far.
Each process will have an instance of `GroupManager`. Each process
could belong to multiple collective groups. The membership information
and other metadata are stored in the global `_group_mgr` object.
"""
def __init__(self):
self._name_group_map = {}
self._group_name_map = {}
[docs]
def create_collective_group(self, backend, world_size, rank, group_name):
"""The entry to create new collective groups in the manager.
Put the registration and the group information into the manager
metadata as well.
"""
backend = types.Backend(backend)
if backend == types.Backend.MPI:
raise RuntimeError("Ray does not support MPI.")
elif backend == types.Backend.GLOO:
logger.debug("Creating GLOO group: '{}'...".format(group_name))
g = GLOOGroup(
world_size,
rank,
group_name,
store_type="ray_internal_kv",
device_type="tcp",
)
self._name_group_map[group_name] = g
self._group_name_map[g] = group_name
elif backend == types.Backend.NCCL:
logger.debug("Creating NCCL group: '{}'...".format(group_name))
g = NCCLGroup(world_size, rank, group_name)
self._name_group_map[group_name] = g
self._group_name_map[g] = group_name
return self._name_group_map[group_name]
def is_group_exist(self, group_name):
return group_name in self._name_group_map
[docs]
def get_group_by_name(self, group_name):
"""Get the collective group handle by its name."""
if not self.is_group_exist(group_name):
logger.warning("The group '{}' is not initialized.".format(group_name))
return None
return self._name_group_map[group_name]
[docs]
def destroy_collective_group(self, group_name):
"""Group destructor."""
if not self.is_group_exist(group_name):
logger.warning("The group '{}' does not exist.".format(group_name))
return
# release the collective group resource
g = self._name_group_map[group_name]
# clean up the dicts
del self._group_name_map[g]
del self._name_group_map[group_name]
# Release the communicator resources
g.destroy_group()
# Release the detached actors spawned by `create_collective_group()`
name = "info_" + group_name
try:
store = ray.get_actor(name)
ray.kill(store)
except ValueError:
pass
_group_mgr = GroupManager()
[docs]
def is_group_initialized(group_name):
"""Check if the group is initialized in this process by the group name."""
return _group_mgr.is_group_exist(group_name)
[docs]
def init_collective_group(
world_size: int, rank: int, backend=types.Backend.NCCL, group_name: str = "default"
):
"""Initialize a collective group inside an actor process.
Args:
world_size: the total number of processes in the group.
rank: the rank of the current process.
backend: the CCL backend to use, NCCL or GLOO.
group_name: the name of the collective group.
Returns:
None
"""
_check_inside_actor()
backend = types.Backend(backend)
_check_backend_availability(backend)
global _group_mgr
# TODO(Hao): implement a group auto-counter.
if not group_name:
raise ValueError("group_name '{}' needs to be a string.".format(group_name))
if _group_mgr.is_group_exist(group_name):
raise RuntimeError("Trying to initialize a group twice.")
assert world_size > 0
assert rank >= 0
assert rank < world_size
_group_mgr.create_collective_group(backend, world_size, rank, group_name)
[docs]
def create_collective_group(
actors,
world_size: int,
ranks: List[int],
backend=types.Backend.NCCL,
group_name: str = "default",
):
"""Declare a list of actors as a collective group.
Note: This function should be called in a driver process.
Args:
actors: a list of actors to be set in a collective group.
world_size: the total number of processes in the group.
ranks (List[int]): the rank of each actor.
backend: the CCL backend to use, NCCL or GLOO.
group_name: the name of the collective group.
Returns:
None
"""
backend = types.Backend(backend)
_check_backend_availability(backend)
name = "info_" + group_name
try:
ray.get_actor(name)
raise RuntimeError("Trying to initialize a group twice.")
except ValueError:
pass
if len(ranks) != len(actors):
raise RuntimeError(
"Each actor should correspond to one rank. Got '{}' "
"ranks but '{}' actors".format(len(ranks), len(actors))
)
if set(ranks) != set(range(len(ranks))):
raise RuntimeError(
"Ranks must be a permutation from 0 to '{}'. Got '{}'.".format(
len(ranks), "".join([str(r) for r in ranks])
)
)
if world_size <= 0:
raise RuntimeError(
"World size must be greater than zero. Got '{}'.".format(world_size)
)
if not all(ranks) >= 0:
raise RuntimeError("Ranks must be non-negative.")
if not all(ranks) < world_size:
raise RuntimeError("Ranks cannot be greater than world_size.")
# avoid a circular dependency
from ray.util.collective.util import Info
# store the information into a NamedActor that can be accessed later.
name = "info_" + group_name
actors_id = [a._ray_actor_id for a in actors]
# TODO (Dacheng): how do we recycle this name actor?
info = Info.options(name=name, lifetime="detached").remote()
ray.get([info.set_info.remote(actors_id, world_size, ranks, backend)])
# TODO (we need a declarative destroy() API here.)
[docs]
def destroy_collective_group(group_name: str = "default") -> None:
"""Destroy a collective group given its group name."""
_check_inside_actor()
global _group_mgr
_group_mgr.destroy_collective_group(group_name)
[docs]
def get_rank(group_name: str = "default") -> int:
"""Return the rank of this process in the given group.
Args:
group_name: the name of the group to query
Returns:
the rank of this process in the named group,
-1 if the group does not exist or the process does
not belong to the group.
"""
_check_inside_actor()
if not is_group_initialized(group_name):
return -1
g = _group_mgr.get_group_by_name(group_name)
return g.rank
[docs]
def get_collective_group_size(group_name: str = "default") -> int:
"""Return the size of the collective group with the given name.
Args:
group_name: the name of the group to query
Returns:
The world size of the collective group, -1 if the group does
not exist or the process does not belong to the group.
"""
_check_inside_actor()
if not is_group_initialized(group_name):
return -1
g = _group_mgr.get_group_by_name(group_name)
return g.world_size
[docs]
def allreduce(tensor, group_name: str = "default", op=types.ReduceOp.SUM):
"""Collective allreduce the tensor across the group.
Args:
tensor: the tensor to be all-reduced on this process.
group_name: the collective group name to perform allreduce.
op: The reduce operation.
Returns:
None
"""
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
opts = types.AllReduceOptions
opts.reduceOp = op
g.allreduce([tensor], opts)
[docs]
def allreduce_multigpu(
tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM
):
"""Collective allreduce a list of tensors across the group.
Args:
tensor_list (List[tensor]): list of tensors to be allreduced,
each on a GPU.
group_name: the collective group name to perform allreduce.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
_check_tensor_list_input(tensor_list)
g = _check_and_get_group(group_name)
opts = types.AllReduceOptions
opts.reduceOp = op
g.allreduce(tensor_list, opts)
[docs]
def barrier(group_name: str = "default"):
"""Barrier all processes in the collective group.
Args:
group_name: the name of the group to barrier.
Returns:
None
"""
g = _check_and_get_group(group_name)
g.barrier()
[docs]
def reduce(
tensor, dst_rank: int = 0, group_name: str = "default", op=types.ReduceOp.SUM
):
"""Reduce the tensor across the group to the destination rank.
Args:
tensor: the tensor to be reduced on this process.
dst_rank: the rank of the destination process.
group_name: the collective group name to perform reduce.
op: The reduce operation.
Returns:
None
"""
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
# check dst rank
_check_rank_valid(g, dst_rank)
opts = types.ReduceOptions()
opts.reduceOp = op
opts.root_rank = dst_rank
opts.root_tensor = 0
g.reduce([tensor], opts)
[docs]
def reduce_multigpu(
tensor_list: list,
dst_rank: int = 0,
dst_tensor: int = 0,
group_name: str = "default",
op=types.ReduceOp.SUM,
):
"""Reduce the tensor across the group to the destination rank
and destination tensor.
Args:
tensor_list: the list of tensors to be reduced on this process;
each tensor located on a GPU.
dst_rank: the rank of the destination process.
dst_tensor: the index of GPU at the destination.
group_name: the collective group name to perform reduce.
op: The reduce operation.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
_check_tensor_list_input(tensor_list)
g = _check_and_get_group(group_name)
# check dst rank
_check_rank_valid(g, dst_rank)
_check_root_tensor_valid(len(tensor_list), dst_tensor)
opts = types.ReduceOptions()
opts.reduceOp = op
opts.root_rank = dst_rank
opts.root_tensor = dst_tensor
g.reduce(tensor_list, opts)
[docs]
def broadcast(tensor, src_rank: int = 0, group_name: str = "default"):
"""Broadcast the tensor from a source process to all others.
Args:
tensor: the tensor to be broadcasted (src) or received (destination).
src_rank: the rank of the source process.
group_name: the collective group name to perform broadcast.
Returns:
None
"""
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
# check src rank
_check_rank_valid(g, src_rank)
opts = types.BroadcastOptions()
opts.root_rank = src_rank
opts.root_tensor = 0
g.broadcast([tensor], opts)
[docs]
def broadcast_multigpu(
tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = "default"
):
"""Broadcast the tensor from a source GPU to all other GPUs.
Args:
tensor_list: the tensors to broadcast (src) or receive (dst).
src_rank: the rank of the source process.
src_tensor: the index of the source GPU on the source process.
group_name: the collective group name to perform broadcast.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
_check_tensor_list_input(tensor_list)
g = _check_and_get_group(group_name)
# check src rank
_check_rank_valid(g, src_rank)
_check_root_tensor_valid(len(tensor_list), src_tensor)
opts = types.BroadcastOptions()
opts.root_rank = src_rank
opts.root_tensor = src_tensor
g.broadcast(tensor_list, opts)
[docs]
def allgather(tensor_list: list, tensor, group_name: str = "default"):
"""Allgather tensors from each process of the group into a list.
Args:
tensor_list: the results, stored as a list of tensors.
tensor: the tensor (to be gathered) in the current process
group_name: the name of the collective group.
Returns:
None
"""
_check_single_tensor_input(tensor)
_check_tensor_list_input(tensor_list)
g = _check_and_get_group(group_name)
if len(tensor_list) != g.world_size:
# Typically CLL lib requires len(tensor_list) >= world_size;
# Here we make it more strict: len(tensor_list) == world_size.
raise RuntimeError(
"The length of the tensor list operands to allgather "
"must be equal to world_size."
)
opts = types.AllGatherOptions()
g.allgather([tensor_list], [tensor], opts)
[docs]
def allgather_multigpu(
output_tensor_lists: list, input_tensor_list: list, group_name: str = "default"
):
"""Allgather tensors from each gpus of the group into lists.
Args:
output_tensor_lists (List[List[tensor]]): gathered results, with shape
must be num_gpus * world_size * shape(tensor).
input_tensor_list: (List[tensor]): a list of tensors, with shape
num_gpus * shape(tensor).
group_name: the name of the collective group.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
_check_tensor_lists_input(output_tensor_lists)
_check_tensor_list_input(input_tensor_list)
g = _check_and_get_group(group_name)
opts = types.AllGatherOptions()
g.allgather(output_tensor_lists, input_tensor_list, opts)
[docs]
def reducescatter(
tensor, tensor_list: list, group_name: str = "default", op=types.ReduceOp.SUM
):
"""Reducescatter a list of tensors across the group.
Reduce the list of the tensors across each process in the group, then
scatter the reduced list of tensors -- one tensor for each process.
Args:
tensor: the resulted tensor on this process.
tensor_list: The list of tensors to be reduced and scattered.
group_name: the name of the collective group.
op: The reduce operation.
Returns:
None
"""
_check_single_tensor_input(tensor)
_check_tensor_list_input(tensor_list)
g = _check_and_get_group(group_name)
if len(tensor_list) != g.world_size:
raise RuntimeError(
"The length of the tensor list operands to reducescatter "
"must not be equal to world_size."
)
opts = types.ReduceScatterOptions()
opts.reduceOp = op
g.reducescatter([tensor], [tensor_list], opts)
[docs]
def reducescatter_multigpu(
output_tensor_list,
input_tensor_lists,
group_name: str = "default",
op=types.ReduceOp.SUM,
):
"""Reducescatter a list of tensors across all GPUs.
Args:
output_tensor_list: the resulted list of tensors, with
shape: num_gpus * shape(tensor).
input_tensor_lists: the original tensors, with shape:
num_gpus * world_size * shape(tensor).
group_name: the name of the collective group.
op: The reduce operation.
Returns:
None.
"""
if not types.cupy_available():
raise RuntimeError("Multigpu calls requires NCCL and Cupy.")
_check_tensor_lists_input(input_tensor_lists)
_check_tensor_list_input(output_tensor_list)
g = _check_and_get_group(group_name)
opts = types.ReduceScatterOptions()
opts.reduceOp = op
g.reducescatter(output_tensor_list, input_tensor_lists, opts)
[docs]
def send(tensor, dst_rank: int, group_name: str = "default"):
"""Send a tensor to a remote process synchronously.
Args:
tensor: the tensor to send.
dst_rank: the rank of the destination process.
group_name: the name of the collective group.
Returns:
None
"""
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
_check_rank_valid(g, dst_rank)
if dst_rank == g.rank:
raise RuntimeError("The destination rank '{}' is self.".format(dst_rank))
opts = types.SendOptions()
opts.dst_rank = dst_rank
g.send([tensor], opts)
[docs]
def send_multigpu(
tensor,
dst_rank: int,
dst_gpu_index: int,
group_name: str = "default",
n_elements: int = 0,
):
"""Send a tensor to a remote GPU synchronously.
The function asssume each process owns >1 GPUs, and the sender
process and receiver process has equal nubmer of GPUs.
Args:
tensor: the tensor to send, located on a GPU.
dst_rank: the rank of the destination process.
dst_gpu_index: the destination gpu index.
group_name: the name of the collective group.
n_elements: if specified, send the next n elements
from the starting address of tensor.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("send_multigpu call requires NCCL.")
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
_check_rank_valid(g, dst_rank)
if dst_rank == g.rank:
raise RuntimeError(
"The dst_rank '{}' is self. Considering "
"doing GPU to GPU memcpy instead?".format(dst_rank)
)
if n_elements < 0:
raise RuntimeError("The n_elements '{}' should >= 0.".format(n_elements))
opts = types.SendOptions()
opts.dst_rank = dst_rank
opts.dst_gpu_index = dst_gpu_index
opts.n_elements = n_elements
g.send([tensor], opts)
[docs]
def recv(tensor, src_rank: int, group_name: str = "default"):
"""Receive a tensor from a remote process synchronously.
Args:
tensor: the received tensor.
src_rank: the rank of the source process.
group_name: the name of the collective group.
Returns:
None
"""
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
_check_rank_valid(g, src_rank)
if src_rank == g.rank:
raise RuntimeError("The destination rank '{}' is self.".format(src_rank))
opts = types.RecvOptions()
opts.src_rank = src_rank
g.recv([tensor], opts)
[docs]
def recv_multigpu(
tensor,
src_rank: int,
src_gpu_index: int,
group_name: str = "default",
n_elements: int = 0,
):
"""Receive a tensor from a remote GPU synchronously.
The function asssume each process owns >1 GPUs, and the sender
process and receiver process has equal nubmer of GPUs.
Args:
tensor: the received tensor, located on a GPU.
src_rank: the rank of the source process.
src_gpu_index (int): the index of the source gpu on the src process.
group_name: the name of the collective group.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("recv_multigpu call requires NCCL.")
_check_single_tensor_input(tensor)
g = _check_and_get_group(group_name)
_check_rank_valid(g, src_rank)
if src_rank == g.rank:
raise RuntimeError(
"The dst_rank '{}' is self. Considering "
"doing GPU to GPU memcpy instead?".format(src_rank)
)
if n_elements < 0:
raise RuntimeError("The n_elements '{}' should be >= 0.".format(n_elements))
opts = types.RecvOptions()
opts.src_rank = src_rank
opts.src_gpu_index = src_gpu_index
opts.n_elements = n_elements
g.recv([tensor], opts)
[docs]
def synchronize(gpu_id: int):
"""Synchronize the current process to a give device.
Args:
gpu_id: the GPU device id to synchronize.
Returns:
None
"""
if not types.cupy_available():
raise RuntimeError("synchronize call requires CUDA and NCCL.")
import cupy as cp
cp.cuda.Device(gpu_id).synchronize()
def _check_and_get_group(group_name):
"""Check the existence and return the group handle."""
_check_inside_actor()
global _group_mgr
if not is_group_initialized(group_name):
# try loading from remote info store
try:
# if the information is stored in an Info object,
# get and create the group.
name = "info_" + group_name
mgr = ray.get_actor(name=name)
ids, world_size, rank, backend = ray.get(mgr.get_info.remote())
worker = ray._private.worker.global_worker
id_ = worker.core_worker.get_actor_id()
r = rank[ids.index(id_)]
_group_mgr.create_collective_group(backend, world_size, r, group_name)
except ValueError as exc:
# check if this group is initialized using options()
if (
"collective_group_name" in os.environ
and os.environ["collective_group_name"] == group_name
):
rank = int(os.environ["collective_rank"])
world_size = int(os.environ["collective_world_size"])
backend = os.environ["collective_backend"]
_group_mgr.create_collective_group(
backend, world_size, rank, group_name
)
else:
raise RuntimeError(
"The collective group '{}' is not "
"initialized in the process.".format(group_name)
) from exc
g = _group_mgr.get_group_by_name(group_name)
return g
def _check_single_tensor_input(tensor):
"""Check if the tensor is with a supported type."""
if isinstance(tensor, np.ndarray):
return
if types.cupy_available():
if isinstance(tensor, types.cp.ndarray):
return
if types.torch_available():
if isinstance(tensor, types.th.Tensor):
return
raise RuntimeError(
"Unrecognized tensor type '{}'. Supported types are: "
"np.ndarray, torch.Tensor, cupy.ndarray.".format(type(tensor))
)
def _check_backend_availability(backend: types.Backend):
"""Check whether the backend is available."""
if backend == types.Backend.GLOO:
if not gloo_available():
raise RuntimeError("GLOO is not available.")
elif backend == types.Backend.NCCL:
if not nccl_available():
raise RuntimeError("NCCL is not available.")
def _check_inside_actor():
"""Check if currently it is inside a Ray actor/task."""
worker = ray._private.worker.global_worker
if worker.mode == ray.WORKER_MODE:
return
else:
raise RuntimeError(
"The collective APIs shall be only used inside a Ray actor or task."
)
def _check_rank_valid(g, rank: int):
"""Check the rank: 0 <= rank < world_size."""
if rank < 0:
raise ValueError("rank '{}' is negative.".format(rank))
if rank >= g.world_size:
raise ValueError(
"rank '{}' must be less than world size '{}'".format(rank, g.world_size)
)
def _check_tensor_list_input(tensor_list):
"""Check if the input is a list of supported tensor types."""
if not isinstance(tensor_list, list):
raise RuntimeError(
"The input must be a list of tensors. "
"Got '{}'.".format(type(tensor_list))
)
if not tensor_list:
raise RuntimeError("Got an empty list of tensors.")
for t in tensor_list:
_check_single_tensor_input(t)
def _check_tensor_lists_input(tensor_lists):
"""Check if the input is a list of lists of supported tensor types."""
if not isinstance(tensor_lists, list):
raise RuntimeError(
"The input must be a list of lists of tensors. "
"Got '{}'.".format(type(tensor_lists))
)
if not tensor_lists:
raise RuntimeError(f"Did not receive tensors. Got: {tensor_lists}")
for t in tensor_lists:
_check_tensor_list_input(t)
def _check_root_tensor_valid(length, root_tensor):
"""Check the root_tensor device is 0 <= root_tensor < length"""
if root_tensor < 0:
raise ValueError("root_tensor '{}' is negative.".format(root_tensor))
if root_tensor >= length:
raise ValueError(
"root_tensor '{}' is greater than the number of GPUs: "
"'{}'".format(root_tensor, length)
)