This example is adapted from Continual AI Avalanche quick start https://avalanche.continualai.org/

Incremental Learning with Ray AIRΒΆ

In this example, we show how to use Ray AIR to incrementally train a simple image classification PyTorch model on a stream of incoming tasks.

Each task is a random permutation of the MNIST Dataset, which is a common benchmark used for continual training. After training on all the tasks, the model is expected to be able to make predictions on data from any task.

In this example, we use just a naive finetuning strategy, where the model is trained on each task, without any special methods to prevent catastrophic forgetting. Model performance is expected to be poor.

More precisely, this example showcases domain incremental training, in which during prediction/testing time, the model is asked to predict on data from tasks trained on so far with the task ID not provided. This is opposed to task incremental training, where the task ID is provided during prediction/testing time.

For more information on the 3 different categories for incremental/continual learning, please see β€œThree scenarios for continual learning” by van de Ven and Tolias

This example will cover the following:

  1. Loading a PyTorch Dataset to Ray Datasets

  2. Create an Iterator[ray.data.Datasets] abstraction to represent a stream of data to train on for incremental training.

  3. Implement a custom Ray AIR preprocessor to preprocess the Dataset.

  4. Incrementally train a model using data parallel training.

  5. Use our trained model to perform batch prediction on test data.

  6. Incrementally deploying our trained model with Ray Serve and performing online prediction queries.

Step 1: Installations and Initializing RayΒΆ

To get started, let’s first install the necessary packages: Ray AIR, torch, and torchvision. Uncomment the below lines and run the cell to install the necessary packages.

# !pip install -q "ray[air]"
# !pip install -q torch
# !pip install -q torchvision

Then, let’s initialize Ray! We can just import and call ray.init(). If you are running on a Ray cluster, then you can do ray.init("auto") to connect to the cluster instead of initiailzing a new local Ray instance.

import ray
ray.init()
# If runnning on a cluster, use the below line instead.
# ray.init("auto")
2022-07-20 21:47:49,873	INFO services.py:1483 -- View the Ray dashboard at http://127.0.0.1:8265

Ray

Python version: 3.7.10
Ray version: 2.0.0
Dashboard: http://127.0.0.1:8265

Step 2: Define our PyTorch ModelΒΆ

Now that we have the necessary installations, let’s define our PyTorch model. For this example to classify MNIST images, we will use a simple multi-layer perceptron.

import torch.nn as nn

class SimpleMLP(nn.Module):
    def __init__(self, num_classes=10, input_size=28 * 28):
        super(SimpleMLP, self).__init__()

        self.features = nn.Sequential(
          nn.Linear(input_size, 512),
          nn.ReLU(inplace=True),
          nn.Dropout(),
        )
        self.classifier = nn.Linear(512, num_classes)
        self._input_size = input_size

    def forward(self, x):
        x = x.contiguous()
        x = x.view(-1, self._input_size)
        x = self.features(x)
        x = self.classifier(x)
        return x

Step 3: Create the Stream of tasksΒΆ

We can now create a stream of tasks (where each task contains a dataset to train on). For this example, we will create an artificial stream of tasks consisting of permuted variations of MNIST, which is a classic benchmark in continual learning research.

For real-world scenarios, this step is not necessary as fresh data will already be arriving as a stream of tasks. It does not need to be artificially created.

3a: Load MNIST Dataset to a Ray DatasetΒΆ

Let’s first define a simple function that will return the original MNIST Dataset as a distributed Ray Dataset. Ray Datasets are the standard way to load and exchange data in Ray libraries and applications, read more about them here!

The function in the below code snippet does the following:

  1. Downloads the MNIST Dataset from torchvision in-memory

  2. Loads the in-memory Torch Dataset into a Ray Dataset

  3. Converts the Ray Dataset into a Pandas format. Instead of the Ray Dataset iterating over tuples, it will have 2 columns: β€œimage” & β€œlabel”.

This will allow us to apply built-in preprocessors to the Ray Dataset and allow Ray Datasets to be used with Ray AIR Predictors.

For this example, since we are just working with MNIST dataset, which is small, we use the SimpleTorchDatasource which just loads the full MNIST dataset into memory.

For loading larger datasets in a parallel fashion, you should use Ray Dataset’s additional read APIs to load data from parquet, csv, image files, and more!

import pandas as pd

import torchvision
from torchvision.transforms import RandomCrop

import ray
from ray.data.datasource.torch_datasource import SimpleTorchDatasource


def get_mnist_dataset(train: bool = True) -> ray.data.Dataset:
    """Returns MNIST Dataset as a ray.data.Dataset.
    
    Args:
        train: Whether to return the train dataset or test dataset.
    """

    def mnist_dataset_factory():
        if train:
            # Only perform random cropping on the Train dataset.
            transform = RandomCrop(28, padding=4)
        else:
            transform = None
        return torchvision.datasets.MNIST("./data", download=True, train=train, transform=transform)

    def convert_batch_to_pandas(batch):
        images = [np.array(item[0]) for item in batch]
        labels = [item[1] for item in batch]

        df = pd.DataFrame({"image": images, "label": labels})

        return df

    mnist_dataset = ray.data.read_datasource(
        SimpleTorchDatasource(), dataset_factory=mnist_dataset_factory
    )
    mnist_dataset = mnist_dataset.map_batches(convert_batch_to_pandas)
    return mnist_dataset

3b: Create our Stream abstractionΒΆ

