Pattern: Using pipelining to increase throughput
Pattern: Using pipelining to increase throughput#
If you have multiple work items and each requires several steps to complete, you can use the pipelining technique to improve the cluster utilization and increase the throughput of your system.
Pipelining is an important technique to improve the performance and is heavily used by Ray libraries. See Ray Dataset pipelines as an example.
Example use case#
A component of your application needs to do both compute-intensive work and communicate with other processes. Ideally, you want to overlap computation and communication to saturate the CPU and increase the overall throughput.
import ray @ray.remote class WorkQueue: def __init__(self): self.queue = list(range(10)) def get_work_item(self): if self.queue: return self.queue.pop(0) else: return None @ray.remote class WorkerWithoutPipelining: def __init__(self, work_queue): self.work_queue = work_queue def process(self, work_item): print(work_item) def run(self): while True: # Get work from the remote queue. work_item = ray.get(self.work_queue.get_work_item.remote()) if work_item is None: break # Do work. self.process(work_item) @ray.remote class WorkerWithPipelining: def __init__(self, work_queue): self.work_queue = work_queue def process(self, work_item): print(work_item) def run(self): self.work_item_ref = self.work_queue.get_work_item.remote() while True: # Get work from the remote queue. work_item = ray.get(self.work_item_ref) if work_item is None: break self.work_item_ref = self.work_queue.get_work_item.remote() # Do work while we are fetching the next work item. self.process(work_item) work_queue = WorkQueue.remote() worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue) ray.get(worker_without_pipelining.run.remote()) work_queue = WorkQueue.remote() worker_with_pipelining = WorkerWithPipelining.remote(work_queue) ray.get(worker_with_pipelining.run.remote())
In the example above, a worker actor pulls work off of a queue and then does some computation on it.
Without pipelining, we call
ray.get() immediately after requesting a work item, so we block while that RPC is in flight, causing idle CPU time.
With pipelining, we instead preemptively request the next work item before processing the current one, so we can use the CPU while the RPC is in flight which increases the CPU utilization.