Ray Package Reference

ray.init(address=None, redis_address=None, redis_port=None, num_cpus=None, num_gpus=None, memory=None, object_store_memory=None, resources=None, driver_object_store_memory=None, redis_max_memory=None, log_to_driver=True, node_ip_address='127.0.0.1', object_id_seed=None, local_mode=False, redirect_worker_output=None, redirect_output=None, ignore_reinit_error=False, num_redis_shards=None, redis_max_clients=None, redis_password='5241590000000000', plasma_directory=None, huge_pages=False, include_java=False, include_webui=None, webui_host='localhost', job_id=None, configure_logging=True, logging_level=20, logging_format='%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s', plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None, load_code_from_local=False, use_pickle=True, _internal_config=None, lru_evict=False)[source]

Connect to an existing Ray cluster or start one and connect to it.

This method handles two cases. Either a Ray cluster already exists and we just attach this driver to it, or we start all of the processes associated with a Ray cluster and attach to the newly started cluster.

To start Ray and all of the relevant processes, use this as follows:

ray.init()

To connect to an existing Ray cluster, use this as follows (substituting in the appropriate address):

ray.init(address="123.45.67.89:6379")
Parameters
  • address (str) – The address of the Ray cluster to connect to. If this address is not provided, then this command will start Redis, a raylet, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits.

  • redis_address (str) – Deprecated; same as address.

  • redis_port (int) – The port that the primary Redis shard should listen to. If None, then a random port will be chosen.

  • num_cpus (int) – Number of cpus the user wishes all raylets to be configured with.

  • num_gpus (int) – Number of gpus the user wishes all raylets to be configured with.

  • resources – A dictionary mapping the name of a resource to the quantity of that resource available.

  • memory – The amount of memory (in bytes) that is available for use by workers requesting memory resources. By default, this is autoset based on available system memory.

  • object_store_memory – The amount of memory (in bytes) to start the object store with. By default, this is autoset based on available system memory, subject to a 20GB cap.

  • redis_max_memory – The max amount of memory (in bytes) to allow each redis shard to use. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task, object, and profile tables). By default, this is autoset based on available system memory, subject to a 10GB cap.

  • log_to_driver (bool) – If true, then output from all of the worker processes on all nodes will be directed to the driver.

  • node_ip_address (str) – The IP address of the node that we are on.

  • object_id_seed (int) – Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same driver in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different drivers.

  • local_mode (bool) – True if the code should be executed serially without Ray. This is useful for debugging.

  • driver_object_store_memory (int) – Limit the amount of memory the driver can use in the object store for creating objects. By default, this is autoset based on available system memory, subject to a 20GB cap.

  • ignore_reinit_error – True if we should suppress errors from calling ray.init() a second time.

  • num_redis_shards – The number of Redis shards to start in addition to the primary Redis shard.

  • redis_max_clients – If provided, attempt to configure Redis with this maxclients number.

  • redis_password (str) – Prevents external clients without the password from connecting to Redis if provided.

  • plasma_directory – A directory where the Plasma memory mapped files will be created.

  • huge_pages – Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory.

  • include_java – Boolean flag indicating whether to enable java worker.

  • include_webui – Boolean flag indicating whether to start the web UI, which displays the status of the Ray cluster. If this argument is None, then the UI will be started if the relevant dependencies are present.

  • webui_host – The host to bind the web UI server to. Can either be localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines.

  • job_id – The ID of this job.

  • configure_logging – True if allow the logging cofiguration here. Otherwise, the users may want to configure it by their own.

  • logging_level – Logging level, default will be logging.INFO.

  • logging_format – Logging format, default contains a timestamp, filename, line number, and message. See ray_constants.py.

  • plasma_store_socket_name (str) – If provided, it will specify the socket name used by the plasma store.

  • raylet_socket_name (str) – If provided, it will specify the socket path used by the raylet process.

  • temp_dir (str) – If provided, it will specify the root temporary directory for the Ray process.

  • load_code_from_local – Whether code should be loaded from a local module or from the GCS.

  • use_pickle – Deprecated.

  • _internal_config (str) – JSON configuration for overriding RayConfig defaults. For testing purposes ONLY.

  • lru_evict (bool) – If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.UnreconstructableError may be thrown. If False, then reference counting will be used to decide which objects are safe to evict and when under memory pressure, ray.ObjectStoreFullError may be thrown.

