API and Package Reference

Python API

ray.init

ray.init(address=None, *, num_cpus=None, num_gpus=None, resources=None, object_store_memory=None, local_mode=False, ignore_reinit_error=False, include_dashboard=None, dashboard_host='127.0.0.1', dashboard_port=None, job_config=None, configure_logging=True, logging_level=20, logging_format='%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s', log_to_driver=True, namespace=None, runtime_env=None, internal_config=None, _enable_object_reconstruction=False, _redis_max_memory=None, _plasma_directory=None, _node_ip_address='127.0.0.1', _driver_object_store_memory=None, _memory=None, _redis_password='5241590000000000', _temp_dir=None, _lru_evict=False, _metrics_export_port=None, _system_config=None, _tracing_startup_hook=None, **kwargs)[source]

Connect to an existing Ray cluster or start one and connect to it.

This method handles two cases; either a Ray cluster already exists and we just attach this driver to it or we start all of the processes associated with a Ray cluster and attach to the newly started cluster.

To start Ray and all of the relevant processes, use this as follows:

ray.init()

To connect to an existing Ray cluster, use this as follows (substituting in the appropriate address):

ray.init(address="123.45.67.89:6379")

You can also define an environment variable called RAY_ADDRESS in the same format as the address parameter to connect to an existing cluster with ray.init() or ray.init(address=”auto”).

