Memory Management

This page describes how memory management works in Ray.

Concepts

There are several ways that Ray applications use memory:

_images/memory.svg
Ray system memory: this is memory used internally by Ray
  • Redis: memory used for storing the list of nodes and actors present in the cluster. The amount of memory used for these purposes is typically quite small.

  • Raylet: memory used by the C++ raylet process running on each node. This cannot be controlled, but is typically quite small.

Application memory: this is memory used by your application
  • Worker heap: memory used by your application (e.g., in Python code or TensorFlow), best measured as the resident set size (RSS) of your application minus its shared memory usage (SHR) in commands such as top. The reason you need to subtract SHR is that object store shared memory is reported by the OS as shared with each worker. Not subtracting SHR will result in double counting memory usage.

  • Object store memory: memory used when your application creates objects in the object store via ray.put and when returning values from remote functions. Objects are reference counted and evicted when they fall out of scope. There is an object store server running on each node. In Ray 1.3+, objects will be spilled to disk if the object store fills up.

  • Object store shared memory: memory used when your application reads objects via ray.get. Note that if an object is already present on the node, this does not cause additional allocations. This allows large objects to be efficiently shared among many actors and tasks.

ObjectRef Reference Counting

Ray implements distributed reference counting so that any ObjectRef in scope in the cluster is pinned in the object store. This includes local python references, arguments to pending tasks, and IDs serialized inside of other objects.

Debugging using ‘ray memory’

The ray memory command can be used to help track down what ObjectRef references are in scope and may be causing an ObjectStoreFullError.

Running ray memory from the command line while a Ray application is running will give you a dump of all of the ObjectRef references that are currently held by the driver, actors, and tasks in the cluster.

======== Object references status: 2021-02-23 22:02:22.072221 ========
Grouping by node address...        Sorting by object size...


--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
287 MiB              4                 0             0              1                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  6465   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  15 MiB  LOCAL_REFERENCE     (put object)
                                                                                                                  | test.py:
                                                                                                                  <module>:17

192.168.0.15  6465   Driver  a67dc375e60ddd1affffffffffffffffffffffff0100000001000000  15 MiB  LOCAL_REFERENCE     (task call)
                                                                                                                  | test.py:
                                                                                                                  :<module>:18

192.168.0.15  6465   Driver  ffffffffffffffffffffffffffffffffffffffff0100000002000000  18 MiB  CAPTURED_IN_OBJECT  (put object)  |
                                                                                                                   test.py:
                                                                                                                  <module>:19

192.168.0.15  6465   Driver  ffffffffffffffffffffffffffffffffffffffff0100000004000000  21 MiB  LOCAL_REFERENCE     (put object)  |
                                                                                                                   test.py:
                                                                                                                  <module>:20

192.168.0.15  6465   Driver  ffffffffffffffffffffffffffffffffffffffff0100000003000000  218 MiB  LOCAL_REFERENCE     (put object)  |
                                                                                                                  test.py:
                                                                                                                  <module>:20

--- Aggregate object store stats across all nodes ---
Plasma memory usage 0 MiB, 4 objects, 0.0% full

Each entry in this output corresponds to an ObjectRef that’s currently pinning an object in the object store along with where the reference is (in the driver, in a worker, etc.), what type of reference it is (see below for details on the types of references), the size of the object in bytes, the process ID and IP address where the object was instantiated, and where in the application the reference was created.

ray memory comes with features to make the memory debugging experience more effective. For example, you can add arguments sort-by=OBJECT_SIZE and group-by=STACK_TRACE, which may be particularly helpful for tracking down the line of code where a memory leak occurs. You can see the full suite of options by running ray memory --help.

There are five types of references that can keep an object pinned:

1. Local ObjectRef references

@ray.remote
def f(arg):
    return arg

a = ray.put(None)
b = f.remote(None)

In this example, we create references to two objects: one that is ray.put() in the object store and another that’s the return value from f.remote().

--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
30 MiB               2                 0             0              0                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  6867   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  15 MiB  LOCAL_REFERENCE     (put object)  |
                                                                                                                  test.py:
                                                                                                                  <module>:12

192.168.0.15  6867   Driver  a67dc375e60ddd1affffffffffffffffffffffff0100000001000000  15 MiB  LOCAL_REFERENCE     (task call)
                                                                                                                  | test.py:
                                                                                                                  :<module>:13

