API and Package Reference

Python API

ray.init

ray.init(address=None, *, num_cpus=None, num_gpus=None, resources=None, object_store_memory=None, local_mode=False, ignore_reinit_error=False, include_dashboard=None, dashboard_host='127.0.0.1', dashboard_port=8265, job_config=None, configure_logging=True, logging_level=20, logging_format='%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s', log_to_driver=True, _enable_object_reconstruction=False, _redis_max_memory=None, _plasma_directory=None, _node_ip_address='127.0.0.1', _driver_object_store_memory=None, _memory=None, _redis_password='5241590000000000', _java_worker_options=None, _code_search_path=None, _temp_dir=None, _load_code_from_local=False, _lru_evict=False, _metrics_export_port=None, _system_config=None)[source]

Connect to an existing Ray cluster or start one and connect to it.

This method handles two cases; either a Ray cluster already exists and we just attach this driver to it or we start all of the processes associated with a Ray cluster and attach to the newly started cluster.

To start Ray and all of the relevant processes, use this as follows:

ray.init()

To connect to an existing Ray cluster, use this as follows (substituting in the appropriate address):

ray.init(address="123.45.67.89:6379")

You can also define an environment variable called RAY_ADDRESS in the same format as the address parameter to connect to an existing cluster with ray.init().

Parameters
  • address (str) – The address of the Ray cluster to connect to. If this address is not provided, then this command will start Redis, a raylet, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits. If the driver is running on a node in a Ray cluster, using auto as the value tells the driver to detect the the cluster, removing the need to specify a specific node address.

  • num_cpus (int) – Number of CPUs the user wishes to assign to each raylet. By default, this is set based on virtual cores.

  • num_gpus (int) – Number of GPUs the user wishes to assign to each raylet. By default, this is set based on detected GPUs.

  • resources – A dictionary mapping the names of custom resources to the quantities for them available.

  • object_store_memory – The amount of memory (in bytes) to start the object store with. By default, this is automatically set based on available system memory.

  • local_mode (bool) – If true, the code will be executed serially. This is useful for debugging.

  • ignore_reinit_error – If true, Ray suppresses errors from calling ray.init() a second time. Ray won’t be restarted.

  • include_dashboard – Boolean flag indicating whether or not to start the Ray dashboard, which displays the status of the Ray cluster. If this argument is None, then the UI will be started if the relevant dependencies are present.

  • dashboard_host – The host to bind the dashboard server to. Can either be localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines.

  • dashboard_port – The port to bind the dashboard server to. Defaults to 8265.

  • job_config (ray.job_config.JobConfig) – The job configuration.

  • configure_logging – True (default) if configuration of logging is allowed here. Otherwise, the user may want to configure it separately.

  • logging_level – Logging level, defaults to logging.INFO. Ignored unless “configure_logging” is true.

  • logging_format – Logging format, defaults to string containing a timestamp, filename, line number, and message. See the source file ray_constants.py for details. Ignored unless “configure_logging” is true.

  • log_to_driver (bool) – If true, the output from all of the worker processes on all nodes will be directed to the driver.

  • _enable_object_reconstruction (bool) – If True, when an object stored in the distributed plasma store is lost due to node failure, Ray will attempt to reconstruct the object by re-executing the task that created the object. Arguments to the task will be recursively reconstructed. If False, then ray.ObjectLostError will be thrown.

  • _redis_max_memory – Redis max memory.

  • _plasma_directory – Override the plasma mmap file directory.

  • _node_ip_address (str) – The IP address of the node that we are on.

  • _driver_object_store_memory (int) – Limit the amount of memory the driver can use in the object store for creating objects.

  • _memory – Amount of reservable memory resource to create.

  • _redis_password (str) – Prevents external clients without the password from connecting to Redis if provided.

  • _temp_dir (str) – If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., “/tmp/ray”.

  • _load_code_from_local – Whether code should be loaded from a local module or from the GCS.

  • _java_worker_options – Overwrite the options to start Java workers.

  • _code_search_path (list) – Java classpath or python import path.

  • _lru_evict (bool) – If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.ObjectLostError may be thrown. If False, then reference counting will be used to decide which objects are safe to evict and when under memory pressure, ray.ObjectStoreFullError may be thrown.

  • _metrics_export_port (int) – Port number Ray exposes system metrics through a Prometheus endpoint. It is currently under active development, and the API is subject to change.

  • _system_config (dict) – Configuration for overriding RayConfig defaults. For testing purposes ONLY.

