Getting Started#

Use Ray to scale applications on your laptop or the cloud. Choose the right guide for your task.

Ray AI Libraries Quickstart#

Use individual libraries for ML workloads. Click on the dropdowns for your workload below.

ray Data: Scalable Datasets for ML

Scale offline inference and training ingest with Ray Data – a data processing library designed for ML.

To learn more, see Offline batch inference and Data preprocessing and ingest for ML training.

Note

To run this example, install Ray Data:

pip install -U "ray[data]"
from typing import Dict
import numpy as np
import ray

# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal length (cm)"]
    width = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = length * width
    return batch

transformed_ds = ds.map_batches(compute_area)

# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
    print(batch)

# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

Learn more about Ray Data

ray Train: Distributed Model Training

Ray Train abstracts away the complexity of setting up a distributed training system.

This example shows how you can use Ray Train with PyTorch.

To run this example install Ray Train and PyTorch packages:

Note

pip install -U "ray[train]" torch torchvision

Set up your dataset and model.

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

def get_dataset():
    return datasets.FashionMNIST(
        root="/tmp/data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, inputs):
        inputs = self.flatten(inputs)
        logits = self.linear_relu_stack(inputs)
        return logits

Now define your single-worker PyTorch training function.

def train_func():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)

    model = NeuralNetwork()

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

This training function can be executed with:

    train_func()

Convert this to a distributed multi-worker training function.

Use the ray.train.torch.prepare_model and ray.train.torch.prepare_data_loader utility functions to set up your model and data for distributed training. This automatically wraps the model with DistributedDataParallel and places it on the right device, and adds DistributedSampler to the DataLoaders.

from ray import train

def train_func_distributed():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)
    dataloader = train.torch.prepare_data_loader(dataloader)

    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

Instantiate a TorchTrainer with 4 workers, and use it to run the new training function.

    from ray.train.torch import TorchTrainer
    from ray.train import ScalingConfig

    # For GPU Training, set `use_gpu` to True.
    use_gpu = False

    trainer = TorchTrainer(
        train_func_distributed,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu)
    )

    results = trainer.fit()

This example shows how you can use Ray Train to set up Multi-worker training with Keras.

To run this example install Ray Train and Tensorflow packages:

Note

pip install -U "ray[train]" tensorflow

Set up your dataset and model.

import numpy as np
import tensorflow as tf

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

Now define your single-worker TensorFlow training function.

def train_func():
    batch_size = 64
    single_worker_dataset = mnist_dataset(batch_size)
    single_worker_model = build_and_compile_cnn_model()
    single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

This training function can be executed with:

    train_func()

Now convert this to a distributed multi-worker training function.

  1. Set the global batch size - each worker processes the same size batch as in the single-worker code.

  2. Choose your TensorFlow distributed training strategy. This examples uses the MultiWorkerMirroredStrategy.

import json
import os

def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Instantiate a TensorflowTrainer with 4 workers, and use it to run the new training function.

    from ray.train.tensorflow import TensorflowTrainer
    from ray.train import ScalingConfig

    # For GPU Training, set `use_gpu` to True.
    use_gpu = False

    trainer = TensorflowTrainer(train_func_distributed, scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu))

    trainer.fit()

Learn more about Ray Train

ray Tune: Hyperparameter Tuning at Scale

Tune is a library for hyperparameter tuning at any scale. With Tune, you can launch a multi-node distributed hyperparameter sweep in less than 10 lines of code. Tune supports any deep learning framework, including PyTorch, TensorFlow, and Keras.

Note

To run this example, install Ray Tune:

pip install -U "ray[tune]"

This example runs a small grid search with an iterative training function.

from ray import train, tune


def objective(config):  # ①
    score = config["a"] ** 2 + config["b"]
    return {"score": score}


search_space = {  # ②
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)  # ③

results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config)

If TensorBoard is installed, automatically visualize all trial results:

tensorboard --logdir ~/ray_results

Learn more about Ray Tune

ray Serve: Scalable Model Serving

Ray Serve is a scalable model-serving library built on Ray.

Note

To run this example, install Ray Serve and scikit-learn:

pip install -U "ray[serve]" scikit-learn

This example runs serves a scikit-learn gradient boosting classifier.

import requests
from starlette.requests import Request
from typing import Dict

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

from ray import serve


# Train model.
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])


@serve.deployment
class BoostingModel:
    def __init__(self, model):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    async def __call__(self, request: Request) -> Dict:
        payload = (await request.json())["vector"]
        print(f"Received http request with data {payload}")

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model.
serve.run(BoostingModel.bind(model), route_prefix="/iris")

# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get(
    "http://localhost:8000/iris", json=sample_request_input)
print(response.text)

As a result you will see {"result": "versicolor"}.

Learn more about Ray Serve

ray RLlib: Industry-Grade Reinforcement Learning

RLlib is an industry-grade library for reinforcement learning (RL) built on top of Ray. RLlib offers high scalability and unified APIs for a variety of industry- and research applications.

Note

To run this example, install rllib and either tensorflow or pytorch:

pip install -U "ray[rllib]" tensorflow  # or torch
import gymnasium as gym
from ray.rllib.algorithms.ppo import PPOConfig


# Define your problem using python and Farama-Foundation's gymnasium API:
class SimpleCorridor(gym.Env):
    """Corridor in which an agent must learn to move right to reach the exit.

    ---------------------
    | S | 1 | 2 | 3 | G |   S=start; G=goal; corridor_length=5
    ---------------------

    Possible actions to chose from are: 0=left; 1=right
    Observations are floats indicating the current field index, e.g. 0.0 for
    starting position, 1.0 for the field next to the starting position, etc..
    Rewards are -0.1 for all steps, except when reaching the goal (+1.0).
    """

    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = gym.spaces.Discrete(2)  # left and right
        self.observation_space = gym.spaces.Box(0.0, self.end_pos, shape=(1,))

    def reset(self, *, seed=None, options=None):
        """Resets the episode.

        Returns:
           Initial observation of the new episode and an info dict.
        """
        self.cur_pos = 0
        # Return initial observation.
        return [self.cur_pos], {}

    def step(self, action):
        """Takes a single step in the episode given `action`.

        Returns:
            New observation, reward, terminated-flag, truncated-flag, info-dict (empty).
        """
        # Walk left.
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        # Walk right.
        elif action == 1:
            self.cur_pos += 1
        # Set `terminated` flag when end of corridor (goal) reached.
        terminated = self.cur_pos >= self.end_pos
        truncated = False
        # +1 when goal reached, otherwise -1.
        reward = 1.0 if terminated else -0.1
        return [self.cur_pos], reward, terminated, truncated, {}


# Create an RLlib Algorithm instance from a PPOConfig object.
config = (
    PPOConfig().environment(
        # Env class to use (here: our gym.Env sub-class from above).
        env=SimpleCorridor,
        # Config dict to be passed to our custom env's constructor.
        # Use corridor with 20 fields (including S and G).
        env_config={"corridor_length": 28},
    )
    # Parallelize environment rollouts.
    .rollouts(num_rollout_workers=3)
)
# Construct the actual (PPO) algorithm object from the config.
algo = config.build()

# Train for n iterations and report results (mean episode rewards).
# Since we have to move at least 19 times in the env to reach the goal and
# each move gives us -0.1 reward (except the last move at the end: +1.0),
# we can expect to reach an optimal episode reward of -0.1*18 + 1.0 = -0.8
for i in range(5):
    results = algo.train()
    print(f"Iter: {i}; avg. reward={results['episode_reward_mean']}")

# Perform inference (action computations) based on given env observations.
# Note that we are using a slightly different env here (len 10 instead of 20),
# however, this should still work as the agent has (hopefully) learned
# to "just always walk right!"
env = SimpleCorridor({"corridor_length": 10})
# Get the initial observation (should be: [0.0] for the starting position).
obs, info = env.reset()
terminated = truncated = False
total_reward = 0.0
# Play one episode.
while not terminated and not truncated:
    # Compute a single action, given the current observation
    # from the environment.
    action = algo.compute_single_action(obs)
    # Apply the computed action in the environment.
    obs, reward, terminated, truncated, info = env.step(action)
    # Sum up rewards for reporting purposes.
    total_reward += reward
