GPU Support
Contents
GPU Support#
GPUs are critical for many machine learning applications. Ray natively supports GPU as a pre-defined resource type and allows tasks and actors to specify their GPU resource requirements.
Starting Ray Nodes with GPUs#
By default, Ray will set the quantity of GPU resources of a node to the physical quantities of GPUs auto detected by Ray. If you need to, you can override this.
Note
There is nothing preventing you from specifying a larger value of
num_gpus
than the true number of GPUs on the machine given Ray resources are logical.
In this case, Ray will act as if the machine has the number of GPUs you specified
for the purposes of scheduling tasks and actors that require GPUs.
Trouble will only occur if those tasks and actors
attempt to actually use GPUs that don’t exist.
Tip
You can set CUDA_VISIBLE_DEVICES
environment variable before starting a Ray node
to limit the GPUs that are visible to Ray.
For example, CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2
will let Ray only see devices 1 and 3.
Using GPUs in Tasks and Actors#
If a task or actor requires GPUs, you can specify the corresponding resource requirements (e.g. @ray.remote(num_gpus=1)
).
Ray will then schedule the task or actor to a node that has enough free GPU resources
and assign GPUs to the task or actor by setting the CUDA_VISIBLE_DEVICES
environment variable before running the task or actor code.
import os
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUActor:
def ping(self):
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
@ray.remote(num_gpus=1)
def use_gpu():
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task will use the second one.
ray.get(use_gpu.remote())
# (GPUActor pid=52420) ray.get_gpu_ids(): [0]
# (GPUActor pid=52420) CUDA_VISIBLE_DEVICES: 0
# (use_gpu pid=51830) ray.get_gpu_ids(): [1]
# (use_gpu pid=51830) CUDA_VISIBLE_DEVICES: 1
Inside a task or actor, ray.get_gpu_ids()
will return a
list of GPU IDs that are available to the task or actor.
Typically, it is not necessary to call ray.get_gpu_ids()
because Ray will
automatically set the CUDA_VISIBLE_DEVICES
environment variable,
which most ML frameworks will respect for purposes of GPU assignment.
Note: The function use_gpu
defined above doesn’t actually use any
GPUs. Ray will schedule it on a node which has at least one GPU, and will
reserve one GPU for it while it is being executed, however it is up to the
function to actually make use of the GPU. This is typically done through an
external library like TensorFlow. Here is an example that actually uses GPUs.
In order for this example to work, you will need to install the GPU version of
TensorFlow.
@ray.remote(num_gpus=1)
def use_gpu():
import tensorflow as tf
# Create a TensorFlow session. TensorFlow will restrict itself to use the
# GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
tf.Session()
Note: It is certainly possible for the person implementing use_gpu
to
ignore ray.get_gpu_ids()
and to use all of the GPUs on the machine. Ray does
not prevent this from happening, and this can lead to too many tasks or actors using the
same GPU at the same time. However, Ray does automatically set the
CUDA_VISIBLE_DEVICES
environment variable, which will restrict the GPUs used
by most deep learning frameworks assuming it’s not overridden by the user.
Fractional GPUs#
Ray supports fractional resource requirements so multiple tasks and actors can share the same GPU.
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
Note: It is the user’s responsibility to make sure that the individual tasks don’t use more than their share of the GPU memory. TensorFlow can be configured to limit its memory usage.
When Ray assigns GPUs of a node to tasks or actors with fractional resource requirements, it will pack one GPU before moving on to the next one to avoid fragmentation.
ray.init(num_gpus=3)
@ray.remote(num_gpus=0.5)
class FractionalGPUActor:
def ping(self):
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
fractional_gpu_actors = [FractionalGPUActor.remote() for _ in range(3)]
# Ray will try to pack GPUs if possible.
[ray.get(fractional_gpu_actors[i].ping.remote()) for i in range(3)]
# (FractionalGPUActor pid=57417) ray.get_gpu_ids(): [0]
# (FractionalGPUActor pid=57416) ray.get_gpu_ids(): [0]
# (FractionalGPUActor pid=57418) ray.get_gpu_ids(): [1]
Workers not Releasing GPU Resources#
Currently, when a worker executes a task that uses a GPU (e.g.,
through TensorFlow), the task may allocate memory on the GPU and may not release
it when the task finishes executing. This can lead to problems the next time a
task tries to use the same GPU. To address the problem, Ray disables the worker
process reuse between GPU tasks by default, where the GPU resources is released after
the task process exits. Since this adds overhead to GPU task scheduling,
you can re-enable worker reuse by setting max_calls=0
in the ray.remote
decorator.
# By default, ray will not reuse workers for GPU tasks to prevent
# GPU resource leakage.
@ray.remote(num_gpus=1)
def leak_gpus():
import tensorflow as tf
# This task will allocate memory on the GPU and then never release it.
tf.Session()
Accelerator Types#
Ray supports resource specific accelerator types. The accelerator_type
option can be used to force to a task or actor to run on a node with a specific type of accelerator.
Under the hood, the accelerator type option is implemented as a custom resource requirement of "accelerator_type:<type>": 0.001
.
This forces the task or actor to be placed on a node with that particular accelerator type available.
This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.
from ray.util.accelerators import NVIDIA_TESLA_V100
@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
return "This function was run on a node with a Tesla V100 GPU"
ray.get(train.remote(1))
See ray.util.accelerators
for available accelerator types. Current automatically detected accelerator types include:
Nvidia GPUs
AWS Neuron Cores
AWS Neuron Core Accelerator (Experimental)#
Similar to Nvidia GPUs, Ray auto-detects AWS Neuron Cores by default.
The user can specify resources={"neuron_cores": some_number}
on
task or actor resource requirements to assign the Neuron Core(s).
Note
Ray supports a heterogeneous cluster of GPUs and Neuron Cores but doesn’t allow specifying resources requirements of
num_gpus
and neuron_cores
together for a task or actor.
import ray
import os
from ray.util.accelerators import AWS_NEURON_CORE
# On trn1.2xlarge instance, there will be 2 neuron cores.
ray.init(resources={"neuron_cores": 2})
@ray.remote(resources={"neuron_cores": 1})
class NeuronCoreActor:
def info(self):
ids = ray.get_runtime_context().get_resource_ids()
print("neuron_core_ids: {}".format(ids["neuron_cores"]))
print(f"NEURON_RT_VISIBLE_CORES: {os.environ['NEURON_RT_VISIBLE_CORES']}")
@ray.remote(resources={"neuron_cores": 1}, accelerator_type=AWS_NEURON_CORE)
def use_neuron_core_task():
ids = ray.get_runtime_context().get_resource_ids()
print("neuron_core_ids: {}".format(ids["neuron_cores"]))
print(f"NEURON_RT_VISIBLE_CORES: {os.environ['NEURON_RT_VISIBLE_CORES']}")
neuron_core_actor = NeuronCoreActor.remote()
ray.get(neuron_core_actor.info.remote())
ray.get(use_neuron_core_task.remote())