Task Lifecycle#
This doc talks about the lifecycle of a task in Ray Core, including how tasks are defined, scheduled and executed. We will use the following code as an example and the internals are based on Ray 2.48.
import ray
@ray.remote
def my_task(arg):
return f"Hello, {arg}!"
obj_ref = my_task.remote("Ray")
print(ray.get(obj_ref))
Hello, Ray!
Defining a remote function#
The first step in the task lifecycle is defining a remote function using the ray.remote()
decorator. ray.remote()
wraps the Python function and returns an instance of RemoteFunction.
RemoteFunction
stores the underlying function and all the user specified Ray task options
such as num_cpus
.
Invoking a remote function#
Once a remote function is defined, it can be invoked using the .remote()
method. Each invocation of a remote function creates a Ray task. This method submits the task for execution and returns an object reference (ObjectRef
) that can be used to retrieve the result later.
Under the hood, .remote()
does the following:
Pickles the underlying function into bytes and stores the bytes in GCS key-value store with a key so that, later on, the remote executor (the core worker process that will execute the task) can get the bytes, unpickle, and execute the function. This is done once per remote function definition instead of once per invocation.
Calls Cython submit_task which prepares the arguments (3 types) and calls the C++ CoreWorker::SubmitTask.
Pass-by-reference argument: the argument is an
ObjectRef
.Pass-by-value inline argument: the argument is a small Python object and the total size of such arguments so far is below the threshold. In this case, it will be pickled, sent to the remote executor (as part of the
PushTask
RPC), and unpickled there. This is called inlining and plasma store is not involved in this case.Pass-by-value non-inline argument: the argument is a normal Python object but it doesn’t meet the inline criteria (e.g. size is too big), it is put in the local plasma store and the argument is replaced by the generated
ObjectRef
, so it’s effectively equivalent to.remote(ray.put(arg))
.
CoreWorker
builds a TaskSpecification that contains all the information about the task including the ID of the function, all the user specified options and the arguments. This spec will be sent to the executor for execution.The TaskSpecification is submitted to NormalTaskSubmitter asynchronously. This means the
.remote()
call returns immediately and the task is scheduled and executed asynchronously.
Scheduling a task#
Once the task is submitted to NormalTaskSubmitter
, a worker process on some Ray node is selected to execute the task and this process is called scheduling.
NormalTaskSubmitter
first waits for all theObjectRef
arguments to be available. Available means tasks that produce thoseObjectRef
s finished execution and the data is available somewhere in the cluster.If the object pointed to by the
ObjectRef
is in the plasma store, theObjectRef
itself is sent to the executor and the executor will resolve theObjectRef
to the actual data (pull from remote plasma store if needed) before calling the user function.If the object pointed to by the
ObjectRef
is in the caller memory store, the data is inlined and sent to the executor as part of thePushTask
RPC just like other pass-by-value inline arguments.
Once all the arguments are available,
NormalTaskSubmitter
will try to find an idle worker to execute the task.NormalTaskSubmitter
gets workers for task execution from raylet via a process called worker lease and this is where scheduling happens. Specifically, it will send aRequestWorkerLease
RPC to a selected (it’s either the local raylet or a data-locality-favored raylet) raylet for a worker lease.Raylet handles the
RequestWorkerLease
RPC.When the
RequestWorkerLease
RPC returns and a leased worker address is included in the response, a worker lease is granted to the caller to execute the task. If theRequestWorkerLease
response contains another raylet address instead,NormalTaskSubmitter
will then worker lease from the specified raylet. This process continues until a worker lease is obtained.
Executing a task#
Once a leased worker is obtained, the task execution starts.
NormalTaskSubmitter
sends aPushTask
RPC to the leased worker with theTaskSpecification
to execute.The executor receives the
PushTask
RPC and executes (1 -> 2 -> 3 -> 4 -> 5) the task.First step of executing the task is getting all the pass-by-reference arguments from the local plasma store (data is already pulled from remote plasma store to the local plasma store during scheduling).
Then the executor gets the pickled function bytes from GCS key-value store and unpickles it.
The next step is unpickling the arguments.
Finally, the user function is called.
Getting the return value#
After the user function is executed, the caller can get the return values.
After the user function returns, the executor gets and stores all the return values. If the return value is a small object and the total size of such return values so far is below the threshold, it is returned directly to the caller as part of the
PushTask
RPC response. Otherwise, it is put in the local plasma store and the reference is returned to the caller.When the caller receives the
PushTask
RPC response, it stores the return values (actual data if the return value is small or a special value indicating the data is in plasma store if the return value is big) in the local memory store.When the return value is added to the local memory store,
ray.get()
is unblocked and returns the value directly if the object is small, or it will get from the local plasma store (pull from remote plasma store first if needed) if the object is big.