{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed Synchronous Value Iteration\n", "***\n", "\n", "The goal of this assignment is to implement both single-core and distributed versions of syncronous value iteration (VI). In particuar, VI will be applied to Markov Decision Processes (MDPs) in order to compute policies that optimize expected infinite horizon discounted cummulative reward. \n", "\n", "The relevant content about MDPs and VI are in the following course notes from CS533. \n", "\n", "https://oregonstate.instructure.com/courses/1719746/files/74716197/download?wrap=1\n", "https://oregonstate.instructure.com/courses/1719746/files/74828408/download?wrap=1\n", "\n", "\n", "### Synchronous Value Iteration Recap\n", "\n", "Below is a review of the synchronous value iteration algorithm. The algorithm is iterative and each iteration produces a newly updated value function $V_{new}$ based on the value function from the previous iteration $V_{curr}$. This is done by applying the Bellman backup operator to $V_{curr}$ at each state. That is, \n", "\\begin{equation}\n", "V_{new}(s) = \\max_{a\\in A} R(s,a) + \\beta \\sum_{s'\\in S} T(s,a,s') V_{curr}(s')\n", "\\end{equation}\n", "where $\\beta \\in [0,1)$ is the discount factor, $R$ is the reward function, and $T$ is the transition function. \n", "\n", "The algorithm also maintains the greedy policy $\\pi$ at each iteration, which is based on a one-step look ahead operator: \n", "\\begin{equation}\n", "\\pi_{curr}(s) = \\arg\\max_{a\\in A} R(s,a) + \\beta \\sum_{s'\\in S} T(s,a,s') V_{curr}(s')\n", "\\end{equation}\n", "\n", "After an update we define the Bellman error of that iteration as $\\max_s |V_{new}(s)-V_{curr}(s)|$. In the notes, it is shown that this error allows us to bound the difference between the value function of $\\pi_{curr}$ and the optimal value function $V^{*}$. Thus, a typical stopping condition for VI is to iterate until the Bellman error is below a specified threshold $\\epsilon$. Putting everything together, the overall algorithm is as follows:\n", "\n", "- Start with $V_{curr}(s) = 0$ for all $s$\n", "- error = $\\infty$\n", "- While error > $\\epsilon$ \n", " - For each state $s$ \n", " - $V_{new}(s) = \\max_{a\\in A} R(s,a) + \\beta \\sum_{s'\\in S} T(s,a,s') V_{curr}(s')$\n", " - $\\pi_{curr}(s) = \\arg\\max_{a\\in A} R(s,a) + \\beta \\sum_{s'\\in S} T(s,a,s') V_{curr}(s')$\n", " - error = $\\max_s |V_{new}(s)-V_{curr}(s)|$ ;; could do this incrementally \n", " - $V_{curr} = V_{new}$\n", "\n", "The reason we refer to this version of VI as synchronous is because it maintains both a current and new value function, where all values of the new value function are computed based on the fixed current value function. That is, each iteration updates all states based on the value function of the previous iteration. \n", "\n", "To simplify this first assignment, we have decided to focus on Synchronous VI and to investigate how to best create a distributed implementation using the Ray framework. In particular, a distributed version of Synchronous VI should still produce a sequence of value functions and policies that are equivalent to those that would be produced by a single-core version, but ideally do so much faster. The remainder of this notebook guides you through some of the MDP mechanics and algorithm implementations. The grand finale of this first assignment is a competition where you will try to develop the fasted distributed implementation that you can. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# You will need to uncomment the following pip commands if the libraries need to be installed. \n", "# You may get some errors related to readchar, but they should not break the project.\n", "\n", "#!pip install --user readchar\n", "#!pip install --user gym" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import ray\n", "import time\n", "from copy import deepcopy\n", "import matplotlib.pyplot as plt\n", "from random import randint, choice\n", "%matplotlib inline\n", "import pickle" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## FrozenLake\n", "\n", "We will use the FrozenLake environment as the MDP environment for this experiment. This is a type of gridworld environment, whose size (number of states) can be controlled by adjusting the grid dimensions. The environment is intended to model the process of navigating a frozen lake, while avoiding falling into holes with the objective of reaching a goal location. \n", "\n", "The environment is defined as follows:\n", "\n", "- The environment is a rectangular grid of states/cells. There are four different types of cells as indicated by the following cell labels: \n", "\n", " - S labels the starting/initial cell, always in the top left corner\n", " \n", " - F labels frozen cells that are safe to step on\n", "\n", " - H labels holes and if the agent enters a hole cell there is a pentalty of -1000 and the episode ends\n", "\n", " - G labels the goal cell and when reached gives a reward of 1000 and the episode ends\n", "\n", "- There are four possible actions (Left, Right, Down, Up). \n", "\n", "- The transition function moves the agent in the expected direction with 0.7 probability, and there is a 0.3 probability of transitioning to one of the other randomly selected directions. \n", "\n", "- There is a reward of -1 for each action taken by the agent, which is intended to encourage the agent to reach the goal as fast as possible. \n", "\n", "- Episodes end whenever the agent falls in a hole or reaches the goal. An end-of-episode is modeled by transitioning to a zero-reward terminal state (all actions lead to that state). \n", " \n", "Below is the code for the FrozenLake environment class, which has the following functions that will be used in this assignment: \n", "\n", "- FrozenLake.GetSuccesors() : Take a state and an action as input, and return a list of pairs, where each pair $(s',p)$ is a successor state $s'$ with non-zero probability and $p$ is the probability of transitioning to $p$. \n", "\n", "- FrozenLake.GetTransitionProb() : Take a state, an action, a next state as input, and return the probability of the transition \n", "\n", "- FrozenLake.GetReward() : Take a state and an action as input, and return the reward of that.\n", "\n", "The version we are using for the assignment 2 is a modified version of the environment at the following location. \n", " \n", "Source: https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py \n", "\n", "Execute the following cell to initialize the MDP environments. (You do not need to change the code in this part.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "from contextlib import closing\n", "\n", "import numpy as np\n", "from six import StringIO, b\n", "\n", "from gym import utils\n", "from gym.envs.toy_text import discrete\n", "\n", "LEFT = 0\n", "DOWN = 1\n", "RIGHT = 2\n", "UP = 3\n", "np.set_printoptions(threshold=sys.maxsize, linewidth=sys.maxsize, precision = 2)\n", "TransitionProb = [0.7, 0.1, 0.1, 0.1]\n", "def generate_row(length, h_prob):\n", " row = np.random.choice(2, length, p=[1.0 - h_prob, h_prob])\n", " row = ''.join(list(map(lambda z: 'F' if z == 0 else 'H', row)))\n", " return row\n", "\n", "\n", "def generate_map(shape):\n", " \"\"\"\n", "\n", " :param shape: Width x Height\n", " :return: List of text based map\n", " \"\"\"\n", " h_prob = 0.1\n", " grid_map = []\n", "\n", " for h in range(shape[1]):\n", "\n", " if h == 0:\n", " row = 'SF'\n", " row += generate_row(shape[0] - 2, h_prob)\n", " elif h == 1:\n", " row = 'FF'\n", " row += generate_row(shape[0] - 2, h_prob)\n", "\n", " elif h == shape[1] - 1:\n", " row = generate_row(shape[0] - 2, h_prob)\n", " row += 'FG'\n", " elif h == shape[1] - 2:\n", " row = generate_row(shape[0] - 2, h_prob)\n", " row += 'FF'\n", " else:\n", " row = generate_row(shape[0], h_prob)\n", "\n", " grid_map.append(row)\n", " del row\n", "\n", " return grid_map\n", "\n", "\n", "\n", "MAPS = {\n", " \n", " \"4x4\": [\n", " \"SFFF\",\n", " \"FHFH\",\n", " \"FFFH\",\n", " \"HFFG\"\n", " ],\n", " \"8x8\": [\n", " \"SFFFFFFF\",\n", " \"FFFFFFFF\",\n", " \"FFFHFFFF\",\n", " \"FFFFFHFF\",\n", " \"FFFHFFFF\",\n", " \"FHHFFFHF\",\n", " \"FHFFHFHF\",\n", " \"FFFHFFFG\"\n", " ],\n", " \"16x16\": [\n", " \"SFFFFFFFFHFFFFHF\",\n", " \"FFFFFFFFFFFFFHFF\",\n", " \"FFFHFFFFHFFFFFFF\",\n", " \"FFFFFFFFHFFFFFFF\",\n", " \"FFFFFFFFFFFFFFFF\",\n", " \"FFHHFFFFFFFHFFFH\",\n", " \"FFFFFFFFFFFFFFFF\",\n", " \"FFFFFHFFFFFFHFFF\",\n", " \"FFFFFHFFFFFFFFFH\",\n", " \"FFFFFFFHFFFFFFFF\",\n", " \"FFFFFFFFFFFFHFFF\",\n", " \"FFFFFFHFFFFFFFFF\",\n", " \"FFFFFFFFHFFFFFFF\",\n", " \"FFFFFFFFFHFFFFHF\",\n", " \"FFFFFFFFFFHFFFFF\",\n", " \"FFFHFFFFFFFFFFFG\",\n", " ],\n", " \n", " \"32x32\": [\n", " 'SFFHFFFFFFFFFFFFFFFFFFFFFFHFFFFF',\n", " 'FFHFHHFFHFFFFFFFFFFFFFFFFFHFFFFF',\n", " 'FFFHFFFFFFFFHFFHFFFFFFFFFFFFFFFF',\n", " 'FFFFFFFFFFFFFFHFHHFHFHFFFFFHFFFH',\n", " 'FFFFHFFFFFFFFFFFFFFFHFHFFFFFFFHF',\n", " 'FFFFFHFFFFFFFFFFHFFFFFFFFFFHFFFF',\n", " 'FFHHFFFFHFFFFFFFFFFFFFFFFFFFFFFF',\n", " 'FFFHFFFFFFFFFFHFFFHFHFFFFFFFFHFF',\n", " 'FFFFHFFFFFFHFFFFHFHFFFFFFFFFFFFH',\n", " 'FFFFHHFHFFFFHFFFFFFFFFFFFFFFFFFF',\n", " 'FHFFFFFFFFFFHFFFFFFFFFFFHHFFFHFH',\n", " 'FFFHFFFHFFFFFFFFFFFFFFFFFFFFHFFF',\n", " 'FFFHFHFFFFFFFFHFFFFFFFFFFFFHFFHF',\n", " 'FFFFFFFFFFFFFFFFHFFFFFFFHFFFFFFF',\n", " 'FFFFFFHFFFFFFFFHHFFFFFFFHFFFFFFF',\n", " 'FFHFFFFFFFFFHFFFFFFFFFFHFFFFFFFF',\n", " 'FFFHFFFFFFFFFHFFFFHFFFFFFHFFFFFF',\n", " 'FFFFFFFFFFFFFFFFFFFFFFFFFFHFFFFF',\n", " 'FFFFFFFFHFFFFFFFHFFFFFFFFFFFFFFH',\n", " 'FFHFFFFFFFFFFFFFFFHFFFFFFFFFFFFF',\n", " 'FFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFF',\n", " 'FFFFFFFFFFFFFFFHFFFFHFFFFFFFHFFF',\n", " 'FFHFFFFHFFFFFFFFFHFFFFFFFFFFFHFH',\n", " 'FFFFFFFFFFHFFFFHFFFFFFFFFFFFFFFF',\n", " 'FFFFFFFFFFFFFFFFFHHFFHHHFFFHFFFF',\n", " 'FFFFFFFFFFFFFFHFFFFHFFFFFFFHFFFF',\n", " 'FFFFFFFHFFFFFFFFFFFFFFFFFFFFFFFF',\n", " 'FFFFFHFFFFFFFFFFFFFFFFHFFHFFFFFF',\n", " 'FFFFFFFHFFFFFFFFFHFFFFFFFFFFFFFF',\n", " 'FFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFF',\n", " 'FFFFFFFFFFFFFFFFFFFFFFFFHFFFFFFF',\n", " 'FFFFFFFFFFFFFFFHFFFFFFFFHFFFFFFG',\n", " ]\n", "}\n", "\n", "\n", "def generate_random_map(size=8, p=0.8):\n", " \"\"\"Generates a random valid map (one that has a path from start to goal)\n", " :param size: size of each side of the grid\n", " :param p: probability that a tile is frozen\n", " \"\"\"\n", " valid = False\n", "\n", " # BFS to check that it's a valid path.\n", " def is_valid(arr, r=0, c=0):\n", " if arr[r][c] == 'G':\n", " return True\n", "\n", " tmp = arr[r][c]\n", " arr[r][c] = \"#\"\n", "\n", " # Recursively check in all four directions.\n", " directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]\n", " for x, y in directions:\n", " r_new = r + x\n", " c_new = c + y\n", " if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:\n", " continue\n", "\n", " if arr[r_new][c_new] not in '#H':\n", " if is_valid(arr, r_new, c_new):\n", " arr[r][c] = tmp\n", " return True\n", "\n", " arr[r][c] = tmp\n", " return False\n", "\n", " while not valid:\n", " p = min(1, p)\n", " res = np.random.choice(['F', 'H'], (size, size), p=[p, 1-p])\n", " res[0][0] = 'S'\n", " res[-1][-1] = 'G'\n", " valid = is_valid(res)\n", " return [\"\".join(x) for x in res]\n", "\n", "\n", "class FrozenLakeEnv(discrete.DiscreteEnv):\n", " \"\"\"\n", " Winter is here. You and your friends were tossing around a frisbee at the park\n", " when you made a wild throw that left the frisbee out in the middle of the lake.\n", " The water is mostly frozen, but there are a few holes where the ice has melted.\n", " If you step into one of those holes, you'll fall into the freezing water.\n", " At this time, there's an international frisbee shortage, so it's absolutely imperative that\n", " you navigate across the lake and retrieve the disc.\n", " However, the ice is slippery, so you won't always move in the direction you intend.\n", " The surface is described using a grid like the following\n", "\n", " SFFF\n", " FHFH\n", " FFFH\n", " HFFG\n", "\n", " S : starting point, safe\n", " F : frozen surface, safe\n", " H : hole, fall to your doom\n", " G : goal, where the frisbee is located\n", "\n", " The episode ends when you reach the goal or fall in a hole.\n", " You receive a reward of 1 if you reach the goal, and zero otherwise.\n", "\n", " \"\"\"\n", "\n", " metadata = {'render.modes': ['human', 'ansi']}\n", "\n", " def __init__(self, desc=None, map_name=\"4x4\",is_slippery=True):\n", " if desc is None and map_name is None:\n", " desc = generate_random_map()\n", " elif desc is None:\n", " desc = MAPS[map_name]\n", " self.desc = desc = np.asarray(desc,dtype='c')\n", " self.nrow, self.ncol = nrow, ncol = desc.shape\n", " self.reward_range = (0, 1)\n", "\n", " nA = 4\n", " nS = nrow * ncol\n", "\n", " isd = np.array(desc == b'S').astype('float64').ravel()\n", " isd /= isd.sum()\n", "\n", " rew_hole = -1000\n", " rew_goal = 1000\n", " rew_step = -1\n", " \n", " P = {s : {a : [] for a in range(nA)} for s in range(nS)}\n", " self.TransitProb = np.zeros((nA, nS + 1, nS + 1))\n", " self.TransitReward = np.zeros((nS + 1, nA))\n", " \n", " def to_s(row, col):\n", " return row*ncol + col\n", " \n", " def inc(row, col, a):\n", " if a == LEFT:\n", " col = max(col-1,0)\n", " elif a == DOWN:\n", " row = min(row+1,nrow-1)\n", " elif a == RIGHT:\n", " col = min(col+1,ncol-1)\n", " elif a == UP:\n", " row = max(row-1,0)\n", " return (row, col)\n", "\n", " for row in range(nrow):\n", " for col in range(ncol):\n", " s = to_s(row, col)\n", " for a in range(4):\n", " li = P[s][a]\n", " letter = desc[row, col]\n", " if letter in b'H':\n", " li.append((1.0, s, 0, True))\n", " self.TransitProb[a, s, nS] = 1.0\n", " self.TransitReward[s, a] = rew_hole\n", " elif letter in b'G':\n", " li.append((1.0, s, 0, True))\n", " self.TransitProb[a, s, nS] = 1.0\n", " self.TransitReward[s, a] = rew_goal\n", " else:\n", " if is_slippery:\n", " #for b in [(a-1)%4, a, (a+1)%4]:\n", " for b, p in zip([a, (a+1)%4, (a+2)%4, (a+3)%4], TransitionProb):\n", " newrow, newcol = inc(row, col, b)\n", " newstate = to_s(newrow, newcol)\n", " newletter = desc[newrow, newcol]\n", " done = bytes(newletter) in b'GH'\n", " #rew = float(newletter == b'G')\n", " #li.append((1.0/10.0, newstate, rew, done))\n", " if newletter == b'G':\n", " rew = rew_goal\n", " elif newletter == b'H':\n", " rew = rew_hole\n", " else:\n", " rew = rew_step\n", " li.append((p, newstate, rew, done))\n", " self.TransitProb[a, s, newstate] += p\n", " self.TransitReward[s, a] = rew_step\n", " else:\n", " newrow, newcol = inc(row, col, a)\n", " newstate = to_s(newrow, newcol)\n", " newletter = desc[newrow, newcol]\n", " done = bytes(newletter) in b'GH'\n", " rew = float(newletter == b'G')\n", " li.append((1.0, newstate, rew, done))\n", "\n", " super(FrozenLakeEnv, self).__init__(nS, nA, P, isd)\n", "\n", " def render(self, mode='human'):\n", " outfile = StringIO() if mode == 'ansi' else sys.stdout\n", "\n", " row, col = self.s // self.ncol, self.s % self.ncol\n", " desc = self.desc.tolist()\n", " desc = [[c.decode('utf-8') for c in line] for line in desc]\n", " desc[row][col] = utils.colorize(desc[row][col], \"red\", highlight=True)\n", " if self.lastaction is not None:\n", " outfile.write(\" ({})\\n\".format([\"Left\",\"Down\",\"Right\",\"Up\"][self.lastaction]))\n", " else:\n", " outfile.write(\"\\n\")\n", " outfile.write(\"\\n\".join(''.join(line) for line in desc)+\"\\n\")\n", "\n", " if mode != 'human':\n", " with closing(outfile):\n", " return outfile.getvalue()\n", " \n", " def GetSuccessors(self, s, a):\n", " next_states = np.nonzero(self.TransitProb[a, s, :])\n", " probs = self.TransitProb[a, s, next_states]\n", " return [(s,p) for s,p in zip(next_states[0], probs[0])]\n", " \n", " def GetTransitionProb(self, s, a, ns):\n", " return self.TransitProb[a, s, ns]\n", " \n", " def GetReward(self, s, a):\n", " return self.TransitReward[s, a]\n", " \n", " def GetStateSpace(self):\n", " return self.TransitProb.shape[1]\n", " \n", " def GetActionSpace(self):\n", " return self.TransitProb.shape[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Play Game\n", "Have Fun! \n", "(You don't have to do this part, but if you do make sure to use quite using \"q\" so that you can continue.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"---------actions--------\")\n", "print(\"a: Left\\ns: Down\\nd: Right\\nw: Up\\n(q: quit)\")\n", "env = FrozenLakeEnv(map_name=\"16x16\")\n", "env.render()\n", "rew = 0\n", "for _ in range(1000):\n", " a = input(\"input action: \")\n", " if a == 'a':\n", " a = 0\n", " elif a == 's':\n", " a = 1\n", " elif a == 'd':\n", " a = 2\n", " elif a == 'w':\n", " a = 3\n", " elif a == 'q':\n", " break\n", " else:\n", " print('illegal input')\n", " continue\n", " observation, reward, done, info = env.step(a)\n", " rew += reward\n", " print(chr(27) + \"[2J\")\n", " print(\"---------actions--------\")\n", " print(\"a: Left\\ns: Down\\nd: Right\\nw: Up\\n(q: quit)\")\n", " print()\n", " print(\"current state:\" + str(observation))\n", " if info['prob'] == TransitionProb[0] or info['prob'] == 1:\n", " print('move to expected direstion')\n", " else:\n", " print('move to unexpected direstion')\n", " print(\"probabilty: \" + str(info['prob']))\n", " print(\"current reward:\" + str(rew))\n", " print()\n", " env.render()\n", " print()\n", " if done:\n", " print('end')\n", " break\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "***\n", "## Initializations\n", "\n", "Run the following cell to initilize maps of different sizes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "map_8 = (MAPS[\"8x8\"], 8)\n", "map_16 = (MAPS[\"16x16\"], 16)\n", "map_32 = (MAPS[\"32x32\"], 32)\n", "#map_50 = (generate_map((50,50)), 50)\n", "#map_110 = (generate_map((110,110)), 110)\n", "\n", "MAP = map_8\n", "map_size = MAP[1]\n", "run_time = {}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Empirical Policy Evaluation \n", "\n", "As a warm up we are going to get experience running a policy in an MDP to empirically evalute the performance of the policy. \n", "\n", "Run the following cell to define the policy evaluation function, which allows us to run a specified policy in a specified environment for a specified number of trials. The function assumes that the trials will terminate for any policy, which is indicated by the \"done\" variable returned by the environment. This version of the function measures performance by total cummulative reward. Since the environment is stochastic each trial may return a different total reward. This function returns the average cummulative reward across the trials. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def evaluate_policy(env, policy, trials = 1000):\n", " total_reward = 0\n", " for _ in range(trials):\n", " env.reset()\n", " done = False\n", " observation, reward, done, info = env.step(policy[0])\n", " total_reward += reward\n", " while not done:\n", " observation, reward, done, info = env.step(policy[observation])\n", " total_reward += reward\n", " return total_reward / trials" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Discounted Policy Evaluation \n", "Create a modified version of the above evaluation function that measure the discounted total reward rather than just the total reward as above. The discount factor is specified via a parameter to the function. Specifically, if a trial results in a sequence of rewards: $r_0, r_1, r_2, r_3$ the discounted total reward would be $r_0 + \\beta r_1 + \\beta^2 r_2 + \\beta^3 r_3$, where $\\beta$ is the discount factor. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def evaluate_policy_discounted(env, policy, discount_factor, trials = 1000):\n", " total_reward = 0\n", " #INSERT YOUR CODE HERE\n", " return total_reward / trials" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Helper Function\n", "Execute the following cell to define the print function. This function shows the policy and state values and saves them to disk. We will use this later in the assignment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def print_results(v, pi, map_size, env, beta, name):\n", " v_np, pi_np = np.array(v), np.array(pi)\n", " print(\"\\nState Value:\\n\")\n", " print(np.array(v_np[:-1]).reshape((map_size,map_size)))\n", " print(\"\\nPolicy:\\n\")\n", " print(np.array(pi_np[:-1]).reshape((map_size,map_size)))\n", " print(\"\\nAverage reward: {}\\n\".format(evaluate_policy(env, pi)))\n", " print(\"Avereage discounted reward: {}\\n\".format(evaluate_policy_discounted(env, pi, discount_factor = beta)))\n", " print(\"State Value image view:\\n\")\n", " plt.imshow(np.array(v_np[:-1]).reshape((map_size,map_size)))\n", " \n", " pickle.dump(v, open(name + \"_\" + str(map_size) + \"_v.pkl\", \"wb\"))\n", " pickle.dump(pi, open(name + \"_\" + str(map_size) + \"_pi.pkl\", \"wb\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Random policy \n", "To provide a reference point for policy performance the following cell defines a random policy (selects actions uniformly at random) and evaluates it. Execute the cell and observe the results. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n", "env.render()\n", "pi = [0] * map_size * map_size\n", "for i in range(map_size * map_size):\n", " pi[i] = randint(0, 3)\n", "print(\"Average reward:\", evaluate_policy(env, pi))\n", "print(\"Average discounted reward:\", \n", " evaluate_policy_discounted(env, pi, discount_factor = 0.999))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Synchronous Value Iteration with full transition function\n", "\n", "In this section, you should implement the synchronous value iteration algorithm. A code skeleton is provided below. Complete the given code by implementing the Bellman backup operator. Recall that the Bellman backup for a state assuming the current value function is $V$ is given by:\n", "\\begin{equation}\n", "V_{new}(s) = \\max_{a\\in A} R(s,a) + \\beta \\sum_{s'\\in S} T(s,a,s') V(s')\n", "\\end{equation}\n", "\n", "For this part of the assignment you should implement this Bellman backup operator in a way that performs the sum over all possible next states $s' \\in S$. You will want to use the functions env.GetTransitionProb() to get the transition probabilities and env.GetReward() to get the rewards. In each iteration you need to do the following:\n", "\n", "1. Apply the Bellman backup to all state. \n", "2. Compute and update the Bellman error (see first part of document).\n", "3. Update the value and policy accordingly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def sync_value_iteration_v1(env, beta = 0.999, epsilon = 0.0001):\n", " \n", " A = env.GetActionSpace()\n", " S = env.GetStateSpace()\n", " \n", " pi = [0] * S\n", " v = [0] * S\n", " \n", " pi_new = [0] * S\n", " v_new = [0] * S\n", " \n", " bellman_error = float('inf')\n", " while(bellman_error > epsilon):\n", " bellman_error = 0\n", " for state in range(S):\n", " max_v = float('-inf')\n", " max_a = 0\n", " for action in range(A):\n", " #INSERT YOUR CODE HERE\n", " \n", " \n", " v = deepcopy(v_new)\n", " pi = deepcopy(pi_new)\n", " \n", " return v, pi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following cell to see the output of your function and store the value and policy matrices to file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "beta = 0.999\n", "env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n", "print(\"Game Map:\")\n", "env.render()\n", "\n", "start_time = time.time()\n", "v, pi = sync_value_iteration_v1(env, beta = beta)\n", "v_np, pi_np = np.array(v), np.array(pi)\n", "end_time = time.time()\n", "run_time['Sync Value Iteration v1'] = end_time - start_time\n", "print(\"time:\", run_time['Sync Value Iteration v1'])\n", "\n", "print_results(v, pi, map_size, env, beta, 'sync_vi')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Synchronous Value Iteration Using GetSuccessors()\n", "The above version of value iteration can be very innefficient when the number of states is large because it iterates over all next states. In practice, it is usually the case that for any state $s$ and action $a$ most states have zero probability of being successors. We can exploit that fact to make value iteration more efficient. \n", "\n", "The goal of this part is to use GetSuccessors() function to decrease the running time. This function takes a state and an action as input and returns the possible next states (with non-zero transition probability) and their transition probabilities. this allows us to ignore all states with zero transition probability. Implement value iteration in the following cell following the previous implmentation. But, here, use the env.GetSuccessors() function ot limit the Bellman backup to only consider non-zero probability states in the summation over next states. Using this function you will not need the GetTransitionProb function.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "def sync_value_iteration_v2(env, beta = 0.999, epsilon = 0.0001):\n", " \n", " A = env.GetActionSpace()\n", " S = env.GetStateSpace()\n", " \n", " pi = [0] * S\n", " v = [0] * S\n", " \n", " pi_new = [0] * S\n", " v_new = [0] * S\n", " \n", " error = float('inf')\n", " \n", " #INSERT YOUR CODE HERE\n", " \n", " return v, pi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following cell to see the output of your function and store the value and policy matrices to file. Note the time taken for this version versus the previous version of value iteration. The computation time should be significantly smaller for this later version that uses GetSuccessors. Because of this time savings, for the remainder of this assignment you should implement Bellman backups using GetSuccessors. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "beta = 0.999\n", "env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n", "print(\"Game Map:\")\n", "env.render()\n", "\n", "start_time = time.time()\n", "v, pi = sync_value_iteration_v2(env, beta = beta)\n", "v_np, pi_np = np.array(v), np.array(pi)\n", "end_time = time.time()\n", "run_time['Sync Value Iteration v2'] = end_time - start_time\n", "print(\"time:\", run_time['Sync Value Iteration v2'])\n", "\n", "print_results(v, pi, map_size, env, beta, 'sync_vi_gs')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Initialize Ray\n", "\n", "Now we are going to use Ray to develop distributed versions of the above value iteration algorithm. The first step of course is to initialize Ray." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ray.shutdown()\n", "ray.init(include_webui=False, ignore_reinit_error=True, redis_max_memory=500000000, object_store_memory=5000000000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed Synchronous Value Iteration -- Version 1\n", "A simple way to distribute Value Iteration would be to implement each iteration by having a each state updated by a distinct worker. That is each state is updated by creating a work to do the Bellman backup for that state and then recording the result. In order to avoid creating an enormous number of workers, the first implementation will only allow a specified number of workers to be active at any time. After each iteration, the main process checks the Bellman error and if it is less than the specified epsilon it terminates. The following diagram demonstrates the architecture of such a system.\n", "\n", "\n", "```python\n", "\n", "\"\"\"\n", "\n", " +---------------+\n", " | |\n", " | Main Process |------------------------------------\n", " | | |\n", " | | |\n", " +---------------+ |\n", " | |\n", " | |\n", " | |\n", " | |\n", " | |\n", " +---Re-init Worker-----+-------------------+-----Re-init Worker---+ Check\n", " | | | | Coverage\n", "+-----------+ +-----------+ +-----------+ +-----------+ Iteratively\n", "| | | | | | | | |\n", "| Worker | | Worker | | Worker | | Worker | |\n", "| (env) | | (env) | | (env) | | (env) | | \n", "| | | | | | | | |\n", "+-----------+ +-----------+ +-----------+ +-----------+ |\n", " ^ ^ ^ ^ |\n", " | | | | |\n", " +------ One-Value ----+---------+---------+----- One-Value -----+ |\n", " | |\n", " | |\n", " +----------------+ | \n", " | | | \n", " | Value Server |------------------------------------- \n", " | | \n", " +----------------+\n", "\n", "\"\"\"\n", "\n", "```\n", "\n", "A key part of this implementation is the Value Server, which is a Ray actor that workers interface with to update the value function at each iteration. In order to avoid \n", "\n", "You need to complete the following code by adding the Bellman backup operator to it. Once you implemented the function, run the following cell to test it and to store the value and policy matrices to file. Note that this implementation should replicate the results of the non-distributed version of synchronous value iteration. \n", "\n", "Importantly you should see that this version is significantly slower than the above non-distributed version. Think about why this might be the case. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@ray.remote\n", "class VI_server_v1(object):\n", " def __init__(self,size):\n", " self.v_current = [0] * size\n", " self.pi = [0] * size\n", " self.v_new = [0] * size\n", " \n", " def get_value_and_policy(self):\n", " return self.v_current, self.pi\n", " \n", " def update(self, update_index, update_v, update_pi):\n", " self.v_new[update_index] = update_v\n", " self.pi[update_index] = update_pi\n", " \n", " def get_error_and_update(self):\n", " max_error = 0\n", " for i in range(len(self.v_current)):\n", " error = abs(self.v_new[i] - self.v_current[i])\n", " if error > max_error:\n", " max_error = error\n", " self.v_current[i] = self.v_new[i]\n", " \n", " return max_error\n", " \n", "@ray.remote\n", "def VI_worker_v1(VI_server, data, worker_id, update_state):\n", " env, workers_num, beta, epsilon = data\n", " A = env.GetActionSpace()\n", " S = env.GetStateSpace()\n", " \n", " # get shared variable \n", " V, _ = ray.get(VI_server.get_value_and_policy.remote())\n", " \n", " # bellman backup\n", " \n", " #INSERT YOUR CODE HERE\n", " \n", " VI_server.update.remote(update_state, max_v, max_a)\n", " \n", " # return ith worker\n", " return worker_id\n", " \n", "def sync_value_iteration_distributed_v1(env, beta = 0.999, epsilon = 0.01, workers_num = 4, stop_steps = 2000):\n", " S = env.GetStateSpace()\n", " VI_server = VI_server_v1.remote(S)\n", " workers_list = []\n", " data_id = ray.put((env, workers_num, beta, epsilon))\n", " \n", " start = 0\n", " # start the all worker, store their id in a list\n", " for i in range(workers_num):\n", " w_id = VI_worker_v1.remote(VI_server, data_id, i, start)\n", " workers_list.append(w_id)\n", " start += 1\n", " \n", " error = float('inf')\n", " while error > epsilon:\n", " for update_state in range(start, S):\n", " # Wait for one worker finishing, get its reuslt, and delete it from list\n", " finished_worker_id = ray.wait(workers_list, num_returns = 1, timeout = None)[0][0]\n", " finish_worker = ray.get(finished_worker_id)\n", " workers_list.remove(finished_worker_id)\n", "\n", " # start a new worker, and add it to the list\n", " w_id = VI_worker_v1.remote(VI_server, data_id, finish_worker, update_state)\n", " workers_list.append(w_id)\n", " start = 0\n", " error = ray.get(VI_server.get_error_and_update.remote())\n", "\n", " v, pi = ray.get(VI_server.get_value_and_policy.remote())\n", " return v, pi" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "beta = 0.999\n", "env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n", "print(\"Game Map:\")\n", "env.render()\n", "\n", "start_time = time.time()\n", "v, pi = sync_value_iteration_distributed_v1(env, beta = beta, workers_num = 4)\n", "v_np, pi_np = np.array(v), np.array(pi)\n", "end_time = time.time()\n", "run_time['Sync distributed v1'] = end_time - start_time\n", "print(\"time:\", run_time['Sync distributed v1'])\n", "\n", "print_results(v, pi, map_size, env, beta, 'dist_vi_v1')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed Synchronous Value Iteration -- Version 2\n", "\n", "One way to improve the above approach is to create a limited number of workers and have each worker perform backups on a batch of states. Effectively, this approach partitions the state space and uses a worker to handle each state subset of the partition. The following diagram demonstrates the architecture of such a system.\n", "\n", "\n", "```python\n", "\n", "\"\"\"\n", "\n", " +---------------+\n", " | |\n", " | Main Process |------------------------------------\n", " | | |\n", " | | |\n", " +---------------+ |\n", " | |\n", " | |\n", " | |\n", " | |\n", " | |\n", " +---Re-init Worker-----+-------------------+-----Re-init Worker---+ Check\n", " | | | | Coverage\n", "+-----------+ +-----------+ +-----------+ +-----------+ Iteratively\n", "| | | | | | | | |\n", "| Worker | | Worker | | Worker | | Worker | |\n", "| (env) | | (env) | | (env) | | (env) | | \n", "| | | | | | | | |\n", "+-----------+ +-----------+ +-----------+ +-----------+ |\n", " ^ ^ ^ ^ |\n", " | | | | |\n", " +---- Batch-Value ----+---------+---------+---- Batch-Value ----+ |\n", " | |\n", " | |\n", " +----------------+ | \n", " | | | \n", " | Value Server |------------------------------------- \n", " | | \n", " +----------------+\n", "\n", "\"\"\"\n", "\n", "```\n", "In this section, you should implement the idea described above.\n", "- Partition the states into batches. The number of batches should be equal to the number of the workers.\n", "- Create workers to handle each batch and run them\n", "- Terminate the workers once the error is less than the given epsilon\n", "\n", "Again, this implementation should exactly emulate the result of each iteration of non-distributed value iteration. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@ray.remote\n", "class VI_server_v2(object):\n", " #INSERT YOUR CODE HERE\n", " \n", "@ray.remote\n", "def VI_worker_v2(VI_server, data, start_state, end_state):\n", " env, workers_num, beta, epsilon = data\n", " A = env.GetActionSpace()\n", " S = env.GetStateSpace()\n", " \n", " #INSERT YOUR CODE HERE\n", " \n", "def sync_value_iteration_distributed_v2(env, beta = 0.999, epsilon = 0.01, workers_num = 4, stop_steps = 2000):\n", " S = env.GetStateSpace()\n", " VI_server = VI_server_v2.remote(S)\n", " workers_list = []\n", " data_id = ray.put((env, workers_num, beta, epsilon))\n", " #INSERT YOUR CODE HERE\n", "\n", " error = float('inf')\n", " while error > epsilon:\n", " #INSERT YOUR CODE HERE\n", " \n", " return v, pi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following code to see the running time of your code. This code stores the policy and state values to disk." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "beta = 0.999\n", "env = FrozenLakeEnv(desc = MAP[0], is_slippery = True)\n", "print(\"Game Map:\")\n", "env.render()\n", "\n", "start_time = time.time()\n", "v, pi = sync_value_iteration_distributed_v2(env, beta = beta, workers_num = 4)\n", "v_np, pi_np = np.array(v), np.array(pi)\n", "end_time = time.time()\n", "run_time['Sync distributed v2'] = end_time - start_time\n", "print(\"time:\", run_time['Sync distributed v2'])\n", "print_results(v, pi, map_size, env, beta, 'dist_vi_v2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Comparison of different approachs\n", "\n", "Run the following cell to compare the running time of different approaches. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from copy import deepcopy\n", "temp_dict = deepcopy(run_time)\n", "print(\"All:\")\n", "for _ in range(len(temp_dict)):\n", " min_v = float('inf')\n", " for k, v in temp_dict.items():\n", " if v is None:\n", " continue\n", " if v < min_v:\n", " min_v = v\n", " name = k\n", " temp_dict[name] = float('inf')\n", " print(name + \": \" + str(min_v))\n", " print()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Report\n", "Write a report that includes the following:\n", "- A plot that shows the running time of the above 4 approaches agianst the map sizes f 8, 16 and 32. \n", "- A plot that shows the running time of both distributed approaches agianst the number of the workers with 2, 4 and 8 workers.\n", "- Breifly explain why the second distributed method is faster than the first one?\n", "- Compere the best distributed method with the best non-distributed appraoch. Which one is better? Briefly explain why." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed Synchronous VI Competition\n", "In this part, you should design and implement your own distributed synchronous VI method based on what you have learned in the previous parts. Your implementation has the following constraints:\n", "- It must terminate and return a value function (and corresponding greedy policy) that satisfies the specified Bellman error threshold\n", "- It must be iterative in the sense that it produces the same sequence of value functions as non-distributed synchronous value iteration\n", "\n", "For this part, you should create a stand alone python file named `competition.py`. You can copy the needed functions from this notebook to your file. Your code should contain a main function called `fast_value_iteration` with the following exact signiture: \n", "\n", "`def fast_value_iteration(env, beta = 0.999, epsilon = 0.01, workers_num = 4)`\n", "\n", "Here epsilon is the Bellman error threshold and worker_num is the maximum number of workers. This function should return policy and value vectors that satsify the Bellman error constraint. \n", "\n", "To test your code, you should use an exclusive compution node of DevCloud. You can use the `qsub -I -lselect=1` command to connect to a computation node and run your code on it. We may test your programs on problems as large as 100x100 FrozenLake environments. \n", "\n", "Some possible ideas to consider\n", "\n", "- How should the number of workers be selected and how should states be partitioned across workers?\n", "- Are there alternative protocols between the server and workers?\n", "- Where are the communication bottlenecks in the system and how might they be improved? " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Deliverables\n", "\n", "Submit a zip file to Canvas that contains:\n", "- completed version of this notebook\n", "- the .pkl files generated by print_results function for your runs on map of size 8x8\n", "- a python file for distributed VI competition\n", "- your PDF report file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.1" } }, "nbformat": 4, "nbformat_minor": 2 }