"""Example training a memory neural net on the bAbI dataset.
References Keras and is based off of https://keras.io/examples/babi_memnn/.
"""
from __future__ import print_function
import argparse
import os
import re
import sys
import tarfile
import numpy as np
from filelock import FileLock
from ray import train, tune
if sys.version_info >= (3, 12):
# Skip this test in Python 3.12+ because TensorFlow is not supported.
sys.exit(0)
else:
from tensorflow.keras.layers import (
LSTM,
Activation,
Dense,
Dropout,
Embedding,
Input,
Permute,
add,
concatenate,
dot,
)
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import get_file
def tokenize(sent):
"""Return the tokens of a sentence including punctuation.
>>> tokenize("Bob dropped the apple. Where is the apple?")
["Bob", "dropped", "the", "apple", ".", "Where", "is", "the", "apple", "?"]
"""
return [x.strip() for x in re.split(r"(\W+)?", sent) if x and x.strip()]
def parse_stories(lines, only_supporting=False):
"""Parse stories provided in the bAbi tasks format
If only_supporting is true, only the sentences
that support the answer are kept.
"""
data = []
story = []
for line in lines:
line = line.decode("utf-8").strip()
nid, line = line.split(" ", 1)
nid = int(nid)
if nid == 1:
story = []
if "\t" in line:
q, a, supporting = line.split("\t")
q = tokenize(q)
if only_supporting:
# Only select the related substory
supporting = map(int, supporting.split())
substory = [story[i - 1] for i in supporting]
else:
# Provide all the substories
substory = [x for x in story if x]
data.append((substory, q, a))
story.append("")
else:
sent = tokenize(line)
story.append(sent)
return data
def get_stories(f, only_supporting=False, max_length=None):
"""Given a file name, read the file,
retrieve the stories,
and then convert the sentences into a single story.
If max_length is supplied,
any stories longer than max_length tokens will be discarded.
"""
def flatten(data):
return sum(data, [])
data = parse_stories(f.readlines(), only_supporting=only_supporting)
data = [
(flatten(story), q, answer)
for story, q, answer in data
if not max_length or len(flatten(story)) < max_length
]
return data
def vectorize_stories(word_idx, story_maxlen, query_maxlen, data):
inputs, queries, answers = [], [], []
for story, query, answer in data:
inputs.append([word_idx[w] for w in story])
queries.append([word_idx[w] for w in query])
answers.append(word_idx[answer])
return (
pad_sequences(inputs, maxlen=story_maxlen),
pad_sequences(queries, maxlen=query_maxlen),
np.array(answers),
)
def read_data(finish_fast=False):
# Get the file
try:
path = get_file(
"babi-tasks-v1-2.tar.gz",
origin="https://s3.amazonaws.com/text-datasets/"
"babi_tasks_1-20_v1-2.tar.gz",
)
except Exception:
print(
"Error downloading dataset, please download it manually:\n"
"$ wget http://www.thespermwhale.com/jaseweston/babi/tasks_1-20_v1-2" # noqa: E501
".tar.gz\n"
"$ mv tasks_1-20_v1-2.tar.gz ~/.keras/datasets/babi-tasks-v1-2.tar.gz" # noqa: E501
)
raise
# Choose challenge
challenges = {
# QA1 with 10,000 samples
"single_supporting_fact_10k": "tasks_1-20_v1-2/en-10k/qa1_"
"single-supporting-fact_{}.txt",
# QA2 with 10,000 samples
"two_supporting_facts_10k": "tasks_1-20_v1-2/en-10k/qa2_"
"two-supporting-facts_{}.txt",
}
challenge_type = "single_supporting_fact_10k"
challenge = challenges[challenge_type]
with tarfile.open(path) as tar:
train_stories = get_stories(tar.extractfile(challenge.format("train")))
test_stories = get_stories(tar.extractfile(challenge.format("test")))
if finish_fast:
train_stories = train_stories[:64]
test_stories = test_stories[:64]
return train_stories, test_stories
class MemNNModel(tune.Trainable):
def build_model(self):
"""Helper method for creating the model"""
vocab = set()
for story, q, answer in self.train_stories + self.test_stories:
vocab |= set(story + q + [answer])
vocab = sorted(vocab)
# Reserve 0 for masking via pad_sequences
vocab_size = len(vocab) + 1
story_maxlen = max(len(x) for x, _, _ in self.train_stories + self.test_stories)
query_maxlen = max(len(x) for _, x, _ in self.train_stories + self.test_stories)
word_idx = {c: i + 1 for i, c in enumerate(vocab)}
self.inputs_train, self.queries_train, self.answers_train = vectorize_stories(
word_idx, story_maxlen, query_maxlen, self.train_stories
)
self.inputs_test, self.queries_test, self.answers_test = vectorize_stories(
word_idx, story_maxlen, query_maxlen, self.test_stories
)
# placeholders
input_sequence = Input((story_maxlen,))
question = Input((query_maxlen,))
# encoders
# embed the input sequence into a sequence of vectors
input_encoder_m = Sequential()
input_encoder_m.add(Embedding(input_dim=vocab_size, output_dim=64))
input_encoder_m.add(Dropout(self.config.get("dropout", 0.3)))
# output: (samples, story_maxlen, embedding_dim)
# embed the input into a sequence of vectors of size query_maxlen
input_encoder_c = Sequential()
input_encoder_c.add(Embedding(input_dim=vocab_size, output_dim=query_maxlen))
input_encoder_c.add(Dropout(self.config.get("dropout", 0.3)))
# output: (samples, story_maxlen, query_maxlen)
# embed the question into a sequence of vectors
question_encoder = Sequential()
question_encoder.add(
Embedding(input_dim=vocab_size, output_dim=64, input_length=query_maxlen)
)
question_encoder.add(Dropout(self.config.get("dropout", 0.3)))
# output: (samples, query_maxlen, embedding_dim)
# encode input sequence and questions (which are indices)
# to sequences of dense vectors
input_encoded_m = input_encoder_m(input_sequence)
input_encoded_c = input_encoder_c(input_sequence)
question_encoded = question_encoder(question)
# compute a "match" between the first input vector sequence
# and the question vector sequence
# shape: `(samples, story_maxlen, query_maxlen)`
match = dot([input_encoded_m, question_encoded], axes=(2, 2))
match = Activation("softmax")(match)
# add the match matrix with the second input vector sequence
response = add(
[match, input_encoded_c]
) # (samples, story_maxlen, query_maxlen)
response = Permute((2, 1))(response) # (samples, query_maxlen, story_maxlen)
# concatenate the match matrix with the question vector sequence
answer = concatenate([response, question_encoded])
# the original paper uses a matrix multiplication.
# we choose to use a RNN instead.
answer = LSTM(32)(answer) # (samples, 32)
# one regularization layer -- more would probably be needed.
answer = Dropout(self.config.get("dropout", 0.3))(answer)
answer = Dense(vocab_size)(answer) # (samples, vocab_size)
# we output a probability distribution over the vocabulary
answer = Activation("softmax")(answer)
# build the final model
model = Model([input_sequence, question], answer)
return model
def setup(self, config):
with FileLock(os.path.expanduser("~/.tune.lock")):
self.train_stories, self.test_stories = read_data(config["finish_fast"])
model = self.build_model()
rmsprop = RMSprop(
lr=self.config.get("lr", 1e-3), rho=self.config.get("rho", 0.9)
)
model.compile(
optimizer=rmsprop,
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
self.model = model
def step(self):
# train
self.model.fit(
[self.inputs_train, self.queries_train],
self.answers_train,
batch_size=self.config.get("batch_size", 32),
epochs=self.config.get("epochs", 1),
validation_data=([self.inputs_test, self.queries_test], self.answers_test),
verbose=0,
)
_, accuracy = self.model.evaluate(
[self.inputs_train, self.queries_train], self.answers_train, verbose=0
)
return {"mean_accuracy": accuracy}
def save_checkpoint(self, checkpoint_dir):
file_path = checkpoint_dir + "/model"
self.model.save(file_path)
def load_checkpoint(self, checkpoint_dir):
# See https://stackoverflow.com/a/42763323
del self.model
file_path = checkpoint_dir + "/model"
self.model = load_model(file_path)
if __name__ == "__main__":
import ray
from ray.tune.schedulers import PopulationBasedTraining
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing"
)
args, _ = parser.parse_known_args()
if args.smoke_test:
ray.init(num_cpus=2)
perturbation_interval = 2
pbt = PopulationBasedTraining(
perturbation_interval=perturbation_interval,
hyperparam_mutations={
"dropout": lambda: np.random.uniform(0, 1),
"lr": lambda: 10 ** np.random.randint(-10, 0),
"rho": lambda: np.random.uniform(0, 1),
},
)
tuner = tune.Tuner(
MemNNModel,
run_config=train.RunConfig(
name="pbt_babi_memnn",
stop={"training_iteration": 4 if args.smoke_test else 100},
checkpoint_config=train.CheckpointConfig(
checkpoint_frequency=perturbation_interval,
checkpoint_score_attribute="mean_accuracy",
num_to_keep=2,
),
),
tune_config=tune.TuneConfig(
scheduler=pbt,
metric="mean_accuracy",
mode="max",
num_samples=2,
reuse_actors=True,
),
param_space={
"finish_fast": args.smoke_test,
"batch_size": 32,
"epochs": 1,
"dropout": 0.3,
"lr": 0.01,
"rho": 0.9,
},
)
tuner.fit()