Returns

Address information about the started processes.

Raises

Exception – An exception is raised if an inappropriate combination of arguments is passed in.

ray.is_initialized()[source]

Check if ray.init has been called yet.

Returns

True if ray.init has already been called and false otherwise.

ray.remote(*args, **kwargs)[source]

Define a remote function or an actor class.

This can be used with no arguments to define a remote function or actor as follows:

@ray.remote
def f():
    return 1

@ray.remote
class Foo:
    def method(self):
        return 1

It can also be used with specific keyword arguments:

  • num_return_vals: This is only for remote functions. It specifies the number of object IDs returned by the remote function invocation.

  • num_cpus: The quantity of CPU cores to reserve for this task or for the lifetime of the actor.

  • num_gpus: The quantity of GPUs to reserve for this task or for the lifetime of the actor.

  • resources: The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to numbers.

  • max_calls: Only for remote functions. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in third-party libraries or to reclaim resources that cannot easily be released, e.g., GPU memory that was acquired by TensorFlow). By default this is infinite.

  • max_reconstructions: Only for actors. This specifies the maximum number of times that the actor should be reconstructed when it dies unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn’t need to be reconstructed. And the maximum valid value is ray.ray_constants.INFINITE_RECONSTRUCTION.

  • max_retries: Only for remote functions. This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4 (default), and the maximum valid value is ray.ray_constants.INFINITE_RECONSTRUCTION.

This can be done as follows:

@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
    return 1, 2

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1

Remote task and actor objects returned by @ray.remote can also be dynamically modified with the same arguments as above using .options() as follows:

@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
    return 1, 2
g = f.options(num_gpus=2, max_calls=None)

@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo:
    def method(self):
        return 1
Bar = Foo.options(num_cpus=1, resources=None)

Running remote actors will be terminated when the actor handle to them in Python is deleted, which will cause them to complete any outstanding work and then shut down. If you want to kill them immediately, you can also call ray.kill(actor).

ray.get(object_ids, timeout=None)[source]

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned.

This method will issue a warning if it’s running inside async context, you can use await object_id instead of ray.get(object_id). For a list of object ids, you can use await asyncio.gather(*object_ids).

Parameters
  • object_ids – Object ID of the object to get or a list of object IDs to get.

  • timeout (Optional[float]) – The maximum amount of time in seconds to wait before returning.

Returns

A Python object or a list of Python objects.

Raises
  • RayTimeoutError – A RayTimeoutError is raised if a timeout is set and the get takes longer than timeout to return.

  • Exception – An exception is raised if the task that created the object or that created one of the objects raised an exception.

ray.wait(object_ids, num_returns=1, timeout=None)[source]

Return a list of IDs that are ready and a list of IDs that are not.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object IDs.

This method returns two lists. The first list consists of object IDs that correspond to objects that are available in the object store. The second list corresponds to the rest of the object IDs (which may or may not be ready).

Ordering of the input list of object IDs is preserved. That is, if A precedes B in the input list, and both are in the ready list, then A will precede B in the ready list. This also holds true if A and B are both in the remaining list.

This method will issue a warning if it’s running inside an async context. Instead of ray.wait(object_ids), you can use await asyncio.wait(object_ids).

Parameters
  • object_ids (List[ObjectID]) – List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.

  • num_returns (int) – The number of object IDs that should be returned.

  • timeout (float) – The maximum amount of time in seconds to wait before returning.

Returns

A list of object IDs that are ready and a list of the remaining object IDs.

ray.put(value, weakref=False)[source]

Store an object in the object store.

The object may not be evicted while a reference to the returned ID exists.

Parameters
  • value – The Python object to be stored.

  • weakref – If set, allows the object to be evicted while a reference to the returned ID exists. You might want to set this if putting a lot of objects that you might not need in the future.

Returns

The object ID assigned to this value.

ray.kill(actor)[source]

Kill an actor forcefully.

This will interrupt any running tasks on the actor, causing them to fail immediately. Any atexit handlers installed in the actor will still be run.

If you want to kill the actor but let pending tasks finish, you can call actor.__ray_terminate__.remote() instead to queue a termination task.

