Pattern: Using an actor to synchronize other tasks and actors#
When you have multiple tasks that need to wait on some condition or otherwise need to synchronize across tasks & actors on a cluster, you can use a central actor to coordinate among them.
Example use case#
You can use an actor to implement a distributed asyncio.Event
that multiple tasks can wait on.
Code example#
import asyncio
import ray
# We set num_cpus to zero because this actor will mostly just block on I/O.
@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self, clear=False):
self.ready_event.set()
if clear:
self.ready_event.clear()
async def wait(self, should_wait=True):
if should_wait:
await self.ready_event.wait()
@ray.remote
def wait_and_go(signal):
ray.get(signal.wait.remote())
print("go!")
signal = SignalActor.remote()
tasks = [wait_and_go.remote(signal) for _ in range(4)]
print("ready...")
# Tasks will all be waiting for the signals.
print("set..")
ray.get(signal.send.remote())
# Tasks are unblocked.
ray.get(tasks)
# Output is:
# ready...
# set..
# (wait_and_go pid=77366) go!
# (wait_and_go pid=77372) go!
# (wait_and_go pid=77367) go!
# (wait_and_go pid=77358) go!