Parameters
  • address (str) – The address of the Ray cluster to connect to. If this address is not provided, then this command will start Redis, a raylet, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits. If the driver is running on a node in a Ray cluster, using auto as the value tells the driver to detect the the cluster, removing the need to specify a specific node address. If the environment variable RAY_ADDRESS is defined and the address is None or “auto”, Ray will set address to RAY_ADDRESS. Addresses can be prefixed with a protocol to connect using Ray Client. For example, passing in the address “ray://123.45.67.89:50005” will connect to the cluster at the given address using the Ray client.

  • num_cpus (int) – Number of CPUs the user wishes to assign to each raylet. By default, this is set based on virtual cores.

  • num_gpus (int) – Number of GPUs the user wishes to assign to each raylet. By default, this is set based on detected GPUs.

  • resources – A dictionary mapping the names of custom resources to the quantities for them available.

  • object_store_memory – The amount of memory (in bytes) to start the object store with. By default, this is automatically set based on available system memory.

  • local_mode (bool) – If true, the code will be executed serially. This is useful for debugging.

  • ignore_reinit_error – If true, Ray suppresses errors from calling ray.init() a second time. Ray won’t be restarted.

  • include_dashboard – Boolean flag indicating whether or not to start the Ray dashboard, which displays the status of the Ray cluster. If this argument is None, then the UI will be started if the relevant dependencies are present.

  • dashboard_host – The host to bind the dashboard server to. Can either be localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines.

  • dashboard_port (int, None) – The port to bind the dashboard server to. Defaults to 8265 and Ray will automatically find a free port if 8265 is not available.

  • job_config (ray.job_config.JobConfig) – The job configuration.

  • configure_logging – True (default) if configuration of logging is allowed here. Otherwise, the user may want to configure it separately.

  • logging_level – Logging level, defaults to logging.INFO. Ignored unless “configure_logging” is true.

  • logging_format – Logging format, defaults to string containing a timestamp, filename, line number, and message. See the source file ray_constants.py for details. Ignored unless “configure_logging” is true.

  • log_to_driver (bool) – If true, the output from all of the worker processes on all nodes will be directed to the driver.

  • namespace (str) – Namespace to use

  • runtime_env (dict) – The runtime environment to use

  • internal_config (dict) – Dictionary mapping names of a unstable parameters to values, e.g. {“redis_password”: “1234”}. This is only used for initializing a local client (ray.init(local://…)). Values in this dictionary will be used to configure the local cluster. Parameter names should exclude the underscore prefix.

  • _enable_object_reconstruction (bool) – If True, when an object stored in the distributed plasma store is lost due to node failure, Ray will attempt to reconstruct the object by re-executing the task that created the object. Arguments to the task will be recursively reconstructed. If False, then ray.ObjectLostError will be thrown.

  • _redis_max_memory – Redis max memory.

  • _plasma_directory – Override the plasma mmap file directory.

  • _node_ip_address (str) – The IP address of the node that we are on.

  • _driver_object_store_memory (int) – Deprecated.

  • _memory – Amount of reservable memory resource to create.

  • _redis_password (str) – Prevents external clients without the password from connecting to Redis if provided.

  • _temp_dir (str) – If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., “/tmp/ray”.

  • _metrics_export_port (int) – Port number Ray exposes system metrics through a Prometheus endpoint. It is currently under active development, and the API is subject to change.

  • _system_config (dict) – Configuration for overriding RayConfig defaults. For testing purposes ONLY.

  • _tracing_startup_hook (str) – If provided, turns on and sets up tracing for Ray. Must be the name of a function that takes no arguments and sets up a Tracer Provider, Remote Span Processors, and (optional) additional instruments. See more at docs.ray.io/tracing.html. It is currently under active development, and the API is subject to change.

Returns

//1.2.3.4”) then a ClientContext is returned with information such as settings, server versions for ray and python, and the dashboard_url. Otherwise, returns address information about the started processes.

Return type

If the provided address included a protocol (e.g. “ray

Raises

Exception – An exception is raised if an inappropriate combination of arguments is passed in.

ray.is_initialized

ray.is_initialized()[source]

Check if ray.init has been called yet.

Returns

True if ray.init has already been called and false otherwise.

ray.remote

ray.remote(*args, **kwargs)[source]

Defines a remote function or an actor class.

This can be used with no arguments to define a remote function or actor as follows:

@ray.remote
def f():
    return 1

@ray.remote
class Foo:
    def method(self):
        return 1

It can also be used with specific keyword arguments as follows:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
    return 1, 2

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1

Remote task and actor objects returned by @ray.remote can also be dynamically modified with the same arguments as above using .options() as follows:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
    return 1, 2
g = f.options(num_gpus=2, max_calls=None)

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1
Bar = Foo.options(num_cpus=1, resources=None)

Running remote actors will be terminated when the actor handle to them in Python is deleted, which will cause them to complete any outstanding work and then shut down. If you want to kill them immediately, you can also call ray.kill(actor).

Parameters
  • num_returns (int) – This is only for remote functions. It specifies the number of object refs returned by the remote function invocation.

  • num_cpus (float) – The quantity of CPU cores to reserve for this task or for the lifetime of the actor.

  • num_gpus (int) – The quantity of GPUs to reserve for this task or for the lifetime of the actor.

  • resources (Dict[str, float]) – The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to floats.

  • accelerator_type – If specified, requires that the task or actor run on a node with the specified type of accelerator. See ray.accelerators for accelerator types.

  • max_calls (int) – Only for remote functions. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow). By default this is infinite.

  • max_restarts (int) – Only for actors. This specifies the maximum number of times that the actor should be restarted when it dies unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn’t need to be restarted. A value of -1 indicates that an actor should be restarted indefinitely.

  • max_task_retries (int) – Only for actors. How many times to retry an actor task if the task fails due to a system error, e.g., the actor has died. If set to -1, the system will retry the failed task until the task succeeds, or the actor has reached its max_restarts limit. If set to n > 0, the system will retry the failed task up to n times, after which the task will throw a RayActorError exception upon ray.get. Note that Python exceptions are not considered system errors and will not trigger retries.

  • max_retries (int) – Only for remote functions. This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4 (default), and a value of -1 indicates infinite retries.

  • runtime_env (Dict[str, Any]) – Specifies the runtime environment for this actor or task and its children. See Runtime Environments (Experimental) for detailed documentation.

  • override_environment_variables (Dict[str, str]) – (Deprecated in Ray 1.4.0, will be removed in Ray 1.6–please use the env_vars field of Runtime Environments (Experimental) instead.) This specifies environment variables to override for the actor or task. The overrides are propagated to all child actors and tasks. This is a dictionary mapping variable names to their values. Existing variables can be overridden, new ones can be created, and an existing variable can be unset by setting it to an empty string. Note: can only be set via .options().

ray.remote_function.RemoteFunction.options(self, args=None, kwargs=None, num_returns=None, num_cpus=None, num_gpus=None, memory=None, object_store_memory=None, accelerator_type=None, resources=None, max_retries=None, placement_group='default', placement_group_bundle_index=- 1, placement_group_capture_child_tasks=None, runtime_env=None, override_environment_variables=None, name='')

Configures and overrides the task invocation parameters.

The arguments are the same as those that can be passed to ray.remote.

Examples:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
   return 1, 2
# Task f will require 2 gpus instead of 1.
g = f.options(num_gpus=2, max_calls=None)
ray.actor.ActorClass.options(self, args=None, kwargs=None, num_cpus=None, num_gpus=None, memory=None, object_store_memory=None, resources=None, accelerator_type=None, max_concurrency=None, max_restarts=None, max_task_retries=None, name=None, lifetime=None, placement_group='default', placement_group_bundle_index=- 1, placement_group_capture_child_tasks=None, runtime_env=None, override_environment_variables=None)

Configures and overrides the actor instantiation parameters.

The arguments are the same as those that can be passed to ray.remote.

Examples:

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1
# Class Foo will require 1 cpu instead of 2.
# It will also require no custom resources.
Bar = Foo.options(num_cpus=1, resources=None)

ray.get

ray.get(object_refs, *, timeout=None)[source]

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ref is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_refs is a list, then the objects corresponding to each object in the list will be returned.

Ordering for an input list of object refs is preserved for each object returned. That is, if an object ref to A precedes an object ref to B in the input list, then A will precede B in the returned list.

This method will issue a warning if it’s running inside async context, you can use await object_ref instead of ray.get(object_ref). For a list of object refs, you can use await asyncio.gather(*object_refs).

Parameters
  • object_refs – Object ref of the object to get or a list of object refs to get.

  • timeout (Optional[float]) – The maximum amount of time in seconds to wait before returning.

Returns

A Python object or a list of Python objects.

Raises
  • GetTimeoutError – A GetTimeoutError is raised if a timeout is set and the get takes longer than timeout to return.

  • Exception – An exception is raised if the task that created the object or that created one of the objects raised an exception.

ray.wait

ray.wait(object_refs, *, num_returns=1, timeout=None, fetch_local=True)[source]

Return a list of IDs that are ready and a list of IDs that are not.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object refs.

This method returns two lists. The first list consists of object refs that correspond to objects that are available in the object store. The second list corresponds to the rest of the object refs (which may or may not be ready).

Ordering of the input list of object refs is preserved. That is, if A precedes B in the input list, and both are in the ready list, then A will precede B in the ready list. This also holds true if A and B are both in the remaining list.

This method will issue a warning if it’s running inside an async context. Instead of ray.wait(object_refs), you can use await asyncio.wait(object_refs).

Parameters
  • object_refs (List[ObjectRef]) – List of object refs for objects that may or may not be ready. Note that these IDs must be unique.

  • num_returns (int) – The number of object refs that should be returned.

  • timeout (float) – The maximum amount of time in seconds to wait before returning.

  • fetch_local (bool) – If True, wait for the object to be downloaded onto the local node before returning it as ready. If False, ray.wait() will not trigger fetching of objects to the local node and will return immediately once the object is available anywhere in the cluster.

Returns

A list of object refs that are ready and a list of the remaining object IDs.

ray.put

ray.put(value)[source]

Store an object in the object store.

The object may not be evicted while a reference to the returned ID exists.

Parameters

value – The Python object to be stored.

Returns

The object ref assigned to this value.

ray.kill

ray.kill(actor, *, no_restart=True)[source]

Kill an actor forcefully.

This will interrupt any running tasks on the actor, causing them to fail immediately. atexit handlers installed in the actor will not be run.

If you want to kill the actor but let pending tasks finish, you can call actor.__ray_terminate__.remote() instead to queue a termination task. Any atexit handlers installed in the actor will be run in this case.

If the actor is a detached actor, subsequent calls to get its handle via ray.get_actor will fail.

Parameters
  • actor (ActorHandle) – Handle to the actor to kill.

  • no_restart (bool) – Whether or not this actor should be restarted if it’s a restartable actor.

ray.cancel

ray.cancel(object_ref, *, force=False, recursive=True)[source]

Cancels a task according to the following conditions.

If the specified task is pending execution, it will not be executed. If the task is currently executing, the behavior depends on the force flag. When force=False, a KeyboardInterrupt will be raised in Python and when force=True, the executing task will immediately exit. If the task is already finished, nothing will happen.

Only non-actor tasks can be canceled. Canceled tasks will not be retried (max_retries will not be respected).

Calling ray.get on a canceled task will raise a TaskCancelledError or a WorkerCrashedError if force=True.

Parameters
  • object_ref (ObjectRef) – ObjectRef returned by the task that should be canceled.

  • force (boolean) – Whether to force-kill a running task by killing the worker that is running the task.

  • recursive (boolean) – Whether to try to cancel tasks submitted by the task specified.

Raises

TypeError – This is also raised for actor tasks.

ray.get_actor

ray.get_actor(name)[source]

Get a handle to a named actor.

Gets a handle to an actor with the given name. The actor must have been created with Actor.options(name=”name”).remote(). This works for both detached & non-detached actors.

Returns

ActorHandle to the actor.

Raises

ValueError if the named actor does not exist.

ray.get_gpu_ids

ray.get_gpu_ids()[source]

Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker started up, then the IDs returned by this method will be a subset of the IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.

Returns

A list of GPU IDs.

ray.shutdown

ray.shutdown(_exiting_interpreter=False)[source]

Disconnect the worker, and terminate processes started by ray.init().

This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.

Note that this will clear any remote function definitions, actor definitions, and existing actors, so if you wish to use any previously defined remote functions or actors after calling ray.shutdown(), then you need to redefine them. If they were defined in an imported module, then you will need to reload the module.

Parameters

_exiting_interpreter (bool) – True if this is called by the atexit hook and false otherwise. If we are exiting the interpreter, we will wait a little while to print any extra error messages.

ray.method

ray.method(*args, **kwargs)[source]

Annotate an actor method.

@ray.remote
class Foo:
    @ray.method(num_returns=2)
    def bar(self):
        return 1, 2

f = Foo.remote()

_, _ = f.bar.remote()
Parameters

num_returns – The number of object refs that should be returned by invocations of this actor method.

ray.util.ActorPool

class ray.util.ActorPool(actors)[source]

Utility class to operate on a fixed pool of actors.

Parameters

actors (list) – List of Ray actor handles to use in this pool.

Examples

>>> a1, a2 = Actor.remote(), Actor.remote()
>>> pool = ActorPool([a1, a2])
>>> print(list(pool.map(lambda a, v: a.double.remote(v),        ...                     [1, 2, 3, 4])))
[2, 4, 6, 8]
map(fn, values)[source]

Apply the given function in parallel over the actors and values.

This returns an ordered iterator that will return results of the map as they finish. Note that you must iterate over the iterator to force the computation to finish.

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

Returns

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(list(pool.map(lambda a, v: a.double.remote(v),            ...                     [1, 2, 3, 4])))
[2, 4, 6, 8]
map_unordered(fn, values)[source]

Similar to map(), but returning an unordered iterator.

This returns an unordered iterator that will return results of the map as they finish. This can be more efficient that map() if some results take longer to compute than others.

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

Returns

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(list(pool.map_unordered(lambda a, v: a.double.remote(v),            ...                               [1, 2, 3, 4])))
[6, 2, 4, 8]
submit(fn, value)[source]

Schedule a single task to run in the pool.

This has the same argument semantics as map(), but takes on a single value instead of a list of values. The result can be retrieved using get_next() / get_next_unordered().

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • value (object) – Value to compute a result for.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next(), pool.get_next())
2, 4
has_next()[source]

Returns whether there are any pending results to return.

Returns

True if there are any pending results not yet returned.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.has_next())
True
>>> print(pool.get_next())
2
>>> print(pool.has_next())
False
get_next(timeout=None)[source]

