{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Getting Started with the Ray Framework\n", "\n", "## Introduction\n", "\n", "Ray is a general-purpose framework for programming a cluster made by UC Berkeley's RISELab. It enables developers to easily parallelize their Python applications or build new ones, and run them at any scale, from a laptop to a large cluster. It also provides a highly flexible, yet minimalist and easy to use API. \n", "\n", "#### Documentation Reference Links:\n", "\n", "Ray official website: https://rise.cs.berkeley.edu/projects/ray/\n", "\n", "\n", "Ray documentation website: http://ray.readthedocs.io/en/latest/\n", "\n", "\n", "GitHub repository: https://github.com/ray-project/ray\n", "\n", "### Installation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install --user --ignore-installed funcsigs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Part 1 - Remote Functions\n", "\n", "This script is too slow, and the computation is embarrassingly parallel. In this exercise, you will use Ray to execute the functions in parallel to speed it up.\n", "\n", "The standard way to turn a Python function into a remote function is to add the `@ray.remote` decorator. Here is an example.\n", "\n", "```python\n", "# A regular Python function.\n", "def regular_function(x):\n", " return x + 1\n", "\n", "# A Ray remote function.\n", "@ray.remote\n", "def remote_function(x):\n", " return x + 1\n", "```\n", "\n", "The differences are the following:\n", "\n", "1. **Invocation:** The regular version is called with `regular_function(1)`, whereas the remote version is called with `remote_function.remote(1)`.\n", "2. **Return values:** `regular_function` immediately executes and returns `1`, whereas `remote_function` immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be obtained with `ray.get`.\n", " ```python\n", " >>> regular_function(0)\n", " 1\n", " \n", " >>> remote_function.remote(0)\n", " ObjectID(1c80d6937802cd7786ad25e50caf2f023c95e350)\n", " \n", " >>> ray.get(remote_function.remote(0))\n", " 1\n", " ```\n", "3. **Parallelism:** Invocations of `regular_function` happen **serially**, for example\n", " ```python\n", " # These happen serially.\n", " for _ in range(4):\n", " regular_function(0)\n", " ```\n", " whereas invocations of `remote_function` happen in **parallel**, for example\n", " ```python\n", " # These happen in parallel.\n", " for _ in range(4):\n", " remote_function.remote(0)\n", " ```" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "WARNING: Not monitoring node memory since `psutil` is not installed. Install this with `pip install psutil` (or ray[debug]) to enable debugging of memory-related crashes.\n" ] } ], "source": [ "from __future__ import absolute_import\n", "from __future__ import division\n", "from __future__ import print_function\n", "\n", "import ray\n", "import time\n", "import numpy as np\n", "import pickle" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start Ray. By default, Ray does not schedule more tasks concurrently than there are CPUs. This example requires four tasks to run concurrently, so we tell Ray that there are four CPUs. Usually this is not done and Ray computes the number of CPUs using `psutil.cpu_count()`. The argument `ignore_reinit_error=True` just ignores errors if the cell is run multiple times.\n", "\n", "The call to `ray.init` starts a number of processes." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2019-04-06 10:19:44,049\tWARNING worker.py:1406 -- WARNING: Not updating worker name since `setproctitle` is not installed. Install this with `pip install setproctitle` (or ray[debug]) to enable monitoring of worker processes.\n", "2019-04-06 10:19:44,052\tINFO node.py:423 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-04-06_10-19-44_21887/logs.\n", "2019-04-06 10:19:44,262\tINFO services.py:363 -- Waiting for redis server at 127.0.0.1:26837 to respond...\n", "2019-04-06 10:19:44,431\tINFO services.py:363 -- Waiting for redis server at 127.0.0.1:44319 to respond...\n", "2019-04-06 10:19:44,436\tINFO services.py:760 -- Starting Redis shard with 1.0 GB max memory.\n", "2019-04-06 10:19:44,535\tWARNING services.py:1236 -- Warning: Capping object memory store to 20.0GB. To increase this further, specify `object_store_memory` when calling ray.init() or ray start.\n", "2019-04-06 10:19:44,538\tINFO services.py:1384 -- Starting the Plasma object store with 20.0 GB memory using /dev/shm.\n", "2019-04-06 10:19:44,628\tWARNING services.py:863 -- Failed to start the reporter. The reporter requires 'pip install psutil'.\n" ] }, { "data": { "text/plain": [ "{'node_ip_address': None,\n", " 'redis_address': '10.9.1.17:26837',\n", " 'object_store_address': '/tmp/ray/session_2019-04-06_10-19-44_21887/sockets/plasma_store',\n", " 'webui_url': None,\n", " 'raylet_socket_name': '/tmp/ray/session_2019-04-06_10-19-44_21887/sockets/raylet'}" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True, redis_max_memory=1000000000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** The function below is slow. Turn it into a remote function using the `@ray.remote` decorator." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# This function is a proxy for a more interesting and computationally\n", "# intensive function.\n", "def slow_function(i):\n", " time.sleep(1)\n", " return i" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** The loop below takes too long. The four function calls could be executed in parallel. Instead of four seconds, it should only take one second. Once `slow_function` has been made a remote function, execute these four tasks in parallel by calling `slow_function.remote()`. Then obtain the results by calling `ray.get` on a list of the resulting object IDs." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The results are [0, 1, 2, 3]. This took 4.0047767162323 seconds. Run the next cell to see if the exercise was done correctly.\n" ] } ], "source": [ "# Sleep a little to improve the accuracy of the timing measurements below.\n", "# We do this because workers may still be starting up in the background.\n", "time.sleep(2.0)\n", "start_time = time.time()\n", "\n", "results = [slow_function(i) for i in range(4)]\n", "\n", "end_time = time.time()\n", "duration = end_time - start_time\n", "\n", "print('The results are {}. This took {} seconds. Run the next cell to see '\n", " 'if the exercise was done correctly.'.format(results, duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "ename": "AssertionError", "evalue": "The loop took 4.0047767162323 seconds. This is too slow.", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32massert\u001b[0m \u001b[0mresults\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'Did you remember to call ray.get?'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m assert duration < 1.1, ('The loop took {} seconds. This is too slow.'\n\u001b[0;32m----> 3\u001b[0;31m .format(duration))\n\u001b[0m\u001b[1;32m 4\u001b[0m assert duration > 1, ('The loop took {} seconds. This is too fast.'\n\u001b[1;32m 5\u001b[0m .format(duration))\n", "\u001b[0;31mAssertionError\u001b[0m: The loop took 4.0047767162323 seconds. This is too slow." ] } ], "source": [ "assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'\n", "assert duration < 1.1, ('The loop took {} seconds. This is too slow.'\n", " .format(duration))\n", "assert duration > 1, ('The loop took {} seconds. This is too fast.'\n", " .format(duration))\n", "\n", "print('Success! The example took {} seconds.'.format(duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Part 2 - Parallel Data Processing with Task Dependencies\n", "\n", "**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.\n", "\n", "In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.\n", "\n", "In this exercise, you will use Ray to parallelize the computation below and speed it up.\n", "\n", "### Concept for this Exercise - Task Dependencies\n", "\n", "Suppose we have two remote functions defined as follows.\n", "\n", "```python\n", "@ray.remote\n", "def f(x):\n", " return x\n", "```\n", "\n", "Arguments can be passed into remote functions as usual.\n", "\n", "```python\n", ">>> x1_id = f.remote(1)\n", ">>> ray.get(x1_id)\n", "1\n", "\n", ">>> x2_id = f.remote([1, 2, 3])\n", ">>> ray.get(x2_id)\n", "[1, 2, 3]\n", "```\n", "\n", "**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.\n", "\n", "```python\n", ">>> y1_id = f.remote(x1_id)\n", ">>> ray.get(y1_id)\n", "1\n", "\n", ">>> y2_id = f.remote(x2_id)\n", ">>> ray.get(y2_id)\n", "[1, 2, 3]\n", "```\n", "\n", "So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.\n", "\n", "**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.\n", "\n", "- The second task will not be executed until the first task has finished executing.\n", "- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "These are some helper functions that mimic an example pattern of a data parallel application.\n", "\n", "**EXERCISE:** You will need to turn all of these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def load_data(filename):\n", " time.sleep(0.1)\n", " return np.ones((1000, 100))\n", "\n", "def normalize_data(data):\n", " time.sleep(0.1)\n", " return data - np.mean(data, axis=0)\n", "\n", "def extract_features(normalized_data):\n", " time.sleep(0.1)\n", " return np.hstack([normalized_data, normalized_data ** 2])\n", "\n", "def compute_loss(features):\n", " num_data, dim = features.shape\n", " time.sleep(0.1)\n", " return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)\n", "\n", "assert hasattr(load_data, 'remote'), 'load_data must be a remote function'\n", "assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'\n", "assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'\n", "assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.\n", "\n", "**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sleep a little to improve the accuracy of the timing measurements below.\n", "time.sleep(2.0)\n", "start_time = time.time()\n", "\n", "losses = []\n", "for filename in ['file1', 'file2', 'file3', 'file4']:\n", " inner_start = time.time()\n", "\n", " data = load_data(filename)\n", " normalized_data = normalize_data(data)\n", " features = extract_features(normalized_data)\n", " loss = compute_loss(features)\n", " losses.append(loss)\n", " \n", " inner_end = time.time()\n", " \n", " if inner_end - inner_start >= 0.1:\n", " raise Exception('You may be calling ray.get inside of the for loop! '\n", " 'Doing this will prevent parallelism from being exposed. '\n", " 'Make sure to only call ray.get once outside of the for loop.')\n", "\n", "print('The losses are {}.'.format(losses) + '\\n')\n", "loss = sum(losses)\n", "\n", "end_time = time.time()\n", "duration = end_time - start_time\n", "\n", "print('The loss is {}. This took {} seconds. Run the next cell to see '\n", " 'if the exercise was done correctly.'.format(loss, duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert loss == 4000\n", "assert duration < 0.8, ('The loop took {} seconds. This is too slow.'\n", " .format(duration))\n", "assert duration > 0.4, ('The loop took {} seconds. This is too fast.'\n", " .format(duration))\n", "\n", "print('Success! The example took {} seconds.'.format(duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Part 3 - Introducing Actors\n", "\n", "**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.\n", "\n", "See the documentation on actors at http://ray.readthedocs.io/en/latest/actors.html.\n", "\n", "Sometimes you need a \"worker\" process to have \"state\". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.\n", "\n", "Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.\n", "\n", "This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.\n", "\n", "### Concepts for this Exercise - Actors\n", "\n", "To create an actor, decorate Python class with the `@ray.remote` decorator.\n", "\n", "```python\n", "@ray.remote\n", "class Example(object):\n", " def __init__(self, x):\n", " self.x = x\n", " \n", " def set(self, x):\n", " self.x = x\n", " \n", " def get(self):\n", " return self.x\n", "```\n", "\n", "Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.\n", "\n", "Actor classes differ from regular Python classes in the following ways.\n", "1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via\n", " ```python\n", " e = Example.remote(1)\n", " ```\n", " When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.\n", "2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.\n", " ```python\n", " >>> e.set.remote(2)\n", " ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)\n", " \n", " >>> e.get.remote()\n", " ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)\n", " ```\n", "3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.\n", " ```python\n", " >>> ray.get(e.set.remote(2))\n", " None\n", " \n", " >>> ray.get(e.get.remote())\n", " 2\n", " ```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Foo(object):\n", " def __init__(self):\n", " self.counter = 0\n", "\n", " def reset(self):\n", " self.counter = 0\n", "\n", " def increment(self):\n", " time.sleep(0.5)\n", " self.counter += 1\n", " return self.counter\n", "\n", "assert hasattr(Foo, 'remote'), 'You need to turn \"Foo\" into an actor with @ray.remote.'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create two Foo objects.\n", "f1 = Foo()\n", "f2 = Foo()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sleep a little to improve the accuracy of the timing measurements below.\n", "time.sleep(2.0)\n", "start_time = time.time()\n", "\n", "# Reset the actor state so that we can run this cell multiple times without\n", "# changing the results.\n", "f1.reset()\n", "f2.reset()\n", "\n", "# We want to parallelize this code. However, it is not straightforward to\n", "# make \"increment\" a remote function, because state is shared (the value of\n", "# \"self.counter\") between subsequent calls to \"increment\". In this case, it\n", "# makes sense to use actors.\n", "results = []\n", "for _ in range(5):\n", " results.append(f1.increment())\n", " results.append(f2.increment())\n", "\n", "end_time = time.time()\n", "duration = end_time - start_time\n", "\n", "assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like \"results\" is {}. You may have forgotten to call ray.get.'.format(results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]\n", "\n", "assert duration < 3, ('The experiments ran in {} seconds. This is too '\n", " 'slow.'.format(duration))\n", "assert duration > 2.5, ('The experiments ran in {} seconds. This is too '\n", " 'fast.'.format(duration))\n", "\n", "print('Success! The example took {} seconds.'.format(duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Part 4 - Handling Slow Tasks\n", "\n", "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.\n", "\n", "See the documentation for ray.wait at https://ray.readthedocs.io/en/latest/api.html#ray.wait.\n", "\n", "This script starts 6 tasks, each of which takes a random amount of time to complete. We'd like to process the results in two batches (each of size 3). Change the code so that instead of waiting for a fixed set of 3 tasks to finish, we make the first batch consist of the first 3 tasks that complete. The second batch should consist of the 3 remaining tasks. Do this exercise by using `ray.wait`.\n", "\n", "### Concepts for this Exercise - ray.wait\n", "\n", "After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.\n", "\n", "```python\n", "ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)\n", "```\n", "\n", "**Arguments:**\n", "- `object_ids`: This is a list of object IDs.\n", "- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.\n", "- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.\n", "\n", "**Return values:**\n", "- `ready_ids`: This is a list of object IDs that are available in the object store.\n", "- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define a remote function that takes a variable amount of time to run." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@ray.remote\n", "def f(i):\n", " np.random.seed(5 + i)\n", " x = np.random.uniform(0, 4)\n", " time.sleep(x)\n", " return i, time.time()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sleep a little to improve the accuracy of the timing measurements below.\n", "time.sleep(2.0)\n", "start_time = time.time()\n", "\n", "# This launches 6 tasks, each of which takes a random amount of time to\n", "# complete.\n", "result_ids = [f.remote(i) for i in range(6)]\n", "# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we\n", "# should instead use the first 3 tasks that finish.\n", "initial_results = ray.get(result_ids[:3])\n", "\n", "end_time = time.time()\n", "duration = end_time - start_time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for the remaining tasks to complete.\n", "remaining_results = ray.get(result_ids[3:])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert len(initial_results) == 3\n", "assert len(remaining_results) == 3\n", "\n", "initial_indices = [result[0] for result in initial_results]\n", "initial_times = [result[1] for result in initial_results]\n", "remaining_indices = [result[0] for result in remaining_results]\n", "remaining_times = [result[1] for result in remaining_results]\n", "\n", "assert set(initial_indices + remaining_indices) == set(range(6))\n", "\n", "assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '\n", " '{} seconds. This is too slow.'.format(duration))\n", "\n", "assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '\n", " '{} seconds. This is too slow.'.format(duration))\n", "\n", "# Make sure the initial results actually completed first.\n", "assert max(initial_times) < min(remaining_times)\n", "\n", "print('Success! The example took {} seconds.'.format(duration))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 5 - Speed up Serialization\n", "\n", "**GOAL:** The goal of this exercise is to illustrate how to speed up serialization by using `ray.put`.\n", "\n", "### Concepts for this Exercise - ray.put\n", "\n", "Object IDs can be created in multiple ways.\n", "- They are returned by remote function calls.\n", "- They are returned by actor method calls.\n", "- They are returned by `ray.put`.\n", "\n", "When an object is passed to `ray.put`, the object is serialized using the Apache Arrow format (see https://arrow.apache.org/ for more information about Arrow) and copied into a shared memory object store. This object will then be available to other workers on the same machine via shared memory. If it is needed by workers on another machine, it will be shipped under the hood.\n", "\n", "**When objects are passed into a remote function, Ray puts them in the object store under the hood.** That is, if `f` is a remote function, the code\n", "\n", "```python\n", "x = np.zeros(1000)\n", "f.remote(x)\n", "```\n", "\n", "is essentially transformed under the hood to\n", "\n", "```python\n", "x = np.zeros(1000)\n", "x_id = ray.put(x)\n", "f.remote(x_id)\n", "```\n", "\n", "The call to `ray.put` copies the numpy array into the shared-memory object store, from where it can be read by all of the worker processes (without additional copying). However, if you do something like\n", "\n", "```python\n", "for i in range(10):\n", " f.remote(x)\n", "```\n", "\n", "then 10 copies of the array will be placed into the object store. This takes up more memory in the object store than is necessary, and it also takes time to copy the array into the object store over and over. This can be made more efficient by placing the array in the object store only once as follows.\n", "\n", "```python\n", "x_id = ray.put(x)\n", "for i in range(10):\n", " f.remote(x_id)\n", "```\n", "\n", "In this exercise, you will speed up the code below and reduce the memory footprint by calling `ray.put` on the neural net weights before passing them into the remote functions.\n", "\n", "**WARNING:** This exercise requires a lot of memory to run. If this notebook is running within a Docker container, then the docker container must be started with a large shared-memory file system. This can be done by starting the docker container with the `--shm-size` flag." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)\n", " for i in range(50)}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** Compare the time required to serialize the neural net weights and copy them into the object store using Ray versus the time required to pickle and unpickle the weights. The big win should be with the time required for *deserialization*.\n", "\n", "Note that when you call `ray.put`, in addition to serializing the object, we are copying it into shared memory where it can be efficiently accessed by other workers on the same machine.\n", "\n", "**NOTE:** You don't actually have to do anything here other than run the cell below and read the output.\n", "\n", "**NOTE:** Sometimes `ray.put` can be faster than `pickle.dumps`. This is because `ray.put` leverages multiple threads when serializing large objects. Note that this is not possible with `pickle`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Ray - serializing')\n", "%time x_id = ray.put(neural_net_weights)\n", "print('\\nRay - deserializing')\n", "%time x_val = ray.get(x_id)\n", "\n", "print('\\npickle - serializing')\n", "%time serialized = pickle.dumps(neural_net_weights)\n", "print('\\npickle - deserializing')\n", "%time deserialized = pickle.loads(serialized)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Define a remote function which uses the neural net weights." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@ray.remote\n", "def use_weights(weights, i):\n", " return i" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE:** In the code below, use `ray.put` to avoid copying the neural net weights to the object store multiple times." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sleep a little to improve the accuracy of the timing measurements below.\n", "time.sleep(2.0)\n", "start_time = time.time()\n", "\n", "results = ray.get([use_weights.remote(neural_net_weights, i)\n", " for i in range(20)])\n", "\n", "end_time = time.time()\n", "duration = end_time - start_time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert results == list(range(20))\n", "assert duration < 1, ('The experiments ran in {} seconds. This is too '\n", " 'slow.'.format(duration))\n", "\n", "print('Success! The example took {} seconds.'.format(duration))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (Intel, 2019 update 2)", "language": "python", "name": "c009-intel_distribution_of_python_3_2019u2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }