Serialization#

Since Ray processes do not share memory space, data transferred between workers and nodes will need to serialized and deserialized. Ray uses the Plasma object store to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization).

Overview#

Ray has decided to use a customized Pickle protocol version 5 backport to replace the original PyArrow serializer. This gets rid of several previous limitations (e.g. cannot serialize recursive objects).

Ray is currently compatible with Pickle protocol version 5, while Ray supports serialization of a wider range of objects (e.g. lambda & nested functions, dynamic classes) with the help of cloudpickle.

Plasma Object Store#

Plasma is an in-memory object store. It has been originally developed as part of Apache Arrow. Prior to Ray’s version 1.0.0 release, Ray forked Arrow’s Plasma code into Ray’s code base in order to disentangle and continue development with respect to Ray’s architecture and performance needs.

Plasma is used to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are immutable and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node.

Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node.

Serializing ObjectRefs#

Explicitly serializing ObjectRefs using ray.cloudpickle should be used as a last resort. Passing ObjectRefs through Ray task arguments and return values is the recommended approach.

Ray ObjectRefs can be serialized using ray.cloudpickle. The ObjectRef can then be deserialized and accessed with ray.get(). Note that ray.cloudpickle must be used; other pickle tools are not guaranteed to work. Additionally, the process that deserializes the ObjectRef must be part of the same Ray cluster that serialized it.

When serialized, the ObjectRef’s value will remain pinned in Ray’s shared memory object store. The object must be explicitly freed by calling ray._private.internal_api.free(obj_ref).

Warning

ray._private.internal_api.free(obj_ref) is a private API and may be changed in future Ray versions.

This code example demonstrates how to serialize an ObjectRef, store it in external storage, deserialize and use it, and lastly free its object.

import ray
from ray import cloudpickle

FILE = "external_store.pickle"

ray.init()

my_dict = {"hello": "world"}

obj_ref = ray.put(my_dict)
with open(FILE, "wb+") as f:
    cloudpickle.dump(obj_ref, f)

# ObjectRef remains pinned in memory because
# it was serialized with ray.cloudpickle.
del obj_ref

with open(FILE, "rb") as f:
    new_obj_ref = cloudpickle.load(f)

# The deserialized ObjectRef works as expected.
assert ray.get(new_obj_ref) == my_dict

# Explicitly free the object.
ray._private.internal_api.free(new_obj_ref)

Numpy Arrays#

Ray optimizes for numpy arrays by using Pickle protocol 5 with out-of-band data. The numpy array is stored as a read-only object, and all Ray workers on the same node can read the numpy array in the object store without copying (zero-copy reads). Each numpy array object in the worker process holds a pointer to the relevant array held in shared memory. Any writes to the read-only object will require the user to first copy it into the local process memory.

Tip

You can often avoid serialization issues by using only native types (e.g., numpy arrays or lists/dicts of numpy arrays and other primitive types), or by using Actors hold objects that cannot be serialized.

Fixing “assignment destination is read-only”#

Because Ray puts numpy arrays in the object store, when deserialized as arguments in remote functions they will become read-only. For example, the following code snippet will crash:

import ray
import numpy as np


@ray.remote
def f(arr):
    # arr = arr.copy()  # Adding a copy will fix the error.
    arr[0] = 1


try:
    ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
    print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
#   File "test.py", line 6, in f
#     arr[0] = 1
# ValueError: assignment destination is read-only

To avoid this issue, you can manually copy the array at the destination if you need to mutate it (arr = arr.copy()). Note that this is effectively like disabling the zero-copy deserialization feature provided by Ray.

Serialization notes#

  • Ray is currently using Pickle protocol version 5. The default pickle protocol used by most python distributions is protocol 3. Protocol 4 & 5 are more efficient than protocol 3 for larger objects.

  • For non-native objects, Ray will always keep a single copy even it is referred multiple times in an object:

    import ray
    import numpy as np
    
    obj = [np.zeros(42)] * 99
    l = ray.get(ray.put(obj))
    assert l[0] is l[1]  # no problem!
    
  • Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.

  • Lock objects are mostly unserializable, because copying a lock is meaningless and could cause serious concurrency problems. You may have to come up with a workaround if your object contains a lock.

Customized Serialization#

Sometimes you may want to customize your serialization process because the default serializer used by Ray (pickle5 + cloudpickle) does not work for you (fail to serialize some objects, too slow for certain objects, etc.).