Now we can create our β€œstream” abstraction. This abstraction provides two methods (generate_train_stream and generate_test_stream) that each returns an Iterator over Ray Datasets. Each item in this iterator contains a unique permutation of MNIST, and is one task that we want to train on.

In this example, β€œthe stream of tasks” is contrived since all the data for all tasks exist already in an offline setting. For true online continual learning, you would want to implement a custom dataset iterator that reads from some stream datasource to produce new tasks. The only abstraction that’s needed is Iterator[ray.data.Dataset].

Note that the test dataset stream has the same permutations that are used for the training dataset stream. In general for continual learning, it is expected that the data distribution of the test/prediction data follows what the model was trained on. If you notice that the distribution of new prediction queries is changing compared to the distribution of the training data, then you should probably trigger training of a new task.

from typing import Iterator, List
import random
import numpy as np

from ray.data import ActorPoolStrategy


class PermutedMNISTStream:
    """Generates streams of permuted MNIST Datasets.
    
    Example:
        
        permuted_mnist = PermutedMNISTStream(n_tasks=3)
        train_stream = permuted_mnist.generate_train_stream()
        
        # Iterate through the train_stream
        for train_dataset in train_stream:
            ...
    
    Args:
        n_tasks: The number of tasks to generate.
    """

    def __init__(self, n_tasks: int = 3):
        self.n_tasks = n_tasks
        self.permutations = [
            np.random.permutation(28 * 28) for _ in range(self.n_tasks)
        ]

        self.train_mnist_dataset = get_mnist_dataset(train=True)
        self.test_mnist_dataset = get_mnist_dataset(train=False)

    def random_permute_dataset(
        self, dataset: ray.data.Dataset, permutation: np.ndarray
    ):
        """Randomly permutes the pixels for each image in the dataset."""

        class PixelsPermutation(object):
            def __call__(self, batch):
                batch["image"] = batch["image"].map(lambda image: image.reshape(-1)[permutation].reshape(28, 28))
                return batch

        return dataset.map_batches(PixelsPermutation, compute=ActorPoolStrategy(), batch_format="pandas")

    def generate_train_stream(self) -> Iterator[ray.data.Dataset]:
        for permutation in self.permutations:
            permuted_mnist_dataset = self.random_permute_dataset(
                self.train_mnist_dataset, permutation
            )
            yield permuted_mnist_dataset

    def generate_test_stream(self) -> Iterator[ray.data.Dataset]:
        for permutation in self.permutations:
            mnist_dataset = get_mnist_dataset(train=False)
            permuted_mnist_dataset = self.random_permute_dataset(
                self.test_mnist_dataset, permutation
            )
            yield permuted_mnist_dataset

    def generate_test_samples(self, num_samples: int = 10) -> List[np.ndarray]:
        """Generates num_samples permuted MNIST images."""
        random_permutation = random.choice(self.permutations)
        return list(self.random_permute_dataset(
            self.test_mnist_dataset.random_shuffle().limit(num_samples),
            random_permutation,
        ).to_pandas()["image"].to_numpy())

Step 4: Define the logic for Training and Inference/PredictionΒΆ

Now that we can get an Iterator over Ray Datasets, we can incrementally train our model in a data parallel fashion via Ray Train, while incrementally deploying our model via Ray Serve. Let’s define some helper functions to allow us to do this!

If you are not familiar with data parallel training, it is a form of distributed training strategies, where we have multiple model replicas, and each replica trains on a different batch of data. After each batch, the gradients are synchronized across the replicas. This effecitively allows us to train on more data in a shorter amount of time.

4a: Define our training logic for each Data Parallel workerΒΆ

The first thing we need to do is to define the training loop that will be run on each training worker.

The training loop takes in a config Dict as an argument that we can use to pass in any configurations for training.

This is just standard PyTorch training, with the difference being that we can leverage Ray Train’s utility functions and Ray AIR Sesssion:

  • ray.train.torch.prepare_model(...): This will prepare the model for distributed training by wrapping it in PyTorch DistributedDataParallel and moving it to the correct accelerator device.

  • ray.air.session.get_dataset_shard(...): This will get the Ray Dataset shard for this particular Data Parallel worker.

  • ray.air.session.report({}, checkpoint=...): This will tell Ray Train to persist the provided Checkpoint object.

  • ray.air.session.get_checkpoint(): Returns a checkpoint to resume from. This is useful for either fault tolerance purposes, or for our purposes, to continue training the same model on a new incoming dataset.

from ray import train
from ray.air import session, Checkpoint

from torch.optim import SGD
from torch.nn import CrossEntropyLoss

from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present

