Example: Large-scale ML Ingest
Contents
Example: Large-scale ML Ingest#
In this example, you will learn how to build, deploy and scale up a machine learning shuffle ingestion pipeline using Ray Dataset and Dataset Pipelines.
In particular, we will show you:
How to build a shuffle ingestion pipeline that loads, shuffles and feeds data into distributed trainers in a few lines of code;
How to scale the pipeline from ingesting 100MiB data to 500GiB data.
Python Setup#
First, weβll import all of the libraries weβll be using. This step also helps us verify that the environment is configured correctly. If any of the imports are missing, an exception will be raised.
import argparse
import tempfile
import time
from typing import List
import pandas
import pyarrow
import ray
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.datasource.datasource import RandomIntRowDatasource
Build shuffle ingestion pipeline#
A typical machine learning ingestion pipeline consists of the following 4 steps:
Load the training data from external storage;
Iterate over the data for multiple epochs;
In each epoch, applying global shuffle to decorrelate the data;
In each epoch, split the shuffled data into shards, and feed shards to distributed trainers;
Letβs see how we implement such pipeline using Ray Dataset:
def create_shuffle_pipeline(
training_data_dir: str, num_epochs: int, num_shards: int
) -> List[DatasetPipeline]:
return (
ray.data.read_parquet(training_data_dir)
.repeat(num_epochs)
.random_shuffle_each_window()
.split(num_shards, equal=True)
)
Weβve now defined a create_shuffle_pipeline
function that creates an
ingestion pipeline.
It reads training_data_dir
, iterates for num_epochs
times,
where in each epoch it
shuffles and splits the training data into num_shards
.
Feed the pipeline into trainers#
Letβs also implement a TrainingWorker
which consumes the shuffled data
from each shard.
For simplicity, we will define a Ray Actor that emulates training workers. Specifically,
It takes one shard of the shuffle pipeline for training;
It iterates over the shard to get a training dataset per epoch;
It then consumes the dataset by batches;
@ray.remote
class TrainingWorker:
def __init__(self, rank: int, shard: DatasetPipeline):
self.rank = rank
self.shard = shard
def train(self):
for epoch, training_dataset in enumerate(self.shard.iter_epochs()):
# Following code emulates epoch based SGD training.
print(f"Training... worker: {self.rank}, epoch: {epoch}")
for i, batch in enumerate(training_dataset.iter_batches()):
# TODO: replace the code for real training.
pass
Letβs run it#
Now letβs run the data pipeline end-to-end:
First, letβs parse some arguments.
parser = argparse.ArgumentParser()
parser.add_argument(
"--large-scale-test",
action="store_true",
help="Run large scale test (500GiB of data).",
)
args, _ = parser.parse_known_args()
After that, letβs generate 100MiB of Parquet files, create the shuffle pipeline by reading those generated Parquet files, and use training workers to consume the pipeline.
if not args.large_scale_test:
NUM_TRAINING_WORKERS = 4
NUM_EPOCHS = 5
NUM_COLUMNS = 10
SIZE_100MiB = 100 * 1024 * 1024
# create a local ray cluster.
ray.init()
def generate_example_files(size_bytes: int) -> str:
tmpdir = tempfile.mkdtemp()
ray.data.read_datasource(
RandomIntRowDatasource(),
n=size_bytes // 8 // NUM_COLUMNS,
num_columns=NUM_COLUMNS,
).write_parquet(tmpdir)
return tmpdir
example_files_dir = generate_example_files(SIZE_100MiB)
splits = create_shuffle_pipeline(
example_files_dir, NUM_EPOCHS, NUM_TRAINING_WORKERS
)
training_workers = [
TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)
]
# Let's run the e2e pipeline
start = time.time()
ray.get([worker.train.remote() for worker in training_workers])
print(f"total ingestion time: {int(time.time() - start)}s")
# -> Write Progress: 100%|ββββββββββββββββββββ| 201/201 [00:00<00:00, 228.67it/s]
# -> Stage 0: 0%| | 0/5 [00:00<?, ?it/s]
# -> Stage 0: 40%|ββββ | 2/5 [00:11<00:17, 5.75s/it]
# -> Stage 0: 60%|ββββββ | 3/5 [00:23<00:16, 8.15s/it]
# -> ...
# -> (TrainingWorker pid=1651600) Training... worker: 2, epoch: 0
# -> Stage 0: 80%|ββββββββ | 4/5 [00:35<00:09, 9.59s/it]
# -> ...
# -> (TrainingWorker pid=1651599) Training... worker: 0, epoch: 1
# -> Stage 0: 100%|ββββββββββ| 5/5 [00:46<00:00, 10.34s/it]
# -> ...
# -> (TrainingWorker pid=1651387) Training... worker: 3, epoch: 4
# -> total ingestion time: 61s
Scale the shuffle ingestion pipeline#
Scaling the shuffle ingestion pipeline is simple. With Ray, we can linearly scale the pipeline from ingesting 100MiB of data to 500GiB of data by adding more machines.
To ingest 500GiB of data, weβll set up a Ray Cluster. The provided big_data_ingestion.yaml cluster config can be used to set up an AWS cluster with 70 CPU nodes and 16 GPU nodes. Using following command to bring up the Ray cluster.
pip install ray boto3
ray up big_data_ingestion.yaml
After the cluster is started, letβs implement our large scale ingestion test:
First, since we are runing on a cluster, letβs create the pipeline from RandomIntRowDatasource directly. In this way we donβt need to set up S3 for storing generated data.
def create_large_shuffle_pipeline(
data_size_bytes: int, num_epochs: int, num_columns: int, num_shards: int
) -> List[DatasetPipeline]:
return (
ray.data.read_datasource(
RandomIntRowDatasource(),
n=data_size_bytes // 8 // num_columns,
num_columns=num_columns,
)
.repeat(num_epochs)
.random_shuffle_each_window()
.split(num_shards, equal=True)
)
Now, itβs time to implement the 500GiB shuffle ingestion pipeline.
if args.large_scale_test:
NUM_TRAINING_WORKERS = 16
NUM_EPOCHS = 5
NUM_COLUMNS = 10
GiB = 1024 * 1024 * 1024
SIZE_500GiB = 500 * GiB
TOTAL_NUM_NODES = 70 + 16 + 1
# use the AWS cluster we just set up.
ray.init()
# waiting for cluster nodes to come up.
while len(ray.nodes()) < TOTAL_NUM_NODES:
print(f"waiting for nodes to start up: {len(ray.nodes())}/{TOTAL_NUM_NODES}")
time.sleep(5)
splits = create_large_shuffle_pipeline(
SIZE_500GiB, NUM_EPOCHS, NUM_COLUMNS, NUM_TRAINING_WORKERS
)
# Note we set num_gpus=1 for workers so that
# the workers will only run on GPU nodes.
training_workers = [
TrainingWorker.options(num_gpus=1).remote(rank, shard)
for rank, shard in enumerate(splits)
]
start = time.time()
# Let's run the large scale test.
ray.get([worker.train.remote() for worker in training_workers])
print(f"total ingestion time: {int(time.time() - start)}s")
throughput = SIZE_500GiB * NUM_EPOCHS / (time.time() - start) / GiB
print("throughput: {0:0.2f}GiB/s".format(throughput))
Finally, letβs run our pipeline on the cluster we just started:
$ ray submit ./big_data_ingestion.yaml ./big_data_ingestion.py --large-scale-test
# -> Connecting to existing Ray cluster at address: 172.31.47.38:6379
# -> waiting for nodes to start up: 1/87
# -> ...
# -> waiting for nodes to start up: 87/87
# -> Stage 0: 0%| | 0/5 [00:00<?, ?it/s]
# -> Stage 0: 20%|ββ | 1/5 [00:00<00:02, 1.77it/s]
# -> Stage 0: 40%|ββββ | 2/5 [00:38<00:35, 11.67s/it]
# -> Stage 0: 60%|ββββββ | 3/5 [01:13<00:37, 18.83s/it]
# -> ...
# -> (TrainingWorker pid=5084, ip=172.31.35.245) Training... worker: 12, epoch: 0
# -> Stage 0: 80%|ββββββββ | 4/5 [03:15<00:49, 49.63s/it]
# -> ...
# -> (TrainingWorker pid=5076, ip=172.31.40.190) Training... worker: 9, epoch: 1
# -> Stage 0: 100%|ββββββββββ| 5/5 [05:02<00:00, 67.01s/it]
# -> ...
# -> (TrainingWorker pid=5074, ip=172.31.40.190) Training... worker: 0, epoch: 4
# -> total ingestion time: 291s
# -> throughput: 8.56GiB/s
Note: The pipeline can also be submitted using Ray Job Submission , which is in beta starting with Ray 1.12. Try it out!