1Best Practices 2============== 3 4It is easy to get started with Dask delayed, but using it *well* does require 5some experience. This page contains suggestions for best practices, and 6includes solutions to common problems. 7 8 9Call delayed on the function, not the result 10-------------------------------------------- 11 12Dask delayed operates on functions like ``dask.delayed(f)(x, y)``, not on their results like ``dask.delayed(f(x, y))``. When you do the latter, Python first calculates ``f(x, y)`` before Dask has a chance to step in. 13 14+------------------------------------------------+--------------------------------------------------------------+ 15| **Don't** | **Do** | 16+------------------------------------------------+--------------------------------------------------------------+ 17| .. code-block:: python | .. code-block:: python | 18| | | 19| # This executes immediately | # This makes a delayed function, acting lazily | 20| | | 21| dask.delayed(f(x, y)) | dask.delayed(f)(x, y) | 22| | | 23+------------------------------------------------+--------------------------------------------------------------+ 24 25 26Compute on lots of computation at once 27-------------------------------------- 28 29To improve parallelism, you want to include lots of computation in each compute call. 30Ideally, you want to make many ``dask.delayed`` calls to define your computation and 31then call ``dask.compute`` only at the end. It is ok to call ``dask.compute`` 32in the middle of your computation as well, but everything will stop there as 33Dask computes those results before moving forward with your code. 34 35+--------------------------------------------------------+-------------------------------------------+ 36| **Don't** | **Do** | 37+--------------------------------------------------------+-------------------------------------------+ 38| .. code-block:: python | .. code-block:: python | 39| | | 40| # Avoid calling compute repeatedly | # Collect many calls for one compute | 41| | | 42| results = [] | results = [] | 43| for x in L: | for x in L: | 44| y = dask.delayed(f)(x) | y = dask.delayed(f)(x) | 45| results.append(y.compute()) | results.append(y) | 46| | | 47| results | results = dask.compute(*results) | 48+--------------------------------------------------------+-------------------------------------------+ 49 50Calling `y.compute()` within the loop would await the result of the computation every time, and 51so inhibit parallelism. 52 53Don't mutate inputs 54------------------- 55 56Your functions should not change the inputs directly. 57 58+-----------------------------------------+--------------------------------------+ 59| **Don't** | **Do** | 60+-----------------------------------------+--------------------------------------+ 61| .. code-block:: python | .. code-block:: python | 62| | | 63| # Mutate inputs in functions | # Return new values or copies | 64| | | 65| @dask.delayed | @dask.delayed | 66| def f(x): | def f(x): | 67| x += 1 | x = x + 1 | 68| return x | return x | 69+-----------------------------------------+--------------------------------------+ 70 71If you need to use a mutable operation, then make a copy within your function first: 72 73.. code-block:: python 74 75 @dask.delayed 76 def f(x): 77 x = copy(x) 78 x += 1 79 return x 80 81 82Avoid global state 83------------------ 84 85Ideally, your operations shouldn't rely on global state. Using global state 86*might* work if you only use threads, but when you move to multiprocessing or 87distributed computing then you will likely encounter confusing errors. 88 89+-------------------------------------------+ 90| **Don't** | 91+-------------------------------------------+ 92| .. code-block:: python | 93| | 94| L = [] | 95| | 96| # This references global variable L | 97| | 98| @dask.delayed | 99| def f(x): | 100| L.append(x) | 101| | 102+-------------------------------------------+ 103 104 105 106Don't rely on side effects 107-------------------------- 108 109Delayed functions only do something if they are computed. You will always need 110to pass the output to something that eventually calls compute. 111 112+--------------------------------+-----------------------------------------+ 113| **Don't** | **Do** | 114+--------------------------------+-----------------------------------------+ 115| .. code-block:: python | .. code-block:: python | 116| | | 117| # Forget to call compute | # Ensure delayed tasks are computed | 118| | | 119| dask.delayed(f)(1, 2, 3) | x = dask.delayed(f)(1, 2, 3) | 120| | ... | 121| ... | dask.compute(x, ...) | 122+--------------------------------+-----------------------------------------+ 123 124In the first case here, nothing happens, because ``compute()`` is never called. 125 126Break up computations into many pieces 127-------------------------------------- 128 129Every ``dask.delayed`` function call is a single operation from Dask's perspective. 130You achieve parallelism by having many delayed calls, not by using only a 131single one: Dask will not look inside a function decorated with ``@dask.delayed`` 132and parallelize that code internally. To accomplish that, it needs your help to 133find good places to break up a computation. 134 135+------------------------------------+--------------------------------------+ 136| **Don't** | **Do** | 137+------------------------------------+--------------------------------------+ 138| .. code-block:: python | .. code-block:: python | 139| | | 140| # One giant task | # Break up into many tasks | 141| | | 142| | @dask.delayed | 143| def load(filename): | def load(filename): | 144| ... | ... | 145| | | 146| | @dask.delayed | 147| def process(data): | def process(data): | 148| ... | ... | 149| | | 150| | @dask.delayed | 151| def save(data): | def save(data): | 152| ... | ... | 153| | | 154| @dask.delayed | | 155| def f(filenames): | def f(filenames): | 156| results = [] | results = [] | 157| for filename in filenames: | for filename in filenames: | 158| data = load(filename) | data = load(filename) | 159| data = process(data) | data = process(data) | 160| result = save(data) | result = save(data) | 161| | | 162| return results | return results | 163| | | 164| dask.compute(f(filenames)) | dask.compute(f(filenames)) | 165+------------------------------------+--------------------------------------+ 166 167The first version only has one delayed task, and so cannot parallelize. 168 169Avoid too many tasks 170-------------------- 171 172Every delayed task has an overhead of a few hundred microseconds. Usually this 173is ok, but it can become a problem if you apply ``dask.delayed`` too finely. In 174this case, it's often best to break up your many tasks into batches or use one 175of the Dask collections to help you. 176 177+------------------------------------+-------------------------------------------------------------+ 178| **Don't** | **Do** | 179+------------------------------------+-------------------------------------------------------------+ 180| .. code-block:: python | .. code-block:: python | 181| | | 182| # Too many tasks | # Use collections | 183| | | 184| results = [] | import dask.bag as db | 185| for x in range(10000000): | b = db.from_sequence(range(10000000), npartitions=1000) | 186| y = dask.delayed(f)(x) | b = b.map(f) | 187| results.append(y) | ... | 188| | | 189+------------------------------------+-------------------------------------------------------------+ 190 191Here we use ``dask.bag`` to automatically batch applying our function. We could also have constructed 192our own batching as follows 193 194.. code-block:: python 195 196 def batch(seq): 197 sub_results = [] 198 for x in seq: 199 sub_results.append(f(x)) 200 return sub_results 201 202 batches = [] 203 for i in range(0, 10000000, 10000): 204 result_batch = dask.delayed(batch)(range(i, i + 10000)) 205 batches.append(result_batch) 206 207 208Here we construct batches where each delayed function call computes for many data points from 209the original input. 210 211Avoid calling delayed within delayed functions 212---------------------------------------------- 213 214Often, if you are new to using Dask delayed, you place ``dask.delayed`` calls 215everywhere and hope for the best. While this may actually work, it's usually 216slow and results in hard-to-understand solutions. 217 218Usually you never call ``dask.delayed`` within ``dask.delayed`` functions. 219 220+----------------------------------------+--------------------------------------+ 221| **Don't** | **Do** | 222+----------------------------------------+--------------------------------------+ 223| .. code-block:: python | .. code-block:: python | 224| | | 225| # Delayed function calls delayed | # Normal function calls delayed | 226| | | 227| @dask.delayed | | 228| def process_all(L): | def process_all(L): | 229| result = [] | result = [] | 230| for x in L: | for x in L: | 231| y = dask.delayed(f)(x) | y = dask.delayed(f)(x) | 232| result.append(y) | result.append(y) | 233| return result | return result | 234+----------------------------------------+--------------------------------------+ 235 236Because the normal function only does delayed work it is very fast and so 237there is no reason to delay it. 238 239Don't call dask.delayed on other Dask collections 240------------------------------------------------- 241 242When you place a Dask array or Dask DataFrame into a delayed call, that function 243will receive the NumPy or Pandas equivalent. Beware that if your array is 244large, then this might crash your workers. 245 246Instead, it's more common to use methods like ``da.map_blocks`` 247 248+--------------------------------------------------+---------------------------------------------+ 249| **Don't** | **Do** | 250+--------------------------------------------------+---------------------------------------------+ 251| .. code-block:: python | .. code-block:: python | 252| | | 253| # Call delayed functions on Dask collections | # Use mapping methods if applicable | 254| | | 255| import dask.dataframe as dd | import dask.dataframe as dd | 256| df = dd.read_csv('/path/to/*.csv') | df = dd.read_csv('/path/to/*.csv') | 257| | | 258| dask.delayed(train)(df) | df.map_partitions(train) | 259+--------------------------------------------------+---------------------------------------------+ 260 261Alternatively, if the procedure doesn't fit into a mapping, you can always 262turn your arrays or dataframes into *many* delayed 263objects, for example 264 265.. code-block:: python 266 267 partitions = df.to_delayed() 268 delayed_values = [dask.delayed(train)(part) 269 for part in partitions] 270 271However, if you don't mind turning your Dask array/DataFrame into a single 272chunk, then this is ok. 273 274.. code-block:: python 275 276 dask.delayed(train)(..., y=df.sum()) 277 278 279Avoid repeatedly putting large inputs into delayed calls 280-------------------------------------------------------- 281 282Every time you pass a concrete result (anything that isn't delayed) Dask will 283hash it by default to give it a name. This is fairly fast (around 500 MB/s) 284but can be slow if you do it over and over again. Instead, it is better to 285delay your data as well. 286 287This is especially important when using a distributed cluster to avoid sending 288your data separately for each function call. 289 290+------------------------------------------+---------------------------------------------------------+ 291| **Don't** | **Do** | 292+------------------------------------------+---------------------------------------------------------+ 293| .. code-block:: python | .. code-block:: python | 294| | | 295| x = np.array(...) # some large array | x = np.array(...) # some large array | 296| | x = dask.delayed(x) # delay the data once | 297| results = [dask.delayed(train)(x, i) | results = [dask.delayed(train)(x, i) | 298| for i in range(1000)] | for i in range(1000)] | 299+------------------------------------------+---------------------------------------------------------+ 300 301 302Every call to ``dask.delayed(train)(x, ...)`` has to hash the NumPy array ``x``, which slows things down. 303