Returns

Address information about the started processes.

Raises

Exception – An exception is raised if an inappropriate combination of arguments is passed in.

ray.is_initialized

ray.is_initialized()[source]

Check if ray.init has been called yet.

Returns

True if ray.init has already been called and false otherwise.

ray.remote

ray.remote(*args, **kwargs)[source]

Defines a remote function or an actor class.

This can be used with no arguments to define a remote function or actor as follows:

@ray.remote
def f():
    return 1

@ray.remote
class Foo:
    def method(self):
        return 1

It can also be used with specific keyword arguments as follows:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
    return 1, 2

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1

Remote task and actor objects returned by @ray.remote can also be dynamically modified with the same arguments as above using .options() as follows:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
    return 1, 2
g = f.options(num_gpus=2, max_calls=None)

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1
Bar = Foo.options(num_cpus=1, resources=None)

Running remote actors will be terminated when the actor handle to them in Python is deleted, which will cause them to complete any outstanding work and then shut down. If you want to kill them immediately, you can also call ray.kill(actor).

Parameters
  • num_returns (int) – This is only for remote functions. It specifies the number of object refs returned by the remote function invocation.

  • num_cpus (float) – The quantity of CPU cores to reserve for this task or for the lifetime of the actor.

  • num_gpus (int) – The quantity of GPUs to reserve for this task or for the lifetime of the actor.

  • resources (Dict[str, float]) – The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to floats.

  • accelerator_type – If specified, requires that the task or actor run on a node with the specified type of accelerator. See ray.accelerators for accelerator types.

  • max_calls (int) – Only for remote functions. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow). By default this is infinite.

  • max_restarts (int) – Only for actors. This specifies the maximum number of times that the actor should be restarted when it dies unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn’t need to be restarted. A value of -1 indicates that an actor should be restarted indefinitely.

  • max_task_retries (int) – Only for actors. How many times to retry an actor task if the task fails due to a system error, e.g., the actor has died. If set to -1, the system will retry the failed task until the task succeeds, or the actor has reached its max_restarts limit. If set to n > 0, the system will retry the failed task up to n times, after which the task will throw a RayActorError exception upon ray.get. Note that Python exceptions are not considered system errors and will not trigger retries.

  • max_retries (int) – Only for remote functions. This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4 (default), and a value of -1 indicates infinite retries.

  • override_environment_variables (Dict[str, str]) – This specifies environment variables to override for the actor or task. The overrides are propagated to all child actors and tasks. This is a dictionary mapping variable names to their values. Existing variables can be overridden, new ones can be created, and an existing variable can be unset by setting it to an empty string.

ray.remote_function.RemoteFunction.options(self, args=None, kwargs=None, num_returns=None, num_cpus=None, num_gpus=None, memory=None, object_store_memory=None, accelerator_type=None, resources=None, max_retries=None, placement_group=None, placement_group_bundle_index=- 1, placement_group_capture_child_tasks=None, override_environment_variables=None, name='')

Configures and overrides the task invocation parameters.

The arguments are the same as those that can be passed to ray.remote.

Examples:

@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
def f():
   return 1, 2
# Task f will require 2 gpus instead of 1.
g = f.options(num_gpus=2, max_calls=None)
ray.actor.ActorClass.options(self, args=None, kwargs=None, num_cpus=None, num_gpus=None, memory=None, object_store_memory=None, resources=None, accelerator_type=None, max_concurrency=None, max_restarts=None, max_task_retries=None, name=None, lifetime=None, placement_group=None, placement_group_bundle_index=- 1, placement_group_capture_child_tasks=None, override_environment_variables=None)

Configures and overrides the actor instantiation parameters.

The arguments are the same as those that can be passed to ray.remote.

Examples:

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1
# Class Foo will require 1 cpu instead of 2.
# It will also require no custom resources.
Bar = Foo.options(num_cpus=1, resources=None)

ray.get

ray.get(object_refs, *, timeout=None)[source]

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ref is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_refs is a list, then the objects corresponding to each object in the list will be returned.

This method will issue a warning if it’s running inside async context, you can use await object_ref instead of ray.get(object_ref). For a list of object refs, you can use await asyncio.gather(*object_refs).