Returns the next pending result in order.

This returns the next result produced by submit(), blocking for up to the specified timeout until it is available.

Returns

The next result.

Raises

TimeoutError if the timeout is reached.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.get_next())
2
get_next_unordered(timeout=None)[source]

Returns any of the next pending results.

This returns some result produced by submit(), blocking for up to the specified timeout until it is available. Unlike get_next(), the results are not always returned in same order as submitted, which can improve performance.

Returns

The next result.

Raises

TimeoutError if the timeout is reached.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next_unordered())
4
>>> print(pool.get_next_unordered())
2
has_free()[source]

Returns whether there are any idle actors available.

Returns

True if there are any idle actors and no pending submits.

Examples

>>> a1 = Actor.remote()
>>> pool = ActorPool(a1)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.has_free())
False
>>> print(pool.get_next())
2
>>> print(pool.has_free())
True
pop_idle()[source]

Removes an idle actor from the pool.

Returns

An idle actor if one is available. None if no actor was free to be removed.

Examples

>>> a1 = Actor.remote()
>>> pool = ActorPool([a1])
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.pop_idle())
None
>>> print(pool.get_next())
2
>>> print(pool.pop_idle())
<ptr to a1>
push(actor)[source]

Pushes a new actor into the current list of idle actors.

Examples