def train_loop_per_worker(config: dict):
    num_epochs = config["num_epochs"]
    learning_rate = config["learning_rate"]
    momentum = config["momentum"]
    batch_size = config["batch_size"]

    model = SimpleMLP(num_classes=10)

    # Load model from checkpoint if there is a checkpoint to load from.
    checkpoint_to_load = session.get_checkpoint()
    if checkpoint_to_load:
        state_dict_to_resume_from = checkpoint_to_load.to_dict()["model"]
        model.load_state_dict(state_dict=state_dict_to_resume_from)

    model = train.torch.prepare_model(model)

    optimizer = SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    criterion = CrossEntropyLoss()

    # Get the Ray Dataset shard for this data parallel worker, and convert it to a PyTorch Dataset.
    dataset_shard = session.get_dataset_shard("train").iter_torch_batches(
        batch_size=batch_size,
    )

    for epoch_idx in range(num_epochs):
        running_loss = 0
        for iteration, batch in enumerate(dataset_shard):
            optimizer.zero_grad()
            train_mb_x, train_mb_y = batch["image"], batch["label"]
            train_mb_x = train_mb_x.to(train.torch.get_device())
            train_mb_y = train_mb_y.to(train.torch.get_device())

            # Forward
            logits = model(train_mb_x)
            # Loss
            loss = criterion(logits, train_mb_y)
            # Backward
            loss.backward()
            # Update
            optimizer.step()

            running_loss += loss.item()
            if session.get_world_rank() == 0 and iteration % 500 == 0:
                print(f"loss: {loss.item():>7f}, epoch: {epoch_idx}, iteration: {iteration}")

        # Checkpoint model after every epoch.
        state_dict = model.state_dict()
        consume_prefix_in_state_dict_if_present(state_dict, "module.")
        checkpoint = Checkpoint.from_dict(dict(model=state_dict))
        session.report({"loss": running_loss}, checkpoint=checkpoint)

4b: Define our PreprocessorΒΆ

Next, we define our Preprocessor to preprocess our data before training and prediction. Our preprocessor will normalize the MNIST Images by the mean and standard deviation of the MNIST training dataset. This is a common operation to do on MNIST to improve training: https://discuss.pytorch.org/t/normalization-in-the-mnist-example/457

from ray.data.preprocessors import BatchMapper

from torchvision import transforms