Parameters
  • object_refs – Object ref of the object to get or a list of object refs to get.

  • timeout (Optional[float]) – The maximum amount of time in seconds to wait before returning.

Returns

A Python object or a list of Python objects.

Raises
  • GetTimeoutError – A GetTimeoutError is raised if a timeout is set and the get takes longer than timeout to return.

  • Exception – An exception is raised if the task that created the object or that created one of the objects raised an exception.

ray.wait

ray.wait(object_refs, *, num_returns=1, timeout=None)[source]

Return a list of IDs that are ready and a list of IDs that are not.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object refs.

This method returns two lists. The first list consists of object refs that correspond to objects that are available in the object store. The second list corresponds to the rest of the object refs (which may or may not be ready).

Ordering of the input list of object refs is preserved. That is, if A precedes B in the input list, and both are in the ready list, then A will precede B in the ready list. This also holds true if A and B are both in the remaining list.

This method will issue a warning if it’s running inside an async context. Instead of ray.wait(object_refs), you can use await asyncio.wait(object_refs).

Parameters
  • object_refs (List[ObjectRef]) – List of object refs for objects that may or may not be ready. Note that these IDs must be unique.

  • num_returns (int) – The number of object refs that should be returned.

  • timeout (float) – The maximum amount of time in seconds to wait before returning.

Returns

A list of object refs that are ready and a list of the remaining object IDs.

ray.put

ray.put(value)[source]

Store an object in the object store.

The object may not be evicted while a reference to the returned ID exists.

Parameters

value – The Python object to be stored.

Returns

The object ref assigned to this value.

ray.kill

ray.kill(actor, *, no_restart=True)[source]

Kill an actor forcefully.

This will interrupt any running tasks on the actor, causing them to fail immediately. Any atexit handlers installed in the actor will still be run.

If you want to kill the actor but let pending tasks finish, you can call actor.__ray_terminate__.remote() instead to queue a termination task.

If the actor is a detached actor, subsequent calls to get its handle via ray.get_actor will fail.

Parameters
  • actor (ActorHandle) – Handle to the actor to kill.

  • no_restart (bool) – Whether or not this actor should be restarted if it’s a restartable actor.

ray.cancel

ray.cancel(object_ref, *, force=False, recursive=True)[source]

Cancels a task according to the following conditions.

If the specified task is pending execution, it will not be executed. If the task is currently executing, the behavior depends on the force flag. When force=False, a KeyboardInterrupt will be raised in Python and when force=True, the executing the task will immediately exit. If the task is already finished, nothing will happen.

Only non-actor tasks can be canceled. Canceled tasks will not be retried (max_retries will not be respected).

Calling ray.get on a canceled task will raise a TaskCancelledError or a WorkerCrashedError if force=True.

Parameters
  • object_ref (ObjectRef) – ObjectRef returned by the task that should be canceled.

  • force (boolean) – Whether to force-kill a running task by killing the worker that is running the task.

  • recursive (boolean) – Whether to try to cancel tasks submitted by the task specified.

Raises

TypeError – This is also raised for actor tasks.

ray.get_gpu_ids

ray.get_gpu_ids()[source]

Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker started up, then the IDs returned by this method will be a subset of the IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.

Returns

A list of GPU IDs.

ray.shutdown

ray.shutdown(_exiting_interpreter=False)[source]

Disconnect the worker, and terminate processes started by ray.init().

This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.

Note that this will clear any remote function definitions, actor definitions, and existing actors, so if you wish to use any previously defined remote functions or actors after calling ray.shutdown(), then you need to redefine them. If they were defined in an imported module, then you will need to reload the module.

Parameters

_exiting_interpreter (bool) – True if this is called by the atexit hook and false otherwise. If we are exiting the interpreter, we will wait a little while to print any extra error messages.

ray.method

ray.method(*args, **kwargs)[source]

Annotate an actor method.

@ray.remote
class Foo:
    @ray.method(num_returns=2)
    def bar(self):
        return 1, 2

f = Foo.remote()

_, _ = f.bar.remote()
Parameters

num_returns – The number of object refs that should be returned by invocations of this actor method.

ray.util.ActorPool

class ray.util.ActorPool(actors)[source]

Utility class to operate on a fixed pool of actors.

Parameters

