1{
2 "cells": [
3  {
4   "cell_type": "markdown",
5   "metadata": {},
6   "source": [
7    "# Working with IPython and dask.distributed\n",
8    "\n",
9    "[dask.distributed](https://distributed.readthedocs.io) is a cool library for doing distributed execution. You should check it out, if you haven't already.\n",
10    "\n",
11    "Assuming you already have an IPython cluster running:"
12   ]
13  },
14  {
15   "cell_type": "code",
16   "execution_count": 1,
17   "metadata": {
18    "collapsed": false
19   },
20   "outputs": [
21    {
22     "data": {
23      "text/plain": [
24       "[0, 1, 2, 3, 4, 5, 6, 7]"
25      ]
26     },
27     "execution_count": 1,
28     "metadata": {},
29     "output_type": "execute_result"
30    }
31   ],
32   "source": [
33    "import ipyparallel as ipp\n",
34    "rc = ipp.Client()\n",
35    "rc.ids"
36   ]
37  },
38  {
39   "cell_type": "markdown",
40   "metadata": {},
41   "source": [
42    "You can turn your IPython cluster into a distributed cluster by calling `Client.become_dask()`:"
43   ]
44  },
45  {
46   "cell_type": "code",
47   "execution_count": 2,
48   "metadata": {
49    "collapsed": false
50   },
51   "outputs": [
52    {
53     "data": {
54      "text/plain": [
55       "<Executor: scheduler=\"172.16.3.46:52245\" processes=9 cores=9>"
56      ]
57     },
58     "execution_count": 2,
59     "metadata": {},
60     "output_type": "execute_result"
61    }
62   ],
63   "source": [
64    "executor = rc.become_dask(ncores=1)\n",
65    "executor"
66   ]
67  },
68  {
69   "cell_type": "markdown",
70   "metadata": {},
71   "source": [
72    "This will:\n",
73    "\n",
74    "1. start a Scheduler on the Hub\n",
75    "2. start a Worker on each engine\n",
76    "3. return an Executor, the distributed client API\n",
77    "\n",
78    "By default, distributed Workers will use threads to run on all cores of a machine. \n",
79    "In this case, since I already have one *engine* per core,\n",
80    "I tell distributed to run one core per Worker with `ncores=1`.\n",
81    "\n",
82    "We can now use our IPython cluster with distributed:"
83   ]
84  },
85  {
86   "cell_type": "code",
87   "execution_count": 3,
88   "metadata": {
89    "collapsed": false
90   },
91   "outputs": [],
92   "source": [
93    "from distributed import progress\n",
94    "\n",
95    "def square(x):\n",
96    "    return x ** 2\n",
97    "\n",
98    "def neg(x):\n",
99    "        return -x\n",
100    "\n",
101    "A = executor.map(square, range(1000))\n",
102    "B = executor.map(neg, A)\n",
103    "total = executor.submit(sum, B)\n",
104    "progress(total)"
105   ]
106  },
107  {
108   "cell_type": "code",
109   "execution_count": 4,
110   "metadata": {
111    "collapsed": false
112   },
113   "outputs": [
114    {
115     "data": {
116      "text/plain": [
117       "-332833500"
118      ]
119     },
120     "execution_count": 4,
121     "metadata": {},
122     "output_type": "execute_result"
123    }
124   ],
125   "source": [
126    "total.result()"
127   ]
128  },
129  {
130   "cell_type": "markdown",
131   "metadata": {},
132   "source": [
133    "I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.\n",
134    "\n",
135    "First, I need to get a mapping of one engine per host:"
136   ]
137  },
138  {
139   "cell_type": "code",
140   "execution_count": 5,
141   "metadata": {
142    "collapsed": false
143   },
144   "outputs": [
145    {
146     "data": {
147      "text/plain": [
148       "{0: 'k5.simula.no',\n",
149       " 1: 'k5.simula.no',\n",
150       " 2: 'k5.simula.no',\n",
151       " 3: 'k5.simula.no',\n",
152       " 4: 'k5.simula.no',\n",
153       " 5: 'k5.simula.no',\n",
154       " 6: 'k5.simula.no',\n",
155       " 7: 'k5.simula.no'}"
156      ]
157     },
158     "execution_count": 5,
159     "metadata": {},
160     "output_type": "execute_result"
161    }
162   ],
163   "source": [
164    "import socket\n",
165    "\n",
166    "engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()\n",
167    "engine_hosts"
168   ]
169  },
170  {
171   "cell_type": "markdown",
172   "metadata": {},
173   "source": [
174    "I can reverse this mapping, to get a list of engines on each host:"
175   ]
176  },
177  {
178   "cell_type": "code",
179   "execution_count": 6,
180   "metadata": {
181    "collapsed": false
182   },
183   "outputs": [
184    {
185     "data": {
186      "text/plain": [
187       "{'k5.simula.no': [0, 1, 2, 3, 4, 5, 6, 7]}"
188      ]
189     },
190     "execution_count": 6,
191     "metadata": {},
192     "output_type": "execute_result"
193    }
194   ],
195   "source": [
196    "host_engines = {}\n",
197    "for engine_id, host in engine_hosts.items():\n",
198    "    if host not in host_engines:\n",
199    "        host_engines[host] = []\n",
200    "    host_engines[host].append(engine_id)\n",
201    "\n",
202    "host_engines"
203   ]
204  },
205  {
206   "cell_type": "markdown",
207   "metadata": {},
208   "source": [
209    "Now I can get one engine per host:"
210   ]
211  },
212  {
213   "cell_type": "code",
214   "execution_count": 7,
215   "metadata": {
216    "collapsed": false
217   },
218   "outputs": [
219    {
220     "data": {
221      "text/plain": [
222       "[0]"
223      ]
224     },
225     "execution_count": 7,
226     "metadata": {},
227     "output_type": "execute_result"
228    }
229   ],
230   "source": [
231    "one_engine_per_host = [ engines[0] for engines in host_engines.values()]\n",
232    "one_engine_per_host"
233   ]
234  },
235  {
236   "cell_type": "markdown",
237   "metadata": {},
238   "source": [
239    "*Here's a concise, but more opaque version that does the same thing:*"
240   ]
241  },
242  {
243   "cell_type": "code",
244   "execution_count": 8,
245   "metadata": {
246    "collapsed": false
247   },
248   "outputs": [
249    {
250     "data": {
251      "text/plain": [
252       "[7]"
253      ]
254     },
255     "execution_count": 8,
256     "metadata": {},
257     "output_type": "execute_result"
258    }
259   ],
260   "source": [
261    "one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())\n",
262    "one_engine_per_host"
263   ]
264  },
265  {
266   "cell_type": "markdown",
267   "metadata": {},
268   "source": [
269    "I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:"
270   ]
271  },
272  {
273   "cell_type": "code",
274   "execution_count": 9,
275   "metadata": {
276    "collapsed": false
277   },
278   "outputs": [
279    {
280     "name": "stderr",
281     "output_type": "stream",
282     "text": [
283      "distributed.executor - INFO - Reconnecting...\n"
284     ]
285    },
286    {
287     "data": {
288      "text/plain": [
289       "<Executor: scheduler=\"172.16.3.46:59120\" processes=1 cores=1>"
290      ]
291     },
292     "execution_count": 9,
293     "metadata": {},
294     "output_type": "execute_result"
295    }
296   ],
297   "source": [
298    "rc.stop_distributed()\n",
299    "\n",
300    "executor = rc.become_dask(one_engine_per_host)\n",
301    "executor"
302   ]
303  },
304  {
305   "cell_type": "markdown",
306   "metadata": {},
307   "source": [
308    "And submit the same tasks again:"
309   ]
310  },
311  {
312   "cell_type": "code",
313   "execution_count": 10,
314   "metadata": {
315    "collapsed": false
316   },
317   "outputs": [
318    {
319     "name": "stderr",
320     "output_type": "stream",
321     "text": [
322      "Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n"
323     ]
324    }
325   ],
326   "source": [
327    "A = executor.map(square, range(100))\n",
328    "B = executor.map(neg, A)\n",
329    "total = executor.submit(sum, B)\n",
330    "progress(total)"
331   ]
332  },
333  {
334   "cell_type": "markdown",
335   "metadata": {},
336   "source": [
337    "## Debugging distributed with IPython"
338   ]
339  },
340  {
341   "cell_type": "code",
342   "execution_count": 11,
343   "metadata": {
344    "collapsed": false
345   },
346   "outputs": [
347    {
348     "name": "stderr",
349     "output_type": "stream",
350     "text": [
351      "distributed.executor - INFO - Reconnecting...\n"
352     ]
353    },
354    {
355     "data": {
356      "text/plain": [
357       "<Executor: scheduler=\"172.16.3.46:59142\" processes=1 cores=1>"
358      ]
359     },
360     "execution_count": 11,
361     "metadata": {},
362     "output_type": "execute_result"
363    }
364   ],
365   "source": [
366    "rc.stop_distributed()\n",
367    "\n",
368    "executor = rc.become_dask(one_engine_per_host)\n",
369    "executor"
370   ]
371  },
372  {
373   "cell_type": "markdown",
374   "metadata": {},
375   "source": [
376    "Let's set the %px magics to only run on our one engine per host:"
377   ]
378  },
379  {
380   "cell_type": "code",
381   "execution_count": 12,
382   "metadata": {
383    "collapsed": false
384   },
385   "outputs": [],
386   "source": [
387    "view = rc[one_engine_per_host]\n",
388    "view.block = True\n",
389    "view.activate()"
390   ]
391  },
392  {
393   "cell_type": "markdown",
394   "metadata": {},
395   "source": [
396    "Let's submit some work that's going to fail somewhere in the middle:"
397   ]
398  },
399  {
400   "cell_type": "code",
401   "execution_count": 13,
402   "metadata": {
403    "collapsed": false
404   },
405   "outputs": [
406    {
407     "name": "stderr",
408     "output_type": "stream",
409     "text": [
410      "Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n"
411     ]
412    },
413    {
414     "ename": "ZeroDivisionError",
415     "evalue": "division by zero",
416     "output_type": "error",
417     "traceback": [
418      "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
419      "\u001b[1;31mZeroDivisionError\u001b[0m                         Traceback (most recent call last)",
420      "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m     13\u001b[0m \u001b[0mtotal\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msubmit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0minverted\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     14\u001b[0m \u001b[0mdisplay\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mprogress\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtotal\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 15\u001b[1;33m \u001b[0mtotal\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
421      "\u001b[1;32m/Users/benjaminrk/dev/py/distributed/distributed/executor.py\u001b[0m in \u001b[0;36mresult\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m    100\u001b[0m         \u001b[0mresult\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mraiseit\u001b[0m\u001b[1;33m=\u001b[0m\u001b[1;32mFalse\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    101\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'error'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 102\u001b[1;33m             \u001b[0msix\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0mresult\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    103\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;34m'cancelled'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    104\u001b[0m             \u001b[1;32mraise\u001b[0m \u001b[0mresult\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
422      "\u001b[1;32m/Users/benjaminrk/conda/lib/python3.5/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[1;34m(tp, value, tb)\u001b[0m\n\u001b[0;32m    683\u001b[0m             \u001b[0mvalue\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mtp\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    684\u001b[0m         \u001b[1;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 685\u001b[1;33m             \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m    686\u001b[0m         \u001b[1;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m    687\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
423      "\u001b[1;32m<ipython-input-13-183a85878b6a>\u001b[0m in \u001b[0;36minverse\u001b[1;34m()\u001b[0m\n\u001b[0;32m      6\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m      7\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0minverse\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mx\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 8\u001b[1;33m     \u001b[1;32mreturn\u001b[0m \u001b[1;36m1\u001b[0m \u001b[1;33m/\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m      9\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m     10\u001b[0m \u001b[0mshifted\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mexecutor\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mshift5\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mrange\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m10\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
424      "\u001b[1;31mZeroDivisionError\u001b[0m: division by zero"
425     ]
426    }
427   ],
428   "source": [
429    "from IPython.display import display\n",
430    "from distributed import progress\n",
431    "\n",
432    "def shift5(x):\n",
433    "    return x - 5\n",
434    "\n",
435    "def inverse(x):\n",
436    "    return 1 / x\n",
437    "\n",
438    "shifted = executor.map(shift5, range(1, 10))\n",
439    "inverted = executor.map(inverse, shifted)\n",
440    "                       \n",
441    "total = executor.submit(sum, inverted)\n",
442    "display(progress(total))\n",
443    "total.result()"
444   ]
445  },
446  {
447   "cell_type": "markdown",
448   "metadata": {},
449   "source": [
450    "We can see which task failed:"
451   ]
452  },
453  {
454   "cell_type": "code",
455   "execution_count": 14,
456   "metadata": {
457    "collapsed": false
458   },
459   "outputs": [
460    {
461     "data": {
462      "text/plain": [
463       "[<Future: status: error, key: inverse-f8907aa30adc310cc8168553500ca8bb>]"
464      ]
465     },
466     "execution_count": 14,
467     "metadata": {},
468     "output_type": "execute_result"
469    }
470   ],
471   "source": [
472    "[ f for f in inverted if f.status == 'error' ]"
473   ]
474  },
475  {
476   "cell_type": "markdown",
477   "metadata": {},
478   "source": [
479    "When IPython starts a worker on each engine,\n",
480    "it stores it in the `distributed_worker` variable in the engine's namespace.\n",
481    "This lets us query the worker interactively.\n",
482    "\n",
483    "We can check out the current data resident on each worker:"
484   ]
485  },
486  {
487   "cell_type": "code",
488   "execution_count": 15,
489   "metadata": {
490    "collapsed": false
491   },
492   "outputs": [
493    {
494     "data": {
495      "text/plain": [
496       "\u001b[0;31mOut[7:2]: \u001b[0m\n",
497       "{'inverse-07072811957c38188d819607f8020bed': 0.3333333333333333,\n",
498       " 'inverse-0994af96c984b7254e2437daa46df6c8': 1.0,\n",
499       " 'inverse-1934b1ad8662540a6b1a321502d3d81e': 0.25,\n",
500       " 'inverse-2e0af360f3e400c0360eaa3351e80a4d': -1.0,\n",
501       " 'inverse-8ef20ef722160668e84ab435b8293751': -0.5,\n",
502       " 'inverse-bee9906329afc3cb86cc241209453f56': -0.3333333333333333,\n",
503       " 'inverse-cfd3e5b72a33fd2fa85c683107287cf9': -0.25,\n",
504       " 'inverse-d9ed866e67ebc068f6561f9263c4cf73': 0.5,\n",
505       " 'shift5-17c829bc866d38df11bb25ffc7ea887f': -3,\n",
506       " 'shift5-3035396f215ce921eda38f8f36ca3e90': 4,\n",
507       " 'shift5-4951afd99368d41997f42a2f823f566f': 2,\n",
508       " 'shift5-5c9f9254c4a34e7571d53ee4839ea6f2': 1,\n",
509       " 'shift5-8458d8715078405cb9dfed60d1c3d26a': -2,\n",
510       " 'shift5-899e24c059f86698e06254cfd5f3f4ea': -1,\n",
511       " 'shift5-9326e9993993cb1c08355c6e5b8e5970': -4,\n",
512       " 'shift5-cabacfd5aaf525d183d932617b8eac5a': 0,\n",
513       " 'shift5-e233bf13876414d6a0a4817695ac7ca1': 3}"
514      ]
515     },
516     "metadata": {},
517     "output_type": "display_data"
518    }
519   ],
520   "source": [
521    "%%px\n",
522    "dask_worker.data"
523   ]
524  },
525  {
526   "cell_type": "markdown",
527   "metadata": {
528    "collapsed": true
529   },
530   "source": [
531    "Now that we can poke around with each Worker,\n",
532    "we can have a slightly easier time figuring out what went wrong."
533   ]
534  }
535 ],
536 "metadata": {
537  "kernelspec": {
538   "display_name": "Python 3",
539   "language": "python",
540   "name": "python3"
541  },
542  "language_info": {
543   "codemirror_mode": {
544    "name": "ipython",
545    "version": 3
546   },
547   "file_extension": ".py",
548   "mimetype": "text/x-python",
549   "name": "python",
550   "nbconvert_exporter": "python",
551   "pygments_lexer": "ipython3",
552   "version": "3.5.1"
553  },
554  "widgets": {
555   "state": {
556    "0db2b058b609455fa55654e5e3565453": {
557     "views": [
558      {
559       "cell_index": 24
560      }
561     ]
562    },
563    "58612800fe1d42058d0cbc83f0534655": {
564     "views": [
565      {
566       "cell_index": 18
567      }
568     ]
569    },
570    "f4647eb4300e4eb6bcdae457df082cc7": {
571     "views": [
572      {
573       "cell_index": 5
574      }
575     ]
576    }
577   },
578   "version": "1.2.0"
579  }
580 },
581 "nbformat": 4,
582 "nbformat_minor": 0
583}
584