Example: Large-scale ML Ingest

In this example, you will learn how to build, deploy and scale up a machine learning shuffle ingestion pipeline using Ray Dataset and Dataset Pipelines.

In particular, we will show you:

  • How to build a shuffle ingestion pipeline that loads, shuffles and feeds data into distributed trainers in a few lines of code;

  • How to scale the pipeline from ingesting 100MiB data to 500GiB data.

../../_images/dataset-repeat-21.svg

Python Setup

First, we’ll import all of the libraries we’ll be using. This step also helps us verify that the environment is configured correctly. If any of the imports are missing, an exception will be raised.

import argparse
import tempfile
import time
from typing import List

import pandas
import pyarrow

import ray
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.datasource.datasource import RandomIntRowDatasource

Build shuffle ingestion pipeline

A typical machine learning ingestion pipeline consists of the following 4 steps:

  1. Load the training data from external storage;

  2. Iterate over the data for multiple epochs;

  3. In each epoch, applying global shuffle to decorrelate the data;

  4. In each epoch, split the shuffled data into shards, and feed shards to distributed trainers;

Let’s see how we implement such pipeline using Ray Dataset:

def create_shuffle_pipeline(training_data_dir: str, num_epochs: int,
                            num_shards: int) -> List[DatasetPipeline]:

    return ray.data.read_parquet(training_data_dir) \
        .repeat(num_epochs) \
        .random_shuffle_each_window() \
        .split(num_shards, equal=True)

We’ve now defined a create_shuffle_pipeline function that creates an ingestion pipeline. It reads training_data_dir, iterates for num_epochs times, where in each epoch it shuffles and splits the training data into num_shards.

Feed the pipeline into trainers

Let’s also implement a TrainingWorker which consumes the shuffled data from each shard.

For simplicity, we will define a Ray Actor that emulates training workers. Specifically,

  1. It takes one shard of the shuffle pipeline for training;

  2. It iterates over the shard to get a training dataset per epoch;

  3. It then consumes the dataset by batches;

@ray.remote
class TrainingWorker:
    def __init__(self, rank: int, shard: DatasetPipeline):
        self.rank = rank
        self.shard = shard

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_datasets()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches()):
                # TODO: replace the code for real training.
                pass

Let’s run it

Now let’s run the data pipeline end-to-end:

First, let’s parse some arguments.

parser = argparse.ArgumentParser()
parser.add_argument(
    "--large-scale-test",
    action="store_true",
    help="Run large scale test (500GiB of data).")

args, _ = parser.parse_known_args()

After that, let’s generate 100MiB of Parquet files, create the shuffle pipeline by reading those generated Parquet files, and use training workers to consume the pipeline.

if not args.large_scale_test:

    NUM_TRAINING_WORKERS = 4
    NUM_EPOCHS = 5
    NUM_COLUMNS = 10
    SIZE_100MiB = 100 * 1024 * 1024

    # create a local ray cluster.
    ray.init()

    def generate_example_files(size_bytes: int) -> str:
        tmpdir = tempfile.mkdtemp()
        ray.data.read_datasource(
            RandomIntRowDatasource(),
            n=size_bytes // 8 // NUM_COLUMNS,
            num_columns=NUM_COLUMNS).write_parquet(tmpdir)
        return tmpdir

    example_files_dir = generate_example_files(SIZE_100MiB)

    splits = create_shuffle_pipeline(example_files_dir, NUM_EPOCHS,
                                     NUM_TRAINING_WORKERS)

    training_workers = [
        TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)
    ]

    # Let's run the e2e pipeline
    start = time.time()
    ray.get([worker.train.remote() for worker in training_workers])
    print(f"total ingestion time: {int(time.time() - start)}s")

    # -> Write Progress: 100%|████████████████████| 201/201 [00:00<00:00, 228.67it/s]
    # -> Stage 0:   0%|          | 0/5 [00:00<?, ?it/s]
    # -> Stage 0:  40%|████      | 2/5 [00:11<00:17,  5.75s/it]
    # -> Stage 0:  60%|██████    | 3/5 [00:23<00:16,  8.15s/it]
    # -> ...
    # -> (TrainingWorker pid=1651600) Training... worker: 2, epoch: 0
    # -> Stage 0:  80%|████████  | 4/5 [00:35<00:09,  9.59s/it]
    # -> ...
    # -> (TrainingWorker pid=1651599) Training... worker: 0, epoch: 1
    # -> Stage 0: 100%|██████████| 5/5 [00:46<00:00, 10.34s/it]
    # -> ...
    # -> (TrainingWorker pid=1651387) Training... worker: 3, epoch: 4
    # -> total ingestion time: 61s