# Report results.
print(f"Played 1 episode; total-reward={total_reward}")

Learn more about Ray RLlib

Ray Core Quickstart#

Turn functions and classes easily into Ray tasks and actors, for Python and Java, with simple primitives for building and running distributed applications.

ray Core: Parallelizing Functions with Ray Tasks

Note

To run this example install Ray Core:

pip install -U "ray"

Import Ray and and initialize it with ray.init(). Then decorate the function with @ray.remote to declare that you want to run this function remotely. Lastly, call the function with .remote() instead of calling it normally. This remote call yields a future, a Ray object reference, that you can then fetch with ray.get.

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]

Note

To run this example, add the ray-api and ray-runtime dependencies in your project.

Use Ray.init to initialize Ray runtime. Then use Ray.task(...).remote() to convert any Java static method into a Ray task. The task runs asynchronously in a remote worker process. The remote method returns an ObjectRef, and you can fetch the actual result with get.

import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;

public class RayDemo {

    public static int square(int x) {
        return x * x;
    }

    public static void main(String[] args) {
        // Intialize Ray runtime.
        Ray.init();
        List<ObjectRef<Integer>> objectRefList = new ArrayList<>();
        // Invoke the `square` method 4 times remotely as Ray tasks.
        // The tasks will run in parallel in the background.
        for (int i = 0; i < 4; i++) {
            objectRefList.add(Ray.task(RayDemo::square, i).remote());
        }
        // Get the actual results of the tasks.
        System.out.println(Ray.get(objectRefList));  // [0, 1, 4, 9]
    }
}

In the above code block we defined some Ray Tasks. While these are great for stateless operations, sometimes you must maintain the state of your application. You can do that with Ray Actors.

Learn more about Ray Core

ray Core: Parallelizing Classes with Ray Actors

Ray provides actors to allow you to parallelize an instance of a class in Python or Java. When you instantiate a class that is a Ray actor, Ray will start a remote instance of that class in the cluster. This actor can then execute remote method calls and maintain its own internal state.

Note

To run this example install Ray Core:

pip install -U "ray"
import ray
ray.init() # Only call this once.

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]

Note

To run this example, add the ray-api and ray-runtime dependencies in your project.

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class RayDemo {

    public static class Counter {

        private int value = 0;

        public void increment() {
            this.value += 1;
        }

        public int read() {
            return this.value;
        }
    }

    public static void main(String[] args) {
        // Intialize Ray runtime.
        Ray.init();
        List<ActorHandle<Counter>> counters = new ArrayList<>();
        // Create 4 actors from the `Counter` class.
        // They will run in remote worker processes.
        for (int i = 0; i < 4; i++) {
            counters.add(Ray.actor(Counter::new).remote());
        }

        // Invoke the `increment` method on each actor.
        // This will send an actor task to each remote actor.
        for (ActorHandle<Counter> counter : counters) {
            counter.task(Counter::increment).remote();
        }
        // Invoke the `read` method on each actor, and print the results.
        List<ObjectRef<Integer>> objectRefList = counters.stream()
            .map(counter -> counter.task(Counter::read).remote())
            .collect(Collectors.toList());
        System.out.println(Ray.get(objectRefList));  // [1, 1, 1, 1]
    }
}

Learn more about Ray Core

Ray Cluster Quickstart#

Deploy your applications on Ray clusters on AWS, GCP, Azure, and more, often with minimal code changes to your existing code.

ray Clusters: Launching a Ray Cluster on AWS

Ray programs can run on a single machine, or seamlessly scale to large clusters.

Note

To run this example install the following:

pip install -U "ray[default]" boto3

If you haven’t already, configure your credentials as described in the https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#guide-credentials[documentation for boto3].

Take this simple example that waits for individual nodes to join the cluster.