actors (list) – List of Ray actor handles to use in this pool.

Examples

>>> a1, a2 = Actor.remote(), Actor.remote()
>>> pool = ActorPool([a1, a2])
>>> print(list(pool.map(lambda a, v: a.double.remote(v),        ...                     [1, 2, 3, 4])))
[2, 4, 6, 8]
map(fn, values)[source]

Apply the given function in parallel over the actors and values.

This returns an ordered iterator that will return results of the map as they finish. Note that you must iterate over the iterator to force the computation to finish.

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

Returns

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(list(pool.map(lambda a, v: a.double.remote(v),            ...                     [1, 2, 3, 4])))
[2, 4, 6, 8]
map_unordered(fn, values)[source]

Similar to map(), but returning an unordered iterator.

This returns an unordered iterator that will return results of the map as they finish. This can be more efficient that map() if some results take longer to compute than others.

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • values (list) – List of values that fn(actor, value) should be applied to.

Returns

Iterator over results from applying fn to the actors and values.

Examples

>>> pool = ActorPool(...)
>>> print(list(pool.map_unordered(lambda a, v: a.double.remote(v),            ...                               [1, 2, 3, 4])))
[6, 2, 4, 8]
submit(fn, value)[source]

Schedule a single task to run in the pool.

This has the same argument semantics as map(), but takes on a single value instead of a list of values. The result can be retrieved using get_next() / get_next_unordered().

Parameters
  • fn (func) – Function that takes (actor, value) as argument and returns an ObjectRef computing the result over the value. The actor will be considered busy until the ObjectRef completes.

  • value (object) – Value to compute a result for.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next(), pool.get_next())
2, 4
has_next()[source]

Returns whether there are any pending results to return.

Returns

True if there are any pending results not yet returned.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.has_next())
True
>>> print(pool.get_next())
2
>>> print(pool.has_next())
False
get_next(timeout=None)[source]

Returns the next pending result in order.

This returns the next result produced by submit(), blocking for up to the specified timeout until it is available.

Returns

The next result.

Raises

TimeoutError if the timeout is reached.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> print(pool.get_next())
2
get_next_unordered(timeout=None)[source]

Returns any of the next pending results.

This returns some result produced by submit(), blocking for up to the specified timeout until it is available. Unlike get_next(), the results are not always returned in same order as submitted, which can improve performance.

Returns

The next result.

Raises

TimeoutError if the timeout is reached.

Examples

>>> pool = ActorPool(...)
>>> pool.submit(lambda a, v: a.double.remote(v), 1)
>>> pool.submit(lambda a, v: a.double.remote(v), 2)
>>> print(pool.get_next_unordered())
4
>>> print(pool.get_next_unordered())
2

ray.util.queue.Queue

class ray.util.queue.Queue(maxsize=0)[source]

Queue implementation on Ray.

Parameters

maxsize (int) – maximum size of the queue. If zero, size is unbounded.

size()[source]

The size of the queue.

qsize()[source]

The size of the queue.

empty()[source]

Whether the queue is empty.

full()[source]

Whether the queue is full.

put(item, block=True, timeout=None)[source]

Adds an item to the queue.

There is no guarantee of order if multiple producers put to the same full queue.

Raises
  • Full if the queue is full and blocking is False.

  • Full if the queue is full, blocking is True, and it timed out.

  • ValueError if timeout is negative.

get(block=True, timeout=None)[source]

Gets an item from the queue.

There is no guarantee of order if multiple consumers get from the same empty queue.

Returns

The next item in the queue.

Raises
  • Empty if the queue is empty and blocking is False.

  • Empty if the queue is empty, blocking is True, and it timed out.

  • ValueError if timeout is negative.

put_nowait(item)[source]

Equivalent to put(item, block=False).

Raises

Full if the queue is full.

get_nowait()[source]

Equivalent to get(block=False).

Raises

Empty if the queue is empty.

ray.nodes

ray.nodes()[source]

Get a list of the nodes in the cluster (for debugging only).

Returns

Information about the Ray clients in the cluster.

ray.timeline

ray.timeline(filename=None)[source]

Return a list of profiling events that can viewed as a timeline.

To view this information as a timeline, simply dump it as a json file by passing in “filename” or using using json.dump, and then load go to chrome://tracing in the Chrome web browser and load the dumped file.

Parameters