In the output from ray memory, we can see that each of these is marked as a LOCAL_REFERENCE in the driver process, but the annotation in the “Reference Creation Site” indicates that the first was created as a “put object” and the second from a “task call.”

2. Objects pinned in memory

import numpy as np

a = ray.put(np.zeros(1))
b = ray.get(a)
del a

In this example, we create a numpy array and then store it in the object store. Then, we fetch the same numpy array from the object store and delete its ObjectRef. In this case, the object is still pinned in the object store because the deserialized copy (stored in b) points directly to the memory in the object store.

--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
243 MiB              0                 1             0              0                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  7066   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  243 MiB  PINNED_IN_MEMORY   test.
                                                                                                                  py:<module>:19

The output from ray memory displays this as the object being PINNED_IN_MEMORY. If we del b, the reference can be freed.

3. Pending task references

@ray.remote
def f(arg):
    while True:
        pass

a = ray.put(None)
b = f.remote(a)

In this example, we first create an object via ray.put() and then submit a task that depends on the object.

--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
25 MiB               1                 1             1              0                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  7207   Driver  a67dc375e60ddd1affffffffffffffffffffffff0100000001000000  ?       LOCAL_REFERENCE     (task call)
                                                                                                                    | test.py:
                                                                                                                  :<module>:29

192.168.0.15  7241   Worker  ffffffffffffffffffffffffffffffffffffffff0100000001000000  10 MiB  PINNED_IN_MEMORY    (deserialize task arg)
                                                                                                                    __main__.f

192.168.0.15  7207   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  15 MiB  USED_BY_PENDING_TASK  (put object)  |
                                                                                                                  test.py:
                                                                                                                  <module>:28

While the task is running, we see that ray memory shows both a LOCAL_REFERENCE and a USED_BY_PENDING_TASK reference for the object in the driver process. The worker process also holds a reference to the object because it is PINNED_IN_MEMORY, because the Python arg is directly referencing the memory in the plasma, so it can’t be evicted.

4. Serialized ObjectRef references

@ray.remote
def f(arg):
    while True:
        pass

a = ray.put(None)
b = f.remote([a])

In this example, we again create an object via ray.put(), but then pass it to a task wrapped in another object (in this case, a list).

--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
15 MiB               2                 0             1              0                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  7411   Worker  ffffffffffffffffffffffffffffffffffffffff0100000001000000  ?       LOCAL_REFERENCE     (deserialize task arg)
                                                                                                                    __main__.f

192.168.0.15  7373   Driver  a67dc375e60ddd1affffffffffffffffffffffff0100000001000000  ?       LOCAL_REFERENCE     (task call)
                                                                                                                  | test.py:
                                                                                                                  :<module>:38

192.168.0.15  7373   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  15 MiB  USED_BY_PENDING_TASK  (put object)
                                                                                                                  | test.py:
                                                                                                                  <module>:37

Now, both the driver and the worker process running the task hold a LOCAL_REFERENCE to the object in addition to it being USED_BY_PENDING_TASK on the driver. If this was an actor task, the actor could even hold a LOCAL_REFERENCE after the task completes by storing the ObjectRef in a member variable.

5. Captured ObjectRef references

a = ray.put(None)
b = ray.put([a])
del a

In this example, we first create an object via ray.put(), then capture its ObjectRef inside of another ray.put() object, and delete the first ObjectRef. In this case, both objects are still pinned.

--- Summary for node address: 192.168.0.15 ---
Mem Used by Objects  Local References  Pinned Count  Pending Tasks  Captured in Objects  Actor Handles
233 MiB              1                 0             0              1                    0

--- Object references for node address: 192.168.0.15 ---
IP Address    PID    Type    Object Ref                                                Size    Reference Type      Call Site
192.168.0.15  7473   Driver  ffffffffffffffffffffffffffffffffffffffff0100000001000000  15 MiB  CAPTURED_IN_OBJECT  (put object)  |
                                                                                                                  test.py:
                                                                                                                  <module>:41

192.168.0.15  7473   Driver  ffffffffffffffffffffffffffffffffffffffff0100000002000000  218 MiB  LOCAL_REFERENCE     (put object)  |
                                                                                                                  test.py:
                                                                                                                  <module>:42

In the output of ray memory, we see that the second object displays as a normal LOCAL_REFERENCE, but the first object is listed as CAPTURED_IN_OBJECT.

