Accelerator Support#
Accelerators like GPUs are critical for many machine learning apps. Ray Core natively supports many accelerators as pre-defined resource types and allows tasks and actors to specify their accelerator resource requirements.
The accelerators natively supported by Ray Core are:
Accelerator |
Ray Resource Name |
Support Level |
---|---|---|
Nvidia GPU |
GPU |
Fully tested, supported by the Ray team |
AMD GPU |
GPU |
Experimental, supported by the community |
Intel GPU |
GPU |
Experimental, supported by the community |
neuron_cores |
Experimental, supported by the community |
|
Google TPU |
TPU |
Experimental, supported by the community |
Intel Gaudi |
HPU |
Experimental, supported by the community |
Huawei Ascend |
NPU |
Experimental, supported by the community |
Starting Ray nodes with accelerators#
By default, Ray sets the quantity of accelerator resources of a node to the physical quantities of accelerators auto detected by Ray. If you need to, you can override this.
Tip
You can set the CUDA_VISIBLE_DEVICES
environment variable before starting a Ray node
to limit the Nvidia GPUs that are visible to Ray.
For example, CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2
lets Ray only see devices 1 and 3.
Tip
You can set the ROCR_VISIBLE_DEVICES
environment variable before starting a Ray node
to limit the AMD GPUs that are visible to Ray.
For example, ROCR_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2
lets Ray only see devices 1 and 3.
Tip
You can set the ONEAPI_DEVICE_SELECTOR
environment variable before starting a Ray node
to limit the Intel GPUs that are visible to Ray.
For example, ONEAPI_DEVICE_SELECTOR=1,3 ray start --head --num-gpus=2
lets Ray only see devices 1 and 3.
Tip
You can set the NEURON_RT_VISIBLE_CORES
environment variable before starting a Ray node
to limit the AWS Neuro Cores that are visible to Ray.
For example, NEURON_RT_VISIBLE_CORES=1,3 ray start --head --resources='{"neuron_cores": 2}'
lets Ray only see devices 1 and 3.
See the Amazon documentation<https://awslabs.github.io/data-on-eks/docs/category/inference-on-eks>
for more examples of Ray on Neuron with EKS as an orchestration substrate.
Tip
You can set the TPU_VISIBLE_CHIPS
environment variable before starting a Ray node
to limit the Google TPUs that are visible to Ray.
For example, TPU_VISIBLE_CHIPS=1,3 ray start --head --resources='{"TPU": 2}'
lets Ray only see devices 1 and 3.
Tip
You can set the HABANA_VISIBLE_MODULES
environment variable before starting a Ray node
to limit the Intel Gaudi HPUs that are visible to Ray.
For example, HABANA_VISIBLE_MODULES=1,3 ray start --head --resources='{"HPU": 2}'
lets Ray only see devices 1 and 3.
Tip
You can set the ASCEND_RT_VISIBLE_DEVICES
environment variable before starting a Ray node
to limit the Huawei Ascend NPUs that are visible to Ray.
For example, ASCEND_RT_VISIBLE_DEVICES=1,3 ray start --head --resources='{"NPU": 2}'
lets Ray only see devices 1 and 3.
Note
There’s nothing preventing you from specifying a larger number of
accelerator resources (e.g., num_gpus
) than the true number of accelerators on the machine given Ray resources are logical.
In this case, Ray acts as if the machine has the number of accelerators you specified
for the purposes of scheduling tasks and actors that require accelerators.
Trouble only occurs if those tasks and actors
attempt to actually use accelerators that don’t exist.
Using accelerators in Tasks and Actors#
If a task or actor requires accelerators, you can specify the corresponding resource requirements (e.g. @ray.remote(num_gpus=1)
).
Ray then schedules the task or actor to a node that has enough free accelerator resources
and assign accelerators to the task or actor by setting the corresponding environment variable (e.g. CUDA_VISIBLE_DEVICES
) before running the task or actor code.
import os
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUActor:
def ping(self):
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
@ray.remote(num_gpus=1)
def gpu_task():
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) CUDA_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) CUDA_VISIBLE_DEVICES: 1
import os
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUActor:
def ping(self):
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))
@ray.remote(num_gpus=1)
def gpu_task():
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))
gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ROCR_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ROCR_VISIBLE_DEVICES: 1
import os
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUActor:
def ping(self):
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))
@ray.remote(num_gpus=1)
def gpu_task():
print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))
gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ONEAPI_DEVICE_SELECTOR: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ONEAPI_DEVICE_SELECTOR: 1
import os
import ray
ray.init(resources={"neuron_cores": 2})
@ray.remote(resources={"neuron_cores": 1})
class NeuronCoreActor:
def ping(self):
print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))
@ray.remote(resources={"neuron_cores": 1})
def neuron_core_task():
print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))
neuron_core_actor = NeuronCoreActor.remote()
ray.get(neuron_core_actor.ping.remote())
# The actor uses the first Neuron Core so the task uses the second one.
ray.get(neuron_core_task.remote())
(NeuronCoreActor pid=52420) Neuron Core IDs: [0]
(NeuronCoreActor pid=52420) NEURON_RT_VISIBLE_CORES: 0
(neuron_core_task pid=51830) Neuron Core IDs: [1]
(neuron_core_task pid=51830) NEURON_RT_VISIBLE_CORES: 1
import os
import ray
ray.init(resources={"TPU": 2})
@ray.remote(resources={"TPU": 1})
class TPUActor:
def ping(self):
print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))
@ray.remote(resources={"TPU": 1})
def tpu_task():
print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))
tpu_actor = TPUActor.remote()
ray.get(tpu_actor.ping.remote())
# The actor uses the first TPU so the task uses the second one.
ray.get(tpu_task.remote())
(TPUActor pid=52420) TPU IDs: [0]
(TPUActor pid=52420) TPU_VISIBLE_CHIPS: 0
(tpu_task pid=51830) TPU IDs: [1]
(tpu_task pid=51830) TPU_VISIBLE_CHIPS: 1
import os
import ray
ray.init(resources={"HPU": 2})
@ray.remote(resources={"HPU": 1})
class HPUActor:
def ping(self):
print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))
@ray.remote(resources={"HPU": 1})
def hpu_task():
print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))
hpu_actor = HPUActor.remote()
ray.get(hpu_actor.ping.remote())
# The actor uses the first HPU so the task uses the second one.
ray.get(hpu_task.remote())
(HPUActor pid=52420) HPU IDs: [0]
(HPUActor pid=52420) HABANA_VISIBLE_MODULES: 0
(hpu_task pid=51830) HPU IDs: [1]
(hpu_task pid=51830) HABANA_VISIBLE_MODULES: 1
import os
import ray
ray.init(resources={"NPU": 2})
@ray.remote(resources={"NPU": 1})
class NPUActor:
def ping(self):
print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))
@ray.remote(resources={"NPU": 1})
def npu_task():
print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))
npu_actor = NPUActor.remote()
ray.get(npu_actor.ping.remote())
# The actor uses the first NPU so the task uses the second one.
ray.get(npu_task.remote())
(NPUActor pid=52420) NPU IDs: [0]
(NPUActor pid=52420) ASCEND_RT_VISIBLE_DEVICES: 0
(npu_task pid=51830) NPU IDs: [1]
(npu_task pid=51830) ASCEND_RT_VISIBLE_DEVICES: 1
Inside a task or actor, ray.get_runtime_context().get_accelerator_ids()
returns a
list of accelerator IDs that are available to the task or actor.
Typically, it is not necessary to call get_accelerator_ids()
because Ray
automatically sets the corresponding environment variable (e.g. CUDA_VISIBLE_DEVICES
),
which most ML frameworks respect for purposes of accelerator assignment.
Note: The remote function or actor defined above doesn’t actually use any accelerators. Ray schedules it on a node which has at least one accelerator, and reserves one accelerator for it while it is being executed, however it is up to the function to actually make use of the accelerator. This is typically done through an external library like TensorFlow. Here is an example that actually uses accelerators. In order for this example to work, you need to install the GPU version of TensorFlow.
@ray.remote(num_gpus=1)
def gpu_task():
import tensorflow as tf
# Create a TensorFlow session. TensorFlow restricts itself to use the
# GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
tf.Session()
Note: It is certainly possible for the person to
ignore assigned accelerators and to use all of the accelerators on the machine. Ray does
not prevent this from happening, and this can lead to too many tasks or actors using the
same accelerator at the same time. However, Ray does automatically set the
environment variable (e.g. CUDA_VISIBLE_DEVICES
), which restricts the accelerators used
by most deep learning frameworks assuming it’s not overridden by the user.
Fractional Accelerators#
Ray supports fractional resource requirements so multiple tasks and actors can share the same accelerator.
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
AWS Neuron Core doesn’t support fractional resource.
Google TPU doesn’t support fractional resource.
Intel Gaudi doesn’t support fractional resource.
ray.init(num_cpus=4, resources={"NPU": 1})
@ray.remote(resources={"NPU": 0.25})
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same NPU.
ray.get([f.remote() for _ in range(4)])
Note: It is the user’s responsibility to make sure that the individual tasks don’t use more than their share of the accelerator memory. Pytorch and TensorFlow can be configured to limit its memory usage.
When Ray assigns accelerators of a node to tasks or actors with fractional resource requirements, it packs one accelerator before moving on to the next one to avoid fragmentation.
ray.init(num_gpus=3)
@ray.remote(num_gpus=0.5)
class FractionalGPUActor:
def ping(self):
print("GPU id: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
fractional_gpu_actors = [FractionalGPUActor.remote() for _ in range(3)]
# Ray tries to pack GPUs if possible.
[ray.get(fractional_gpu_actors[i].ping.remote()) for i in range(3)]
(FractionalGPUActor pid=57417) GPU id: [0]
(FractionalGPUActor pid=57416) GPU id: [0]
(FractionalGPUActor pid=57418) GPU id: [1]
Workers not Releasing GPU Resources#
Currently, when a worker executes a task that uses a GPU (e.g.,
through TensorFlow), the task may allocate memory on the GPU and may not release
it when the task finishes executing. This can lead to problems the next time a
task tries to use the same GPU. To address the problem, Ray disables the worker
process reuse between GPU tasks by default, where the GPU resources is released after
the task process exits. Since this adds overhead to GPU task scheduling,
you can re-enable worker reuse by setting max_calls=0
in the ray.remote
decorator.
# By default, ray does not reuse workers for GPU tasks to prevent
# GPU resource leakage.
@ray.remote(num_gpus=1)
def leak_gpus():
import tensorflow as tf
# This task allocates memory on the GPU and then never release it.
tf.Session()
Accelerator Types#
Ray supports resource specific accelerator types. The accelerator_type
option can be used to force to a task or actor to run on a node with a specific type of accelerator.
Under the hood, the accelerator type option is implemented as a custom resource requirement of "accelerator_type:<type>": 0.001
.
This forces the task or actor to be placed on a node with that particular accelerator type available.
This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.
from ray.util.accelerators import NVIDIA_TESLA_V100
@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
return "This function was run on a node with a Tesla V100 GPU"
ray.get(train.remote(1))
See ray.util.accelerators for available accelerator types.