filename – If a filename is provided, the timeline is dumped to that file.

Returns

If filename is not provided, this returns a list of profiling events.

Each profile event is a dictionary.

ray.cluster_resources

ray.cluster_resources()[source]

Get the current total cluster resources.

Note that this information can grow stale as nodes are added to or removed from the cluster.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.available_resources

ray.available_resources()[source]

Get the current available cluster resources.

This is different from cluster_resources in that this will return idle (available) resources rather than total resources.

Note that this information can grow stale as tasks start and finish.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.cross_language

ray.java_function(class_name, function_name)[source]

Define a Java function.

Parameters
  • class_name (str) – Java class name.

  • function_name (str) – Java function name.

ray.java_actor_class(class_name)[source]

Define a Java actor class.

Parameters

class_name (str) – Java class name.

Placement Group APIs

placement_group

ray.util.placement_group.placement_group(bundles: List[Dict[str, float]], strategy: str = 'PACK', name: str = 'unnamed_group')ray.util.placement_group.PlacementGroup[source]

Asynchronously creates a PlacementGroup.

Parameters
  • bundles (List[Dict]) – A list of bundles which represent the resources requirements.

  • strategy (str) –

    The strategy to create the placement group.

    • ”PACK”: Packs Bundles into as few nodes as possible.

    • ”SPREAD”: Places Bundles across distinct nodes as even as possible.

    • ”STRICT_PACK”: Packs Bundles into one node. The group is not allowed to span multiple nodes.

    • ”STRICT_SPREAD”: Packs Bundles across distinct nodes.

  • name (str) – The name of the placement group.

Returns

Placement group object.

Return type

PlacementGroup

PlacementGroup (class)

class ray.util.placement_group.PlacementGroup(id: <Mock name='mock.PlacementGroupID' id='140243843244432'>)[source]

A handle to a placement group.

ready() → <Mock name=’mock.ObjectRef’ id=’140243843317520’>[source]

Returns an ObjectRef to check ready status.

This API runs a small dummy task to wait for placement group creation. It is compatible to ray.get and ray.wait.

Example:

>>> pg = placement_group([{"CPU": 1}])
    ray.get(pg.ready())
>>> pg = placement_group([{"CPU": 1}])
    ray.wait([pg.ready()], timeout=0)
property bundle_specs

Return bundles belonging to this placement group.

Type

List[Dict]

placement_group_table

ray.util.placement_group.placement_group_table(placement_group: ray.util.placement_group.PlacementGroup = None) → list[source]

Get the state of the placement group from GCS.

Parameters

placement_group (PlacementGroup) – placement group to see states.

remove_placement_group

ray.util.placement_group.remove_placement_group(placement_group: ray.util.placement_group.PlacementGroup)[source]

Asynchronously remove placement group.

Parameters

placement_group (PlacementGroup) – The placement group to delete.

get_current_placement_group

ray.util.placement_group.get_current_placement_group() → Optional[ray.util.placement_group.PlacementGroup][source]

Get the current placement group which a task or actor is using.

It returns None if there’s no current placement group for the worker. For example, if you call this method in your driver, it returns None (because drivers never belong to any placement group).

Examples

>>> @ray.remote
>>> def f():
>>>     # This will return the placement group the task f belongs to.
>>>     # It means this pg will be identical to the pg created below.
>>>     pg = get_current_placement_group()
>>> pg = placement_group([{"CPU": 2}])
>>> f.options(placement_group=pg).remote()
>>> # New script.
>>> ray.init()
>>> # New script doesn't belong to any placement group,
>>> # so it returns None.
>>> assert get_current_placement_group() is None
Returns

Placement group object.

None if the current task or actor wasn’t created with any placement group.

Return type

PlacementGroup

Custom Metrics APIs

Metric

class ray.util.metrics.Metric(name: str, description: str = '', tag_keys: Optional[Tuple[str]] = None)[source]

The parent class of custom metrics.

Ray’s custom metrics APIs are rooted from this class and share the same public methods.

set_default_tags(default_tags: Dict[str, str])[source]

Set default tags of metrics.

Example

>>> # Note that set_default_tags returns the instance itself.
>>> counter = Counter("name")
>>> counter2 = counter.set_default_tags({"a": "b"})
>>> assert counter is counter2
>>> # this means you can instantiate it in this way.
>>> counter = Counter("name").set_default_tags({"a": "b"})
Parameters