example.py
import sys
import time
from collections import Counter

import ray


@ray.remote
def get_host_name(x):
    import platform
    import time

    time.sleep(0.01)
    return x + (platform.node(),)


def wait_for_nodes(expected):
    # Wait for all nodes to join the cluster.
    while True:
        num_nodes = len(ray.nodes())
        if num_nodes < expected:
            print(
                "{} nodes have joined so far, waiting for {} more.".format(
                    num_nodes, expected - num_nodes
                )
            )
            sys.stdout.flush()
            time.sleep(1)
        else:
            break


def main():
    wait_for_nodes(4)

    # Check that objects can be transferred from each node to each other node.
    for i in range(10):
        print("Iteration {}".format(i))
        results = [get_host_name.remote(get_host_name.remote(())) for _ in range(100)]
        print(Counter(ray.get(results)))
        sys.stdout.flush()

    print("Success!")
    sys.stdout.flush()
    time.sleep(20)


if __name__ == "__main__":
    ray.init(address="localhost:6379")
    main()

You can also download this example from the GitHub repository. Store it locally in a file called example.py.

To execute this script in the cloud, download this configuration file, or copy it here:

cluster.yaml
# An unique identifier for the head node and workers of this cluster.
cluster_name: aws-example-minimal

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 3

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g., instance type. By default
        # Ray auto-configures unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large
    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 3
        # The maximum number of worker nodes of this type to launch.
        # This parameter takes precedence over min_workers.
        max_workers: 3
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g., instance type. By default
        # Ray auto-configures unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large

Assuming you have stored this configuration in a file called cluster.yaml, you can now launch an AWS cluster as follows:

ray submit cluster.yaml example.py --start

Learn more about launching Ray Clusters on AWS, GCP, Azure, and more

Debugging and Monitoring Quickstart#

Use built-in observability tools to monitor and debug Ray applications and clusters.

ray Ray Dashboard: Web GUI to monitor and debug Ray

Ray dashboard provides a visual interface that displays real-time system metrics, node-level resource monitoring, job profiling, and task visualizations. The dashboard is designed to help users understand the performance of their Ray applications and identify potential issues.

https://raw.githubusercontent.com/ray-project/Images/master/docs/new-dashboard/Dashboard-overview.png

Note

To get started with the dashboard, install the default installation as follows:

pip install -U "ray[default]"

Access the dashboard through the default URL, http://localhost:8265.

Learn more about Ray Dashboard

ray Ray State APIs: CLI to access cluster states

Ray state APIs allow users to conveniently access the current state (snapshot) of Ray through CLI or Python SDK.

Note

To get started with the state API, install the default installation as follows:

pip install -U "ray[default]"

Run the following code.

    import ray
    import time

    ray.init(num_cpus=4)

    @ray.remote
    def task_running_300_seconds():
        print("Start!")
        time.sleep(300)

    @ray.remote
    class Actor:
        def __init__(self):
            print("Actor created")

    # Create 2 tasks
    tasks = [task_running_300_seconds.remote() for _ in range(2)]

    # Create 2 actors
    actors = [Actor.remote() for _ in range(2)]

    ray.get(tasks)

See the summarized statistics of Ray tasks using ray summary tasks.

    ray summary tasks
    ======== Tasks Summary: 2022-07-22 08:54:38.332537 ========
    Stats:
    ------------------------------------
    total_actor_scheduled: 2
    total_actor_tasks: 0
    total_tasks: 2


    Table (group by func_name):
    ------------------------------------
        FUNC_OR_CLASS_NAME        STATE_COUNTS    TYPE
    0   task_running_300_seconds  RUNNING: 2      NORMAL_TASK
    1   Actor.__init__            FINISHED: 2     ACTOR_CREATION_TASK

Learn more about Ray State APIs

Learn More#

Here are some talks, papers, and press coverage involving Ray and its libraries. Please raise an issue if any of the below links are broken, or if you’d like to add your own talk!

Blog and Press#

Talks (Videos)#

Slides#

Papers#