def preprocess_images(df: pd.DataFrame) -> pd.DataFrame:
    """Preprocess images by scaling each channel in the image."""

    torchvision_transforms = transforms.Compose(
      [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    df.loc[:, "image"] = [
        torchvision_transforms(image).numpy() for image in df["image"]
    ]
    return df

mnist_normalize_preprocessor = BatchMapper(fn=preprocess_images)

4c: Define logic for Batch/Offline Prediction.ΒΆ

After training on each task, we want to use our trained model to do batch (i.e. offline) inference on a test dataset.

To do this, we leverage the built-in ray.air.BatchPredictor. We define a batch_predict function that will take in a Checkpoint and a Test Dataset and outputs the accuracy our model achieves on the test dataset.

from ray.train.batch_predictor import BatchPredictor
from ray.train.torch import TorchPredictor

def batch_predict(checkpoint: ray.air.Checkpoint, test_dataset: ray.data.Dataset) -> float:
  """Perform batch prediction on the provided test dataset, and return accuracy results."""

  batch_predictor = BatchPredictor.from_checkpoint(checkpoint, predictor_cls=TorchPredictor, model=SimpleMLP(num_classes=10))
  model_output = batch_predictor.predict(
            data=test_dataset, feature_columns=["image"], keep_columns=["label"]
        )
  
  # Postprocess model outputs.
  # Convert logits outputted from model into actual class predictions.
  def convert_logits_to_classes(df):
     best_class = df["predictions"].map(lambda x: np.array(x).argmax())
     df["predictions"] = best_class
     return df
    
  prediction_results = model_output.map_batches(convert_logits_to_classes, batch_format="pandas")
  
  # Then, for each prediction output, see if it matches with the ground truth
  # label.
  def calculate_prediction_scores(df):
      return pd.DataFrame({"correct": df["predictions"] == df["label"]})

  correct_dataset = prediction_results.map_batches(
      calculate_prediction_scores, batch_format="pandas"
  )

  return correct_dataset.sum(on="correct") / correct_dataset.count()

4d: Define logic for Deploying and Querying our modelΒΆ

In addition to batch inference, we also want to deploy our model so that we can submit live queries to it for online inference. We use Ray Serve’s PredictorDeployment utility to deploy our trained model.

Once we deploy the model, we can send HTTP requests to our deployment.

from typing import List
import requests
from requests import Response
import numpy as np

from ray.serve.http_adapters import NdArray


def deploy_model(checkpoint: ray.air.Checkpoint) -> str:
  """Deploys the model from the provided Checkpoint and returns the URL for the endpoint of the model deployment."""
  def json_to_pandas(payload: NdArray) -> pd.DataFrame:
      """Accepts an NdArray JSON from an HTTP body and converts it to a Pandas dataframe."""
      # Have to explicitly convert to float since np.array reads as a double.
      arr = np.array(payload.array, dtype=np.float32)
      # We have to specify an image column since our preprocessor requires it.
      df = pd.DataFrame({"image": [arr]})
      return df

  deployment = PredictorDeployment.options(name="mnist_model", route_prefix="/mnist_predict", version=f"v{task_idx}", num_replicas=2)
  deployment.deploy(
    batching_params=dict(max_batch_size=10, batch_wait_timeout_s=5),
    http_adapter=json_to_pandas, 
    predictor_cls=TorchPredictor, 
    checkpoint=latest_checkpoint, 
    model=SimpleMLP(num_classes=10)
  )
  return deployment.url

# Function that queries our deployed model
def query_deployment(test_samples: List[np.ndarray], endpoint_uri: str) -> List[Response]:
  """Given a set of test samples, queries the model deployment at the provided endpoint and returns the results."""
  results = []
  # Have to convert to Python List since Numpy arrays are not Json serializable.
  for sample in test_samples:
    results.append(requests.post(endpoint_uri, json={"array": sample.tolist()}))
  # TODO: Figure out how Serve deals with Pandas DataFrame returned by Predictors.
  return results

Step 5: Putting it all togetherΒΆ

Once we have defined our training logic and our preprocessor, we can put everything together!

For each dataset in our stream, we do the following:

  1. Train on the dataset in Data Parallel fashion. We create a TorchTrainer, specify the config for the training loop we defined above, the dataset to train on, and how much we want to scale. TorchTrainer also accepts a checkpoint arg to continue training from a previously saved checkpoint.

  2. Get the saved checkpoint from the training run.

  3. Test our trained model on a test set containing test data from all the tasks trained on so far.

  4. After training on each task, we deploy our model so we can query it for predictions.

In this example, the training and test data for each task is well-defined beforehand by the benchmark. For real-world scenarios, this probably will not be the case. It is very likely that the prediction requests after training on one task will become the training data for the next task.

from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
from ray.train.torch import TorchPredictor
from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import json_to_ndarray

# The number of tasks (i.e. datasets in our stream) that we want to use for this example.
n_tasks = 3

# Number of epochs to train each task for.
num_epochs = 4
# Batch size.
batch_size = 32
# Optimizer args.
learning_rate = 0.001
momentum = 0.9

# Number of data parallel workers to use for training.
num_workers = 1
# Whether to use GPU or not.
use_gpu = ray.available_resources().get("GPU", 0) > 0

permuted_mnist = PermutedMNISTStream(n_tasks=n_tasks)
train_stream = permuted_mnist.generate_train_stream()
test_stream = permuted_mnist.generate_test_stream()

latest_checkpoint = None

accuracy_for_all_tasks = []
task_idx = 0
all_test_datasets_seen_so_far = []
for train_dataset, test_dataset in zip(train_stream, test_stream):
  print(f"Starting training for task: {task_idx}")
  task_idx += 1

  # *********Training*****************

  trainer = TorchTrainer(
          train_loop_per_worker=train_loop_per_worker,
          train_loop_config={
              "num_epochs": num_epochs,
              "learning_rate": learning_rate,
              "momentum": momentum,
              "batch_size": batch_size,
          },
          # Have to specify trainer_resources as 0 so that the example works on Colab. 
          scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={"CPU": 0}),
          datasets={"train": train_dataset},
          preprocessor=BatchMapper(fn=preprocess_images),
          resume_from_checkpoint=latest_checkpoint,
      )
  result = trainer.fit()
  latest_checkpoint = result.checkpoint

  # **************Batch Prediction**************************

  # We can do batch prediction on the test data for the tasks seen so far.
  # TODO: Fix type signature in Ray Datasets
  # TODO: Fix dataset.union when used with empty list.
  if len(all_test_datasets_seen_so_far) > 0:
    full_test_dataset = test_dataset.union(*all_test_datasets_seen_so_far)
  else:
    full_test_dataset = test_dataset

  all_test_datasets_seen_so_far.append(test_dataset)

  accuracy_for_this_task = batch_predict(latest_checkpoint, full_test_dataset)
  print(f"Accuracy for task {task_idx}: {accuracy_for_this_task}")
  accuracy_for_all_tasks.append(accuracy_for_this_task)

  # *************Model Deployment & Online Inference***************************
  
  # We can also deploy our model to do online inference with Ray Serve.
  # Start Ray Serve.
  serve.start()
  test_samples = permuted_mnist.generate_test_samples()
  endpoint_uri = deploy_model(latest_checkpoint)
  online_inference_results = query_deployment(test_samples, endpoint_uri)

  if ray.available_resources().get("CPU", 0) < num_workers+1:
    # If there are no more CPUs left, then shutdown the Serve replicas so we can continue training on the next task.
    serve.shutdown()

  
serve.shutdown()
Read->Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:06<00:00,  6.40s/it]
Read->Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  2.12it/s]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.34s/it]
Read->Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  2.29it/s]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.33s/it]
Starting training for task: 0
== Status ==
Current time: 2022-07-20 21:48:52 (running for 00:00:39.66)
Memory usage on this node: 33.1/64.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/28.14 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/jiaodong/ray_results/TorchTrainer_2022-07-20_21-48-13
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_53c58_00000TERMINATED127.0.0.1:39548 4 36.4582824.229 1658378932 6.46339


