Pattern: Using pipelining to increase throughput#

If you have multiple work items and each requires several steps to complete, you can use the pipelining technique to improve the cluster utilization and increase the throughput of your system.

Note

Pipelining is an important technique to improve the performance and is heavily used by Ray libraries. See Ray Data as an example.

../../_images/pipelining.svg

Example use case#

A component of your application needs to do both compute-intensive work and communicate with other processes. Ideally, you want to overlap computation and communication to saturate the CPU and increase the overall throughput.

Code example#

import ray


@ray.remote
class WorkQueue:
    def __init__(self):
        self.queue = list(range(10))

    def get_work_item(self):
        if self.queue:
            return self.queue.pop(0)
        else:
            return None


@ray.remote
class WorkerWithoutPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_queue.get_work_item.remote())

            if work_item is None:
                break

            # Do work.
            self.process(work_item)


@ray.remote
class WorkerWithPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        self.work_item_ref = self.work_queue.get_work_item.remote()

        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_item_ref)

            if work_item is None:
                break

            self.work_item_ref = self.work_queue.get_work_item.remote()

            # Do work while we are fetching the next work item.
            self.process(work_item)


work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())

work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())

In the example above, a worker actor pulls work off of a queue and then does some computation on it. Without pipelining, we call ray.get() immediately after requesting a work item, so we block while that RPC is in flight, causing idle CPU time. With pipelining, we instead preemptively request the next work item before processing the current one, so we can use the CPU while the RPC is in flight which increases the CPU utilization.