Utility Classes#

Actor Pool#

The ray.util module contains a utility class, ActorPool. This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors.

import ray
from ray.util import ActorPool


@ray.remote
class Actor:
    def double(self, n):
        return n * 2


a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])

# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
# [2, 4, 6, 8]

See the package reference for more information.

Actor pool hasn’t been implemented in Java yet.

Actor pool hasn’t been implemented in C++ yet.

Message passing using Ray Queue#

Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or actors, you can use ray.util.queue.Queue.

import ray
from ray.util.queue import Queue, Empty

ray.init()
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)


@ray.remote
def consumer(id, queue):
    try:
        while True:
            next_item = queue.get(block=True, timeout=1)
            print(f"consumer {id} got work {next_item}")
    except Empty:
        pass


[queue.put(i) for i in range(10)]
print("Put work 1 - 10 to queue...")

consumers = [consumer.remote(id, queue) for id in range(2)]
ray.get(consumers)

Ray’s Queue API has a similar API to Python’s asyncio.Queue and queue.Queue.