>>> a1, b1 = Actor.remote(), Actor.remote()
>>> pool = ActorPool([a1])
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.get_next())
2
>>> pool2 = ActorPool([b1])
>>> pool2.push(pool.pop_idle())

ray.util.queue.Queue

class ray.util.queue.Queue(maxsize: int = 0, actor_options: Optional[Dict] = None)[source]

A first-in, first-out queue implementation on Ray.

The behavior and use cases are similar to those of the asyncio.Queue class.

Features both sync and async put and get methods. Provides the option to block until space is available when calling put on a full queue, or to block until items are available when calling get on an empty queue.

Optionally supports batched put and get operations to minimize serialization overhead.

Parameters
  • maxsize (optional, int) – maximum size of the queue. If zero, size is unbounded.

  • actor_options (optional, Dict) – Dictionary of options to pass into the QueueActor during creation. These are directly passed into QueueActor.options(…). This could be useful if you need to pass in custom resource requirements, for example.

Examples

>>> q = Queue()
>>> items = list(range(10))
>>> for item in items:
>>>     q.put(item)
>>> for item in items:
>>>     assert item == q.get()
>>> # Create Queue with the underlying actor reserving 1 CPU.
>>> q = Queue(actor_options={"num_cpus": 1})
size() → int[source]

The size of the queue.

qsize() → int[source]

The size of the queue.

empty() → bool[source]

Whether the queue is empty.

full() → bool[source]

Whether the queue is full.

put(item: Any, block: bool = True, timeout: Optional[float] = None) → None[source]

Adds an item to the queue.

If block is True and the queue is full, blocks until the queue is no longer full or until timeout.

There is no guarantee of order if multiple producers put to the same full queue.

Raises
  • Full – if the queue is full and blocking is False.

  • Full – if the queue is full, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

async put_async(item: Any, block: bool = True, timeout: Optional[float] = None) → None[source]

Adds an item to the queue.

If block is True and the queue is full, blocks until the queue is no longer full or until timeout.

There is no guarantee of order if multiple producers put to the same full queue.

Raises
  • Full – if the queue is full and blocking is False.

  • Full – if the queue is full, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

get(block: bool = True, timeout: Optional[float] = None) → Any[source]

Gets an item from the queue.

If block is True and the queue is empty, blocks until the queue is no longer empty or until timeout.

There is no guarantee of order if multiple consumers get from the same empty queue.

Returns

The next item in the queue.

Raises
  • Empty – if the queue is empty and blocking is False.

  • Empty – if the queue is empty, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

async get_async(block: bool = True, timeout: Optional[float] = None) → Any[source]

Gets an item from the queue.

There is no guarantee of order if multiple consumers get from the same empty queue.

Returns

The next item in the queue.

Raises
  • Empty – if the queue is empty and blocking is False.

  • Empty – if the queue is empty, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

put_nowait(item: Any) → None[source]

Equivalent to put(item, block=False).

Raises

Full – if the queue is full.

put_nowait_batch(items: collections.abc.Iterable) → None[source]

Takes in a list of items and puts them into the queue in order.

Raises

Full – if the items will not fit in the queue

get_nowait() → Any[source]

Equivalent to get(block=False).

Raises

Empty – if the queue is empty.

