Anti-pattern: Forking new processes in application code#

Summary: Don’t fork new processes in Ray application code-for example, in driver, tasks or actors. Instead, use “spawn” method to start new processes or use Ray tasks and actors to parallelize your workload

Ray manages the lifecycle of processes for you. Ray Objects, Tasks, and Actors manages sockets to communicate with the Raylet and the GCS. If you fork new processes in your application code, the processes could share the same sockets without any synchronization. This can lead to corrupted message and unexpected behavior.

The solution is to: 1. use “spawn” method to start new processes so that parent process’s memory space isn’t copied to the child processes or 2. use Ray tasks and actors to parallelize your workload and let Ray to manage the lifecycle of the processes for you.

Code example#

import os

os.environ["RAY_DEDUP_LOGS"] = "0"

import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import numpy as np


@ray.remote
def generate_response(request):
    print(request)
    array = np.ones(100000)
    return array


def process_response(response, idx):
    print(f"Processing response {idx}")
    return response


def main():
    ray.init()
    responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])

    # Better approach: Set the start method to "spawn"
    multiprocessing.set_start_method("spawn", force=True)

    with ProcessPoolExecutor(max_workers=4) as executor:
        future_to_task = {}
        for idx, response in enumerate(responses):
            future_to_task[executor.submit(process_response, response, idx)] = idx

        for future in as_completed(future_to_task):
            idx = future_to_task[future]
            response_entry = future.result()
            print(f"Response {idx} processed: {response_entry}")

    ray.shutdown()


if __name__ == "__main__":
    main()