default_tags (dict) – Default tags that are used for every record method.

Returns

it returns the instance itself.

Return type

Metric

record(value: float, tags: dict = None) → None[source]

Record the metric point of the metric.

Parameters

value (float) – The value to be recorded as a metric point.

property info

Return the information of this metric.

Example

>>> counter = Counter("name", description="desc")
    print(counter.info)
    """
    {
        "name": "name",
        "description": "desc"
        "tag_keys": ("ray.key")
        "default_tags": {"ray.key": "abc"}
    }
    """

Count

class ray.util.metrics.Count(name: str, description: str = '', tag_keys: Optional[Tuple[str]] = None)[source]

The count of the number of metric points.

This is corresponding to Prometheus’ Count metric.

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • tag_keys (tuple) – Tag keys of the metric.

Gauge

class ray.util.metrics.Gauge(name: str, description: str = '', tag_keys: Optional[Tuple[str]] = None)[source]

Gauge Keeps the last recorded value, drops everything before.

This is corresponding to Prometheus’ Gauge metric.

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • tag_keys (tuple) – Tag keys of the metric.

Histogram

class ray.util.metrics.Histogram(name: str, description: str = '', boundaries: List[float] = None, tag_keys: Optional[Tuple[str]] = None)[source]

Histogram distribution of metric points.

This is corresponding to Prometheus’ Histogram metric. Recording metrics with histogram will enable you to import min, mean, max, 25, 50, 95, 99 percentile latency.

Parameters
  • name (str) – Name of the metric.

  • description (str) – Description of the metric.

  • boundaries (list) – Boundaries of histogram buckets.

  • tag_keys (tuple) – Tag keys of the metric.

property info

Return information about histogram metric.

Debugger APIs

ray.util.pdb.set_trace()

Interrupt the flow of the program and drop into the Ray debugger.

Can be used within a Ray task or actor.

Experimental APIs

ray.experimental.set_resource(resource_name, capacity, client_id=None)[source]

Set a resource to a specified capacity.

This creates, updates or deletes a custom resource for a target NodeID. If the resource already exists, it’s capacity is updated to the new value. If the capacity is set to 0, the resource is deleted. If NodeID is not specified or set to None, the resource is created on the local client where the actor is running.

Parameters
  • resource_name (str) – Name of the resource to be created

  • capacity (int) – Capacity of the new resource. Resource is deleted if capacity is 0.

  • client_id (str) – The NodeID of the node where the resource is to be set.

Returns

None

Raises

ValueError – This exception is raised when a non-negative capacity is specified.

ray.experimental.force_spill_objects(object_refs)[source]

Force spilling objects to external storage.

Parameters

object_refs – Object refs of the objects to be spilled.

The Ray Command Line API

ray start

Start Ray processes manually on the local machine.

ray start [OPTIONS]

Options

--node-ip-address <node_ip_address>

the IP address of this node

--address <address>

the address to use for Ray

--port <port>

the port of the head ray process. If not provided, defaults to 6379; if port is set to 0, we will allocate an available port.

--redis-shard-ports <redis_shard_ports>

the port to use for the Redis shards other than the primary Redis shard

--object-manager-port <object_manager_port>

the port to use for starting the object manager

--node-manager-port <node_manager_port>

the port to use for starting the node manager

--gcs-server-port <gcs_server_port>

Port number for the GCS server.

--min-worker-port <min_worker_port>

the lowest port number that workers will bind on. If not set, random ports will be chosen.

--max-worker-port <max_worker_port>

the highest port number that workers will bind on. If set, ‘–min-worker-port’ must also be set.

--worker-port-list <worker_port_list>

a comma-separated list of open ports for workers to bind on. Overrides ‘–min-worker-port’ and ‘–max-worker-port’.

--object-store-memory <object_store_memory>

The amount of memory (in bytes) to start the object store with. By default, this is capped at 20GB but can be set higher.

--num-cpus <num_cpus>

the number of CPUs on this node

--num-gpus <num_gpus>

the number of GPUs on this node

--resources <resources>

a JSON serialized dictionary mapping resource name to resource quantity

--head

provide this argument for the head node

--include-dashboard <include_dashboard>

provide this argument to start the Ray dashboard GUI

