{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Learning to Play Pong\n\nIn this example, we'll train a **very simple** neural network to play Pong using\nthe OpenAI Gym.\n\nAt a high level, we will use multiple Ray actors to obtain simulation rollouts\nand calculate gradient simultaneously. We will then centralize these\ngradients and update the neural network. The updated neural network will\nthen be passed back to each Ray actor for more gradient calculation.\n\nThis application is adapted, with minimal modifications, from\nAndrej Karpathy's `source code`_ (see the accompanying `blog post`_).\n\n\n\n\nTo run the application, first install some dependencies.\n\n.. code-block:: bash\n\n pip install gym[atari]\n\nAt the moment, on a large machine with 64 physical cores, computing an update\nwith a batch of size 1 takes about 1 second, a batch of size 10 takes about 2.5\nseconds. A batch of size 60 takes about 3 seconds. On a cluster with 11 nodes,\neach with 18 physical cores, a batch of size 300 takes about 10 seconds. If the\nnumbers you see differ from these by much, take a look at the\n**Troubleshooting** section at the bottom of this page and consider `submitting\nan issue`_.\n\n\n**Note** that these times depend on how long the rollouts take, which in turn\ndepends on how well the policy is doing. For example, a really bad policy will\nlose very quickly. As the policy learns, we should expect these numbers to\nincrease.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\nimport os\nimport ray\nimport time\n\nimport gym"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Hyperparameters\n\nHere we'll define a couple of the hyperparameters that are used.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"H = 200 # The number of hidden layer neurons.\ngamma = 0.99 # The discount factor for reward.\ndecay_rate = 0.99 # The decay factor for RMSProp leaky sum of grad^2.\nD = 80 * 80 # The input dimensionality: 80x80 grid.\nlearning_rate = 1e-4 # Magnitude of the update."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Helper Functions\n\nWe first define a few helper functions:\n\n1. Preprocessing: The ``preprocess`` function will\npreprocess the original 210x160x3 uint8 frame into a one-dimensional 6400\nfloat vector.\n\n2. Reward Processing: The ``process_rewards`` function will calculate\na discounted reward. This formula states that the \"value\" of a\nsampled action is the weighted sum of all rewards afterwards,\nbut later rewards are exponentially less important.\n\n3. Rollout: The ``rollout`` function plays an entire game of Pong (until\neither the computer or the RL agent loses).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def preprocess(img):\n # Crop the image.\n img = img[35:195]\n # Downsample by factor of 2.\n img = img[::2, ::2, 0]\n # Erase background (background type 1).\n img[img == 144] = 0\n # Erase background (background type 2).\n img[img == 109] = 0\n # Set everything else (paddles, ball) to 1.\n img[img != 0] = 1\n return img.astype(np.float).ravel()\n\n\ndef process_rewards(r):\n \"\"\"Compute discounted reward from a vector of rewards.\"\"\"\n discounted_r = np.zeros_like(r)\n running_add = 0\n for t in reversed(range(0, r.size)):\n # Reset the sum, since this was a game boundary (pong specific!).\n if r[t] != 0:\n running_add = 0\n running_add = running_add * gamma + r[t]\n discounted_r[t] = running_add\n return discounted_r\n\n\ndef rollout(model, env):\n \"\"\"Evaluates env and model until the env returns \"Done\".\n\n Returns:\n xs: A list of observations\n hs: A list of model hidden states per observation\n dlogps: A list of gradients\n drs: A list of rewards.\n\n \"\"\"\n # Reset the game.\n observation = env.reset()\n # Note that prev_x is used in computing the difference frame.\n prev_x = None\n xs, hs, dlogps, drs = [], [], [], []\n done = False\n while not done:\n cur_x = preprocess(observation)\n x = cur_x - prev_x if prev_x is not None else np.zeros(D)\n prev_x = cur_x\n\n aprob, h = model.policy_forward(x)\n # Sample an action.\n action = 2 if np.random.uniform() < aprob else 3\n\n # The observation.\n xs.append(x)\n # The hidden state.\n hs.append(h)\n y = 1 if action == 2 else 0 # A \"fake label\".\n # The gradient that encourages the action that was taken to be\n # taken (see http://cs231n.github.io/neural-networks-2/#losses if\n # confused).\n dlogps.append(y - aprob)\n\n observation, reward, done, info = env.step(action)\n\n # Record reward (has to be done after we call step() to get reward\n # for previous action).\n drs.append(reward)\n return xs, hs, dlogps, drs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Neural Network\nHere, a neural network is used to define a \"policy\"\nfor playing Pong (that is, a function that chooses an action given a state).\n\nTo implement a neural network in NumPy, we need to provide helper functions\nfor calculating updates and computing the output of the neural network\ngiven an input, which in our case is an observation.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class Model(object):\n \"\"\"This class holds the neural network weights.\"\"\"\n\n def __init__(self):\n self.weights = {}\n self.weights[\"W1\"] = np.random.randn(H, D) / np.sqrt(D)\n self.weights[\"W2\"] = np.random.randn(H) / np.sqrt(H)\n\n def policy_forward(self, x):\n h = np.dot(self.weights[\"W1\"], x)\n h[h < 0] = 0 # ReLU nonlinearity.\n logp = np.dot(self.weights[\"W2\"], h)\n # Softmax\n p = 1.0 / (1.0 + np.exp(-logp))\n # Return probability of taking action 2, and hidden state.\n return p, h\n\n def policy_backward(self, eph, epx, epdlogp):\n \"\"\"Backward pass to calculate gradients.\n\n Arguments:\n eph: Array of intermediate hidden states.\n epx: Array of experiences (observations.\n epdlogp: Array of logps (output of last layer before softmax/\n\n \"\"\"\n dW2 = np.dot(eph.T, epdlogp).ravel()\n dh = np.outer(epdlogp, self.weights[\"W2\"])\n # Backprop relu.\n dh[eph <= 0] = 0\n dW1 = np.dot(dh.T, epx)\n return {\"W1\": dW1, \"W2\": dW2}\n\n def update(self, grad_buffer, rmsprop_cache, lr, decay):\n \"\"\"Applies the gradients to the model parameters with RMSProp.\"\"\"\n for k, v in self.weights.items():\n g = grad_buffer[k]\n rmsprop_cache[k] = (decay * rmsprop_cache[k] + (1 - decay) * g**2)\n self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)\n\n\ndef zero_grads(grad_buffer):\n \"\"\"Reset the batch gradient buffer.\"\"\"\n for k, v in grad_buffer.items():\n grad_buffer[k] = np.zeros_like(v)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallelizing Gradients\nWe define an **actor**, which is responsible for taking a model and an env\nand performing a rollout + computing a gradient update.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ray.init()\n\n\n@ray.remote\nclass RolloutWorker(object):\n def __init__(self):\n # Tell numpy to only use one core. If we don't do this, each actor may\n # try to use all of the cores and the resulting contention may result\n # in no speedup over the serial version. Note that if numpy is using\n # OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you\n # probably need to do it from the command line (so it happens before\n # numpy is imported).\n os.environ[\"MKL_NUM_THREADS\"] = \"1\"\n self.env = gym.make(\"Pong-v0\")\n\n def compute_gradient(self, model):\n # Compute a simulation episode.\n xs, hs, dlogps, drs = rollout(model, self.env)\n reward_sum = sum(drs)\n # Vectorize the arrays.\n epx = np.vstack(xs)\n eph = np.vstack(hs)\n epdlogp = np.vstack(dlogps)\n epr = np.vstack(drs)\n\n # Compute the discounted reward backward through time.\n discounted_epr = process_rewards(epr)\n # Standardize the rewards to be unit normal (helps control the gradient\n # estimator variance).\n discounted_epr -= np.mean(discounted_epr)\n discounted_epr /= np.std(discounted_epr)\n # Modulate the gradient with advantage (the policy gradient magic\n # happens right here).\n epdlogp *= discounted_epr\n return model.policy_backward(eph, epx, epdlogp), reward_sum"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running\n\nThis example is easy to parallelize because the network can play ten games\nin parallel and no information needs to be shared between the games.\n\nIn the loop, the network repeatedly plays games of Pong and\nrecords a gradient from each game. Every ten games, the gradients are\ncombined together and used to update the network.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"iterations = 20\nbatch_size = 4\nmodel = Model()\nactors = [RolloutWorker.remote() for _ in range(batch_size)]\n\nrunning_reward = None\n# \"Xavier\" initialization.\n# Update buffers that add up gradients over a batch.\ngrad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()}\n# Update the rmsprop memory.\nrmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()}\n\nfor i in range(1, 1 + iterations):\n model_id = ray.put(model)\n gradient_ids = []\n # Launch tasks to compute gradients from multiple rollouts in parallel.\n start_time = time.time()\n gradient_ids = [\n actor.compute_gradient.remote(model_id) for actor in actors\n ]\n for batch in range(batch_size):\n [grad_id], gradient_ids = ray.wait(gradient_ids)\n grad, reward_sum = ray.get(grad_id)\n # Accumulate the gradient over batch.\n for k in model.weights:\n grad_buffer[k] += grad[k]\n running_reward = (reward_sum if running_reward is None else\n running_reward * 0.99 + reward_sum * 0.01)\n end_time = time.time()\n print(\"Batch {} computed {} rollouts in {} seconds, \"\n \"running mean is {}\".format(i, batch_size, end_time - start_time,\n running_reward))\n model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate)\n zero_grads(grad_buffer)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}