get_nowait_batch(num_items: int) → List[Any][source]

Gets items from the queue and returns them in a list in order.

Raises

Empty – if the queue does not contain the desired number of items

shutdown(force: bool = False, grace_period_s: int = 5) → None[source]

Terminates the underlying QueueActor.

All of the resources reserved by the queue will be released.

Parameters
  • force (bool) – If True, forcefully kill the actor, causing an immediate failure. If False, graceful actor termination will be attempted first, before falling back to a forceful kill.

  • grace_period_s (int) – If force is False, how long in seconds to wait for graceful termination before falling back to forceful kill.

ray.nodes

ray.nodes()[source]

Get a list of the nodes in the cluster (for debugging only).

Returns

Information about the Ray clients in the cluster.

ray.timeline

ray.timeline(filename=None)[source]

Return a list of profiling events that can viewed as a timeline.

Ray profiling must be enabled by setting the RAY_PROFILING=1 environment variable prior to starting Ray.

To view this information as a timeline, simply dump it as a json file by passing in “filename” or using using json.dump, and then load go to chrome://tracing in the Chrome web browser and load the dumped file.

Parameters

filename – If a filename is provided, the timeline is dumped to that file.

Returns

If filename is not provided, this returns a list of profiling events.

Each profile event is a dictionary.

ray.cluster_resources

ray.cluster_resources()[source]

Get the current total cluster resources.

Note that this information can grow stale as nodes are added to or removed from the cluster.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.available_resources

ray.available_resources()[source]

Get the current available cluster resources.

This is different from cluster_resources in that this will return idle (available) resources rather than total resources.

Note that this information can grow stale as tasks start and finish.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.cross_language

ray.java_function(class_name, function_name)[source]

Define a Java function.

Parameters
  • class_name (str) – Java class name.

  • function_name (str) – Java function name.

ray.java_actor_class(class_name)[source]

Define a Java actor class.

Parameters

class_name (str) – Java class name.

Placement Group APIs

placement_group

ray.util.placement_group.placement_group(bundles: List[Dict[str, float]], strategy: str = 'PACK', name: str = '', lifetime=None)ray.util.placement_group.PlacementGroup[source]

Asynchronously creates a PlacementGroup.

Parameters
  • bundles (List[Dict]) – A list of bundles which represent the resources requirements.

  • strategy (str) –

    The strategy to create the placement group.

    • ”PACK”: Packs Bundles into as few nodes as possible.

    • ”SPREAD”: Places Bundles across distinct nodes as even as possible.

    • ”STRICT_PACK”: Packs Bundles into one node. The group is not allowed to span multiple nodes.

    • ”STRICT_SPREAD”: Packs Bundles across distinct nodes.

  • name (str) – The name of the placement group.

  • lifetime (str) – Either None, which defaults to the placement group will fate share with its creator and will be deleted once its creator is dead, or “detached”, which means the placement group will live as a global object independent of the creator.

Returns

Placement group object.

Return type

PlacementGroup

PlacementGroup (class)

class ray.util.placement_group.PlacementGroup(id: <Mock name='mock.PlacementGroupID' id='139970942985680'>, bundle_cache: Optional[List[Dict]] = None)[source]

A handle to a placement group.

ready() → <Mock name=’mock.ObjectRef’ id=’139970942985104’>[source]

Returns an ObjectRef to check ready status.

This API runs a small dummy task to wait for placement group creation. It is compatible to ray.get and ray.wait.

Example:

>>> pg = placement_group([{"CPU": 1}])
    ray.get(pg.ready())
>>> pg = placement_group([{"CPU": 1}])
    ray.wait([pg.ready()], timeout=0)
wait(timeout_seconds: Union[float, int]) → bool[source]

Wait for the placement group to be ready within the specified time. :param timeout_seconds: Timeout in seconds. :type timeout_seconds: float|int

Returns

True if the placement group is created. False otherwise.

property bundle_specs

Return bundles belonging to this placement group.

Type

List[Dict]

to_dict() → dict[source]

Convert this placement group into a dict for purposes of json serialization.

Used when passing a placement group as an option to a Ray client remote function. See set_task_options in util/client/common.py.

Returns

Dictionary with json-serializable keys representing the placemnent group.

static from_dict(pg_dict: dict)ray.util.placement_group.PlacementGroup[source]

Instantiate and return a PlacementGroup from its json-serializable dict representation.

Used by Ray Client on server-side to deserialize placement group option. See decode_options in util/client/server/server.py.

Parameters

serializable_form (dict) – Dictionary representing a placement group.

Returns

A placement group made from the data in the input dict.

placement_group_table

ray.util.placement_group.placement_group_table(placement_group: ray.util.placement_group.PlacementGroup = None) → dict[source]

Get the state of the placement group from GCS.

Parameters

placement_group (PlacementGroup) – placement group to see states.

remove_placement_group

ray.util.placement_group.remove_placement_group(placement_group: ray.util.placement_group.PlacementGroup) → None[source]

Asynchronously remove placement group.

Parameters

placement_group (PlacementGroup) – The placement group to delete.

get_current_placement_group

ray.util.placement_group.get_current_placement_group() → Optional[ray.util.placement_group.PlacementGroup][source]