2022-07-20 21:48:13,244	INFO plugin_schema_manager.py:52 -- Loading the default runtime env schemas: ['/Users/jiaodong/Workspace/ray/python/ray/_private/runtime_env/../../runtime_env/schemas/working_dir_schema.json', '/Users/jiaodong/Workspace/ray/python/ray/_private/runtime_env/../../runtime_env/schemas/pip_schema.json'].
(RayTrainWorker pid=39562) loss: 2.282040, epoch: 0, iteration: 0
(RayTrainWorker pid=39562) 2022-07-20 21:48:26,772	INFO train_loop_utils.py:298 -- Moving model to device: cpu
(RayTrainWorker pid=39562) loss: 1.521038, epoch: 0, iteration: 500
(RayTrainWorker pid=39562) loss: 1.169452, epoch: 0, iteration: 1000
(RayTrainWorker pid=39562) loss: 0.856338, epoch: 0, iteration: 1500
(RayTrainWorker pid=39562) loss: 0.788410, epoch: 1, iteration: 0
(RayTrainWorker pid=39562) loss: 0.854239, epoch: 1, iteration: 500
(RayTrainWorker pid=39562) loss: 0.533351, epoch: 1, iteration: 1000
(RayTrainWorker pid=39562) loss: 0.591339, epoch: 1, iteration: 1500
(RayTrainWorker pid=39562) loss: 0.457057, epoch: 2, iteration: 0
(RayTrainWorker pid=39562) loss: 0.594715, epoch: 2, iteration: 500
(RayTrainWorker pid=39562) loss: 0.477588, epoch: 2, iteration: 1000
(RayTrainWorker pid=39562) loss: 0.235412, epoch: 2, iteration: 1500
(RayTrainWorker pid=39562) loss: 0.507374, epoch: 3, iteration: 0
(RayTrainWorker pid=39562) loss: 0.447128, epoch: 3, iteration: 500
(RayTrainWorker pid=39562) loss: 0.381943, epoch: 3, iteration: 1000
(RayTrainWorker pid=39562) loss: 0.347877, epoch: 3, iteration: 1500
Result for TorchTrainer_53c58_00000:
  _time_this_iter_s: 6.463389873504639
  _timestamp: 1658378932
  _training_iteration: 4
  date: 2022-07-20_21-48-52
  done: true
  experiment_id: abc531ef544440268933d8221addeb9d
  experiment_tag: '0'
  hostname: Jiaos-MacBook-Pro-16-inch-2019
  iterations_since_restore: 4
  loss: 824.2287287414074
  node_ip: 127.0.0.1
  pid: 39548
  should_checkpoint: true
  time_since_restore: 36.45815992355347
  time_this_iter_s: 6.464020013809204
  time_total_s: 36.45815992355347
  timestamp: 1658378932
  timesteps_since_restore: 0
  training_iteration: 4
  trial_id: 53c58_00000
  warmup_time: 0.003597259521484375
  
2022-07-20 21:48:52,891	INFO tune.py:738 -- Total run time: 39.80 seconds (39.66 seconds for the tuning loop).
Map Progress (1 actors 1 pending):   0%|          | 0/1 [00:01<?, ?it/s](BlockWorker pid=39601) /Users/jiaodong/anaconda3/envs/ray3.7/lib/python3.7/site-packages/torchvision/transforms/functional.py:150: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_numpy.cpp:178.)
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:03<00:00,  3.01s/it]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  8.70it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 76.13it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 82.57it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 134.32it/s]
Accuracy for task 1: 0.3767
(ServeController pid=39625) INFO 2022-07-20 21:48:57,458 controller 39625 checkpoint_path.py:17 - Using RayInternalKVStore for controller checkpoint and recovery.
(ServeController pid=39625) INFO 2022-07-20 21:48:57,460 controller 39625 http_state.py:126 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:oEzsmU:SERVE_PROXY_ACTOR-db68eafa3bbe9042df574f3c9974b40ce8d97728db90282feefb4690' on node 'db68eafa3bbe9042df574f3c9974b40ce8d97728db90282feefb4690' listening on '127.0.0.1:8000'
Shuffle Map:   0%|          | 0/1 [00:00<?, ?it/s](HTTPProxyActor pid=39628) INFO:     Started server process [39628]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  8.12it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  5.80it/s]
Map Progress (1 actors 0 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.16s/it]
/Users/jiaodong/anaconda3/envs/ray3.7/lib/python3.7/site-packages/ipykernel_launcher.py:25: UserWarning: From /var/folders/1s/wy6f3ytn3q726p5hl8fw8d780000gn/T/ipykernel_39344/1249059442.py:25: deploy (from ray.serve.deployment) is deprecated and will be removed in a future version Please see https://docs.ray.io/en/latest/serve/index.html
(ServeController pid=39625) INFO 2022-07-20 21:49:00,913 controller 39625 deployment_state.py:1281 - Adding 2 replicas to deployment 'mnist_model'.
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.39s/it]
Read->Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  2.39it/s]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.37s/it]
Starting training for task: 1
== Status ==
Current time: 2022-07-20 21:50:36 (running for 00:00:37.98)
Memory usage on this node: 33.7/64.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/28.14 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/jiaodong/ray_results/TorchTrainer_2022-07-20_21-49-58
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_92bcd_00000TERMINATED127.0.0.1:39736 4 34.1132707.634 1658379035 6.45643