If this actor is reconstructable, it will be attempted to be reconstructed.

Parameters

actor (ActorHandle) – Handle to the actor to kill.

ray.get_gpu_ids()[source]

Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker started up, then the IDs returned by this method will be a subset of the IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.

Returns

A list of GPU IDs.

ray.get_resource_ids()[source]

Get the IDs of the resources that are available to the worker.

Returns

A dictionary mapping the name of a resource to a list of pairs, where each pair consists of the ID of a resource and the fraction of that resource reserved for this worker.

ray.get_webui_url()[source]

Get the URL to access the web UI.

Note that the URL does not specify which node the web UI is on.

Returns

The URL of the web UI as a string.

ray.shutdown(exiting_interpreter=False)[source]

Disconnect the worker, and terminate processes started by ray.init().

This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. The primary use case for this function is to cleanup state between tests.

Note that this will clear any remote function definitions, actor definitions, and existing actors, so if you wish to use any previously defined remote functions or actors after calling ray.shutdown(), then you need to redefine them. If they were defined in an imported module, then you will need to reload the module.

Parameters

exiting_interpreter (bool) – True if this is called by the atexit hook and false otherwise. If we are exiting the interpreter, we will wait a little while to print any extra error messages.

ray.register_custom_serializer(cls, serializer, deserializer, use_pickle=False, use_dict=False, class_id=None)[source]

Registers custom functions for efficient object serialization.

The serializer and deserializer are used when transferring objects of cls across processes and nodes. This can be significantly faster than the Ray default fallbacks. Wraps register_custom_serializer underneath.

Parameters
  • cls (type) – The class that ray should use this custom serializer for.

  • serializer – The custom serializer that takes in a cls instance and outputs a serialized representation. use_pickle and use_dict must be False if provided.

  • deserializer – The custom deserializer that takes in a serialized representation of the cls and outputs a cls instance. use_pickle and use_dict must be False if provided.

  • use_pickle – Deprecated.

  • use_dict – Deprecated.

  • class_id (str) – Unique ID of the class. Autogenerated if None.

ray.profile(event_type, extra_data=None)[source]

Profile a span of time so that it appears in the timeline visualization.

Note that this only works in the raylet code path.

This function can be used as follows (both on the driver or within a task).

with ray.profile("custom event", extra_data={'key': 'value'}):
    # Do some computation here.

Optionally, a dictionary can be passed as the “extra_data” argument, and it can have keys “name” and “cname” if you want to override the default timeline display text and box color. Other values will appear at the bottom of the chrome tracing GUI when you click on the box corresponding to this profile span.

Parameters
  • event_type – A string describing the type of the event.

  • extra_data – This must be a dictionary mapping strings to strings. This data will be added to the json objects that are used to populate the timeline, so if you want to set a particular color, you can simply set the “cname” attribute to an appropriate color. Similarly, if you set the “name” attribute, then that will set the text displayed on the box in the timeline.

Returns

An object that can profile a span of time via a “with” statement.

ray.method(*args, **kwargs)[source]

Annotate an actor method.

@ray.remote
class Foo:
    @ray.method(num_return_vals=2)
    def bar(self):
        return 1, 2

f = Foo.remote()

_, _ = f.bar.remote()
Parameters

num_return_vals – The number of object IDs that should be returned by invocations of this actor method.

Inspect the Cluster State

ray.nodes()[source]

Get a list of the nodes in the cluster.

Returns

Information about the Ray clients in the cluster.

ray.tasks(task_id=None)[source]

Fetch and parse the task table information for one or more task IDs.

Parameters

task_id – A hex string of the task ID to fetch information about. If this is None, then the task object table is fetched.

Returns

Information from the task table.

ray.objects(object_id=None)[source]

Fetch and parse the object table info for one or more object IDs.

Parameters

object_id – An object ID to fetch information about. If this is None, then the entire object table is fetched.

Returns

Information from the object table.

ray.timeline(filename=None)[source]

Return a list of profiling events that can viewed as a timeline.

To view this information as a timeline, simply dump it as a json file by passing in “filename” or using using json.dump, and then load go to chrome://tracing in the Chrome web browser and load the dumped file.

Parameters

filename – If a filename is provided, the timeline is dumped to that file.

Returns

