1{ 2 "cells": [ 3 { 4 "cell_type": "markdown", 5 "metadata": {}, 6 "source": [ 7 "# Working with IPython and dask.distributed\n", 8 "\n", 9 "[dask.distributed](https://distributed.readthedocs.io) is a cool library for doing distributed execution. You should check it out, if you haven't already.\n", 10 "\n", 11 "Assuming you already have an IPython cluster running:" 12 ] 13 }, 14 { 15 "cell_type": "code", 16 "execution_count": 1, 17 "metadata": { 18 "collapsed": false 19 }, 20 "outputs": [ 21 { 22 "data": { 23 "text/plain": [ 24 "[0, 1, 2, 3, 4, 5, 6, 7]" 25 ] 26 }, 27 "execution_count": 1, 28 "metadata": {}, 29 "output_type": "execute_result" 30 } 31 ], 32 "source": [ 33 "import ipyparallel as ipp\n", 34 "rc = ipp.Client()\n", 35 "rc.ids" 36 ] 37 }, 38 { 39 "cell_type": "markdown", 40 "metadata": {}, 41 "source": [ 42 "You can turn your IPython cluster into a distributed cluster by calling `Client.become_dask()`:" 43 ] 44 }, 45 { 46 "cell_type": "code", 47 "execution_count": 2, 48 "metadata": { 49 "collapsed": false 50 }, 51 "outputs": [ 52 { 53 "data": { 54 "text/plain": [ 55 "<Executor: scheduler=\"172.16.3.46:52245\" processes=9 cores=9>" 56 ] 57 }, 58 "execution_count": 2, 59 "metadata": {}, 60 "output_type": "execute_result" 61 } 62 ], 63 "source": [ 64 "executor = rc.become_dask(ncores=1)\n", 65 "executor" 66 ] 67 }, 68 { 69 "cell_type": "markdown", 70 "metadata": {}, 71 "source": [ 72 "This will:\n", 73 "\n", 74 "1. start a Scheduler on the Hub\n", 75 "2. start a Worker on each engine\n", 76 "3. return an Executor, the distributed client API\n", 77 "\n", 78 "By default, distributed Workers will use threads to run on all cores of a machine. \n", 79 "In this case, since I already have one *engine* per core,\n", 80 "I tell distributed to run one core per Worker with `ncores=1`.\n", 81 "\n", 82 "We can now use our IPython cluster with distributed:" 83 ] 84 }, 85 { 86 "cell_type": "code", 87 "execution_count": 3, 88 "metadata": { 89 "collapsed": false 90 }, 91 "outputs": [], 92 "source": [ 93 "from distributed import progress\n", 94 "\n", 95 "def square(x):\n", 96 " return x ** 2\n", 97 "\n", 98 "def neg(x):\n", 99 " return -x\n", 100 "\n", 101 "A = executor.map(square, range(1000))\n", 102 "B = executor.map(neg, A)\n", 103 "total = executor.submit(sum, B)\n", 104 "progress(total)" 105 ] 106 }, 107 { 108 "cell_type": "code", 109 "execution_count": 4, 110 "metadata": { 111 "collapsed": false 112 }, 113 "outputs": [ 114 { 115 "data": { 116 "text/plain": [ 117 "-332833500" 118 ] 119 }, 120 "execution_count": 4, 121 "metadata": {}, 122 "output_type": "execute_result" 123 } 124 ], 125 "source": [ 126 "total.result()" 127 ] 128 }, 129 { 130 "cell_type": "markdown", 131 "metadata": {}, 132 "source": [ 133 "I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.\n", 134 "\n", 135 "First, I need to get a mapping of one engine per host:" 136 ] 137 }, 138 { 139 "cell_type": "code", 140 "execution_count": 5, 141 "metadata": { 142 "collapsed": false 143 }, 144 "outputs": [ 145 { 146 "data": { 147 "text/plain": [ 148 "{0: 'k5.simula.no',\n", 149 " 1: 'k5.simula.no',\n", 150 " 2: 'k5.simula.no',\n", 151 " 3: 'k5.simula.no',\n", 152 " 4: 'k5.simula.no',\n", 153 " 5: 'k5.simula.no',\n", 154 " 6: 'k5.simula.no',\n", 155 " 7: 'k5.simula.no'}" 156 ] 157 }, 158 "execution_count": 5, 159 "metadata": {}, 160 "output_type": "execute_result" 161 } 162 ], 163 "source": [ 164 "import socket\n", 165 "\n", 166 "engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()\n", 167 "engine_hosts" 168 ] 169 }, 170 { 171 "cell_type": "markdown", 172 "metadata": {}, 173 "source": [ 174 "I can reverse this mapping, to get a list of engines on each host:" 175 ] 176 }, 177 { 178 "cell_type": "code", 179 "execution_count": 6, 180 "metadata": { 181 "collapsed": false 182 }, 183 "outputs": [ 184 { 185 "data": { 186 "text/plain": [ 187 "{'k5.simula.no': [0, 1, 2, 3, 4, 5, 6, 7]}" 188 ] 189 }, 190 "execution_count": 6, 191 "metadata": {}, 192 "output_type": "execute_result" 193 } 194 ], 195 "source": [ 196 "host_engines = {}\n", 197 "for engine_id, host in engine_hosts.items():\n", 198 " if host not in host_engines:\n", 199 " host_engines[host] = []\n", 200 " host_engines[host].append(engine_id)\n", 201 "\n", 202 "host_engines" 203 ] 204 }, 205 { 206 "cell_type": "markdown", 207 "metadata": {}, 208 "source": [ 209 "Now I can get one engine per host:" 210 ] 211 }, 212 { 213 "cell_type": "code", 214 "execution_count": 7, 215 "metadata": { 216 "collapsed": false 217 }, 218 "outputs": [ 219 { 220 "data": { 221 "text/plain": [ 222 "[0]" 223 ] 224 }, 225 "execution_count": 7, 226 "metadata": {}, 227 "output_type": "execute_result" 228 } 229 ], 230 "source": [ 231 "one_engine_per_host = [ engines[0] for engines in host_engines.values()]\n", 232 "one_engine_per_host" 233 ] 234 }, 235 { 236 "cell_type": "markdown", 237 "metadata": {}, 238 "source": [ 239 "*Here's a concise, but more opaque version that does the same thing:*" 240 ] 241 }, 242 { 243 "cell_type": "code", 244 "execution_count": 8, 245 "metadata": { 246 "collapsed": false 247 }, 248 "outputs": [ 249 { 250 "data": { 251 "text/plain": [ 252 "[7]" 253 ] 254 }, 255 "execution_count": 8, 256 "metadata": {}, 257 "output_type": "execute_result" 258 } 259 ], 260 "source": [ 261 "one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())\n", 262 "one_engine_per_host" 263 ] 264 }, 265 { 266 "cell_type": "markdown", 267 "metadata": {}, 268 "source": [ 269 "I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:" 270 ] 271 }, 272 { 273 "cell_type": "code", 274 "execution_count": 9, 275 "metadata": { 276 "collapsed": false 277 }, 278 "outputs": [ 279 { 280 "name": "stderr", 281 "output_type": "stream", 282 "text": [ 283 "distributed.executor - INFO - Reconnecting...\n" 284 ] 285 }, 286 { 287 "data": { 288 "text/plain": [ 289 "<Executor: scheduler=\"172.16.3.46:59120\" processes=1 cores=1>" 290 ] 291 }, 292 "execution_count": 9, 293 "metadata": {}, 294 "output_type": "execute_result" 295 } 296 ], 297 "source": [ 298 "rc.stop_distributed()\n", 299 "\n", 300 "executor = rc.become_dask(one_engine_per_host)\n", 301 "executor" 302 ] 303 }, 304 { 305 "cell_type": "markdown", 306 "metadata": {}, 307 "source": [ 308 "And submit the same tasks again:" 309 ] 310 }, 311 { 312 "cell_type": "code", 313 "execution_count": 10, 314 "metadata": { 315 "collapsed": false 316 }, 317 "outputs": [ 318 { 319 "name": "stderr", 320 "output_type": "stream", 321 "text": [ 322 "Widget Javascript not detected. It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n" 323 ] 324 } 325 ], 326 "source": [ 327 "A = executor.map(square, range(100))\n", 328 "B = executor.map(neg, A)\n", 329 "total = executor.submit(sum, B)\n", 330 "progress(total)" 331 ] 332 }, 333 { 334 "cell_type": "markdown", 335 "metadata": {}, 336 "source": [ 337 "## Debugging distributed with IPython" 338 ] 339 }, 340 { 341 "cell_type": "code", 342 "execution_count": 11, 343 "metadata": { 344 "collapsed": false 345 }, 346 "outputs": [ 347 { 348 "name": "stderr", 349 "output_type": "stream", 350 "text": [ 351 "distributed.executor - INFO - Reconnecting...\n" 352 ] 353 }, 354 { 355 "data": { 356 "text/plain": [ 357 "<Executor: scheduler=\"172.16.3.46:59142\" processes=1 cores=1>" 358 ] 359 }, 360 "execution_count": 11, 361 "metadata": {}, 362 "output_type": "execute_result" 363 } 364 ], 365 "source": [ 366 "rc.stop_distributed()\n", 367 "\n", 368 "executor = rc.become_dask(one_engine_per_host)\n", 369 "executor" 370 ] 371 }, 372 { 373 "cell_type": "markdown", 374 "metadata": {}, 375 "source": [ 376 "Let's set the %px magics to only run on our one engine per host:" 377 ] 378 }, 379 { 380 "cell_type": "code", 381 "execution_count": 12, 382 "metadata": { 383 "collapsed": false 384 }, 385 "outputs": [], 386 "source": [ 387 "view = rc[one_engine_per_host]\n", 388 "view.block = True\n", 389 "view.activate()" 390 ] 391 }, 392 { 393 "cell_type": "markdown", 394 "metadata": {}, 395 "source": [ 396 "Let's submit some work that's going to fail somewhere in the middle:" 397 ] 398 }, 399 { 400 "cell_type": "code", 401 "execution_count": 13, 402 "metadata": { 403 "collapsed": false 404 }, 405 "outputs": [ 406 { 407 "name": "stderr", 408 "output_type": "stream", 409 "text": [ 410 "Widget Javascript not detected. It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n" 411 ] 412 }, 413 { 414 "ename": "ZeroDivisionError", 415 "evalue": "division by zero", 416 "output_type": "error", 417 "traceback": [ 418 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", 419 "\u001b[1;31mZeroDivisionError\u001b[0m Traceback (most recent call last)", 420 "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 13\u001b[0m \u001b[0mtotal\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msubmit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0minverted\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 14\u001b[0m \u001b[0mdisplay\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mprogress\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtotal\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 15\u001b[1;33m \u001b[0mtotal\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", 421 "\u001b[1;32m/Users/benjaminrk/dev/py/distributed/distributed/executor.py\u001b[0m in \u001b[0;36mresult\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 100\u001b[0m \u001b[0mresult\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mraiseit\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;32mFalse\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 101\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'error'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 102\u001b[1;33m \u001b[0msix\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 103\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'cancelled'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 104\u001b[0m \u001b[1;32mraise\u001b[0m \u001b[0mresult\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", 422 "\u001b[1;32m/Users/benjaminrk/conda/lib/python3.5/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[1;34m(tp, value, tb)\u001b[0m\n\u001b[0;32m 683\u001b[0m \u001b[0mvalue\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtp\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 684\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 685\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 686\u001b[0m \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 687\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", 423 "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36minverse\u001b[1;34m()\u001b[0m\n\u001b[0;32m 6\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 7\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0minverse\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mx\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 8\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[1;36m1\u001b[0m \u001b[1;33m/\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 9\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 10\u001b[0m \u001b[0mshifted\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mshift5\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mrange\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m10\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", 424 "\u001b[1;31mZeroDivisionError\u001b[0m: division by zero" 425 ] 426 } 427 ], 428 "source": [ 429 "from IPython.display import display\n", 430 "from distributed import progress\n", 431 "\n", 432 "def shift5(x):\n", 433 " return x - 5\n", 434 "\n", 435 "def inverse(x):\n", 436 " return 1 / x\n", 437 "\n", 438 "shifted = executor.map(shift5, range(1, 10))\n", 439 "inverted = executor.map(inverse, shifted)\n", 440 " \n", 441 "total = executor.submit(sum, inverted)\n", 442 "display(progress(total))\n", 443 "total.result()" 444 ] 445 }, 446 { 447 "cell_type": "markdown", 448 "metadata": {}, 449 "source": [ 450 "We can see which task failed:" 451 ] 452 }, 453 { 454 "cell_type": "code", 455 "execution_count": 14, 456 "metadata": { 457 "collapsed": false 458 }, 459 "outputs": [ 460 { 461 "data": { 462 "text/plain": [ 463 "[<Future: status: error, key: inverse-f8907aa30adc310cc8168553500ca8bb>]" 464 ] 465 }, 466 "execution_count": 14, 467 "metadata": {}, 468 "output_type": "execute_result" 469 } 470 ], 471 "source": [ 472 "[ f for f in inverted if f.status == 'error' ]" 473 ] 474 }, 475 { 476 "cell_type": "markdown", 477 "metadata": {}, 478 "source": [ 479 "When IPython starts a worker on each engine,\n", 480 "it stores it in the `distributed_worker` variable in the engine's namespace.\n", 481 "This lets us query the worker interactively.\n", 482 "\n", 483 "We can check out the current data resident on each worker:" 484 ] 485 }, 486 { 487 "cell_type": "code", 488 "execution_count": 15, 489 "metadata": { 490 "collapsed": false 491 }, 492 "outputs": [ 493 { 494 "data": { 495 "text/plain": [ 496 "\u001b[0;31mOut[7:2]: \u001b[0m\n", 497 "{'inverse-07072811957c38188d819607f8020bed': 0.3333333333333333,\n", 498 " 'inverse-0994af96c984b7254e2437daa46df6c8': 1.0,\n", 499 " 'inverse-1934b1ad8662540a6b1a321502d3d81e': 0.25,\n", 500 " 'inverse-2e0af360f3e400c0360eaa3351e80a4d': -1.0,\n", 501 " 'inverse-8ef20ef722160668e84ab435b8293751': -0.5,\n", 502 " 'inverse-bee9906329afc3cb86cc241209453f56': -0.3333333333333333,\n", 503 " 'inverse-cfd3e5b72a33fd2fa85c683107287cf9': -0.25,\n", 504 " 'inverse-d9ed866e67ebc068f6561f9263c4cf73': 0.5,\n", 505 " 'shift5-17c829bc866d38df11bb25ffc7ea887f': -3,\n", 506 " 'shift5-3035396f215ce921eda38f8f36ca3e90': 4,\n", 507 " 'shift5-4951afd99368d41997f42a2f823f566f': 2,\n", 508 " 'shift5-5c9f9254c4a34e7571d53ee4839ea6f2': 1,\n", 509 " 'shift5-8458d8715078405cb9dfed60d1c3d26a': -2,\n", 510 " 'shift5-899e24c059f86698e06254cfd5f3f4ea': -1,\n", 511 " 'shift5-9326e9993993cb1c08355c6e5b8e5970': -4,\n", 512 " 'shift5-cabacfd5aaf525d183d932617b8eac5a': 0,\n", 513 " 'shift5-e233bf13876414d6a0a4817695ac7ca1': 3}" 514 ] 515 }, 516 "metadata": {}, 517 "output_type": "display_data" 518 } 519 ], 520 "source": [ 521 "%%px\n", 522 "dask_worker.data" 523 ] 524 }, 525 { 526 "cell_type": "markdown", 527 "metadata": { 528 "collapsed": true 529 }, 530 "source": [ 531 "Now that we can poke around with each Worker,\n", 532 "we can have a slightly easier time figuring out what went wrong." 533 ] 534 } 535 ], 536 "metadata": { 537 "kernelspec": { 538 "display_name": "Python 3", 539 "language": "python", 540 "name": "python3" 541 }, 542 "language_info": { 543 "codemirror_mode": { 544 "name": "ipython", 545 "version": 3 546 }, 547 "file_extension": ".py", 548 "mimetype": "text/x-python", 549 "name": "python", 550 "nbconvert_exporter": "python", 551 "pygments_lexer": "ipython3", 552 "version": "3.5.1" 553 }, 554 "widgets": { 555 "state": { 556 "0db2b058b609455fa55654e5e3565453": { 557 "views": [ 558 { 559 "cell_index": 24 560 } 561 ] 562 }, 563 "58612800fe1d42058d0cbc83f0534655": { 564 "views": [ 565 { 566 "cell_index": 18 567 } 568 ] 569 }, 570 "f4647eb4300e4eb6bcdae457df082cc7": { 571 "views": [ 572 { 573 "cell_index": 5 574 } 575 ] 576 } 577 }, 578 "version": "1.2.0" 579 } 580 }, 581 "nbformat": 4, 582 "nbformat_minor": 0 583} 584