(TorchTrainer pid=39736) 2022-07-20 21:50:01,936	WARNING base_trainer.py:167 -- When passing `datasets` to a Trainer, it is recommended to reserve at least 20% of node CPUs for Dataset execution by setting `_max_cpu_fraction_per_node = 0.8` in the Trainer `scaling_config`. Not doing so can lead to resource contention or hangs. See https://docs.ray.io/en/master/data/key-concepts.html#example-datasets-in-tune for more info.
(RayTrainWorker pid=39752) 2022-07-20 21:50:09,489	INFO config.py:71 -- Setting up process group for: env:// [rank=0, world_size=1]
(RayTrainWorker pid=39752) [W ProcessGroupGloo.cpp:715] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())
(RayTrainWorker pid=39752) loss: 3.301114, epoch: 0, iteration: 0
(RayTrainWorker pid=39752) 2022-07-20 21:50:09,795	INFO train_loop_utils.py:298 -- Moving model to device: cpu
(RayTrainWorker pid=39752) /Users/jiaodong/Workspace/ray/python/ray/air/_internal/torch_utils.py:64: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_numpy.cpp:178.)
(RayTrainWorker pid=39752)   return torch.as_tensor(vals, dtype=dtype)
(RayTrainWorker pid=39752) loss: 1.075076, epoch: 0, iteration: 500
(RayTrainWorker pid=39752) loss: 0.536976, epoch: 0, iteration: 1000
(RayTrainWorker pid=39752) loss: 0.600182, epoch: 0, iteration: 1500
(RayTrainWorker pid=39752) loss: 0.546070, epoch: 1, iteration: 0
(RayTrainWorker pid=39752) loss: 0.448120, epoch: 1, iteration: 500
(RayTrainWorker pid=39752) loss: 0.392481, epoch: 1, iteration: 1000
(RayTrainWorker pid=39752) loss: 0.371981, epoch: 1, iteration: 1500
(RayTrainWorker pid=39752) loss: 0.521735, epoch: 2, iteration: 0
(RayTrainWorker pid=39752) loss: 0.635850, epoch: 2, iteration: 500
(RayTrainWorker pid=39752) loss: 0.395862, epoch: 2, iteration: 1000
(RayTrainWorker pid=39752) loss: 0.402500, epoch: 2, iteration: 1500
(RayTrainWorker pid=39752) loss: 0.236922, epoch: 3, iteration: 0
(RayTrainWorker pid=39752) loss: 0.528482, epoch: 3, iteration: 500
(RayTrainWorker pid=39752) loss: 0.372242, epoch: 3, iteration: 1000
(RayTrainWorker pid=39752) loss: 0.355759, epoch: 3, iteration: 1500
Result for TorchTrainer_92bcd_00000:
  _time_this_iter_s: 6.456433057785034
  _timestamp: 1658379035
  _training_iteration: 4
  date: 2022-07-20_21-50-36
  done: true
  experiment_id: 21820161d0a245428cf75b0b9b17fe6e
  experiment_tag: '0'
  hostname: Jiaos-MacBook-Pro-16-inch-2019
  iterations_since_restore: 4
  loss: 707.6341038495302
  node_ip: 127.0.0.1
  pid: 39736
  should_checkpoint: true
  time_since_restore: 34.11321783065796
  time_this_iter_s: 6.463765859603882
  time_total_s: 34.11321783065796
  timestamp: 1658379036
  timesteps_since_restore: 0
  training_iteration: 4
  trial_id: 92bcd_00000
  warmup_time: 0.005189180374145508
  
2022-07-20 21:50:36,835	INFO tune.py:738 -- Total run time: 38.13 seconds (37.98 seconds for the tuning loop).
Map Progress (1 actors 1 pending):   0%|          | 0/2 [00:01<?, ?it/s](BlockWorker pid=39801) /Users/jiaodong/anaconda3/envs/ray3.7/lib/python3.7/site-packages/torchvision/transforms/functional.py:150: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_numpy.cpp:178.)
Map Progress (2 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:03<00:00,  1.96s/it]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00,  5.28it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 114.72it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 162.16it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 140.57it/s]
Accuracy for task 2: 0.36795
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  6.24it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  6.19it/s]
Map Progress (1 actors 0 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.18s/it]
(ServeController pid=39625) INFO 2022-07-20 21:50:42,924 controller 39625 deployment_state.py:1240 - Stopping 1 replicas of deployment 'mnist_model' with outdated versions.
(ServeController pid=39625) INFO 2022-07-20 21:50:45,044 controller 39625 deployment_state.py:1281 - Adding 1 replicas to deployment 'mnist_model'.
(ServeController pid=39625) INFO 2022-07-20 21:50:47,377 controller 39625 deployment_state.py:1240 - Stopping 1 replicas of deployment 'mnist_model' with outdated versions.
(ServeController pid=39625) INFO 2022-07-20 21:50:49,504 controller 39625 deployment_state.py:1281 - Adding 1 replicas to deployment 'mnist_model'.
Map Progress (2 actors 0 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.36s/it]
Read->Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  2.04it/s]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.37s/it]
Starting training for task: 2
== Status ==
Current time: 2022-07-20 21:52:25 (running for 00:00:37.97)
Memory usage on this node: 34.0/64.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/28.14 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/jiaodong/ray_results/TorchTrainer_2022-07-20_21-51-47
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_d37db_00000TERMINATED127.0.0.1:39948 4 34.0141671.998 1658379144 6.59292