Get the current placement group which a task or actor is using.

It returns None if there’s no current placement group for the worker. For example, if you call this method in your driver, it returns None (because drivers never belong to any placement group).

Examples

>>> @ray.remote
>>> def f():
>>>     # This will return the placement group the task f belongs to.
>>>     # It means this pg will be identical to the pg created below.
>>>     pg = get_current_placement_group()
>>> pg = placement_group([{"CPU": 2}])
>>> f.options(placement_group=pg).remote()
>>> # New script.
>>> ray.init()
>>> # New script doesn't belong to any placement group,
>>> # so it returns None.
>>> assert get_current_placement_group() is None
Returns

Placement group object.

None if the current task or actor wasn’t created with any placement group.

Return type

PlacementGroup

Custom Metrics APIs

Counter

class ray.util.metrics.Counter(name: str, description: str = '', tag_keys: Optional[Tuple[str]] = None)[source]

A cumulative metric that is monotonically increasing.

This corresponds to Prometheus’ counter metric: https://prometheus.io/docs/concepts/metric_types/#counter

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • tag_keys (tuple) – Tag keys of the metric.

inc(value: Union[int, float] = 1.0, tags: Dict[str, str] = None)[source]

Increment the counter by value (defaults to 1).

Tags passed in will take precedence over the metric’s default tags.

Parameters
  • value (int, float) – Value to increment the counter by (default=1).

  • tags (Dict[str, str]) – Tags to set or override for this counter.

Gauge

class ray.util.metrics.Gauge(name: str, description: str = '', tag_keys: Optional[Tuple[str]] = None)[source]

Gauges keep the last recorded value and drop everything before.

Unlike counters, gauges can go up or down over time.

This corresponds to Prometheus’ gauge metric: https://prometheus.io/docs/concepts/metric_types/#gauge

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • tag_keys (tuple) – Tag keys of the metric.

set(value: Union[int, float], tags: Dict[str, str] = None)[source]

Set the gauge to the given value.

Tags passed in will take precedence over the metric’s default tags.

Parameters
  • value (int, float) – Value to set the gauge to.

  • tags (Dict[str, str]) – Tags to set or override for this gauge.

Histogram

class ray.util.metrics.Histogram(name: str, description: str = '', boundaries: List[float] = None, tag_keys: Optional[Tuple[str]] = None)[source]

Tracks the size and number of events in buckets.

Histograms allow you to calculate aggregate quantiles such as 25, 50, 95, 99 percentile latency for an RPC.

This corresponds to Prometheus’ histogram metric: https://prometheus.io/docs/concepts/metric_types/#histogram

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • boundaries (list) – Boundaries of histogram buckets.

  • tag_keys (tuple) – Tag keys of the metric.

observe(value: Union[int, float], tags: Dict[str, str] = None)[source]

Observe a given value and add it to the appropriate bucket.

Tags passed in will take precedence over the metric’s default tags.

Parameters
  • value (int, float) – Value to set the gauge to.

  • tags (Dict[str, str]) – Tags to set or override for this gauge.

property info

Return information about histogram metric.

Runtime Context APIs

ray.runtime_context.get_runtime_context()[source]

Get the runtime context of the current driver/worker.

Example:

>>> ray.get_runtime_context().job_id # Get the job id.
>>> ray.get_runtime_context().get() # Get all the metadata.
class ray.runtime_context.RuntimeContext(worker)[source]

A class used for getting runtime context.

get()[source]

Get a dictionary of the current context.

Returns

Dictionary of the current context.

Return type

dict

property job_id

Get current job ID for this worker or driver.

Job ID is the id of your Ray drivers that create tasks or actors.

Returns

If called by a driver, this returns the job ID. If called in

a task, return the job ID of the associated driver.

property node_id

Get current node ID for this worker or driver.

Node ID is the id of a node that your driver, task, or actor runs.

Returns

a node id for this worker or driver.

property task_id

Get current task ID for this worker or driver.

Task ID is the id of a Ray task. This shouldn’t be used in a driver process.

Example

>>> @ray.remote
>>> class Actor:
>>>     def ready(self):
>>>         return True
>>>
>>> @ray.remote
>>> def f():
>>>     return True
>>>
>>> # All the below code will generate different task ids.
>>> # Task ids are available for actor creation.
>>> a = Actor.remote()
>>> # Task ids are available for actor tasks.
>>> a.ready.remote()
>>> # Task ids are available for normal tasks.
>>> f.remote()
Returns

The current worker’s task id. None if there’s no task id.

property actor_id

Get the current actor ID in this worker.

ID of the actor of the current process. This shouldn’t be used in a driver process.

Returns

The current actor id in this worker. None if there’s no actor id.

property was_current_actor_reconstructed

Check whether this actor has been restarted

Returns

Whether this actor has been ever restarted.

property current_placement_group_id

Get the current Placement group ID of this worker.

Returns

The current placement group id of this worker.

property should_capture_child_tasks_in_placement_group

Get if the current task should capture parent’s placement group.

This returns True if it is called inside a driver.

Returns

Return True if the current task should implicitly

capture the parent placement group.

property runtime_env

Get the runtime env passed to job_config

Returns

The runtime env currently using by this worker.

Debugging APIs