--dashboard-host <dashboard_host>

the host to bind the dashboard server to, either localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, thisis localhost.

--dashboard-port <dashboard_port>

the port to bind the dashboard server to–defaults to 8265

--block

provide this argument to block forever in this command

--plasma-directory <plasma_directory>

object store directory for memory mapped files

--autoscaling-config <autoscaling_config>

the file that contains the autoscaling config

--no-redirect-worker-output

do not redirect worker stdout and stderr to files

--no-redirect-output

do not redirect non-worker stdout and stderr to files

--plasma-store-socket-name <plasma_store_socket_name>

manually specify the socket name of the plasma store

--raylet-socket-name <raylet_socket_name>

manually specify the socket path of the raylet process

--load-code-from-local

Specify whether load code from local file or GCS serialization.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

ray stop

Stop Ray processes manually on the local machine.

ray stop [OPTIONS]

Options

-f, --force

If set, ray will send SIGKILL instead of SIGTERM.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

ray up

Create or update a Ray cluster.

ray up [OPTIONS] CLUSTER_CONFIG_FILE

Options

--min-workers <min_workers>

Override the configured min worker node count for the cluster.

--max-workers <max_workers>

Override the configured max worker node count for the cluster.

--no-restart

Whether to skip restarting Ray services during the update. This avoids interrupting running jobs.

--restart-only

Whether to skip running setup commands and only restart Ray. This cannot be used with ‘no-restart’.

-y, --yes

Don’t ask for confirmation.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

--redirect-command-output

Whether to redirect command output to a file.

--use-login-shells, --use-normal-shells

Ray uses login shells (bash –login -i) to run cluster commands by default. If your workflow is compatible with normal shells, this can be disabled for a better user experience.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray down

Tear down a Ray cluster.

ray down [OPTIONS] CLUSTER_CONFIG_FILE

Options

-y, --yes

Don’t ask for confirmation.

--workers-only

Only destroy the workers.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--keep-min-workers

Retain the minimal amount of workers specified in the config.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray exec

Execute a command via SSH on a Ray cluster.

ray exec [OPTIONS] CLUSTER_CONFIG_FILE CMD

Options

--run-env <run_env>

Choose whether to execute this command in a container or directly on the cluster head. Only applies when docker is configured in the YAML.

Options

auto|host|docker

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

CMD

Required argument

ray submit

Uploads and runs a script on the specified cluster.

The script is automatically synced to the following location:

os.path.join(“~”, os.path.basename(script))

Example:
>>> ray submit [CLUSTER.YAML] experiment.py -- --smoke-test
ray submit [OPTIONS] CLUSTER_CONFIG_FILE SCRIPT [SCRIPT_ARGS]...

Options

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--args <args>

(deprecated) Use ‘– –arg1 –arg2’ for script args.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

SCRIPT

Required argument

SCRIPT_ARGS

Optional argument(s)

ray attach

Create or attach to a SSH session to a Ray cluster.

ray attach [OPTIONS] CLUSTER_CONFIG_FILE

Options

--start

Start the cluster if needed.

--screen

Run the command in screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

--no-config-cache

Disable the local cluster config cache.

-N, --new

Force creation of a new screen.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--log-style <log_style>

If ‘pretty’, outputs with formatting and color. If ‘record’, outputs record-style without formatting. ‘auto’ defaults to ‘pretty’, and disables pretty logging if stdin is not a TTY.

Options

auto|record|pretty

--log-color <log_color>

Use color logging. Auto enables color logging if stdout is a TTY.

Options

auto|false|true

-v, --verbose

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray get_head_ip

Return the head node IP of a Ray cluster.

ray get_head_ip [OPTIONS] CLUSTER_CONFIG_FILE

Options

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray stack

Take a stack dump of all Python workers on the local machine.

ray stack [OPTIONS]

ray memory

Print object references held in a Ray cluster.

ray memory [OPTIONS]

Options

--address <address>

Override the address to connect to.

--redis_password <redis_password>

Connect to ray with redis_password.

ray timeline

Take a Chrome tracing timeline for a Ray cluster.

ray timeline [OPTIONS]

Options

--address <address>

Override the redis address to connect to.

ray debug

Show all active breakpoints and exceptions in the Ray debugger.

ray debug [OPTIONS]

Options

--address <address>

Override the address to connect to.