(TorchTrainer pid=39948) 2022-07-20 21:51:50,596	WARNING base_trainer.py:167 -- When passing `datasets` to a Trainer, it is recommended to reserve at least 20% of node CPUs for Dataset execution by setting `_max_cpu_fraction_per_node = 0.8` in the Trainer `scaling_config`. Not doing so can lead to resource contention or hangs. See https://docs.ray.io/en/master/data/key-concepts.html#example-datasets-in-tune for more info.
(RayTrainWorker pid=39968) 2022-07-20 21:51:58,118	INFO config.py:71 -- Setting up process group for: env:// [rank=0, world_size=1]
(RayTrainWorker pid=39968) [W ProcessGroupGloo.cpp:715] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())
(RayTrainWorker pid=39968) 2022-07-20 21:51:58,367	INFO train_loop_utils.py:298 -- Moving model to device: cpu
(RayTrainWorker pid=39968) loss: 4.062408, epoch: 0, iteration: 0
(RayTrainWorker pid=39968) loss: 0.970063, epoch: 0, iteration: 500
(RayTrainWorker pid=39968) loss: 0.658269, epoch: 0, iteration: 1000
(RayTrainWorker pid=39968) loss: 0.442650, epoch: 0, iteration: 1500
(RayTrainWorker pid=39968) loss: 0.603212, epoch: 1, iteration: 0
(RayTrainWorker pid=39968) loss: 0.534739, epoch: 1, iteration: 500
(RayTrainWorker pid=39968) loss: 0.420072, epoch: 1, iteration: 1000
(RayTrainWorker pid=39968) loss: 0.351545, epoch: 1, iteration: 1500
(RayTrainWorker pid=39968) loss: 0.347010, epoch: 2, iteration: 0
(RayTrainWorker pid=39968) loss: 0.419703, epoch: 2, iteration: 500
(RayTrainWorker pid=39968) loss: 0.350773, epoch: 2, iteration: 1000
(RayTrainWorker pid=39968) loss: 0.231652, epoch: 2, iteration: 1500
(RayTrainWorker pid=39968) loss: 0.343125, epoch: 3, iteration: 0
(RayTrainWorker pid=39968) loss: 0.547853, epoch: 3, iteration: 500
(RayTrainWorker pid=39968) loss: 0.353915, epoch: 3, iteration: 1000
(RayTrainWorker pid=39968) loss: 0.260028, epoch: 3, iteration: 1500
Result for TorchTrainer_d37db_00000:
  _time_this_iter_s: 6.5929179191589355
  _timestamp: 1658379144
  _training_iteration: 4
  date: 2022-07-20_21-52-24
  done: true
  experiment_id: 5d41bf13ba524c528faac8f64b13c7cc
  experiment_tag: '0'
  hostname: Jiaos-MacBook-Pro-16-inch-2019
  iterations_since_restore: 4
  loss: 671.9976235236973
  node_ip: 127.0.0.1
  pid: 39948
  should_checkpoint: true
  time_since_restore: 34.01405596733093
  time_this_iter_s: 6.590774774551392
  time_total_s: 34.01405596733093
  timestamp: 1658379144
  timesteps_since_restore: 0
  training_iteration: 4
  trial_id: d37db_00000
  warmup_time: 0.005116939544677734
  
2022-07-20 21:52:25,471	INFO tune.py:738 -- Total run time: 38.13 seconds (37.97 seconds for the tuning loop).
Map Progress (1 actors 1 pending):   0%|          | 0/3 [00:01<?, ?it/s](BlockWorker pid=40038) /Users/jiaodong/anaconda3/envs/ray3.7/lib/python3.7/site-packages/torchvision/transforms/functional.py:150: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_numpy.cpp:178.)
Map Progress (2 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:04<00:00,  1.62s/it]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00,  7.77it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00, 136.51it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00, 216.98it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 135.98it/s]
Accuracy for task 3: 0.3590333333333333
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  6.01it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  6.26it/s]
Map Progress (1 actors 0 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.17s/it]
(ServeController pid=39625) INFO 2022-07-20 21:52:32,498 controller 39625 deployment_state.py:1240 - Stopping 1 replicas of deployment 'mnist_model' with outdated versions.
(ServeController pid=39625) INFO 2022-07-20 21:52:34,634 controller 39625 deployment_state.py:1281 - Adding 1 replicas to deployment 'mnist_model'.
(ServeController pid=39625) INFO 2022-07-20 21:52:36,956 controller 39625 deployment_state.py:1240 - Stopping 1 replicas of deployment 'mnist_model' with outdated versions.
(ServeController pid=39625) INFO 2022-07-20 21:52:39,078 controller 39625 deployment_state.py:1281 - Adding 1 replicas to deployment 'mnist_model'.
(ServeController pid=39625) INFO 2022-07-20 21:53:31,642 controller 39625 deployment_state.py:1304 - Removing 2 replicas from deployment 'mnist_model'.

Now that we have finished all of our training, let’s see the accuracy of our model after training on each task.

We should see the accuracy decrease over time. This is to be expected since we are using just a naive fine-tuning strategy so our model is prone to catastrophic forgetting.

As we increase the number of tasks, the model performance on all the tasks trained on so far should decrease.

accuracy_for_all_tasks
[0.3767, 0.36795, 0.3590333333333333]

[Optional] Step 6: Compare against full training.ΒΆ

We have now incrementally trained our simple multi-layer perceptron. Let’s compare the incrementally trained model via fine tuning against a model that is trained on all the tasks up front.

Since we are using a naive fine-tuning strategy, we should expect that our incrementally trained model will perform worse than the the one that is fully trained! However, there’s various other strategies that have been developed and are actively being researched to improve accuracy for incremental training. And overall, incremental/continual learning allows you to train in many real world settings where the entire dataset is not available up front, but new data is arriving at a relatively high rate.

Let’s first combine all of our datasets for each task into a single, unified Dataset

train_stream = permuted_mnist.generate_train_stream()

# Collect all datasets in the stream into a single dataset.
all_training_datasets = []
for train_dataset in train_stream:
  all_training_datasets.append(train_dataset)
combined_training_dataset = all_training_datasets[0].union(*all_training_datasets[1:])


combined_training_dataset = combined_training_dataset.random_shuffle()
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.33s/it]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.32s/it]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:02<00:00,  2.31s/it]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:01<00:00,  2.55it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:01<00:00,  2.55it/s]

Then, we train a new model on the unified Dataset using the same configurations as before.

# Now we do training with the same configurations as before
trainer = TorchTrainer(
            train_loop_per_worker=train_loop_per_worker,
            train_loop_config={
                "num_epochs": num_epochs,
                "learning_rate": learning_rate,
                "momentum": momentum,
                "batch_size": batch_size,
            },
            # Have to specify trainer_resources as 0 so that the example works on Colab. 
            scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={"CPU": 0}),
            datasets={"train": combined_training_dataset},
            preprocessor=BatchMapper(fn=preprocess_images),
        )
