1Best Practices
2==============
3
4It is easy to get started with Dask delayed, but using it *well* does require
5some experience.  This page contains suggestions for best practices, and
6includes solutions to common problems.
7
8
9Call delayed on the function, not the result
10--------------------------------------------
11
12Dask delayed operates on functions like ``dask.delayed(f)(x, y)``, not on their results like ``dask.delayed(f(x, y))``.  When you do the latter, Python first calculates ``f(x, y)`` before Dask has a chance to step in.
13
14+------------------------------------------------+--------------------------------------------------------------+
15| **Don't**                                      | **Do**                                                       |
16+------------------------------------------------+--------------------------------------------------------------+
17| .. code-block:: python                         | .. code-block:: python                                       |
18|                                                |                                                              |
19|    # This executes immediately                 |    # This makes a delayed function, acting lazily            |
20|                                                |                                                              |
21|    dask.delayed(f(x, y))                       |    dask.delayed(f)(x, y)                                     |
22|                                                |                                                              |
23+------------------------------------------------+--------------------------------------------------------------+
24
25
26Compute on lots of computation at once
27--------------------------------------
28
29To improve parallelism, you want to include lots of computation in each compute call.
30Ideally, you want to make many ``dask.delayed`` calls to define your computation and
31then call ``dask.compute`` only at the end.  It is ok to call ``dask.compute``
32in the middle of your computation as well, but everything will stop there as
33Dask computes those results before moving forward with your code.
34
35+--------------------------------------------------------+-------------------------------------------+
36| **Don't**                                              | **Do**                                    |
37+--------------------------------------------------------+-------------------------------------------+
38| .. code-block:: python                                 | .. code-block:: python                    |
39|                                                        |                                           |
40|    # Avoid calling compute repeatedly                  |    # Collect many calls for one compute   |
41|                                                        |                                           |
42|    results = []                                        |    results = []                           |
43|    for x in L:                                         |    for x in L:                            |
44|        y = dask.delayed(f)(x)                          |        y = dask.delayed(f)(x)             |
45|        results.append(y.compute())                     |        results.append(y)                  |
46|                                                        |                                           |
47|    results                                             |    results = dask.compute(*results)       |
48+--------------------------------------------------------+-------------------------------------------+
49
50Calling `y.compute()` within the loop would await the result of the computation every time, and
51so inhibit parallelism.
52
53Don't mutate inputs
54-------------------
55
56Your functions should not change the inputs directly.
57
58+-----------------------------------------+--------------------------------------+
59| **Don't**                               | **Do**                               |
60+-----------------------------------------+--------------------------------------+
61| .. code-block:: python                  | .. code-block:: python               |
62|                                         |                                      |
63|    # Mutate inputs in functions         |    # Return new values or copies     |
64|                                         |                                      |
65|    @dask.delayed                        |    @dask.delayed                     |
66|    def f(x):                            |    def f(x):                         |
67|        x += 1                           |        x = x + 1                     |
68|        return x                         |        return x                      |
69+-----------------------------------------+--------------------------------------+
70
71If you need to use a mutable operation, then make a copy within your function first:
72
73.. code-block:: python
74
75   @dask.delayed
76   def f(x):
77       x = copy(x)
78       x += 1
79       return x
80
81
82Avoid global state
83------------------
84
85Ideally, your operations shouldn't rely on global state.  Using global state
86*might* work if you only use threads, but when you move to multiprocessing or
87distributed computing then you will likely encounter confusing errors.
88
89+-------------------------------------------+
90| **Don't**                                 |
91+-------------------------------------------+
92| .. code-block:: python                    |
93|                                           |
94|   L = []                                  |
95|                                           |
96|   # This references global variable L     |
97|                                           |
98|   @dask.delayed                           |
99|   def f(x):                               |
100|       L.append(x)                         |
101|                                           |
102+-------------------------------------------+
103
104
105
106Don't rely on side effects
107--------------------------
108
109Delayed functions only do something if they are computed.  You will always need
110to pass the output to something that eventually calls compute.
111
112+--------------------------------+-----------------------------------------+
113| **Don't**                      | **Do**                                  |
114+--------------------------------+-----------------------------------------+
115| .. code-block:: python         | .. code-block:: python                  |
116|                                |                                         |
117|    # Forget to call compute    |    # Ensure delayed tasks are computed  |
118|                                |                                         |
119|    dask.delayed(f)(1, 2, 3)    |    x = dask.delayed(f)(1, 2, 3)         |
120|                                |    ...                                  |
121|    ...                         |    dask.compute(x, ...)                 |
122+--------------------------------+-----------------------------------------+
123
124In the first case here, nothing happens, because ``compute()`` is never called.
125
126Break up computations into many pieces
127--------------------------------------
128
129Every ``dask.delayed`` function call is a single operation from Dask's perspective.
130You achieve parallelism by having many delayed calls, not by using only a
131single one: Dask will not look inside a function decorated with ``@dask.delayed``
132and parallelize that code internally.  To accomplish that, it needs your help to
133find good places to break up a computation.
134
135+------------------------------------+--------------------------------------+
136| **Don't**                          | **Do**                               |
137+------------------------------------+--------------------------------------+
138| .. code-block:: python             | .. code-block:: python               |
139|                                    |                                      |
140|    # One giant task                |    # Break up into many tasks        |
141|                                    |                                      |
142|                                    |    @dask.delayed                     |
143|    def load(filename):             |    def load(filename):               |
144|        ...                         |        ...                           |
145|                                    |                                      |
146|                                    |    @dask.delayed                     |
147|    def process(data):              |    def process(data):                |
148|        ...                         |        ...                           |
149|                                    |                                      |
150|                                    |    @dask.delayed                     |
151|    def save(data):                 |    def save(data):                   |
152|        ...                         |        ...                           |
153|                                    |                                      |
154|    @dask.delayed                   |                                      |
155|    def f(filenames):               |    def f(filenames):                 |
156|        results = []                |        results = []                  |
157|        for filename in filenames:  |        for filename in filenames:    |
158|            data = load(filename)   |            data = load(filename)     |
159|            data = process(data)    |            data = process(data)      |
160|            result = save(data)     |            result = save(data)       |
161|                                    |                                      |
162|        return results              |        return results                |
163|                                    |                                      |
164|    dask.compute(f(filenames))      |    dask.compute(f(filenames))        |
165+------------------------------------+--------------------------------------+
166
167The first version only has one delayed task, and so cannot parallelize.
168
169Avoid too many tasks
170--------------------
171
172Every delayed task has an overhead of a few hundred microseconds.  Usually this
173is ok, but it can become a problem if you apply ``dask.delayed`` too finely.  In
174this case, it's often best to break up your many tasks into batches or use one
175of the Dask collections to help you.
176
177+------------------------------------+-------------------------------------------------------------+
178| **Don't**                          | **Do**                                                      |
179+------------------------------------+-------------------------------------------------------------+
180| .. code-block:: python             | .. code-block:: python                                      |
181|                                    |                                                             |
182|    # Too many tasks                |    # Use collections                                        |
183|                                    |                                                             |
184|    results = []                    |    import dask.bag as db                                    |
185|    for x in range(10000000):       |    b = db.from_sequence(range(10000000), npartitions=1000)  |
186|        y = dask.delayed(f)(x)      |    b = b.map(f)                                             |
187|        results.append(y)           |    ...                                                      |
188|                                    |                                                             |
189+------------------------------------+-------------------------------------------------------------+
190
191Here we use ``dask.bag`` to automatically batch applying our function. We could also have constructed
192our own batching as follows
193
194.. code-block:: python
195
196   def batch(seq):
197       sub_results = []
198       for x in seq:
199           sub_results.append(f(x))
200       return sub_results
201
202    batches = []
203    for i in range(0, 10000000, 10000):
204        result_batch = dask.delayed(batch)(range(i, i + 10000))
205        batches.append(result_batch)
206
207
208Here we construct batches where each delayed function call computes for many data points from
209the original input.
210
211Avoid calling delayed within delayed functions
212----------------------------------------------
213
214Often, if you are new to using Dask delayed, you place ``dask.delayed`` calls
215everywhere and hope for the best.  While this may actually work, it's usually
216slow and results in hard-to-understand solutions.
217
218Usually you never call ``dask.delayed`` within ``dask.delayed`` functions.
219
220+----------------------------------------+--------------------------------------+
221| **Don't**                              | **Do**                               |
222+----------------------------------------+--------------------------------------+
223| .. code-block:: python                 | .. code-block:: python               |
224|                                        |                                      |
225|    # Delayed function calls delayed    |    # Normal function calls delayed   |
226|                                        |                                      |
227|    @dask.delayed                       |                                      |
228|    def process_all(L):                 |    def process_all(L):               |
229|        result = []                     |        result = []                   |
230|        for x in L:                     |        for x in L:                   |
231|            y = dask.delayed(f)(x)      |            y = dask.delayed(f)(x)    |
232|            result.append(y)            |            result.append(y)          |
233|        return result                   |        return result                 |
234+----------------------------------------+--------------------------------------+
235
236Because the normal function only does delayed work it is very fast and so
237there is no reason to delay it.
238
239Don't call dask.delayed on other Dask collections
240-------------------------------------------------
241
242When you place a Dask array or Dask DataFrame into a delayed call, that function
243will receive the NumPy or Pandas equivalent.  Beware that if your array is
244large, then this might crash your workers.
245
246Instead, it's more common to use methods like ``da.map_blocks``
247
248+--------------------------------------------------+---------------------------------------------+
249| **Don't**                                        | **Do**                                      |
250+--------------------------------------------------+---------------------------------------------+
251| .. code-block:: python                           | .. code-block:: python                      |
252|                                                  |                                             |
253|    # Call delayed functions on Dask collections  |    # Use mapping methods if applicable      |
254|                                                  |                                             |
255|    import dask.dataframe as dd                   |    import dask.dataframe as dd              |
256|    df = dd.read_csv('/path/to/*.csv')            |    df = dd.read_csv('/path/to/*.csv')       |
257|                                                  |                                             |
258|    dask.delayed(train)(df)                       |    df.map_partitions(train)                 |
259+--------------------------------------------------+---------------------------------------------+
260
261Alternatively, if the procedure doesn't fit into a mapping, you can always
262turn your arrays or dataframes into *many* delayed
263objects, for example
264
265.. code-block:: python
266
267    partitions = df.to_delayed()
268    delayed_values = [dask.delayed(train)(part)
269                      for part in partitions]
270
271However, if you don't mind turning your Dask array/DataFrame into a single
272chunk, then this is ok.
273
274.. code-block:: python
275
276   dask.delayed(train)(..., y=df.sum())
277
278
279Avoid repeatedly putting large inputs into delayed calls
280--------------------------------------------------------
281
282Every time you pass a concrete result (anything that isn't delayed) Dask will
283hash it by default to give it a name.  This is fairly fast (around 500 MB/s)
284but can be slow if you do it over and over again.  Instead, it is better to
285delay your data as well.
286
287This is especially important when using a distributed cluster to avoid sending
288your data separately for each function call.
289
290+------------------------------------------+---------------------------------------------------------+
291| **Don't**                                | **Do**                                                  |
292+------------------------------------------+---------------------------------------------------------+
293| .. code-block:: python                   | .. code-block:: python                                  |
294|                                          |                                                         |
295|    x = np.array(...)  # some large array |    x = np.array(...)    # some large array              |
296|                                          |    x = dask.delayed(x)  # delay the data once           |
297|    results = [dask.delayed(train)(x, i)  |    results = [dask.delayed(train)(x, i)                 |
298|               for i in range(1000)]      |               for i in range(1000)]                     |
299+------------------------------------------+---------------------------------------------------------+
300
301
302Every call to ``dask.delayed(train)(x, ...)`` has to hash the NumPy array ``x``, which slows things down.
303