Utility Classes#
Actor Pool#
The ray.util
module contains a utility class, ActorPool
.
This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors.
import ray
from ray.util import ActorPool
@ray.remote
class Actor:
def double(self, n):
return n * 2
a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])
# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
# [2, 4, 6, 8]
See the package reference
for more information.
Actor pool hasn’t been implemented in Java yet.
Actor pool hasn’t been implemented in C++ yet.
Message passing using Ray Queue#
Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or
actors, you can use ray.util.queue.Queue
.
import ray
from ray.util.queue import Queue, Empty
ray.init()
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)
@ray.remote
def consumer(id, queue):
try:
while True:
next_item = queue.get(block=True, timeout=1)
print(f"consumer {id} got work {next_item}")
except Empty:
pass
[queue.put(i) for i in range(10)]
print("Put work 1 - 10 to queue...")
consumers = [consumer.remote(id, queue) for id in range(2)]
ray.get(consumers)
Ray’s Queue API has a similar API to Python’s asyncio.Queue
and queue.Queue
.