1:orphan: 2 3Scheduler Overview 4================== 5 6After we create a dask graph, we use a scheduler to run it. Dask currently 7implements a few different schedulers: 8 9- ``dask.threaded.get``: a scheduler backed by a thread pool 10- ``dask.multiprocessing.get``: a scheduler backed by a process pool 11- ``dask.get``: a synchronous scheduler, good for debugging 12- ``distributed.Client.get``: a distributed scheduler for executing graphs 13 on multiple machines. This lives in the external distributed_ project. 14 15.. _distributed: https://distributed.dask.org/en/latest/ 16 17 18The ``get`` function 19-------------------- 20 21The entry point for all schedulers is a ``get`` function. This takes a dask 22graph, and a key or list of keys to compute: 23 24.. code-block:: python 25 26 >>> from operator import add 27 28 >>> dsk = {'a': 1, 29 ... 'b': 2, 30 ... 'c': (add, 'a', 'b'), 31 ... 'd': (sum, ['a', 'b', 'c'])} 32 33 >>> get(dsk, 'c') 34 3 35 36 >>> get(dsk, 'd') 37 6 38 39 >>> get(dsk, ['a', 'b', 'c']) 40 [1, 2, 3] 41 42 43Using ``compute`` methods 44------------------------- 45 46When working with dask collections, you will rarely need to 47interact with scheduler ``get`` functions directly. Each collection has a 48default scheduler, and a built-in ``compute`` method that calculates the output 49of the collection: 50 51.. code-block:: python 52 53 >>> import dask.array as da 54 >>> x = da.arange(100, chunks=10) 55 >>> x.sum().compute() 56 4950 57 58The compute method takes a number of keywords: 59 60- ``scheduler``: the name of the desired scheduler as a string (``"threads"``, ``"processes"``, ``"single-threaded"``, etc.), a ``get`` function, or a ``dask.distributed.Client`` object. Overrides the default for the collection. 61- ``**kwargs``: extra keywords to pass on to the scheduler ``get`` function. 62 63See also: :ref:`configuring-schedulers`. 64 65 66The ``compute`` function 67------------------------ 68 69You may wish to compute results from multiple dask collections at once. 70Similar to the ``compute`` method on each collection, there is a general 71``compute`` function that takes multiple collections and returns multiple 72results. This merges the graphs from each collection, so intermediate results 73are shared: 74 75.. code-block:: python 76 77 >>> y = (x + 1).sum() 78 >>> z = (x + 1).mean() 79 >>> da.compute(y, z) # Compute y and z, sharing intermediate results 80 (5050, 50.5) 81 82Here the ``x + 1`` intermediate was only computed once, while calling 83``y.compute()`` and ``z.compute()`` would compute it twice. For large graphs 84that share many intermediates, this can be a big performance gain. 85 86The ``compute`` function works with any dask collection, and is found in 87``dask.base``. For convenience it has also been imported into the top level 88namespace of each collection. 89 90.. code-block:: python 91 92 >>> from dask.base import compute 93 >>> compute is da.compute 94 True 95 96 97.. _configuring-schedulers: 98 99Configuring the schedulers 100-------------------------- 101 102The dask collections each have a default scheduler: 103 104- ``dask.array`` and ``dask.dataframe`` use the threaded scheduler by default 105- ``dask.bag`` uses the multiprocessing scheduler by default. 106 107For most cases, the default settings are good choices. However, sometimes you 108may want to use a different scheduler. There are two ways to do this. 109 1101. Using the ``scheduler`` keyword in the ``compute`` method: 111 112 .. code-block:: python 113 114 >>> x.sum().compute(scheduler='processes') 115 1162. Using ``dask.config.set``. This can be used either as a context manager, or to 117 set the scheduler globally: 118 119 .. code-block:: python 120 121 # As a context manager 122 >>> with dask.config.set(scheduler='processes'): 123 ... x.sum().compute() 124 125 # Set globally 126 >>> dask.config.set(scheduler='processes') 127 >>> x.sum().compute() 128 129 130Additionally, each scheduler may take a few extra keywords specific to that 131scheduler. For example, the multiprocessing and threaded schedulers each take a 132``num_workers`` keyword, which sets the number of processes or threads to use 133(defaults to number of cores). This can be set by passing the keyword when 134calling ``compute``: 135 136.. code-block:: python 137 138 # Compute with 4 threads 139 >>> x.compute(num_workers=4) 140 141Alternatively, the multiprocessing and threaded schedulers will check for a 142global pool set with ``dask.config.set``: 143 144.. code-block:: python 145 146 >>> from concurrent.futures import ThreadPoolExecutor 147 >>> with dask.config.set(pool=ThreadPoolExecutor(4)): 148 ... x.compute() 149 150The multiprocessing scheduler also supports `different contexts`_ ("spawn", 151"forkserver", "fork") which you can set with ``dask.config.set``. The default 152context is "spawn", but you can set a different one: 153 154.. code-block:: python 155 156 >>> with dask.config.set({"multiprocessing.context": "forkserver"}): 157 ... x.compute() 158 159.. _different contexts: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 160 161For more information on the individual options for each scheduler, see the 162docstrings for each scheduler ``get`` function. 163 164 165Debugging the schedulers 166------------------------ 167 168Debugging parallel code can be difficult, as conventional tools such as ``pdb`` 169don't work well with multiple threads or processes. To get around this when 170debugging, we recommend using the synchronous scheduler found at 171``dask.get``. This runs everything serially, allowing it to work 172well with ``pdb``: 173 174.. code-block:: python 175 176 >>> dask.config.set(scheduler='single-threaded') 177 >>> x.sum().compute() # This computation runs serially instead of in parallel 178 179 180The shared memory schedulers also provide a set of callbacks that can be used 181for diagnosing and profiling. You can learn more about scheduler callbacks and 182diagnostics :doc:`here <diagnostics-local>`. 183 184 185More Information 186---------------- 187 188- See :doc:`shared` for information on the design of the shared memory 189 (threaded or multiprocessing) schedulers 190- See distributed_ for information on the distributed memory scheduler 191