If filename is not provided, this returns a list of profiling events.

Each profile event is a dictionary.

ray.object_transfer_timeline(filename=None)[source]

Return a list of transfer events that can viewed as a timeline.

To view this information as a timeline, simply dump it as a json file by passing in “filename” or using using json.dump, and then load go to chrome://tracing in the Chrome web browser and load the dumped file. Make sure to enable “Flow events” in the “View Options” menu.

Parameters

filename – If a filename is provided, the timeline is dumped to that file.

Returns

If filename is not provided, this returns a list of profiling events.

Each profile event is a dictionary.

ray.cluster_resources()[source]

Get the current total cluster resources.

Note that this information can grow stale as nodes are added to or removed from the cluster.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.available_resources()[source]

Get the current available cluster resources.

This is different from cluster_resources in that this will return idle (available) resources rather than total resources.

Note that this information can grow stale as tasks start and finish.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

ray.errors(all_jobs=False)[source]

Get error messages from the cluster.

Parameters

all_jobs – False if we should only include error messages for this specific job, or True if we should include error messages for all jobs.

Returns

Error messages pushed from the cluster. This will be a single list if

all_jobs is False, or a dictionary mapping from job ID to a list of error messages for that job if all_jobs is True.

Experimental APIs

ray.experimental.get(object_ids)[source]

Get a single or a collection of remote objects from the object store.

This method is identical to ray.get except it adds support for tuples, ndarrays and dictionaries.

Parameters

object_ids – Object ID of the object to get, a list, tuple, ndarray of object IDs to get or a dict of {key: object ID}.

Returns

object}.

Return type

A Python object, a list of Python objects or a dict of {key

ray.experimental.wait(object_ids, num_returns=1, timeout=None)[source]

Return a list of IDs that are ready and a list of IDs that are not.

This method is identical to ray.wait except it adds support for tuples and ndarrays.

Parameters
  • object_ids (List[ObjectID], Tuple(ObjectID), np.array(ObjectID)) – List like of object IDs for objects that may or may not be ready. Note that these IDs must be unique.

  • num_returns (int) – The number of object IDs that should be returned.

  • timeout (float) – The maximum amount of time in seconds to wait before returning.

Returns

A list of object IDs that are ready and a list of the remaining object

IDs.

ray.experimental.set_resource(resource_name, capacity, client_id=None)[source]

Set a resource to a specified capacity.

This creates, updates or deletes a custom resource for a target clientId. If the resource already exists, it’s capacity is updated to the new value. If the capacity is set to 0, the resource is deleted. If ClientID is not specified or set to None, the resource is created on the local client where the actor is running.

Parameters
  • resource_name (str) – Name of the resource to be created

  • capacity (int) – Capacity of the new resource. Resource is deleted if capacity is 0.

  • client_id (str) – The ClientId of the node where the resource is to be set.

Returns

None

Raises

ValueError – This exception is raised when a non-negative capacity is specified.

The Ray Command Line API

ray start

ray start [OPTIONS]

Options

--node-ip-address <node_ip_address>

the IP address of this node

--redis-address <redis_address>

same as –address

--address <address>

the address to use for Ray

--redis-port <redis_port>

the port to use for starting Redis

--num-redis-shards <num_redis_shards>

the number of additional Redis shards to use in addition to the primary Redis shard

--redis-max-clients <redis_max_clients>

If provided, attempt to configure Redis with this maximum number of clients.

--redis-password <redis_password>

If provided, secure Redis ports with this password

--redis-shard-ports <redis_shard_ports>

the port to use for the Redis shards other than the primary Redis shard

--object-manager-port <object_manager_port>

the port to use for starting the object manager

--node-manager-port <node_manager_port>

the port to use for starting the node manager

--memory <memory>

The amount of memory (in bytes) to make available to workers. By default, this is set to the available memory on the node.

--object-store-memory <object_store_memory>

The amount of memory (in bytes) to start the object store with. By default, this is capped at 20GB but can be set higher.

--redis-max-memory <redis_max_memory>

The max amount of memory (in bytes) to allow redis to use. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task, object, and profile tables). By default this is capped at 10GB but can be set higher.

--num-cpus <num_cpus>

the number of CPUs on this node

--num-gpus <num_gpus>