Scale the shuffle ingestion pipeline

Scaling the shuffle ingestion pipeline is simple. With Ray, we can linearly scale the pipeline from ingesting 100MiB of data to 500GiB of data by adding more machines.

To ingest 500GiB of data, we’ll set up a Ray Cluster. The provided big_data_ingestion.yaml cluster config can be used to set up an AWS cluster with 70 CPU nodes and 16 GPU nodes. Using following command to bring up the Ray cluster.

$ pip install ray boto3
$ ray up big_data_ingestion.yaml

After the cluster is started, let’s implement our large scale ingestion test:

First, since we are runing on a cluster, let’s create the pipeline from RandomIntRowDatasource directly. In this way we don’t need to set up S3 for storing generated data.

def create_large_shuffle_pipeline(data_size_bytes: int, num_epochs: int,
                                  num_columns: int,
                                  num_shards: int) -> List[DatasetPipeline]:
    # _spread_resource_prefix is used to ensure tasks are evenly spread to all
    # CPU nodes.
    return ray.data.read_datasource(
            RandomIntRowDatasource(), n=data_size_bytes // 8 // num_columns,
            num_columns=num_columns,
            _spread_resource_prefix="node:") \
        .repeat(num_epochs) \
        .random_shuffle_each_window(_spread_resource_prefix="node:") \
        .split(num_shards, equal=True)

Now, it’s time to implement the 500GiB shuffle ingestion pipeline.

if args.large_scale_test:
    NUM_TRAINING_WORKERS = 16
    NUM_EPOCHS = 5
    NUM_COLUMNS = 10
    GiB = 1024 * 1024 * 1024
    SIZE_500GiB = 500 * GiB
    TOTAL_NUM_NODES = 70 + 16 + 1

    # use the AWS cluster we just set up.
    ray.init(address="auto")

    # waiting for cluster nodes to come up.
    while len(ray.nodes()) < TOTAL_NUM_NODES:
        print(
            f"waiting for nodes to start up: {len(ray.nodes())}/{TOTAL_NUM_NODES}"
        )
        time.sleep(5)

    splits = create_large_shuffle_pipeline(SIZE_500GiB, NUM_EPOCHS,
                                           NUM_COLUMNS, NUM_TRAINING_WORKERS)

    # Note we set num_gpus=1 for workers so that
    # the workers will only run on GPU nodes.
    training_workers = [
        TrainingWorker.options(num_gpus=1) \
            .remote(rank, shard) for rank, shard in enumerate(splits)
    ]

    start = time.time()

    # Let's run the large scale test.
    ray.get([worker.train.remote() for worker in training_workers])
    print(f"total ingestion time: {int(time.time() - start)}s")
    throughput = SIZE_500GiB * NUM_EPOCHS / (time.time() - start) / GiB
    print("throughput: {0:0.2f}GiB/s".format(throughput))

Finally, let’s run our pipeline on the cluster we just started:

$ ray submit ./big_data_ingestion.yaml ./big_data_ingestion.py --large-scale-test
# -> Connecting to existing Ray cluster at address: 172.31.47.38:6379
# -> waiting for nodes to start up: 1/87
# -> ...
# -> waiting for nodes to start up: 87/87
# -> Stage 0:   0%|          | 0/5 [00:00<?, ?it/s]
# -> Stage 0:  20%|██        | 1/5 [00:00<00:02,  1.77it/s]
# -> Stage 0:  40%|████      | 2/5 [00:38<00:35, 11.67s/it]
# -> Stage 0:  60%|██████    | 3/5 [01:13<00:37, 18.83s/it]
# -> ...
# -> (TrainingWorker pid=5084, ip=172.31.35.245) Training... worker: 12, epoch: 0
# -> Stage 0:  80%|████████  | 4/5 [03:15<00:49, 49.63s/it]
# -> ...
# -> (TrainingWorker pid=5076, ip=172.31.40.190) Training... worker: 9, epoch: 1
# -> Stage 0: 100%|██████████| 5/5 [05:02<00:00, 67.01s/it]
# -> ...
# -> (TrainingWorker pid=5074, ip=172.31.40.190) Training... worker: 0, epoch: 4
# -> total ingestion time: 291s
# -> throughput: 8.56GiB/s

Gallery generated by Sphinx-Gallery