There are at least 3 ways to define your custom serialization process:

  1. If you want to customize the serialization of a type of objects, and you have access to the code, you can define __reduce__ function inside the corresponding class. This is commonly done by most Python libraries. Example code:

    import ray
    import sqlite3
    
    class DBConnection:
        def __init__(self, path):
            self.path = path
            self.conn = sqlite3.connect(path)
    
        # without '__reduce__', the instance is unserializable.
        def __reduce__(self):
            deserializer = DBConnection
            serialized_data = (self.path,)
            return deserializer, serialized_data
    
    original = DBConnection("/tmp/db")
    print(original.conn)
    
    copied = ray.get(ray.put(original))
    print(copied.conn)
    
<sqlite3.Connection object at ...>
<sqlite3.Connection object at ...>
  1. If you want to customize the serialization of a type of objects, but you cannot access or modify the corresponding class, you can register the class with the serializer you use:

    import ray
    import threading
    
    class A:
        def __init__(self, x):
            self.x = x
            self.lock = threading.Lock()  # could not be serialized!
    
    try:
      ray.get(ray.put(A(1)))  # fail!
    except TypeError:
      pass
    
    def custom_serializer(a):
        return a.x
    
    def custom_deserializer(b):
        return A(b)
    
    # Register serializer and deserializer for class A:
    ray.util.register_serializer(
      A, serializer=custom_serializer, deserializer=custom_deserializer)
    ray.get(ray.put(A(1)))  # success!
    
    # You can deregister the serializer at any time.
    ray.util.deregister_serializer(A)
    try:
      ray.get(ray.put(A(1)))  # fail!
    except TypeError:
      pass
    
    # Nothing happens when deregister an unavailable serializer.
    ray.util.deregister_serializer(A)
    

    NOTE: Serializers are managed locally for each Ray worker. So for every Ray worker, if you want to use the serializer, you need to register the serializer. Deregister a serializer also only applies locally.

    If you register a new serializer for a class, the new serializer would replace the old serializer immediately in the worker. This API is also idempotent, there are no side effects caused by re-registering the same serializer.

  2. We also provide you an example, if you want to customize the serialization of a specific object:

    import threading
    
    class A:
        def __init__(self, x):
            self.x = x
            self.lock = threading.Lock()  # could not serialize!
    
    try:
       ray.get(ray.put(A(1)))  # fail!
    except TypeError:
       pass
    
    class SerializationHelperForA:
        """A helper class for serialization."""
        def __init__(self, a):
            self.a = a
    
        def __reduce__(self):
            return A, (self.a.x,)
    
    ray.get(ray.put(SerializationHelperForA(A(1))))  # success!
    # the serializer only works for a specific object, not all A
    # instances, so we still expect failure here.
    try:
       ray.get(ray.put(A(1)))  # still fail!
    except TypeError:
       pass
    

Troubleshooting#

Use ray.util.inspect_serializability to identify tricky pickling issues. This function can be used to trace a potential non-serializable object within any Python object – whether it be a function, class, or object instance.

Below, we demonstrate this behavior on a function with a non-serializable object (threading lock):

from ray.util import inspect_serializability
import threading

lock = threading.Lock()

def test():
    print(lock)

inspect_serializability(test, name="test")

The resulting output is:

  =============================================================
  Checking Serializability of <function test at 0x7ff130697e50>
  =============================================================
  !!! FAIL serialization: cannot pickle '_thread.lock' object
  Detected 1 global variables. Checking serializability...
      Serializing 'lock' <unlocked _thread.lock object at 0x7ff1306a9f30>...
      !!! FAIL serialization: cannot pickle '_thread.lock' object
      WARNING: Did not find non-serializable object in <unlocked _thread.lock object at 0x7ff1306a9f30>. This may be an oversight.
  =============================================================
  Variable:

      FailTuple(lock [obj=<unlocked _thread.lock object at 0x7ff1306a9f30>, parent=<function test at 0x7ff130697e50>])

  was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
  Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
  =============================================================
  Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
  If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
  =============================================================

For even more detailed information, set environmental variable RAY_PICKLE_VERBOSE_DEBUG='2' before importing Ray. This enables serialization with python-based backend instead of C-Pickle, so you can debug into python code at the middle of serialization. However, this would make serialization much slower.

Known Issues#

Users could experience memory leak when using certain python3.8 & 3.9 versions. This is due to a bug in python’s pickle module.

This issue has been solved for Python 3.8.2rc1, Python 3.9.0 alpha 4 or late versions.