Anti-pattern: Forking new processes in application code#
Summary: Don’t fork new processes in Ray application code—for example, in driver, tasks or actors. Instead, use the “spawn” method to start new processes or use Ray tasks and actors to parallelize your workload
Ray manages the lifecycle of processes for you. Ray Objects, Tasks, and Actors manage sockets to communicate with the Raylet and the GCS. If you fork new processes in your application code, the processes could share the same sockets without any synchronization. This can lead to corrupted messages and unexpected behavior.
The solution is to: 1. use the “spawn” method to start new processes so that the parent process’s memory space is not copied to the child processes or 2. use Ray tasks and actors to parallelize your workload and let Ray manage the lifecycle of the processes for you.
Code example#
import os
os.environ["RAY_DEDUP_LOGS"] = "0"
import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import numpy as np
@ray.remote
def generate_response(request):
print(request)
array = np.ones(100000)
return array
def process_response(response, idx):
print(f"Processing response {idx}")
return response
def main():
ray.init()
responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])
# Better approach: Set the start method to "spawn"
multiprocessing.set_start_method("spawn", force=True)
with ProcessPoolExecutor(max_workers=4) as executor:
future_to_task = {}
for idx, response in enumerate(responses):
future_to_task[executor.submit(process_response, response, idx)] = idx
for future in as_completed(future_to_task):
idx = future_to_task[future]
response_entry = future.result()
print(f"Response {idx} processed: {response_entry}")
ray.shutdown()
if __name__ == "__main__":
main()