the number of GPUs on this node

--resources <resources>

a JSON serialized dictionary mapping resource name to resource quantity

--head

provide this argument for the head node

--include-webui <include_webui>

provide this argument if the UI should be started

--webui-host <webui_host>

The host to bind the web UI server to. Can either be localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces). By default, this is set to localhost to prevent access from external machines.

--block

provide this argument to block forever in this command

--plasma-directory <plasma_directory>

object store directory for memory mapped files

--huge-pages

enable support for huge pages in the object store

--autoscaling-config <autoscaling_config>

the file that contains the autoscaling config

--no-redirect-worker-output

do not redirect worker stdout and stderr to files

--no-redirect-output

do not redirect non-worker stdout and stderr to files

--plasma-store-socket-name <plasma_store_socket_name>

manually specify the socket name of the plasma store

--raylet-socket-name <raylet_socket_name>

manually specify the socket path of the raylet process

--temp-dir <temp_dir>

manually specify the root temporary dir of the Ray process

--include-java

Enable Java worker support.

--java-worker-options <java_worker_options>

Overwrite the options to start Java workers.

--internal-config <internal_config>

Do NOT use this. This is for debugging/development purposes ONLY.

--load-code-from-local

Specify whether load code from local file or GCS serialization.

ray stop

ray stop [OPTIONS]

Options

-f, --force

If set, ray will send SIGKILL instead of SIGTERM.

-v, --verbose

If set, ray prints out more information about processes to kill.

ray up

Create or update a Ray cluster.

ray up [OPTIONS] CLUSTER_CONFIG_FILE

Options

--no-restart

Whether to skip restarting Ray services during the update. This avoids interrupting running jobs.

--restart-only

Whether to skip running setup commands and only restart Ray. This cannot be used with ‘no-restart’.

--min-workers <min_workers>

Override the configured min worker node count for the cluster.

--max-workers <max_workers>

Override the configured max worker node count for the cluster.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-y, --yes

Don’t ask for confirmation.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray down

Tear down the Ray cluster.

ray down [OPTIONS] CLUSTER_CONFIG_FILE

Options

--workers-only

Only destroy the workers.

--keep-min-workers

Retain the minimal amount of workers specified in the config.

-y, --yes

Don’t ask for confirmation.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray exec

ray exec [OPTIONS] CLUSTER_CONFIG_FILE CMD

Options

--docker

Runs command in the docker container specified in cluster_config.

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

Arguments

CLUSTER_CONFIG_FILE

Required argument

CMD

Required argument

ray submit

Uploads and runs a script on the specified cluster.

The script is automatically synced to the following location:

os.path.join(“~”, os.path.basename(script))

Example:
>>> ray submit [CLUSTER.YAML] experiment.py --args="--smoke-test"
ray submit [OPTIONS] CLUSTER_CONFIG_FILE SCRIPT

Options

--docker

Runs command in the docker container specified in cluster_config.

--stop

Stop the cluster after the command finishes running.

--start

Start the cluster if needed.

--screen

Run the command in a screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

--args <args>

Script args.

Arguments

CLUSTER_CONFIG_FILE

Required argument

SCRIPT

Required argument

ray attach

ray attach [OPTIONS] CLUSTER_CONFIG_FILE

Options

--start

Start the cluster if needed.

--screen

Run the command in screen.

--tmux

Run the command in tmux.

-n, --cluster-name <cluster_name>

Override the configured cluster name.

-N, --new

Force creation of a new screen.

-p, --port-forward <port_forward>

Port to forward. Use this multiple times to forward multiple ports.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray get_head_ip

ray get_head_ip [OPTIONS] CLUSTER_CONFIG_FILE

Options

-n, --cluster-name <cluster_name>

Override the configured cluster name.

Arguments

CLUSTER_CONFIG_FILE

Required argument

ray stack

ray stack [OPTIONS]

ray stat

ray stat [OPTIONS]

Options

--address <address>

Override the address to connect to.

ray memory

ray memory [OPTIONS]

Options

--address <address>

Override the address to connect to.

ray globalgc

ray globalgc [OPTIONS]

Options

--address <address>

Override the address to connect to.

ray timeline

ray timeline [OPTIONS]

Options

--address <address>

Override the redis address to connect to.