Pattern: Using asyncio to run actor methods concurrently#

By default, a Ray actor runs in a single thread and actor method calls are executed sequentially. This means that a long running method call blocks all the following ones. In this pattern, we use await to yield control from the long running method call so other method calls can run concurrently. Normally the control is yielded when the method is doing IO operations but you can also use await asyncio.sleep(0) to yield control explicitly.

Note

You can also use threaded actors to achieve concurrency.

Example use case#

You have an actor with a long polling method that continuously fetches tasks from the remote store and executes them. You also want to query the number of tasks executed while the long polling method is running.

With the default actor, the code will look like this:

import ray


@ray.remote
class TaskStore:
    def get_next_task(self):
        return "task"


@ray.remote
class TaskExecutor:
    def __init__(self, task_store):
        self.task_store = task_store
        self.num_executed_tasks = 0

    def run(self):
        while True:
            task = ray.get(task_store.get_next_task.remote())
            self._execute_task(task)

    def _execute_task(self, task):
        # Executing the task
        self.num_executed_tasks = self.num_executed_tasks + 1

    def get_num_executed_tasks(self):
        return self.num_executed_tasks


task_store = TaskStore.remote()
task_executor = TaskExecutor.remote(task_store)
task_executor.run.remote()
try:
    # This will timeout since task_executor.run occupies the entire actor thread
    # and get_num_executed_tasks cannot run.
    ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5)
except ray.exceptions.GetTimeoutError:
    print("get_num_executed_tasks didn't finish in 5 seconds")

This is problematic because TaskExecutor.run method runs forever and never yield the control to run other methods. We can solve this problem by using async actors and use await to yield control:

@ray.remote
class AsyncTaskExecutor:
    def __init__(self, task_store):
        self.task_store = task_store
        self.num_executed_tasks = 0

    async def run(self):
        while True:
            # Here we use await instead of ray.get() to
            # wait for the next task and it will yield
            # the control while waiting.
            task = await task_store.get_next_task.remote()
            self._execute_task(task)

    def _execute_task(self, task):
        # Executing the task
        self.num_executed_tasks = self.num_executed_tasks + 1

    def get_num_executed_tasks(self):
        return self.num_executed_tasks


async_task_executor = AsyncTaskExecutor.remote(task_store)
async_task_executor.run.remote()
# We are able to run get_num_executed_tasks while run method is running.
num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote())
print(f"num of executed tasks so far: {num_executed_tasks}")

Here, instead of using the blocking ray.get() to get the value of an ObjectRef, we use await so it can yield the control while we are waiting for the object to be fetched.