ray.util.pdb.set_trace(breakpoint_uuid=None)

Interrupt the flow of the program and drop into the Ray debugger.

Can be used within a Ray task or actor.

ray.util.inspect_serializability(base_obj: Any, name: Optional[str] = None, depth: int = 3, _parent: Optional[Any] = None, _failure_set: Optional[set] = None) → Tuple[bool, Set[ray.util.check_serialize.FailureTuple]][source]

Identifies what objects are preventing serialization.

Parameters
  • base_obj – Object to be serialized.

  • name – Optional name of string.

  • depth – Depth of the scope stack to walk through. Defaults to 3.

Returns

True if serializable. set[FailureTuple]: Set of unserializable objects.

Return type

bool

New in version 1.1.0.

Experimental APIs

ray.experimental.load_package(config_path: str) → ray.experimental.packaging.load_package._RuntimePackage[source]

Load the code package given its config path.

Parameters

config_path (str) – The path to the configuration YAML that defines the package. For documentation on the packaging format, see the example YAML in example_pkg/ray_pkg.yaml.

Examples

>>> # Load from local.
>>> my_pkg = load_package("~/path/to/my_pkg.yaml")
>>> # Load from GitHub.
>>> my_pkg = ray.util.load_package(
...   "https://raw.githubusercontent.com/user/repo/refspec"
...   "/path/to/package/my_pkg.yaml")
>>> # Inspect the package runtime env.
>>> print(my_pkg._runtime_env)
... {"conda": {...},
...  "docker": "anyscale-ml/ray-ml:nightly-py38-cpu",
...  "working_dir": "https://github.com/demo/foo/blob/v3.0/project/"}
>>> # Run remote functions from the package.
>>> my_pkg.my_func.remote(1, 2)
>>> # Create actors from the package.
>>> actor = my_pkg.MyActor.remote(3, 4)
>>> # Create new remote funcs in the same env as a package.
>>> @ray.remote(runtime_env=my_pkg._runtime_env)
>>> def f(): ...
ray.experimental.get_object_locations(obj_refs: List[<Mock name='mock.ObjectRef' id='139970942985104'>], timeout_ms: int = -1) → Dict[<Mock name=’mock.ObjectRef’ id=’139970942985104’>, Dict[str, Any]][source]

Lookup the locations for a list of objects.

It returns a dict maps from an object to its location. The dict excludes those objects whose location lookup failed.

Parameters
  • object_refs (List[ObjectRef]) – List of object refs.

  • timeout_ms (int) – The maximum amount of time in micro seconds to wait before returning. Wait infinitely if it’s negative.

Returns

A dict maps from an object to its location. The dict excludes those objects whose location lookup failed.

The location is stored as a dict with following attributes:

  • node_ids (List[str]): The hex IDs of the nodes that have a copy of this object.

  • object_size (int): The size of data + metadata in bytes.

Raises
  • RuntimeError – if the processes were not started by ray.init().

  • ray.exceptions.GetTimeoutError – if it couldn’t finish the request in time.

ClientBuilder API

ray.client(address: Optional[str] = None) → ray.client_builder.ClientBuilder[source]

Creates a ClientBuilder based on the provided address. The address can be of the following forms:

  • None: Connects to or creates a local cluster and connects to it.

  • "local": Creates a new cluster locally and connects to it.

  • "IP:Port": Connects to a Ray Client Server at the given address.

  • "module://inner_address": load module.ClientBuilder & pass

    inner_address

class ray.ClientBuilder(address: Optional[str])[source]

Builder for a Ray Client connection. This class can be subclassed by custom builder classes to modify connection behavior to include additional features or altered semantics. One example is the _LocalClientBuilder.

env(env: Dict[str, Any]) → ray.client_builder.ClientBuilder[source]

Set an environment for the session. :param env: A runtime environment to use for this :type env: Dict[st, Any] :param connection. See Runtime Environments (Experimental) for what values are: :param accepted in this dict.:

namespace(namespace: str) → ray.client_builder.ClientBuilder[source]

Sets the namespace for the session. :param namespace: Namespace to use. :type namespace: str

connect() → ray.client_builder.ClientContext[source]

Begin a connection to the address passed in via ray.client(…).

Returns

Dataclass with information about the setting. This

includes the server’s version of Python & Ray as well as the dashboard_url.

Return type

ClientInfo

The Ray Command Line API

ray start

Start Ray processes manually on the local machine.

ray start [OPTIONS]

Options

--node-ip-address <node_ip_address>

the IP address of this node

--address <address>

the address to use for Ray

--port <port>

the port of the head ray process. If not provided, defaults to 6379; if port is set to 0, we will allocate an available port.

--object-manager-port <object_manager_port>

the port to use for starting the object manager

--node-manager-port <node_manager_port>

the port to use for starting the node manager

--gcs-server-port <gcs_server_port>

Port number for the GCS server.

--min-worker-port <min_worker_port>

the lowest port number that workers will bind on. If not set, random ports will be chosen.

--max-worker-port <max_worker_port>

the highest port number that workers will bind on. If set, ‘–min-worker-port’ must also be set.

--worker-port-list <worker_port_list>

a comma-separated list of open ports for workers to bind on. Overrides ‘–min-worker-port’ and ‘–max-worker-port’.

