Task Lifecycle#

This doc talks about the lifecycle of a task in Ray Core, including how tasks are defined, scheduled and executed. We will use the following code as an example and the internals are based on Ray 2.48.

import ray

@ray.remote
def my_task(arg):
    return f"Hello, {arg}!"

obj_ref = my_task.remote("Ray")
print(ray.get(obj_ref))
Hello, Ray!

Defining a remote function#

The first step in the task lifecycle is defining a remote function using the ray.remote() decorator. ray.remote() wraps the Python function and returns an instance of RemoteFunction. RemoteFunction stores the underlying function and all the user specified Ray task options such as num_cpus.

Invoking a remote function#

Once a remote function is defined, it can be invoked using the .remote() method. Each invocation of a remote function creates a Ray task. This method submits the task for execution and returns an object reference (ObjectRef) that can be used to retrieve the result later. Under the hood, .remote() does the following:

  1. Pickles the underlying function into bytes and stores the bytes in GCS key-value store with a key so that, later on, the remote executor (the core worker process that will execute the task) can get the bytes, unpickle, and execute the function. This is done once per remote function definition instead of once per invocation.

  2. Calls Cython submit_task which prepares the arguments (3 types) and calls the C++ CoreWorker::SubmitTask.

    1. Pass-by-reference argument: the argument is an ObjectRef.

    2. Pass-by-value inline argument: the argument is a small Python object and the total size of such arguments so far is below the threshold. In this case, it will be pickled, sent to the remote executor (as part of the PushTask RPC), and unpickled there. This is called inlining and plasma store is not involved in this case.

    3. Pass-by-value non-inline argument: the argument is a normal Python object but it doesn’t meet the inline criteria (e.g. size is too big), it is put in the local plasma store and the argument is replaced by the generated ObjectRef, so it’s effectively equivalent to .remote(ray.put(arg)).

  3. CoreWorker builds a TaskSpecification that contains all the information about the task including the ID of the function, all the user specified options and the arguments. This spec will be sent to the executor for execution.

  4. The TaskSpecification is submitted to NormalTaskSubmitter asynchronously. This means the .remote() call returns immediately and the task is scheduled and executed asynchronously.

Scheduling a task#

Once the task is submitted to NormalTaskSubmitter, a worker process on some Ray node is selected to execute the task and this process is called scheduling.

  1. NormalTaskSubmitter first waits for all the ObjectRef arguments to be available. Available means tasks that produce those ObjectRefs finished execution and the data is available somewhere in the cluster.

    1. If the object pointed to by the ObjectRef is in the plasma store, the ObjectRef itself is sent to the executor and the executor will resolve the ObjectRef to the actual data (pull from remote plasma store if needed) before calling the user function.

    2. If the object pointed to by the ObjectRef is in the caller memory store, the data is inlined and sent to the executor as part of the PushTask RPC just like other pass-by-value inline arguments.

  2. Once all the arguments are available, NormalTaskSubmitter will try to find an idle worker to execute the task. NormalTaskSubmitter gets workers for task execution from raylet via a process called worker lease and this is where scheduling happens. Specifically, it will send a RequestWorkerLease RPC to a selected (it’s either the local raylet or a data-locality-favored raylet) raylet for a worker lease.

  3. Raylet handles the RequestWorkerLease RPC.

  4. When the RequestWorkerLease RPC returns and a leased worker address is included in the response, a worker lease is granted to the caller to execute the task. If the RequestWorkerLease response contains another raylet address instead, NormalTaskSubmitter will then worker lease from the specified raylet. This process continues until a worker lease is obtained.

Executing a task#

Once a leased worker is obtained, the task execution starts.

  1. NormalTaskSubmitter sends a PushTask RPC to the leased worker with the TaskSpecification to execute.

  2. The executor receives the PushTask RPC and executes (1 -> 2 -> 3 -> 4 -> 5) the task.

  3. First step of executing the task is getting all the pass-by-reference arguments from the local plasma store (data is already pulled from remote plasma store to the local plasma store during scheduling).

  4. Then the executor gets the pickled function bytes from GCS key-value store and unpickles it.

  5. The next step is unpickling the arguments.

  6. Finally, the user function is called.

Getting the return value#

After the user function is executed, the caller can get the return values.

  1. After the user function returns, the executor gets and stores all the return values. If the return value is a small object and the total size of such return values so far is below the threshold, it is returned directly to the caller as part of the PushTask RPC response. Otherwise, it is put in the local plasma store and the reference is returned to the caller.

  2. When the caller receives the PushTask RPC response, it stores the return values (actual data if the return value is small or a special value indicating the data is in plasma store if the return value is big) in the local memory store.

  3. When the return value is added to the local memory store, ray.get() is unblocked and returns the value directly if the object is small, or it will get from the local plasma store (pull from remote plasma store first if needed) if the object is big.