1:orphan:
2
3Scheduler Overview
4==================
5
6After we create a dask graph, we use a scheduler to run it. Dask currently
7implements a few different schedulers:
8
9-  ``dask.threaded.get``: a scheduler backed by a thread pool
10-  ``dask.multiprocessing.get``: a scheduler backed by a process pool
11-  ``dask.get``: a synchronous scheduler, good for debugging
12-  ``distributed.Client.get``: a distributed scheduler for executing graphs
13   on multiple machines.  This lives in the external distributed_ project.
14
15.. _distributed: https://distributed.dask.org/en/latest/
16
17
18The ``get`` function
19--------------------
20
21The entry point for all schedulers is a ``get`` function. This takes a dask
22graph, and a key or list of keys to compute:
23
24.. code-block:: python
25
26   >>> from operator import add
27
28   >>> dsk = {'a': 1,
29   ...        'b': 2,
30   ...        'c': (add, 'a', 'b'),
31   ...        'd': (sum, ['a', 'b', 'c'])}
32
33   >>> get(dsk, 'c')
34   3
35
36   >>> get(dsk, 'd')
37   6
38
39   >>> get(dsk, ['a', 'b', 'c'])
40   [1, 2, 3]
41
42
43Using ``compute`` methods
44-------------------------
45
46When working with dask collections, you will rarely need to
47interact with scheduler ``get`` functions directly. Each collection has a
48default scheduler, and a built-in ``compute`` method that calculates the output
49of the collection:
50
51.. code-block:: python
52
53    >>> import dask.array as da
54    >>> x = da.arange(100, chunks=10)
55    >>> x.sum().compute()
56    4950
57
58The compute method takes a number of keywords:
59
60- ``scheduler``: the name of the desired scheduler as a string (``"threads"``, ``"processes"``, ``"single-threaded"``, etc.), a ``get`` function, or a ``dask.distributed.Client`` object.  Overrides the default for the collection.
61- ``**kwargs``: extra keywords to pass on to the scheduler ``get`` function.
62
63See also: :ref:`configuring-schedulers`.
64
65
66The ``compute`` function
67------------------------
68
69You may wish to compute results from multiple dask collections at once.
70Similar to the ``compute`` method on each collection, there is a general
71``compute`` function that takes multiple collections and returns multiple
72results. This merges the graphs from each collection, so intermediate results
73are shared:
74
75.. code-block:: python
76
77    >>> y = (x + 1).sum()
78    >>> z = (x + 1).mean()
79    >>> da.compute(y, z)    # Compute y and z, sharing intermediate results
80    (5050, 50.5)
81
82Here the ``x + 1`` intermediate was only computed once, while calling
83``y.compute()`` and ``z.compute()`` would compute it twice. For large graphs
84that share many intermediates, this can be a big performance gain.
85
86The ``compute`` function works with any dask collection, and is found in
87``dask.base``. For convenience it has also been imported into the top level
88namespace of each collection.
89
90.. code-block:: python
91
92    >>> from dask.base import compute
93    >>> compute is da.compute
94    True
95
96
97.. _configuring-schedulers:
98
99Configuring the schedulers
100--------------------------
101
102The dask collections each have a default scheduler:
103
104- ``dask.array`` and ``dask.dataframe`` use the threaded scheduler by default
105- ``dask.bag`` uses the multiprocessing scheduler by default.
106
107For most cases, the default settings are good choices. However, sometimes you
108may want to use a different scheduler. There are two ways to do this.
109
1101. Using the ``scheduler`` keyword in the ``compute`` method:
111
112    .. code-block:: python
113
114        >>> x.sum().compute(scheduler='processes')
115
1162. Using ``dask.config.set``. This can be used either as a context manager, or to
117   set the scheduler globally:
118
119    .. code-block:: python
120
121        # As a context manager
122        >>> with dask.config.set(scheduler='processes'):
123        ...     x.sum().compute()
124
125        # Set globally
126        >>> dask.config.set(scheduler='processes')
127        >>> x.sum().compute()
128
129
130Additionally, each scheduler may take a few extra keywords specific to that
131scheduler. For example, the multiprocessing and threaded schedulers each take a
132``num_workers`` keyword, which sets the number of processes or threads to use
133(defaults to number of cores). This can be set by passing the keyword when
134calling ``compute``:
135
136.. code-block:: python
137
138    # Compute with 4 threads
139    >>> x.compute(num_workers=4)
140
141Alternatively, the multiprocessing and threaded schedulers will check for a
142global pool set with ``dask.config.set``:
143
144.. code-block:: python
145
146    >>> from concurrent.futures import ThreadPoolExecutor
147    >>> with dask.config.set(pool=ThreadPoolExecutor(4)):
148    ...     x.compute()
149
150The multiprocessing scheduler also supports `different contexts`_ ("spawn",
151"forkserver", "fork") which you can set with ``dask.config.set``. The default
152context is "spawn", but you can set a different one:
153
154.. code-block:: python
155
156   >>> with dask.config.set({"multiprocessing.context": "forkserver"}):
157   ...     x.compute()
158
159.. _different contexts: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
160
161For more information on the individual options for each scheduler, see the
162docstrings for each scheduler ``get`` function.
163
164
165Debugging the schedulers
166------------------------
167
168Debugging parallel code can be difficult, as conventional tools such as ``pdb``
169don't work well with multiple threads or processes. To get around this when
170debugging, we recommend using the synchronous scheduler found at
171``dask.get``. This runs everything serially, allowing it to work
172well with ``pdb``:
173
174.. code-block:: python
175
176    >>> dask.config.set(scheduler='single-threaded')
177    >>> x.sum().compute()    # This computation runs serially instead of in parallel
178
179
180The shared memory schedulers also provide a set of callbacks that can be used
181for diagnosing and profiling. You can learn more about scheduler callbacks and
182diagnostics :doc:`here <diagnostics-local>`.
183
184
185More Information
186----------------
187
188- See :doc:`shared` for information on the design of the shared memory
189  (threaded or multiprocessing) schedulers
190- See distributed_ for information on the distributed memory scheduler
191