Object Spilling

Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to Ray’s temporary directory in the local filesystem.

Single node

Ray uses object spilling by default. Without any setting, objects are spilled to [temp_folder]/spill. temp_folder is /tmp for Linux and MacOS by default.

To configure the directory where objects are placed, use:

ray.init(
    _system_config={
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
        )
    },
)

You can also specify multiple directories for spilling to spread the IO load and disk space usage across multiple physical devices if needed (e.g., SSD devices):

ray.init(
    _system_config={
        "max_io_workers": 4,  # More IO workers for local storage. Each IO worker tries using a different directories.
        "object_spilling_config": json.dumps(
            {
              "type": "filesystem",
              "params": {
                # Each directory could mount at different devices.
                "directory_path": [
                  "/tmp/spill",
                  "/tmp/spill_1",
                  "/tmp/spill_2"}},
        )
    },
)

Note

To optimize the performance, it is recommended to use SSD instead of HD when using object spilling for memory intensive workloads.

To enable object spilling to remote storage (any URI supported by smart_open):

ray.init(
    _system_config={
        "max_io_workers": 4,  # More IO workers for remote storage.
        "min_spilling_size": 100 * 1024 * 1024,  # Spill at least 100MB at a time.
        "object_spilling_config": json.dumps(
            {"type": "smart_open", "params": {"uri": "s3:///bucket/path"}},
        )
    },
)

Remote storage support is still experimental.

Cluster mode

To enable object spilling in multi node clusters:

# Note that `object_spilling_config`'s value should be json format.
ray start --head --system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'

Stats

When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., /tmp/ray/session_latest/logs/raylet.out):

local_object_manager.cc:166: Spilled 50 MiB, 1 objects, write throughput 230 MiB/s
local_object_manager.cc:334: Restored 50 MiB, 1 objects, read throughput 505 MiB/s

You can also view cluster-wide spill stats by using the ray memory command:

--- Aggregate object store stats across all nodes ---
Plasma memory usage 50 MiB, 1 objects, 50.0% full
Spilled 200 MiB, 4 objects, avg write throughput 570 MiB/s
Restored 150 MiB, 3 objects, avg read throughput 1361 MiB/s

If you only want to display cluster-wide spill stats, use ray memory --stats-only.

Preallocating object store memory

By default, object store memory pages are lazily requested from the operating system. However, for more consistent performance, and to avoid SIGBUS errors should there be insufficient memory space in /dev/shm, you can tell Ray to preallocate all pages used by the object store on startup. You can do this by setting the env var RAY_PREALLOCATE_PLASMA_MEMORY=1. Note that it takes some time to preallocate the memory (a few GB/s), which may slow down Ray startup.

Memory Aware Scheduling

By default, Ray does not take into account the potential memory usage of a task or actor when scheduling. This is simply because it cannot estimate ahead of time how much memory is required. However, if you know how much memory a task or actor requires, you can specify it in the resource requirements of its ray.remote decorator to enable memory-aware scheduling:

Important

Specifying a memory requirement does NOT impose any limits on memory usage. The requirements are used for admission control during scheduling only (similar to how CPU scheduling works in Ray). It is up to the task itself to not use more memory than it requested.

To tell the Ray scheduler a task or actor requires a certain amount of available memory to run, set the memory argument. The Ray scheduler will then reserve the specified amount of available memory during scheduling, similar to how it handles CPU and GPU resources:

# reserve 500MiB of available memory to place this task
@ray.remote(memory=500 * 1024 * 1024)
def some_function(x):
    pass

# reserve 2.5GiB of available memory to place this actor
@ray.remote(memory=2500 * 1024 * 1024)
class SomeActor(object):
    def __init__(self, a, b):
        pass

In the above example, the memory quota is specified statically by the decorator, but you can also set them dynamically at runtime using .options() as follows:

# override the memory quota to 100MiB when submitting the task
some_function.options(memory=100 * 1024 * 1024).remote(x=1)

# override the memory quota to 1GiB when creating the actor
SomeActor.options(memory=1000 * 1024 * 1024).remote(a=1, b=2)

Questions or Issues?

You can post questions or issues or feedback through the following channels:

  1. Discussion Board: For questions about Ray usage or feature requests.

  2. GitHub Issues: For bug reports.

  3. Ray Slack: For getting in touch with Ray maintainers.

  4. StackOverflow: Use the [ray] tag questions about Ray.