result = trainer.fit()
full_training_checkpoint = result.checkpoint
2022-07-20 21:53:44,223	WARNING base_trainer.py:167 -- When passing `datasets` to a Trainer, it is recommended to reserve at least 20% of node CPUs for Dataset execution by setting `_max_cpu_fraction_per_node = 0.8` in the Trainer `scaling_config`. Not doing so can lead to resource contention or hangs. See https://docs.ray.io/en/master/data/key-concepts.html#example-datasets-in-tune for more info.
== Status ==
Current time: 2022-07-20 21:55:10 (running for 00:01:25.89)
Memory usage on this node: 34.4/64.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/28.14 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/jiaodong/ray_results/TorchTrainer_2022-07-20_21-53-44
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_1923b_00000TERMINATED127.0.0.1:40228 4 82.72852328.8 1658379309 17.0239


(TorchTrainer pid=40228) 2022-07-20 21:53:47,328	WARNING base_trainer.py:167 -- When passing `datasets` to a Trainer, it is recommended to reserve at least 20% of node CPUs for Dataset execution by setting `_max_cpu_fraction_per_node = 0.8` in the Trainer `scaling_config`. Not doing so can lead to resource contention or hangs. See https://docs.ray.io/en/master/data/key-concepts.html#example-datasets-in-tune for more info.
(RayTrainWorker pid=40276) loss: 2.305423, epoch: 0, iteration: 0
(RayTrainWorker pid=40276) loss: 1.935424, epoch: 0, iteration: 500
(RayTrainWorker pid=40276) loss: 1.174222, epoch: 0, iteration: 5000
(RayTrainWorker pid=40276) loss: 0.776577, epoch: 0, iteration: 5500
(RayTrainWorker pid=40276) loss: 0.674814, epoch: 1, iteration: 0
(RayTrainWorker pid=40276) loss: 0.699747, epoch: 1, iteration: 500
(RayTrainWorker pid=40276) loss: 0.795673, epoch: 1, iteration: 5000
(RayTrainWorker pid=40276) loss: 0.651217, epoch: 1, iteration: 5500
(RayTrainWorker pid=40276) loss: 0.743072, epoch: 2, iteration: 0
(RayTrainWorker pid=40276) loss: 0.745054, epoch: 2, iteration: 500
(RayTrainWorker pid=40276) loss: 0.639829, epoch: 2, iteration: 5000
(RayTrainWorker pid=40276) loss: 0.682482, epoch: 2, iteration: 5500
(RayTrainWorker pid=40276) loss: 0.553197, epoch: 3, iteration: 0
(RayTrainWorker pid=40276) loss: 0.471037, epoch: 3, iteration: 500
(RayTrainWorker pid=40276) loss: 0.538055, epoch: 3, iteration: 5000
(RayTrainWorker pid=40276) loss: 0.534079, epoch: 3, iteration: 5500
Result for TorchTrainer_1923b_00000:
  _time_this_iter_s: 17.023871898651123
  _timestamp: 1658379309
  _training_iteration: 4
  date: 2022-07-20_21-55-10
  done: true
  experiment_id: d304983bfe3f4e269118f8618aa9b02f
  experiment_tag: '0'
  hostname: Jiaos-MacBook-Pro-16-inch-2019
  iterations_since_restore: 4
  loss: 2328.8038033917546
  node_ip: 127.0.0.1
  pid: 40228
  should_checkpoint: true
  time_since_restore: 82.72845268249512
  time_this_iter_s: 17.024354696273804
  time_total_s: 82.72845268249512
  timestamp: 1658379310
  timesteps_since_restore: 0
  training_iteration: 4
  trial_id: 1923b_00000
  warmup_time: 0.004433870315551758
  
2022-07-20 21:55:10,233	INFO tune.py:738 -- Total run time: 86.00 seconds (85.88 seconds for the tuning loop).

Then, let’s test model that was trained on all the tasks up front.

# Then, we used the fully trained model and do batch prediction on the entire test set.

# `full_test_dataset` should already contain the combined test datasets.
fully_trained_accuracy = batch_predict(full_training_checkpoint, full_test_dataset)
Map Progress (1 actors 1 pending):   0%|          | 0/3 [00:01<?, ?it/s](BlockWorker pid=40400) /Users/jiaodong/anaconda3/envs/ray3.7/lib/python3.7/site-packages/torchvision/transforms/functional.py:150: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_numpy.cpp:178.)
Map Progress (2 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:04<00:00,  1.62s/it]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00, 63.30it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00, 129.65it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 3/3 [00:00<00:00, 312.18it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 149.25it/s]

Finally, let’s compare the accuracies between the incrementally trained model and the fully trained model. We should see that the fully trained model performs better.

print("Fully trained model accuracy: ", fully_trained_accuracy)
print("Incrementally trained model accuracy: ", accuracy_for_all_tasks[-1])
Fully trained model accuracy:  0.38016666666666665
Incrementally trained model accuracy:  0.3590333333333333

Next StepsΒΆ

Once you’ve completed this notebook, you should be set to play around with scalable incremental training using Ray, either by trying more fancy algorithms for incremental learning other than naive fine-tuning, or attempting to scale out to larger datasets!

If you run into any issues, or have any feature requests, please file an issue on the Ray Github.