--ray-client-server-port <ray_client_server_port>

the port number the ray client server will bind on. If not set, the ray client server will not be started.

--object-store-memory <object_store_memory>

The amount of memory (in bytes) to start the object store with. By default, this is capped at 20GB but can be set higher.

--num-cpus <num_cpus>

the number of CPUs on this node

--num-gpus <num_gpus>

the number of GPUs on this node

--resources <resources>

a JSON serialized dictionary mapping resource name to resource quantity

--head

provide this argument for the head node

--include-dashboard <include_dashboard>

provide this argument to start the Ray dashboard GUI

--dashboard-host <dashboard_host>

the host to bind the dashboard server to, either localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, thisis localhost.

--dashboard-port <dashboard_port>

the port to bind the dashboard server to–defaults to 8265

--block

provide this argument to block forever in this command

--plasma-directory <plasma_directory>

object store directory for memory mapped files

--autoscaling-config <autoscaling_config>

the file that contains the autoscaling config

--no-redirect-worker-output

do not redirect worker stdout and stderr to files

--no-redirect-output

do not redirect non-worker stdout and stderr to files

--plasma-store-socket-name <plasma_store_socket_name>

manually specify the socket name of the plasma store

--raylet-socket-name <raylet_socket_name>

manually specify the socket path of the raylet process

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

ray stop

Stop Ray processes manually on the local machine.

ray stop [OPTIONS]

Options

-f, --force

If set, ray will send SIGKILL instead of SIGTERM.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

ray up

Create or update a Ray cluster.

ray up [OPTIONS] CLUSTER_CONFIG_FILE

Options

--min-workers <min_workers>

Override the configured min worker node count for the cluster.

--max-workers <max_workers>

Override the configured max worker node count for the cluster.

--no-restart

Whether to skip restarting Ray services during the update. This avoids interrupting running jobs.

--restart-only

Whether to skip running setup commands and only restart Ray. This cannot be used with ‘no-restart’.

-y, --yes

Don’t ask for confirmation.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

--redirect-command-output

Whether to redirect command output to a file.

--use-login-shells, --use-normal-shells

Ray uses login shells (bash –login -i) to run cluster commands by default. If your workflow is compatible with normal shells, this can be disabled for a better user experience.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray down

Tear down a Ray cluster.

ray down [OPTIONS] CLUSTER_CONFIG_FILE

Options

-y, --yes

Don’t ask for confirmation.

--workers-only

Only destroy the workers.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--keep-min-workers

Retain the minimal amount of workers specified in the config.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray exec

Execute a command via SSH on a Ray cluster.

ray exec [OPTIONS] CLUSTER_CONFIG_FILE CMD

Options

--run-env <run_env>

Choose whether to execute this command in a container or directly on the cluster head. Only applies when docker is configured in the YAML.

Options

auto | host | docker

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

CMD

Required argument

ray submit

Uploads and runs a script on the specified cluster.

The script is automatically synced to the following location:

os.path.join(“~”, os.path.basename(script))

Example:
>>> ray submit [CLUSTER.YAML] experiment.py -- --smoke-test
ray submit [OPTIONS] CLUSTER_CONFIG_FILE SCRIPT [SCRIPT_ARGS]...

Options

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--args <args>

(deprecated) Use ‘– –arg1 –arg2’ for script args.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

SCRIPT

Required argument

SCRIPT_ARGS

Optional argument(s)

ray attach

Create or attach to a SSH session to a Ray cluster.

ray attach [OPTIONS] CLUSTER_CONFIG_FILE

Options

--start

Start the cluster if needed.

--screen

Run the command in screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-N, --new

Force creation of a new screen.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto | record | pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto | false | true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray get_head_ip

Return the head node IP of a Ray cluster.

ray get_head_ip [OPTIONS] CLUSTER_CONFIG_FILE

Options

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray stack

Take a stack dump of all Python workers on the local machine.

ray stack [OPTIONS]

ray memory

Print object references held in a Ray cluster.

ray memory [OPTIONS]

Options

--address <address>

Override the address to connect to.

--redis_password <redis_password>

Connect to ray with redis_password.

--group-by <group_by>

Group object references by a GroupByType (e.g. NODE_ADDRESS or STACK_TRACE).

Options

NODE_ADDRESS | STACK_TRACE

--sort-by <sort_by>

Sort object references in ascending order by a SortingType (e.g. PID, OBJECT_SIZE, or REFERENCE_TYPE).

Options

PID | OBJECT_SIZE | REFERENCE_TYPE

--units <units>

Specify unit metrics for displaying object sizes (e.g. B, KB, MB, GB).

Options

B | KB | MB | GB

--no-format

Display unformatted results. Defaults to true when terminal width is less than 137 characters.

--stats-only

Display plasma store stats only.

--num-entries, --n <num_entries>

Specify number of sorted entries per group.

ray timeline

Take a Chrome tracing timeline for a Ray cluster.

ray timeline [OPTIONS]

Options

--address <address>

Override the redis address to connect to.

ray debug

Show all active breakpoints and exceptions in the Ray debugger.

ray debug [OPTIONS]

Options

--address <address>

Override the address to connect to.