1from __future__ import annotations
2
3import asyncio
4import atexit
5import copy
6import errno
7import inspect
8import json
9import logging
10import os
11import re
12import sys
13import threading
14import traceback
15import uuid
16import warnings
17import weakref
18from collections import defaultdict
19from collections.abc import Iterator
20from concurrent.futures import ThreadPoolExecutor
21from concurrent.futures._base import DoneAndNotDoneFutures
22from contextlib import contextmanager, suppress
23from contextvars import ContextVar
24from functools import partial
25from numbers import Number
26from queue import Queue as pyQueue
27from typing import TYPE_CHECKING, Awaitable, ClassVar, Sequence
28
29from tlz import first, groupby, keymap, merge, partition_all, valmap
30
31if TYPE_CHECKING:
32    from typing_extensions import Literal
33
34import dask
35from dask.base import collections_to_dsk, normalize_token, tokenize
36from dask.core import flatten
37from dask.highlevelgraph import HighLevelGraph
38from dask.optimization import SubgraphCallable
39from dask.utils import (
40    _deprecated,
41    apply,
42    ensure_dict,
43    format_bytes,
44    funcname,
45    parse_timedelta,
46    stringify,
47    typename,
48)
49from dask.widgets import get_template
50
51try:
52    from dask.delayed import single_key
53except ImportError:
54    single_key = first
55from tornado import gen
56from tornado.ioloop import IOLoop, PeriodicCallback
57
58from . import versions as version_module  # type: ignore
59from .batched import BatchedSend
60from .cfexecutor import ClientExecutor
61from .core import (
62    CommClosedError,
63    ConnectionPool,
64    PooledRPCCall,
65    clean_exception,
66    connect,
67    rpc,
68)
69from .diagnostics.plugin import NannyPlugin, UploadFile, WorkerPlugin, _get_plugin_name
70from .metrics import time
71from .objects import HasWhat, SchedulerInfo, WhoHas
72from .protocol import to_serialize
73from .protocol.pickle import dumps, loads
74from .publish import Datasets
75from .pubsub import PubSubClientExtension
76from .security import Security
77from .sizeof import sizeof
78from .threadpoolexecutor import rejoin
79from .utils import (
80    All,
81    Any,
82    CancelledError,
83    LoopRunner,
84    TimeoutError,
85    format_dashboard_link,
86    has_keyword,
87    log_errors,
88    no_default,
89    sync,
90    thread_state,
91)
92from .utils_comm import (
93    WrappedKey,
94    gather_from_workers,
95    pack_data,
96    retry_operation,
97    scatter_to_workers,
98    unpack_remotedata,
99)
100from .worker import get_client, get_worker, secede
101
102logger = logging.getLogger(__name__)
103
104_global_clients: weakref.WeakValueDictionary[
105    int, Client
106] = weakref.WeakValueDictionary()
107_global_client_index = [0]
108
109_current_client = ContextVar("_current_client", default=None)
110
111DEFAULT_EXTENSIONS = [PubSubClientExtension]
112# Placeholder used in the get_dataset function(s)
113NO_DEFAULT_PLACEHOLDER = "_no_default_"
114
115
116def _get_global_client() -> Client | None:
117    L = sorted(list(_global_clients), reverse=True)
118    for k in L:
119        c = _global_clients[k]
120        if c.status != "closed":
121            return c
122        else:
123            del _global_clients[k]
124    return None
125
126
127def _set_global_client(c: Client | None) -> None:
128    if c is not None:
129        _global_clients[_global_client_index[0]] = c
130        _global_client_index[0] += 1
131
132
133def _del_global_client(c: Client) -> None:
134    for k in list(_global_clients):
135        try:
136            if _global_clients[k] is c:
137                del _global_clients[k]
138        except KeyError:
139            pass
140
141
142class Future(WrappedKey):
143    """A remotely running computation
144
145    A Future is a local proxy to a result running on a remote worker.  A user
146    manages future objects in the local Python process to determine what
147    happens in the larger cluster.
148
149    Parameters
150    ----------
151    key: str, or tuple
152        Key of remote data to which this future refers
153    client: Client
154        Client that should own this future.  Defaults to _get_global_client()
155    inform: bool
156        Do we inform the scheduler that we need an update on this future
157
158    Examples
159    --------
160    Futures typically emerge from Client computations
161
162    >>> my_future = client.submit(add, 1, 2)  # doctest: +SKIP
163
164    We can track the progress and results of a future
165
166    >>> my_future  # doctest: +SKIP
167    <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
168
169    We can get the result or the exception and traceback from the future
170
171    >>> my_future.result()  # doctest: +SKIP
172
173    See Also
174    --------
175    Client:  Creates futures
176    """
177
178    _cb_executor = None
179    _cb_executor_pid = None
180
181    def __init__(self, key, client=None, inform=True, state=None):
182        self.key = key
183        self._cleared = False
184        tkey = stringify(key)
185        self.client = client or Client.current()
186        self.client._inc_ref(tkey)
187        self._generation = self.client.generation
188
189        if tkey in self.client.futures:
190            self._state = self.client.futures[tkey]
191        else:
192            self._state = self.client.futures[tkey] = FutureState()
193
194        if inform:
195            self.client._send_to_scheduler(
196                {
197                    "op": "client-desires-keys",
198                    "keys": [stringify(key)],
199                    "client": self.client.id,
200                }
201            )
202
203        if state is not None:
204            try:
205                handler = self.client._state_handlers[state]
206            except KeyError:
207                pass
208            else:
209                handler(key=key)
210
211    @property
212    def executor(self):
213        return self.client
214
215    @property
216    def status(self):
217        return self._state.status
218
219    def done(self):
220        """Is the computation complete?"""
221        return self._state.done()
222
223    def result(self, timeout=None):
224        """Wait until computation completes, gather result to local process.
225
226        If *timeout* seconds are elapsed before returning, a
227        ``dask.distributed.TimeoutError`` is raised.
228        """
229        if self.client.asynchronous:
230            return self.client.sync(self._result, callback_timeout=timeout)
231
232        # shorten error traceback
233        result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False)
234        if self.status == "error":
235            typ, exc, tb = result
236            raise exc.with_traceback(tb)
237        elif self.status == "cancelled":
238            raise result
239        else:
240            return result
241
242    async def _result(self, raiseit=True):
243        await self._state.wait()
244        if self.status == "error":
245            exc = clean_exception(self._state.exception, self._state.traceback)
246            if raiseit:
247                typ, exc, tb = exc
248                raise exc.with_traceback(tb)
249            else:
250                return exc
251        elif self.status == "cancelled":
252            exception = CancelledError(self.key)
253            if raiseit:
254                raise exception
255            else:
256                return exception
257        else:
258            result = await self.client._gather([self])
259            return result[0]
260
261    async def _exception(self):
262        await self._state.wait()
263        if self.status == "error":
264            return self._state.exception
265        else:
266            return None
267
268    def exception(self, timeout=None, **kwargs):
269        """Return the exception of a failed task
270
271        If *timeout* seconds are elapsed before returning, a
272        ``dask.distributed.TimeoutError`` is raised.
273
274        See Also
275        --------
276        Future.traceback
277        """
278        return self.client.sync(self._exception, callback_timeout=timeout, **kwargs)
279
280    def add_done_callback(self, fn):
281        """Call callback on future when callback has finished
282
283        The callback ``fn`` should take the future as its only argument.  This
284        will be called regardless of if the future completes successfully,
285        errs, or is cancelled
286
287        The callback is executed in a separate thread.
288        """
289        cls = Future
290        if cls._cb_executor is None or cls._cb_executor_pid != os.getpid():
291            try:
292                cls._cb_executor = ThreadPoolExecutor(
293                    1, thread_name_prefix="Dask-Callback-Thread"
294                )
295            except TypeError:
296                cls._cb_executor = ThreadPoolExecutor(1)
297            cls._cb_executor_pid = os.getpid()
298
299        def execute_callback(fut):
300            try:
301                fn(fut)
302            except BaseException:
303                logger.exception("Error in callback %s of %s:", fn, fut)
304
305        self.client.loop.add_callback(
306            done_callback, self, partial(cls._cb_executor.submit, execute_callback)
307        )
308
309    def cancel(self, **kwargs):
310        """Cancel request to run this future
311
312        See Also
313        --------
314        Client.cancel
315        """
316        return self.client.cancel([self], **kwargs)
317
318    def retry(self, **kwargs):
319        """Retry this future if it has failed
320
321        See Also
322        --------
323        Client.retry
324        """
325        return self.client.retry([self], **kwargs)
326
327    def cancelled(self):
328        """Returns True if the future has been cancelled"""
329        return self._state.status == "cancelled"
330
331    async def _traceback(self):
332        await self._state.wait()
333        if self.status == "error":
334            return self._state.traceback
335        else:
336            return None
337
338    def traceback(self, timeout=None, **kwargs):
339        """Return the traceback of a failed task
340
341        This returns a traceback object.  You can inspect this object using the
342        ``traceback`` module.  Alternatively if you call ``future.result()``
343        this traceback will accompany the raised exception.
344
345        If *timeout* seconds are elapsed before returning, a
346        ``dask.distributed.TimeoutError`` is raised.
347
348        Examples
349        --------
350        >>> import traceback  # doctest: +SKIP
351        >>> tb = future.traceback()  # doctest: +SKIP
352        >>> traceback.format_tb(tb)  # doctest: +SKIP
353        [...]
354
355        See Also
356        --------
357        Future.exception
358        """
359        return self.client.sync(self._traceback, callback_timeout=timeout, **kwargs)
360
361    @property
362    def type(self):
363        return self._state.type
364
365    def release(self, _in_destructor=False):
366        # NOTE: this method can be called from different threads
367        # (see e.g. Client.get() or Future.__del__())
368        if not self._cleared and self.client.generation == self._generation:
369            self._cleared = True
370            try:
371                self.client.loop.add_callback(self.client._dec_ref, stringify(self.key))
372            except TypeError:
373                pass  # Shutting down, add_callback may be None
374
375    def __getstate__(self):
376        return self.key, self.client.scheduler.address
377
378    def __setstate__(self, state):
379        key, address = state
380        try:
381            c = Client.current(allow_global=False)
382        except ValueError:
383            c = get_client(address)
384        self.__init__(key, c)
385        c._send_to_scheduler(
386            {
387                "op": "update-graph",
388                "tasks": {},
389                "keys": [stringify(self.key)],
390                "client": c.id,
391            }
392        )
393
394    def __del__(self):
395        try:
396            self.release()
397        except AttributeError:
398            # Ocassionally we see this error when shutting down the client
399            # https://github.com/dask/distributed/issues/4305
400            if not sys.is_finalizing():
401                raise
402        except RuntimeError:  # closed event loop
403            pass
404
405    def __repr__(self):
406        if self.type:
407            return (
408                f"<Future: {self.status}, type: {typename(self.type)}, key: {self.key}>"
409            )
410        else:
411            return f"<Future: {self.status}, key: {self.key}>"
412
413    def _repr_html_(self):
414        return get_template("future.html.j2").render(
415            key=str(self.key),
416            type=typename(self.type),
417            status=self.status,
418        )
419
420    def __await__(self):
421        return self.result().__await__()
422
423
424class FutureState:
425    """A Future's internal state.
426
427    This is shared between all Futures with the same key and client.
428    """
429
430    __slots__ = ("_event", "status", "type", "exception", "traceback")
431
432    def __init__(self):
433        self._event = None
434        self.status = "pending"
435        self.type = None
436
437    def _get_event(self):
438        # Can't create Event eagerly in constructor as it can fetch
439        # its IOLoop from the wrong thread
440        # (https://github.com/tornadoweb/tornado/issues/2189)
441        event = self._event
442        if event is None:
443            event = self._event = asyncio.Event()
444        return event
445
446    def cancel(self):
447        self.status = "cancelled"
448        self.exception = CancelledError()
449        self._get_event().set()
450
451    def finish(self, type=None):
452        self.status = "finished"
453        self._get_event().set()
454        if type is not None:
455            self.type = type
456
457    def lose(self):
458        self.status = "lost"
459        self._get_event().clear()
460
461    def retry(self):
462        self.status = "pending"
463        self._get_event().clear()
464
465    def set_error(self, exception, traceback):
466        _, exception, traceback = clean_exception(exception, traceback)
467
468        self.status = "error"
469        self.exception = exception
470        self.traceback = traceback
471        self._get_event().set()
472
473    def done(self):
474        return self._event is not None and self._event.is_set()
475
476    def reset(self):
477        self.status = "pending"
478        if self._event is not None:
479            self._event.clear()
480
481    async def wait(self, timeout=None):
482        await asyncio.wait_for(self._get_event().wait(), timeout)
483
484    def __repr__(self):
485        return f"<{self.__class__.__name__}: {self.status}>"
486
487
488async def done_callback(future, callback):
489    """Coroutine that waits on future, then calls callback"""
490    while future.status == "pending":
491        await future._state.wait()
492    callback(future)
493
494
495@partial(normalize_token.register, Future)
496def normalize_future(f):
497    return [f.key, type(f)]
498
499
500class AllExit(Exception):
501    """Custom exception class to exit All(...) early."""
502
503
504def _handle_print(event):
505    _, msg = event
506    if isinstance(msg, dict) and "args" in msg and "kwargs" in msg:
507        print(*msg["args"], **msg["kwargs"])
508    else:
509        print(msg)
510
511
512def _handle_warn(event):
513    _, msg = event
514    if isinstance(msg, dict) and "args" in msg and "kwargs" in msg:
515        warnings.warn(*msg["args"], **msg["kwargs"])
516    else:
517        warnings.warn(msg)
518
519
520class Client:
521    """Connect to and submit computation to a Dask cluster
522
523    The Client connects users to a Dask cluster.  It provides an asynchronous
524    user interface around functions and futures.  This class resembles
525    executors in ``concurrent.futures`` but also allows ``Future`` objects
526    within ``submit/map`` calls.  When a Client is instantiated it takes over
527    all ``dask.compute`` and ``dask.persist`` calls by default.
528
529    It is also common to create a Client without specifying the scheduler
530    address , like ``Client()``.  In this case the Client creates a
531    :class:`LocalCluster` in the background and connects to that.  Any extra
532    keywords are passed from Client to LocalCluster in this case.  See the
533    LocalCluster documentation for more information.
534
535    Parameters
536    ----------
537    address: string, or Cluster
538        This can be the address of a ``Scheduler`` server like a string
539        ``'127.0.0.1:8786'`` or a cluster object like ``LocalCluster()``
540    timeout: int
541        Timeout duration for initial connection to the scheduler
542    set_as_default: bool (True)
543        Use this Client as the global dask scheduler
544    scheduler_file: string (optional)
545        Path to a file with scheduler information if available
546    security: Security or bool, optional
547        Optional security information. If creating a local cluster can also
548        pass in ``True``, in which case temporary self-signed credentials will
549        be created automatically.
550    asynchronous: bool (False by default)
551        Set to True if using this client within async/await functions or within
552        Tornado gen.coroutines.  Otherwise this should remain False for normal
553        use.
554    name: string (optional)
555        Gives the client a name that will be included in logs generated on
556        the scheduler for matters relating to this client
557    direct_to_workers: bool (optional)
558        Whether or not to connect directly to the workers, or to ask
559        the scheduler to serve as intermediary.
560    heartbeat_interval: int
561        Time in milliseconds between heartbeats to scheduler
562    **kwargs:
563        If you do not pass a scheduler address, Client will create a
564        ``LocalCluster`` object, passing any extra keyword arguments.
565
566    Examples
567    --------
568    Provide cluster's scheduler node address on initialization:
569
570    >>> client = Client('127.0.0.1:8786')  # doctest: +SKIP
571
572    Use ``submit`` method to send individual computations to the cluster
573
574    >>> a = client.submit(add, 1, 2)  # doctest: +SKIP
575    >>> b = client.submit(add, 10, 20)  # doctest: +SKIP
576
577    Continue using submit or map on results to build up larger computations
578
579    >>> c = client.submit(add, a, b)  # doctest: +SKIP
580
581    Gather results with the ``gather`` method.
582
583    >>> client.gather(c)  # doctest: +SKIP
584    33
585
586    You can also call Client with no arguments in order to create your own
587    local cluster.
588
589    >>> client = Client()  # makes your own local "cluster" # doctest: +SKIP
590
591    Extra keywords will be passed directly to LocalCluster
592
593    >>> client = Client(n_workers=2, threads_per_worker=4)  # doctest: +SKIP
594
595    See Also
596    --------
597    distributed.scheduler.Scheduler: Internal scheduler
598    distributed.LocalCluster:
599    """
600
601    _instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet()
602
603    _default_event_handlers = {"print": _handle_print, "warn": _handle_warn}
604
605    def __init__(
606        self,
607        address=None,
608        loop=None,
609        timeout=no_default,
610        set_as_default=True,
611        scheduler_file=None,
612        security=None,
613        asynchronous=False,
614        name=None,
615        heartbeat_interval=None,
616        serializers=None,
617        deserializers=None,
618        extensions=DEFAULT_EXTENSIONS,
619        direct_to_workers=None,
620        connection_limit=512,
621        **kwargs,
622    ):
623        if timeout == no_default:
624            timeout = dask.config.get("distributed.comm.timeouts.connect")
625        if timeout is not None:
626            timeout = parse_timedelta(timeout, "s")
627        self._timeout = timeout
628
629        self.futures = dict()
630        self.refcount = defaultdict(lambda: 0)
631        self.coroutines = []
632        if name is None:
633            name = dask.config.get("client-name", None)
634        self.id = (
635            type(self).__name__
636            + ("-" + name + "-" if name else "-")
637            + str(uuid.uuid1(clock_seq=os.getpid()))
638        )
639        self.generation = 0
640        self.status = "newly-created"
641        self._pending_msg_buffer = []
642        self.extensions = {}
643        self.scheduler_file = scheduler_file
644        self._startup_kwargs = kwargs
645        self.cluster = None
646        self.scheduler = None
647        self._scheduler_identity = {}
648        # A reentrant-lock on the refcounts for futures associated with this
649        # client. Should be held by individual operations modifying refcounts,
650        # or any bulk operation that needs to ensure the set of futures doesn't
651        # change during operation.
652        self._refcount_lock = threading.RLock()
653        self.datasets = Datasets(self)
654        self._serializers = serializers
655        if deserializers is None:
656            deserializers = serializers
657        self._deserializers = deserializers
658        self.direct_to_workers = direct_to_workers
659
660        # Communication
661        self.scheduler_comm = None
662
663        if address is None:
664            address = dask.config.get("scheduler-address", None)
665            if address:
666                logger.info("Config value `scheduler-address` found: %s", address)
667
668        if address is not None and kwargs:
669            raise ValueError(f"Unexpected keyword arguments: {sorted(kwargs)}")
670
671        if isinstance(address, (rpc, PooledRPCCall)):
672            self.scheduler = address
673        elif isinstance(getattr(address, "scheduler_address", None), str):
674            # It's a LocalCluster or LocalCluster-compatible object
675            self.cluster = address
676            with suppress(AttributeError):
677                loop = address.loop
678            if security is None:
679                security = getattr(self.cluster, "security", None)
680        elif address is not None and not isinstance(address, str):
681            raise TypeError(
682                "Scheduler address must be a string or a Cluster instance, got {}".format(
683                    type(address)
684                )
685            )
686
687        if security is None:
688            security = Security()
689        elif isinstance(security, dict):
690            security = Security(**security)
691        elif security is True:
692            security = Security.temporary()
693            self._startup_kwargs["security"] = security
694        elif not isinstance(security, Security):
695            raise TypeError("security must be a Security object")
696
697        self.security = security
698
699        if name == "worker":
700            self.connection_args = self.security.get_connection_args("worker")
701        else:
702            self.connection_args = self.security.get_connection_args("client")
703
704        self._connecting_to_scheduler = False
705        self._asynchronous = asynchronous
706        self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
707        self.io_loop = self.loop = self._loop_runner.loop
708
709        self._gather_keys = None
710        self._gather_future = None
711
712        if heartbeat_interval is None:
713            heartbeat_interval = dask.config.get("distributed.client.heartbeat")
714        heartbeat_interval = parse_timedelta(heartbeat_interval, default="ms")
715
716        scheduler_info_interval = parse_timedelta(
717            dask.config.get("distributed.client.scheduler-info-interval", default="ms")
718        )
719
720        self._periodic_callbacks = dict()
721        self._periodic_callbacks["scheduler-info"] = PeriodicCallback(
722            self._update_scheduler_info, scheduler_info_interval * 1000
723        )
724        self._periodic_callbacks["heartbeat"] = PeriodicCallback(
725            self._heartbeat, heartbeat_interval * 1000
726        )
727
728        self._start_arg = address
729        self._set_as_default = set_as_default
730        if set_as_default:
731            self._set_config = dask.config.set(
732                scheduler="dask.distributed", shuffle="tasks"
733            )
734        self._event_handlers = {}
735
736        self._stream_handlers = {
737            "key-in-memory": self._handle_key_in_memory,
738            "lost-data": self._handle_lost_data,
739            "cancelled-key": self._handle_cancelled_key,
740            "task-retried": self._handle_retried_key,
741            "task-erred": self._handle_task_erred,
742            "restart": self._handle_restart,
743            "error": self._handle_error,
744            "event": self._handle_event,
745        }
746
747        self._state_handlers = {
748            "memory": self._handle_key_in_memory,
749            "lost": self._handle_lost_data,
750            "erred": self._handle_task_erred,
751        }
752
753        self.rpc = ConnectionPool(
754            limit=connection_limit,
755            serializers=serializers,
756            deserializers=deserializers,
757            deserialize=True,
758            connection_args=self.connection_args,
759            timeout=timeout,
760            server=self,
761        )
762
763        for ext in extensions:
764            ext(self)
765
766        self.start(timeout=timeout)
767        Client._instances.add(self)
768
769        from distributed.recreate_tasks import ReplayTaskClient
770
771        ReplayTaskClient(self)
772
773    @contextmanager
774    def as_current(self):
775        """Thread-local, Task-local context manager that causes the Client.current class
776        method to return self. Any Future objects deserialized inside this context
777        manager will be automatically attached to this Client.
778        """
779        tok = _current_client.set(self)
780        try:
781            yield
782        finally:
783            _current_client.reset(tok)
784
785    @classmethod
786    def current(cls, allow_global=True):
787        """When running within the context of `as_client`, return the context-local
788        current client. Otherwise, return the latest initialised Client.
789        If no Client instances exist, raise ValueError.
790        If allow_global is set to False, raise ValueError if running outside of the
791        `as_client` context manager.
792        """
793        out = _current_client.get()
794        if out:
795            return out
796        if allow_global:
797            return default_client()
798        raise ValueError("Not running inside the `as_current` context manager")
799
800    @property
801    def asynchronous(self):
802        """Are we running in the event loop?
803
804        This is true if the user signaled that we might be when creating the
805        client as in the following::
806
807            client = Client(asynchronous=True)
808
809        However, we override this expectation if we can definitively tell that
810        we are running from a thread that is not the event loop.  This is
811        common when calling get_client() from within a worker task.  Even
812        though the client was originally created in asynchronous mode we may
813        find ourselves in contexts when it is better to operate synchronously.
814        """
815        try:
816            return self._asynchronous and self.loop is IOLoop.current()
817        except RuntimeError:
818            return False
819
820    @property
821    def dashboard_link(self):
822        """Link to the scheduler's dashboard.
823
824        Returns
825        -------
826        str
827            Dashboard URL.
828
829        Examples
830        --------
831        Opening the dashboard in your default web browser:
832
833        >>> import webbrowser
834        >>> from distributed import Client
835        >>> client = Client()
836        >>> webbrowser.open(client.dashboard_link)
837
838        """
839        try:
840            return self.cluster.dashboard_link
841        except AttributeError:
842            scheduler, info = self._get_scheduler_info()
843            if scheduler is None:
844                return None
845            else:
846                protocol, rest = scheduler.address.split("://")
847
848            port = info["services"]["dashboard"]
849            if protocol == "inproc":
850                host = "localhost"
851            else:
852                host = rest.split(":")[0]
853
854            return format_dashboard_link(host, port)
855
856    def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
857        callback_timeout = parse_timedelta(callback_timeout)
858        if (
859            asynchronous
860            or self.asynchronous
861            or getattr(thread_state, "asynchronous", False)
862        ):
863            future = func(*args, **kwargs)
864            if callback_timeout is not None:
865                future = asyncio.wait_for(future, callback_timeout)
866            return future
867        else:
868            return sync(
869                self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
870            )
871
872    def _get_scheduler_info(self):
873        from .scheduler import Scheduler
874
875        if (
876            self.cluster
877            and hasattr(self.cluster, "scheduler")
878            and isinstance(self.cluster.scheduler, Scheduler)
879        ):
880            info = self.cluster.scheduler.identity()
881            scheduler = self.cluster.scheduler
882        elif (
883            self._loop_runner.is_started()
884            and self.scheduler
885            and not (self.asynchronous and self.loop is IOLoop.current())
886        ):
887            info = sync(self.loop, self.scheduler.identity)
888            scheduler = self.scheduler
889        else:
890            info = self._scheduler_identity
891            scheduler = self.scheduler
892
893        return scheduler, SchedulerInfo(info)
894
895    def __repr__(self):
896        # Note: avoid doing I/O here...
897        info = self._scheduler_identity
898        addr = info.get("address")
899        if addr:
900            workers = info.get("workers", {})
901            nworkers = len(workers)
902            nthreads = sum(w["nthreads"] for w in workers.values())
903            text = "<%s: %r processes=%d threads=%d" % (
904                self.__class__.__name__,
905                addr,
906                nworkers,
907                nthreads,
908            )
909            memory = [w["memory_limit"] for w in workers.values()]
910            if all(memory):
911                text += ", memory=" + format_bytes(sum(memory))
912            text += ">"
913            return text
914
915        elif self.scheduler is not None:
916            return "<{}: scheduler={!r}>".format(
917                self.__class__.__name__,
918                self.scheduler.address,
919            )
920        else:
921            return f"<{self.__class__.__name__}: No scheduler connected>"
922
923    def _repr_html_(self):
924        scheduler, info = self._get_scheduler_info()
925
926        return get_template("client.html.j2").render(
927            id=self.id,
928            scheduler=scheduler,
929            info=info,
930            cluster=self.cluster,
931            scheduler_file=self.scheduler_file,
932            dashboard_link=self.dashboard_link,
933        )
934
935    def start(self, **kwargs):
936        """Start scheduler running in separate thread"""
937        if self.status != "newly-created":
938            return
939
940        self._loop_runner.start()
941        if self._set_as_default:
942            _set_global_client(self)
943        self.status = "connecting"
944
945        if self.asynchronous:
946            self._started = asyncio.ensure_future(self._start(**kwargs))
947        else:
948            sync(self.loop, self._start, **kwargs)
949
950    def __await__(self):
951        if hasattr(self, "_started"):
952            return self._started.__await__()
953        else:
954
955            async def _():
956                return self
957
958            return _().__await__()
959
960    def _send_to_scheduler_safe(self, msg):
961        if self.status in ("running", "closing"):
962            try:
963                self.scheduler_comm.send(msg)
964            except (CommClosedError, AttributeError):
965                if self.status == "running":
966                    raise
967        elif self.status in ("connecting", "newly-created"):
968            self._pending_msg_buffer.append(msg)
969
970    def _send_to_scheduler(self, msg):
971        if self.status in ("running", "closing", "connecting", "newly-created"):
972            self.loop.add_callback(self._send_to_scheduler_safe, msg)
973        else:
974            raise Exception(
975                "Tried sending message after closing.  Status: %s\n"
976                "Message: %s" % (self.status, msg)
977            )
978
979    async def _start(self, timeout=no_default, **kwargs):
980        await self.rpc.start()
981
982        if timeout == no_default:
983            timeout = self._timeout
984        if timeout is not None:
985            timeout = parse_timedelta(timeout, "s")
986
987        address = self._start_arg
988        if self.cluster is not None:
989            # Ensure the cluster is started (no-op if already running)
990            try:
991                await self.cluster
992            except Exception:
993                logger.info(
994                    "Tried to start cluster and received an error. Proceeding.",
995                    exc_info=True,
996                )
997            address = self.cluster.scheduler_address
998        elif self.scheduler_file is not None:
999            while not os.path.exists(self.scheduler_file):
1000                await asyncio.sleep(0.01)
1001            for i in range(10):
1002                try:
1003                    with open(self.scheduler_file) as f:
1004                        cfg = json.load(f)
1005                    address = cfg["address"]
1006                    break
1007                except (ValueError, KeyError):  # JSON file not yet flushed
1008                    await asyncio.sleep(0.01)
1009        elif self._start_arg is None:
1010            from .deploy import LocalCluster
1011
1012            try:
1013                self.cluster = await LocalCluster(
1014                    loop=self.loop,
1015                    asynchronous=self._asynchronous,
1016                    **self._startup_kwargs,
1017                )
1018            except OSError as e:
1019                if e.errno != errno.EADDRINUSE:
1020                    raise
1021                # The default port was taken, use a random one
1022                self.cluster = await LocalCluster(
1023                    scheduler_port=0,
1024                    loop=self.loop,
1025                    asynchronous=True,
1026                    **self._startup_kwargs,
1027                )
1028
1029            address = self.cluster.scheduler_address
1030
1031        self._gather_semaphore = asyncio.Semaphore(5)
1032
1033        if self.scheduler is None:
1034            self.scheduler = self.rpc(address)
1035        self.scheduler_comm = None
1036
1037        try:
1038            await self._ensure_connected(timeout=timeout)
1039        except (OSError, ImportError):
1040            await self._close()
1041            raise
1042
1043        for pc in self._periodic_callbacks.values():
1044            pc.start()
1045
1046        for topic, handler in Client._default_event_handlers.items():
1047            self.subscribe_topic(topic, handler)
1048
1049        self._handle_scheduler_coroutine = asyncio.ensure_future(self._handle_report())
1050        self.coroutines.append(self._handle_scheduler_coroutine)
1051
1052        return self
1053
1054    async def _reconnect(self):
1055        with log_errors():
1056            assert self.scheduler_comm.comm.closed()
1057
1058            self.status = "connecting"
1059            self.scheduler_comm = None
1060
1061            for st in self.futures.values():
1062                st.cancel()
1063            self.futures.clear()
1064
1065            timeout = self._timeout
1066            deadline = self.loop.time() + timeout
1067            while timeout > 0 and self.status == "connecting":
1068                try:
1069                    await self._ensure_connected(timeout=timeout)
1070                    break
1071                except OSError:
1072                    # Wait a bit before retrying
1073                    await asyncio.sleep(0.1)
1074                    timeout = deadline - self.loop.time()
1075                except ImportError:
1076                    await self._close()
1077                    break
1078            else:
1079                logger.error(
1080                    "Failed to reconnect to scheduler after %.2f "
1081                    "seconds, closing client",
1082                    self._timeout,
1083                )
1084                await self._close()
1085
1086    async def _ensure_connected(self, timeout=None):
1087        if (
1088            self.scheduler_comm
1089            and not self.scheduler_comm.closed()
1090            or self._connecting_to_scheduler
1091            or self.scheduler is None
1092        ):
1093            return
1094
1095        self._connecting_to_scheduler = True
1096
1097        try:
1098            comm = await connect(
1099                self.scheduler.address, timeout=timeout, **self.connection_args
1100            )
1101            comm.name = "Client->Scheduler"
1102            if timeout is not None:
1103                await asyncio.wait_for(self._update_scheduler_info(), timeout)
1104            else:
1105                await self._update_scheduler_info()
1106            await comm.write(
1107                {
1108                    "op": "register-client",
1109                    "client": self.id,
1110                    "reply": False,
1111                    "versions": version_module.get_versions(),
1112                }
1113            )
1114        except Exception:
1115            if self.status == "closed":
1116                return
1117            else:
1118                raise
1119        finally:
1120            self._connecting_to_scheduler = False
1121        if timeout is not None:
1122            msg = await asyncio.wait_for(comm.read(), timeout)
1123        else:
1124            msg = await comm.read()
1125        assert len(msg) == 1
1126        assert msg[0]["op"] == "stream-start"
1127
1128        if msg[0].get("error"):
1129            raise ImportError(msg[0]["error"])
1130        if msg[0].get("warning"):
1131            warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
1132
1133        bcomm = BatchedSend(interval="10ms", loop=self.loop)
1134        bcomm.start(comm)
1135        self.scheduler_comm = bcomm
1136        if self._set_as_default:
1137            _set_global_client(self)
1138        self.status = "running"
1139
1140        for msg in self._pending_msg_buffer:
1141            self._send_to_scheduler(msg)
1142        del self._pending_msg_buffer[:]
1143
1144        logger.debug("Started scheduling coroutines. Synchronized")
1145
1146    async def _update_scheduler_info(self):
1147        if self.status not in ("running", "connecting"):
1148            return
1149        try:
1150            self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
1151        except OSError:
1152            logger.debug("Not able to query scheduler for identity")
1153
1154    async def _wait_for_workers(self, n_workers=0, timeout=None):
1155        info = await self.scheduler.identity()
1156        if timeout:
1157            deadline = time() + parse_timedelta(timeout)
1158        else:
1159            deadline = None
1160        while n_workers and len(info["workers"]) < n_workers:
1161            if deadline and time() > deadline:
1162                raise TimeoutError(
1163                    "Only %d/%d workers arrived after %s"
1164                    % (len(info["workers"]), n_workers, timeout)
1165                )
1166            await asyncio.sleep(0.1)
1167            info = await self.scheduler.identity()
1168
1169    def wait_for_workers(self, n_workers=0, timeout=None):
1170        """Blocking call to wait for n workers before continuing"""
1171        return self.sync(self._wait_for_workers, n_workers, timeout=timeout)
1172
1173    def _heartbeat(self):
1174        if self.scheduler_comm:
1175            self.scheduler_comm.send({"op": "heartbeat-client"})
1176
1177    def __enter__(self):
1178        if not self._loop_runner.is_started():
1179            self.start()
1180        return self
1181
1182    async def __aenter__(self):
1183        await self
1184        return self
1185
1186    async def __aexit__(self, typ, value, traceback):
1187        await self._close()
1188
1189    def __exit__(self, type, value, traceback):
1190        self.close()
1191
1192    def __del__(self):
1193        self.close()
1194
1195    def _inc_ref(self, key):
1196        with self._refcount_lock:
1197            self.refcount[key] += 1
1198
1199    def _dec_ref(self, key):
1200        with self._refcount_lock:
1201            self.refcount[key] -= 1
1202            if self.refcount[key] == 0:
1203                del self.refcount[key]
1204                self._release_key(key)
1205
1206    def _release_key(self, key):
1207        """Release key from distributed memory"""
1208        logger.debug("Release key %s", key)
1209        st = self.futures.pop(key, None)
1210        if st is not None:
1211            st.cancel()
1212        if self.status != "closed":
1213            self._send_to_scheduler(
1214                {"op": "client-releases-keys", "keys": [key], "client": self.id}
1215            )
1216
1217    async def _handle_report(self):
1218        """Listen to scheduler"""
1219        with log_errors():
1220            try:
1221                while True:
1222                    if self.scheduler_comm is None:
1223                        break
1224                    try:
1225                        msgs = await self.scheduler_comm.comm.read()
1226                    except CommClosedError:
1227                        if self.status == "running":
1228                            logger.info("Client report stream closed to scheduler")
1229                            logger.info("Reconnecting...")
1230                            self.status = "connecting"
1231                            await self._reconnect()
1232                            continue
1233                        else:
1234                            break
1235                    if not isinstance(msgs, (list, tuple)):
1236                        msgs = (msgs,)
1237
1238                    breakout = False
1239                    for msg in msgs:
1240                        logger.debug("Client receives message %s", msg)
1241
1242                        if "status" in msg and "error" in msg["status"]:
1243                            typ, exc, tb = clean_exception(**msg)
1244                            raise exc.with_traceback(tb)
1245
1246                        op = msg.pop("op")
1247
1248                        if op == "close" or op == "stream-closed":
1249                            breakout = True
1250                            break
1251
1252                        try:
1253                            handler = self._stream_handlers[op]
1254                            result = handler(**msg)
1255                            if inspect.isawaitable(result):
1256                                await result
1257                        except Exception as e:
1258                            logger.exception(e)
1259                    if breakout:
1260                        break
1261            except CancelledError:
1262                pass
1263
1264    def _handle_key_in_memory(self, key=None, type=None, workers=None):
1265        state = self.futures.get(key)
1266        if state is not None:
1267            if type and not state.type:  # Type exists and not yet set
1268                try:
1269                    type = loads(type)
1270                except Exception:
1271                    type = None
1272                # Here, `type` may be a str if actual type failed
1273                # serializing in Worker
1274            else:
1275                type = None
1276            state.finish(type)
1277
1278    def _handle_lost_data(self, key=None):
1279        state = self.futures.get(key)
1280        if state is not None:
1281            state.lose()
1282
1283    def _handle_cancelled_key(self, key=None):
1284        state = self.futures.get(key)
1285        if state is not None:
1286            state.cancel()
1287
1288    def _handle_retried_key(self, key=None):
1289        state = self.futures.get(key)
1290        if state is not None:
1291            state.retry()
1292
1293    def _handle_task_erred(self, key=None, exception=None, traceback=None):
1294        state = self.futures.get(key)
1295        if state is not None:
1296            state.set_error(exception, traceback)
1297
1298    def _handle_restart(self):
1299        logger.info("Receive restart signal from scheduler")
1300        for state in self.futures.values():
1301            state.cancel()
1302        self.futures.clear()
1303        with suppress(AttributeError):
1304            self._restart_event.set()
1305
1306    def _handle_error(self, exception=None):
1307        logger.warning("Scheduler exception:")
1308        logger.exception(exception)
1309
1310    async def _close(self, fast=False):
1311        """Send close signal and wait until scheduler completes"""
1312        if self.status == "closed":
1313            return
1314
1315        self.status = "closing"
1316
1317        with suppress(AttributeError):
1318            for pc in self._periodic_callbacks.values():
1319                pc.stop()
1320
1321        with log_errors():
1322            _del_global_client(self)
1323            self._scheduler_identity = {}
1324            with suppress(AttributeError):
1325                # clear the dask.config set keys
1326                with self._set_config:
1327                    pass
1328            if self.get == dask.config.get("get", None):
1329                del dask.config.config["get"]
1330
1331            if (
1332                self.scheduler_comm
1333                and self.scheduler_comm.comm
1334                and not self.scheduler_comm.comm.closed()
1335            ):
1336                self._send_to_scheduler({"op": "close-client"})
1337                self._send_to_scheduler({"op": "close-stream"})
1338
1339            # Give the scheduler 'stream-closed' message 100ms to come through
1340            # This makes the shutdown slightly smoother and quieter
1341            with suppress(AttributeError, asyncio.CancelledError, TimeoutError):
1342                await asyncio.wait_for(
1343                    asyncio.shield(self._handle_scheduler_coroutine), 0.1
1344                )
1345
1346            if (
1347                self.scheduler_comm
1348                and self.scheduler_comm.comm
1349                and not self.scheduler_comm.comm.closed()
1350            ):
1351                await self.scheduler_comm.close()
1352
1353            for key in list(self.futures):
1354                self._release_key(key=key)
1355
1356            if self._start_arg is None:
1357                with suppress(AttributeError):
1358                    await self.cluster.close()
1359
1360            await self.rpc.close()
1361
1362            self.status = "closed"
1363
1364            if _get_global_client() is self:
1365                _set_global_client(None)
1366
1367            coroutines = set(self.coroutines)
1368            for f in self.coroutines:
1369                # cancel() works on asyncio futures (Tornado 5)
1370                # but is a no-op on Tornado futures
1371                with suppress(RuntimeError):
1372                    f.cancel()
1373                if f.cancelled():
1374                    coroutines.remove(f)
1375            del self.coroutines[:]
1376
1377            if not fast:
1378                with suppress(TimeoutError, asyncio.CancelledError):
1379                    await asyncio.wait_for(asyncio.gather(*coroutines), 2)
1380
1381            with suppress(AttributeError):
1382                await self.scheduler.close_rpc()
1383
1384            self.scheduler = None
1385
1386        self.status = "closed"
1387
1388    def close(self, timeout=no_default):
1389        """Close this client
1390
1391        Clients will also close automatically when your Python session ends
1392
1393        If you started a client without arguments like ``Client()`` then this
1394        will also close the local cluster that was started at the same time.
1395
1396        See Also
1397        --------
1398        Client.restart
1399        """
1400        if timeout == no_default:
1401            timeout = self._timeout * 2
1402        # XXX handling of self.status here is not thread-safe
1403        if self.status in ["closed", "newly-created"]:
1404            if self.asynchronous:
1405                future = asyncio.Future()
1406                future.set_result(None)
1407                return future
1408            return
1409        self.status = "closing"
1410
1411        with suppress(AttributeError):
1412            for pc in self._periodic_callbacks.values():
1413                pc.stop()
1414
1415        if self.asynchronous:
1416            coro = self._close()
1417            if timeout:
1418                coro = asyncio.wait_for(coro, timeout)
1419            return coro
1420
1421        if self._start_arg is None:
1422            with suppress(AttributeError):
1423                f = self.cluster.close()
1424                if asyncio.iscoroutine(f):
1425
1426                    async def _():
1427                        await f
1428
1429                    self.sync(_)
1430
1431        sync(self.loop, self._close, fast=True, callback_timeout=timeout)
1432
1433        assert self.status == "closed"
1434
1435        if not sys.is_finalizing():
1436            self._loop_runner.stop()
1437
1438    async def _shutdown(self):
1439        logger.info("Shutting down scheduler from Client")
1440        if self.cluster:
1441            await self.cluster.close()
1442        else:
1443            with suppress(CommClosedError):
1444                self.status = "closing"
1445                await self.scheduler.terminate(close_workers=True)
1446
1447    def shutdown(self):
1448        """Shut down the connected scheduler and workers
1449
1450        Note, this may disrupt other clients that may be using the same
1451        scheduler and workers.
1452
1453        See Also
1454        --------
1455        Client.close : close only this client
1456        """
1457        return self.sync(self._shutdown)
1458
1459    def get_executor(self, **kwargs):
1460        """
1461        Return a concurrent.futures Executor for submitting tasks on this Client
1462
1463        Parameters
1464        ----------
1465        **kwargs
1466            Any submit()- or map()- compatible arguments, such as
1467            `workers` or `resources`.
1468
1469        Returns
1470        -------
1471        An Executor object that's fully compatible with the concurrent.futures
1472        API.
1473        """
1474        return ClientExecutor(self, **kwargs)
1475
1476    def submit(
1477        self,
1478        func,
1479        *args,
1480        key=None,
1481        workers=None,
1482        resources=None,
1483        retries=None,
1484        priority=0,
1485        fifo_timeout="100 ms",
1486        allow_other_workers=False,
1487        actor=False,
1488        actors=False,
1489        pure=None,
1490        **kwargs,
1491    ):
1492
1493        """Submit a function application to the scheduler
1494
1495        Parameters
1496        ----------
1497        func : callable
1498            Callable to be scheduled as ``func(*args **kwargs)``. If ``func`` returns a
1499            coroutine, it will be run on the main event loop of a worker. Otherwise
1500            ``func`` will be run in a worker's task executor pool (see
1501            ``Worker.executors`` for more information.)
1502        *args
1503        **kwargs
1504        pure : bool (defaults to True)
1505            Whether or not the function is pure.  Set ``pure=False`` for
1506            impure functions like ``np.random.random``.
1507            See :ref:`pure functions` for more details.
1508        workers : string or iterable of strings
1509            A set of worker addresses or hostnames on which computations may be
1510            performed. Leave empty to default to all workers (common case)
1511        key : str
1512            Unique identifier for the task.  Defaults to function-name and hash
1513        allow_other_workers : bool (defaults to False)
1514            Used with ``workers``. Indicates whether or not the computations
1515            may be performed on workers that are not in the `workers` set(s).
1516        retries : int (default to 0)
1517            Number of allowed automatic retries if the task fails
1518        priority : Number
1519            Optional prioritization of task.  Zero is default.
1520            Higher priorities take precedence
1521        fifo_timeout : str timedelta (default '100ms')
1522            Allowed amount of time between calls to consider the same priority
1523        resources : dict (defaults to {})
1524            Defines the ``resources`` each instance of this mapped task requires
1525            on the worker; e.g. ``{'GPU': 2}``.
1526            See :doc:`worker resources <resources>` for details on defining
1527            resources.
1528        actor : bool (default False)
1529            Whether this task should exist on the worker as a stateful actor.
1530            See :doc:`actors` for additional details.
1531        actors : bool (default False)
1532            Alias for `actor`
1533
1534        Examples
1535        --------
1536        >>> c = client.submit(add, a, b)  # doctest: +SKIP
1537
1538        Returns
1539        -------
1540        Future
1541
1542        See Also
1543        --------
1544        Client.map : Submit on many arguments at once
1545        """
1546        if not callable(func):
1547            raise TypeError("First input to submit must be a callable function")
1548
1549        actor = actor or actors
1550        if pure is None:
1551            pure = not actor
1552
1553        if allow_other_workers not in (True, False, None):
1554            raise TypeError("allow_other_workers= must be True or False")
1555
1556        if key is None:
1557            if pure:
1558                key = funcname(func) + "-" + tokenize(func, kwargs, *args)
1559            else:
1560                key = funcname(func) + "-" + str(uuid.uuid4())
1561
1562        skey = stringify(key)
1563
1564        with self._refcount_lock:
1565            if skey in self.futures:
1566                return Future(key, self, inform=False)
1567
1568        if allow_other_workers and workers is None:
1569            raise ValueError("Only use allow_other_workers= if using workers=")
1570
1571        if isinstance(workers, (str, Number)):
1572            workers = [workers]
1573
1574        if kwargs:
1575            dsk = {skey: (apply, func, list(args), kwargs)}
1576        else:
1577            dsk = {skey: (func,) + tuple(args)}
1578
1579        futures = self._graph_to_futures(
1580            dsk,
1581            [skey],
1582            workers=workers,
1583            allow_other_workers=allow_other_workers,
1584            priority={skey: 0},
1585            user_priority=priority,
1586            resources=resources,
1587            retries=retries,
1588            fifo_timeout=fifo_timeout,
1589            actors=actor,
1590        )
1591
1592        logger.debug("Submit %s(...), %s", funcname(func), key)
1593
1594        return futures[skey]
1595
1596    def map(
1597        self,
1598        func,
1599        *iterables,
1600        key=None,
1601        workers=None,
1602        retries=None,
1603        resources=None,
1604        priority=0,
1605        allow_other_workers=False,
1606        fifo_timeout="100 ms",
1607        actor=False,
1608        actors=False,
1609        pure=None,
1610        batch_size=None,
1611        **kwargs,
1612    ):
1613        """Map a function on a sequence of arguments
1614
1615        Arguments can be normal objects or Futures
1616
1617        Parameters
1618        ----------
1619        func : callable
1620            Callable to be scheduled for execution. If ``func`` returns a coroutine, it
1621            will be run on the main event loop of a worker. Otherwise ``func`` will be
1622            run in a worker's task executor pool (see ``Worker.executors`` for more
1623            information.)
1624        iterables : Iterables
1625            List-like objects to map over.  They should have the same length.
1626        key : str, list
1627            Prefix for task names if string.  Explicit names if list.
1628        pure : bool (defaults to True)
1629            Whether or not the function is pure.  Set ``pure=False`` for
1630            impure functions like ``np.random.random``.
1631            See :ref:`pure functions` for more details.
1632        workers : string or iterable of strings
1633            A set of worker hostnames on which computations may be performed.
1634            Leave empty to default to all workers (common case)
1635        allow_other_workers : bool (defaults to False)
1636            Used with `workers`. Indicates whether or not the computations
1637            may be performed on workers that are not in the `workers` set(s).
1638        retries : int (default to 0)
1639            Number of allowed automatic retries if a task fails
1640        priority : Number
1641            Optional prioritization of task.  Zero is default.
1642            Higher priorities take precedence
1643        fifo_timeout : str timedelta (default '100ms')
1644            Allowed amount of time between calls to consider the same priority
1645        resources : dict (defaults to {})
1646            Defines the `resources` each instance of this mapped task requires
1647            on the worker; e.g. ``{'GPU': 2}``.
1648            See :doc:`worker resources <resources>` for details on defining
1649            resources.
1650        actor : bool (default False)
1651            Whether these tasks should exist on the worker as stateful actors.
1652            See :doc:`actors` for additional details.
1653        actors : bool (default False)
1654            Alias for `actor`
1655        batch_size : int, optional
1656            Submit tasks to the scheduler in batches of (at most) ``batch_size``.
1657            Larger batch sizes can be useful for very large ``iterables``,
1658            as the cluster can start processing tasks while later ones are
1659            submitted asynchronously.
1660        **kwargs : dict
1661            Extra keywords to send to the function.
1662            Large values will be included explicitly in the task graph.
1663
1664        Examples
1665        --------
1666        >>> L = client.map(func, sequence)  # doctest: +SKIP
1667
1668        Returns
1669        -------
1670        List, iterator, or Queue of futures, depending on the type of the
1671        inputs.
1672
1673        See Also
1674        --------
1675        Client.submit : Submit a single function
1676        """
1677        if not callable(func):
1678            raise TypeError("First input to map must be a callable function")
1679
1680        if all(isinstance(it, pyQueue) for it in iterables) or all(
1681            isinstance(i, Iterator) for i in iterables
1682        ):
1683            raise TypeError(
1684                "Dask no longer supports mapping over Iterators or Queues."
1685                "Consider using a normal for loop and Client.submit"
1686            )
1687        total_length = sum(len(x) for x in iterables)
1688
1689        if batch_size and batch_size > 1 and total_length > batch_size:
1690            batches = list(
1691                zip(*(partition_all(batch_size, iterable) for iterable in iterables))
1692            )
1693            if isinstance(key, list):
1694                keys = [list(element) for element in partition_all(batch_size, key)]
1695            else:
1696                keys = [key for _ in range(len(batches))]
1697            return sum(
1698                (
1699                    self.map(
1700                        func,
1701                        *batch,
1702                        key=key,
1703                        workers=workers,
1704                        retries=retries,
1705                        priority=priority,
1706                        allow_other_workers=allow_other_workers,
1707                        fifo_timeout=fifo_timeout,
1708                        resources=resources,
1709                        actor=actor,
1710                        actors=actors,
1711                        pure=pure,
1712                        **kwargs,
1713                    )
1714                    for key, batch in zip(keys, batches)
1715                ),
1716                [],
1717            )
1718
1719        key = key or funcname(func)
1720        actor = actor or actors
1721        if pure is None:
1722            pure = not actor
1723
1724        if allow_other_workers and workers is None:
1725            raise ValueError("Only use allow_other_workers= if using workers=")
1726
1727        iterables = list(zip(*zip(*iterables)))
1728        if isinstance(key, list):
1729            keys = key
1730        else:
1731            if pure:
1732                keys = [
1733                    key + "-" + tokenize(func, kwargs, *args)
1734                    for args in zip(*iterables)
1735                ]
1736            else:
1737                uid = str(uuid.uuid4())
1738                keys = (
1739                    [
1740                        key + "-" + uid + "-" + str(i)
1741                        for i in range(min(map(len, iterables)))
1742                    ]
1743                    if iterables
1744                    else []
1745                )
1746
1747        if not kwargs:
1748            dsk = {key: (func,) + args for key, args in zip(keys, zip(*iterables))}
1749        else:
1750            kwargs2 = {}
1751            dsk = {}
1752            for k, v in kwargs.items():
1753                if sizeof(v) > 1e5:
1754                    vv = dask.delayed(v)
1755                    kwargs2[k] = vv._key
1756                    dsk.update(vv.dask)
1757                else:
1758                    kwargs2[k] = v
1759            dsk.update(
1760                {
1761                    key: (apply, func, (tuple, list(args)), kwargs2)
1762                    for key, args in zip(keys, zip(*iterables))
1763                }
1764            )
1765
1766        if isinstance(workers, (str, Number)):
1767            workers = [workers]
1768        if workers is not None and not isinstance(workers, (list, set)):
1769            raise TypeError("Workers must be a list or set of workers or None")
1770
1771        internal_priority = dict(zip(keys, range(len(keys))))
1772
1773        futures = self._graph_to_futures(
1774            dsk,
1775            keys,
1776            workers=workers,
1777            allow_other_workers=allow_other_workers,
1778            priority=internal_priority,
1779            resources=resources,
1780            retries=retries,
1781            user_priority=priority,
1782            fifo_timeout=fifo_timeout,
1783            actors=actor,
1784        )
1785        logger.debug("map(%s, ...)", funcname(func))
1786
1787        return [futures[stringify(k)] for k in keys]
1788
1789    async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
1790        unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
1791        mismatched_futures = [f for f in future_set if f.client is not self]
1792        if mismatched_futures:
1793            raise ValueError(
1794                "Cannot gather Futures created by another client. "
1795                f"These are the {len(mismatched_futures)} (out of {len(futures)}) mismatched Futures and their client IDs "
1796                f"(this client is {self.id}): "
1797                f"{ {f: f.client.id for f in mismatched_futures} }"
1798            )
1799        keys = [stringify(future.key) for future in future_set]
1800        bad_data = dict()
1801        data = {}
1802
1803        if direct is None:
1804            direct = self.direct_to_workers
1805        if direct is None:
1806            try:
1807                w = get_worker()
1808            except Exception:
1809                direct = False
1810            else:
1811                if w.scheduler.address == self.scheduler.address:
1812                    direct = True
1813
1814        async def wait(k):
1815            """Want to stop the All(...) early if we find an error"""
1816            st = self.futures[k]
1817            await st.wait()
1818            if st.status != "finished" and errors == "raise":
1819                raise AllExit()
1820
1821        while True:
1822            logger.debug("Waiting on futures to clear before gather")
1823
1824            with suppress(AllExit):
1825                await All(
1826                    [wait(key) for key in keys if key in self.futures],
1827                    quiet_exceptions=AllExit,
1828                )
1829
1830            failed = ("error", "cancelled")
1831
1832            exceptions = set()
1833            bad_keys = set()
1834            for key in keys:
1835                if key not in self.futures or self.futures[key].status in failed:
1836                    exceptions.add(key)
1837                    if errors == "raise":
1838                        try:
1839                            st = self.futures[key]
1840                            exception = st.exception
1841                            traceback = st.traceback
1842                        except (KeyError, AttributeError):
1843                            exc = CancelledError(key)
1844                        else:
1845                            raise exception.with_traceback(traceback)
1846                        raise exc
1847                    if errors == "skip":
1848                        bad_keys.add(key)
1849                        bad_data[key] = None
1850                    else:
1851                        raise ValueError("Bad value, `errors=%s`" % errors)
1852
1853            keys = [k for k in keys if k not in bad_keys and k not in data]
1854
1855            if local_worker:  # look inside local worker
1856                data.update(
1857                    {k: local_worker.data[k] for k in keys if k in local_worker.data}
1858                )
1859                keys = [k for k in keys if k not in data]
1860
1861            # We now do an actual remote communication with workers or scheduler
1862            if self._gather_future:  # attach onto another pending gather request
1863                self._gather_keys |= set(keys)
1864                response = await self._gather_future
1865            else:  # no one waiting, go ahead
1866                self._gather_keys = set(keys)
1867                future = asyncio.ensure_future(
1868                    self._gather_remote(direct, local_worker)
1869                )
1870                if self._gather_keys is None:
1871                    self._gather_future = None
1872                else:
1873                    self._gather_future = future
1874                response = await future
1875
1876            if response["status"] == "error":
1877                log = logger.warning if errors == "raise" else logger.debug
1878                log(
1879                    "Couldn't gather %s keys, rescheduling %s",
1880                    len(response["keys"]),
1881                    response["keys"],
1882                )
1883                for key in response["keys"]:
1884                    self._send_to_scheduler({"op": "report-key", "key": key})
1885                for key in response["keys"]:
1886                    try:
1887                        self.futures[key].reset()
1888                    except KeyError:  # TODO: verify that this is safe
1889                        pass
1890            else:
1891                break
1892
1893        if bad_data and errors == "skip" and isinstance(unpacked, list):
1894            unpacked = [f for f in unpacked if f not in bad_data]
1895
1896        data.update(response["data"])
1897        result = pack_data(unpacked, merge(data, bad_data))
1898        return result
1899
1900    async def _gather_remote(self, direct, local_worker):
1901        """Perform gather with workers or scheduler
1902
1903        This method exists to limit and batch many concurrent gathers into a
1904        few.  In controls access using a Tornado semaphore, and picks up keys
1905        from other requests made recently.
1906        """
1907        async with self._gather_semaphore:
1908            keys = list(self._gather_keys)
1909            self._gather_keys = None  # clear state, these keys are being sent off
1910            self._gather_future = None
1911
1912            if direct or local_worker:  # gather directly from workers
1913                who_has = await retry_operation(self.scheduler.who_has, keys=keys)
1914                data2, missing_keys, missing_workers = await gather_from_workers(
1915                    who_has, rpc=self.rpc, close=False
1916                )
1917                response = {"status": "OK", "data": data2}
1918                if missing_keys:
1919                    keys2 = [key for key in keys if key not in data2]
1920                    response = await retry_operation(self.scheduler.gather, keys=keys2)
1921                    if response["status"] == "OK":
1922                        response["data"].update(data2)
1923
1924            else:  # ask scheduler to gather data for us
1925                response = await retry_operation(self.scheduler.gather, keys=keys)
1926
1927        return response
1928
1929    def gather(self, futures, errors="raise", direct=None, asynchronous=None):
1930        """Gather futures from distributed memory
1931
1932        Accepts a future, nested container of futures, iterator, or queue.
1933        The return type will match the input type.
1934
1935        Parameters
1936        ----------
1937        futures : Collection of futures
1938            This can be a possibly nested collection of Future objects.
1939            Collections can be lists, sets, or dictionaries
1940        errors : string
1941            Either 'raise' or 'skip' if we should raise if a future has erred
1942            or skip its inclusion in the output collection
1943        direct : boolean
1944            Whether or not to connect directly to the workers, or to ask
1945            the scheduler to serve as intermediary.  This can also be set when
1946            creating the Client.
1947
1948        Returns
1949        -------
1950        results: a collection of the same type as the input, but now with
1951        gathered results rather than futures
1952
1953        Examples
1954        --------
1955        >>> from operator import add  # doctest: +SKIP
1956        >>> c = Client('127.0.0.1:8787')  # doctest: +SKIP
1957        >>> x = c.submit(add, 1, 2)  # doctest: +SKIP
1958        >>> c.gather(x)  # doctest: +SKIP
1959        3
1960        >>> c.gather([x, [x], x])  # support lists and dicts # doctest: +SKIP
1961        [3, [3], 3]
1962
1963        See Also
1964        --------
1965        Client.scatter : Send data out to cluster
1966        """
1967        if isinstance(futures, pyQueue):
1968            raise TypeError(
1969                "Dask no longer supports gathering over Iterators and Queues. "
1970                "Consider using a normal for loop and Client.submit/gather"
1971            )
1972
1973        elif isinstance(futures, Iterator):
1974            return (self.gather(f, errors=errors, direct=direct) for f in futures)
1975        else:
1976            if hasattr(thread_state, "execution_state"):  # within worker task
1977                local_worker = thread_state.execution_state["worker"]
1978            else:
1979                local_worker = None
1980            return self.sync(
1981                self._gather,
1982                futures,
1983                errors=errors,
1984                direct=direct,
1985                local_worker=local_worker,
1986                asynchronous=asynchronous,
1987            )
1988
1989    async def _scatter(
1990        self,
1991        data,
1992        workers=None,
1993        broadcast=False,
1994        direct=None,
1995        local_worker=None,
1996        timeout=no_default,
1997        hash=True,
1998    ):
1999        if timeout == no_default:
2000            timeout = self._timeout
2001        if isinstance(workers, (str, Number)):
2002            workers = [workers]
2003        if isinstance(data, dict) and not all(
2004            isinstance(k, (bytes, str)) for k in data
2005        ):
2006            d = await self._scatter(keymap(stringify, data), workers, broadcast)
2007            return {k: d[stringify(k)] for k in data}
2008
2009        if isinstance(data, type(range(0))):
2010            data = list(data)
2011        input_type = type(data)
2012        names = False
2013        unpack = False
2014        if isinstance(data, Iterator):
2015            data = list(data)
2016        if isinstance(data, (set, frozenset)):
2017            data = list(data)
2018        if not isinstance(data, (dict, list, tuple, set, frozenset)):
2019            unpack = True
2020            data = [data]
2021        if isinstance(data, (list, tuple)):
2022            if hash:
2023                names = [type(x).__name__ + "-" + tokenize(x) for x in data]
2024            else:
2025                names = [type(x).__name__ + "-" + uuid.uuid4().hex for x in data]
2026            data = dict(zip(names, data))
2027
2028        assert isinstance(data, dict)
2029
2030        types = valmap(type, data)
2031
2032        if direct is None:
2033            direct = self.direct_to_workers
2034        if direct is None:
2035            try:
2036                w = get_worker()
2037            except Exception:
2038                direct = False
2039            else:
2040                if w.scheduler.address == self.scheduler.address:
2041                    direct = True
2042
2043        if local_worker:  # running within task
2044            local_worker.update_data(data=data, report=False)
2045
2046            await self.scheduler.update_data(
2047                who_has={key: [local_worker.address] for key in data},
2048                nbytes=valmap(sizeof, data),
2049                client=self.id,
2050            )
2051
2052        else:
2053            data2 = valmap(to_serialize, data)
2054            if direct:
2055                nthreads = None
2056                start = time()
2057                while not nthreads:
2058                    if nthreads is not None:
2059                        await asyncio.sleep(0.1)
2060                    if time() > start + timeout:
2061                        raise TimeoutError("No valid workers found")
2062                    # Exclude paused and closing_gracefully workers
2063                    nthreads = await self.scheduler.ncores_running(workers=workers)
2064                if not nthreads:
2065                    raise ValueError("No valid workers found")
2066
2067                _, who_has, nbytes = await scatter_to_workers(
2068                    nthreads, data2, report=False, rpc=self.rpc
2069                )
2070
2071                await self.scheduler.update_data(
2072                    who_has=who_has, nbytes=nbytes, client=self.id
2073                )
2074            else:
2075                await self.scheduler.scatter(
2076                    data=data2,
2077                    workers=workers,
2078                    client=self.id,
2079                    broadcast=broadcast,
2080                    timeout=timeout,
2081                )
2082
2083        out = {k: Future(k, self, inform=False) for k in data}
2084        for key, typ in types.items():
2085            self.futures[key].finish(type=typ)
2086
2087        if direct and broadcast:
2088            n = None if broadcast is True else broadcast
2089            await self._replicate(list(out.values()), workers=workers, n=n)
2090
2091        if issubclass(input_type, (list, tuple, set, frozenset)):
2092            out = input_type(out[k] for k in names)
2093
2094        if unpack:
2095            assert len(out) == 1
2096            out = list(out.values())[0]
2097        return out
2098
2099    def scatter(
2100        self,
2101        data,
2102        workers=None,
2103        broadcast=False,
2104        direct=None,
2105        hash=True,
2106        timeout=no_default,
2107        asynchronous=None,
2108    ):
2109        """Scatter data into distributed memory
2110
2111        This moves data from the local client process into the workers of the
2112        distributed scheduler.  Note that it is often better to submit jobs to
2113        your workers to have them load the data rather than loading data
2114        locally and then scattering it out to them.
2115
2116        Parameters
2117        ----------
2118        data : list, dict, or object
2119            Data to scatter out to workers.  Output type matches input type.
2120        workers : list of tuples (optional)
2121            Optionally constrain locations of data.
2122            Specify workers as hostname/port pairs, e.g. ``('127.0.0.1', 8787)``.
2123        broadcast : bool (defaults to False)
2124            Whether to send each data element to all workers.
2125            By default we round-robin based on number of cores.
2126        direct : bool (defaults to automatically check)
2127            Whether or not to connect directly to the workers, or to ask
2128            the scheduler to serve as intermediary.  This can also be set when
2129            creating the Client.
2130        hash : bool (optional)
2131            Whether or not to hash data to determine key.
2132            If False then this uses a random key
2133
2134        Returns
2135        -------
2136        List, dict, iterator, or queue of futures matching the type of input.
2137
2138        Examples
2139        --------
2140        >>> c = Client('127.0.0.1:8787')  # doctest: +SKIP
2141        >>> c.scatter(1) # doctest: +SKIP
2142        <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
2143
2144        >>> c.scatter([1, 2, 3])  # doctest: +SKIP
2145        [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>,
2146         <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>,
2147         <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
2148
2149        >>> c.scatter({'x': 1, 'y': 2, 'z': 3})  # doctest: +SKIP
2150        {'x': <Future: status: finished, key: x>,
2151         'y': <Future: status: finished, key: y>,
2152         'z': <Future: status: finished, key: z>}
2153
2154        Constrain location of data to subset of workers
2155
2156        >>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])   # doctest: +SKIP
2157
2158        Broadcast data to all workers
2159
2160        >>> [future] = c.scatter([element], broadcast=True)  # doctest: +SKIP
2161
2162        Send scattered data to parallelized function using client futures
2163        interface
2164
2165        >>> data = c.scatter(data, broadcast=True)  # doctest: +SKIP
2166        >>> res = [c.submit(func, data, i) for i in range(100)]
2167
2168        See Also
2169        --------
2170        Client.gather : Gather data back to local process
2171        """
2172        if timeout == no_default:
2173            timeout = self._timeout
2174        if isinstance(data, pyQueue) or isinstance(data, Iterator):
2175            raise TypeError(
2176                "Dask no longer supports mapping over Iterators or Queues."
2177                "Consider using a normal for loop and Client.submit"
2178            )
2179
2180        if hasattr(thread_state, "execution_state"):  # within worker task
2181            local_worker = thread_state.execution_state["worker"]
2182        else:
2183            local_worker = None
2184        return self.sync(
2185            self._scatter,
2186            data,
2187            workers=workers,
2188            broadcast=broadcast,
2189            direct=direct,
2190            local_worker=local_worker,
2191            timeout=timeout,
2192            asynchronous=asynchronous,
2193            hash=hash,
2194        )
2195
2196    async def _cancel(self, futures, force=False):
2197        keys = list({stringify(f.key) for f in futures_of(futures)})
2198        await self.scheduler.cancel(keys=keys, client=self.id, force=force)
2199        for k in keys:
2200            st = self.futures.pop(k, None)
2201            if st is not None:
2202                st.cancel()
2203
2204    def cancel(self, futures, asynchronous=None, force=False):
2205        """
2206        Cancel running futures
2207
2208        This stops future tasks from being scheduled if they have not yet run
2209        and deletes them if they have already run.  After calling, this result
2210        and all dependent results will no longer be accessible
2211
2212        Parameters
2213        ----------
2214        futures : list of Futures
2215        force : boolean (False)
2216            Cancel this future even if other clients desire it
2217        """
2218        return self.sync(self._cancel, futures, asynchronous=asynchronous, force=force)
2219
2220    async def _retry(self, futures):
2221        keys = list({stringify(f.key) for f in futures_of(futures)})
2222        response = await self.scheduler.retry(keys=keys, client=self.id)
2223        for key in response:
2224            st = self.futures[key]
2225            st.retry()
2226
2227    def retry(self, futures, asynchronous=None):
2228        """
2229        Retry failed futures
2230
2231        Parameters
2232        ----------
2233        futures : list of Futures
2234        """
2235        return self.sync(self._retry, futures, asynchronous=asynchronous)
2236
2237    async def _publish_dataset(self, *args, name=None, override=False, **kwargs):
2238        with log_errors():
2239            coroutines = []
2240
2241            def add_coro(name, data):
2242                keys = [stringify(f.key) for f in futures_of(data)]
2243                coroutines.append(
2244                    self.scheduler.publish_put(
2245                        keys=keys,
2246                        name=name,
2247                        data=to_serialize(data),
2248                        override=override,
2249                        client=self.id,
2250                    )
2251                )
2252
2253            if name:
2254                if len(args) == 0:
2255                    raise ValueError(
2256                        "If name is provided, expecting call signature like"
2257                        " publish_dataset(df, name='ds')"
2258                    )
2259                # in case this is a singleton, collapse it
2260                elif len(args) == 1:
2261                    args = args[0]
2262                add_coro(name, args)
2263
2264            for name, data in kwargs.items():
2265                add_coro(name, data)
2266
2267            await asyncio.gather(*coroutines)
2268
2269    def publish_dataset(self, *args, **kwargs):
2270        """
2271        Publish named datasets to scheduler
2272
2273        This stores a named reference to a dask collection or list of futures
2274        on the scheduler.  These references are available to other Clients
2275        which can download the collection or futures with ``get_dataset``.
2276
2277        Datasets are not immediately computed.  You may wish to call
2278        ``Client.persist`` prior to publishing a dataset.
2279
2280        Parameters
2281        ----------
2282        args : list of objects to publish as name
2283        name : optional name of the dataset to publish
2284        override : bool (optional, default False)
2285            if true, override any already present dataset with the same name
2286        kwargs : dict
2287            named collections to publish on the scheduler
2288
2289        Examples
2290        --------
2291        Publishing client:
2292
2293        >>> df = dd.read_csv('s3://...')  # doctest: +SKIP
2294        >>> df = c.persist(df) # doctest: +SKIP
2295        >>> c.publish_dataset(my_dataset=df)  # doctest: +SKIP
2296
2297        Alternative invocation
2298        >>> c.publish_dataset(df, name='my_dataset')
2299
2300        Receiving client:
2301
2302        >>> c.list_datasets()  # doctest: +SKIP
2303        ['my_dataset']
2304        >>> df2 = c.get_dataset('my_dataset')  # doctest: +SKIP
2305
2306        Returns
2307        -------
2308        None
2309
2310        See Also
2311        --------
2312        Client.list_datasets
2313        Client.get_dataset
2314        Client.unpublish_dataset
2315        Client.persist
2316        """
2317        return self.sync(self._publish_dataset, *args, **kwargs)
2318
2319    def unpublish_dataset(self, name, **kwargs):
2320        """
2321        Remove named datasets from scheduler
2322
2323        Examples
2324        --------
2325        >>> c.list_datasets()  # doctest: +SKIP
2326        ['my_dataset']
2327        >>> c.unpublish_datasets('my_dataset')  # doctest: +SKIP
2328        >>> c.list_datasets()  # doctest: +SKIP
2329        []
2330
2331        See Also
2332        --------
2333        Client.publish_dataset
2334        """
2335        return self.sync(self.scheduler.publish_delete, name=name, **kwargs)
2336
2337    def list_datasets(self, **kwargs):
2338        """
2339        List named datasets available on the scheduler
2340
2341        See Also
2342        --------
2343        Client.publish_dataset
2344        Client.get_dataset
2345        """
2346        return self.sync(self.scheduler.publish_list, **kwargs)
2347
2348    async def _get_dataset(self, name, default=NO_DEFAULT_PLACEHOLDER):
2349        with self.as_current():
2350            out = await self.scheduler.publish_get(name=name, client=self.id)
2351
2352        if out is None:
2353            if default is NO_DEFAULT_PLACEHOLDER:
2354                raise KeyError(f"Dataset '{name}' not found")
2355            else:
2356                return default
2357        return out["data"]
2358
2359    def get_dataset(self, name, default=NO_DEFAULT_PLACEHOLDER, **kwargs):
2360        """
2361        Get named dataset from the scheduler if present.
2362        Return the default or raise a KeyError if not present.
2363
2364        Parameters
2365        ----------
2366        name : name of the dataset to retrieve
2367        default : optional, not set by default
2368            If set, do not raise a KeyError if the name is not present but return this default
2369        kwargs : dict
2370            additional arguments to _get_dataset
2371
2372        See Also
2373        --------
2374        Client.publish_dataset
2375        Client.list_datasets
2376        """
2377        return self.sync(self._get_dataset, name, default=default, **kwargs)
2378
2379    async def _run_on_scheduler(self, function, *args, wait=True, **kwargs):
2380        response = await self.scheduler.run_function(
2381            function=dumps(function, protocol=4),
2382            args=dumps(args, protocol=4),
2383            kwargs=dumps(kwargs, protocol=4),
2384            wait=wait,
2385        )
2386        if response["status"] == "error":
2387            typ, exc, tb = clean_exception(**response)
2388            raise exc.with_traceback(tb)
2389        else:
2390            return response["result"]
2391
2392    def run_on_scheduler(self, function, *args, **kwargs):
2393        """Run a function on the scheduler process
2394
2395        This is typically used for live debugging.  The function should take a
2396        keyword argument ``dask_scheduler=``, which will be given the scheduler
2397        object itself.
2398
2399        Examples
2400        --------
2401        >>> def get_number_of_tasks(dask_scheduler=None):
2402        ...     return len(dask_scheduler.tasks)
2403
2404        >>> client.run_on_scheduler(get_number_of_tasks)  # doctest: +SKIP
2405        100
2406
2407        Run asynchronous functions in the background:
2408
2409        >>> async def print_state(dask_scheduler):  # doctest: +SKIP
2410        ...    while True:
2411        ...        print(dask_scheduler.status)
2412        ...        await asyncio.sleep(1)
2413
2414        >>> c.run(print_state, wait=False)  # doctest: +SKIP
2415
2416        See Also
2417        --------
2418        Client.run : Run a function on all workers
2419        Client.start_ipython_scheduler : Start an IPython session on scheduler
2420        """
2421        return self.sync(self._run_on_scheduler, function, *args, **kwargs)
2422
2423    async def _run(
2424        self, function, *args, nanny=False, workers=None, wait=True, **kwargs
2425    ):
2426        responses = await self.scheduler.broadcast(
2427            msg=dict(
2428                op="run",
2429                function=dumps(function, protocol=4),
2430                args=dumps(args, protocol=4),
2431                wait=wait,
2432                kwargs=dumps(kwargs, protocol=4),
2433            ),
2434            workers=workers,
2435            nanny=nanny,
2436        )
2437        results = {}
2438        for key, resp in responses.items():
2439            if resp["status"] == "OK":
2440                results[key] = resp["result"]
2441            elif resp["status"] == "error":
2442                typ, exc, tb = clean_exception(**resp)
2443                raise exc.with_traceback(tb)
2444        if wait:
2445            return results
2446
2447    def run(self, function, *args, **kwargs):
2448        """
2449        Run a function on all workers outside of task scheduling system
2450
2451        This calls a function on all currently known workers immediately,
2452        blocks until those results come back, and returns the results
2453        asynchronously as a dictionary keyed by worker address.  This method
2454        if generally used for side effects, such and collecting diagnostic
2455        information or installing libraries.
2456
2457        If your function takes an input argument named ``dask_worker`` then
2458        that variable will be populated with the worker itself.
2459
2460        Parameters
2461        ----------
2462        function : callable
2463        *args : arguments for remote function
2464        **kwargs : keyword arguments for remote function
2465        workers : list
2466            Workers on which to run the function. Defaults to all known workers.
2467        wait : boolean (optional)
2468            If the function is asynchronous whether or not to wait until that
2469            function finishes.
2470        nanny : bool, defualt False
2471            Whether to run ``function`` on the nanny. By default, the function
2472            is run on the worker process.  If specified, the addresses in
2473            ``workers`` should still be the worker addresses, not the nanny addresses.
2474
2475        Examples
2476        --------
2477        >>> c.run(os.getpid)  # doctest: +SKIP
2478        {'192.168.0.100:9000': 1234,
2479         '192.168.0.101:9000': 4321,
2480         '192.168.0.102:9000': 5555}
2481
2482        Restrict computation to particular workers with the ``workers=``
2483        keyword argument.
2484
2485        >>> c.run(os.getpid, workers=['192.168.0.100:9000',
2486        ...                           '192.168.0.101:9000'])  # doctest: +SKIP
2487        {'192.168.0.100:9000': 1234,
2488         '192.168.0.101:9000': 4321}
2489
2490        >>> def get_status(dask_worker):
2491        ...     return dask_worker.status
2492
2493        >>> c.run(get_hostname)  # doctest: +SKIP
2494        {'192.168.0.100:9000': 'running',
2495         '192.168.0.101:9000': 'running}
2496
2497        Run asynchronous functions in the background:
2498
2499        >>> async def print_state(dask_worker):  # doctest: +SKIP
2500        ...    while True:
2501        ...        print(dask_worker.status)
2502        ...        await asyncio.sleep(1)
2503
2504        >>> c.run(print_state, wait=False)  # doctest: +SKIP
2505        """
2506        return self.sync(self._run, function, *args, **kwargs)
2507
2508    @_deprecated(use_instead="Client.run which detects async functions automatically")
2509    def run_coroutine(self, function, *args, **kwargs):
2510        """
2511        Spawn a coroutine on all workers.
2512
2513        This spawns a coroutine on all currently known workers and then waits
2514        for the coroutine on each worker.  The coroutines' results are returned
2515        as a dictionary keyed by worker address.
2516
2517        Parameters
2518        ----------
2519        function : a coroutine function
2520            (typically a function wrapped in gen.coroutine or
2521             a Python 3.5+ async function)
2522        *args : arguments for remote function
2523        **kwargs : keyword arguments for remote function
2524        wait : boolean (default True)
2525            Whether to wait for coroutines to end.
2526        workers : list
2527            Workers on which to run the function. Defaults to all known workers.
2528
2529        """
2530        return self.run(function, *args, **kwargs)
2531
2532    @staticmethod
2533    def _get_computation_code(stacklevel=None) -> str:
2534        """Walk up the stack to the user code and extract the code surrounding
2535        the compute/submit/persist call. All modules encountered which are
2536        blacklisted by the option
2537        `distributed.diagnostics.computations.ignore-modules` will be ignored.
2538        This can be used to blacklist commonly used libraries which wrap
2539        dask/distributed compute calls.
2540
2541        ``stacklevel`` may be used to explicitly indicate from which frame on
2542        the stack to get the source code.
2543        """
2544
2545        ignore_modules = dask.config.get(
2546            "distributed.diagnostics.computations.ignore-modules"
2547        )
2548        if not isinstance(ignore_modules, list):
2549            raise TypeError(
2550                "Ignored modules must be a list. Instead got "
2551                f"({type(ignore_modules)}, {ignore_modules})"
2552            )
2553        if stacklevel is None:
2554            pattern: re.Pattern | None
2555            if ignore_modules:
2556                pattern = re.compile("|".join([f"(?:{mod})" for mod in ignore_modules]))
2557            else:
2558                pattern = None
2559        else:
2560            # stacklevel 0 or less - shows dask internals which likely isn't helpful
2561            stacklevel = stacklevel if stacklevel > 0 else 1
2562        for i, (fr, _) in enumerate(traceback.walk_stack(None), 1):
2563            if stacklevel is not None:
2564                if i != stacklevel:
2565                    continue
2566            elif pattern is not None and (
2567                pattern.match(fr.f_globals.get("__name__", ""))
2568                or fr.f_code.co_name in ("<listcomp>", "<dictcomp>")
2569            ):
2570                continue
2571            try:
2572                return inspect.getsource(fr)
2573            except OSError:
2574                # Try to fine the source if we are in %%time or %%timeit magic.
2575                if (
2576                    fr.f_code.co_filename in {"<timed exec>", "<magic-timeit>"}
2577                    and "IPython" in sys.modules
2578                ):
2579                    from IPython import get_ipython
2580
2581                    ip = get_ipython()
2582                    if ip is not None:
2583                        # The current cell
2584                        return ip.history_manager._i00
2585                break
2586        return "<Code not available>"
2587
2588    def _graph_to_futures(
2589        self,
2590        dsk,
2591        keys,
2592        workers=None,
2593        allow_other_workers=None,
2594        priority=None,
2595        user_priority=0,
2596        resources=None,
2597        retries=None,
2598        fifo_timeout=0,
2599        actors=None,
2600    ):
2601        with self._refcount_lock:
2602            if actors is not None and actors is not True and actors is not False:
2603                actors = list(self._expand_key(actors))
2604
2605            # Make sure `dsk` is a high level graph
2606            if not isinstance(dsk, HighLevelGraph):
2607                dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
2608
2609            annotations = {}
2610            if user_priority:
2611                annotations["priority"] = user_priority
2612            if workers:
2613                if not isinstance(workers, (list, tuple, set)):
2614                    workers = [workers]
2615                annotations["workers"] = workers
2616            if retries:
2617                annotations["retries"] = retries
2618            if allow_other_workers not in (True, False, None):
2619                raise TypeError("allow_other_workers= must be True, False, or None")
2620            if allow_other_workers:
2621                annotations["allow_other_workers"] = allow_other_workers
2622            if resources:
2623                annotations["resources"] = resources
2624
2625            # Merge global and local annotations
2626            annotations = merge(dask.config.get("annotations", {}), annotations)
2627
2628            # Pack the high level graph before sending it to the scheduler
2629            keyset = set(keys)
2630            dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
2631
2632            # Create futures before sending graph (helps avoid contention)
2633            futures = {key: Future(key, self, inform=False) for key in keyset}
2634
2635            self._send_to_scheduler(
2636                {
2637                    "op": "update-graph-hlg",
2638                    "hlg": dsk,
2639                    "keys": list(map(stringify, keys)),
2640                    "priority": priority,
2641                    "submitting_task": getattr(thread_state, "key", None),
2642                    "fifo_timeout": fifo_timeout,
2643                    "actors": actors,
2644                    "code": self._get_computation_code(),
2645                }
2646            )
2647            return futures
2648
2649    def get(
2650        self,
2651        dsk,
2652        keys,
2653        workers=None,
2654        allow_other_workers=None,
2655        resources=None,
2656        sync=True,
2657        asynchronous=None,
2658        direct=None,
2659        retries=None,
2660        priority=0,
2661        fifo_timeout="60s",
2662        actors=None,
2663        **kwargs,
2664    ):
2665        """Compute dask graph
2666
2667        Parameters
2668        ----------
2669        dsk : dict
2670        keys : object, or nested lists of objects
2671        workers : string or iterable of strings
2672            A set of worker addresses or hostnames on which computations may be
2673            performed. Leave empty to default to all workers (common case)
2674        allow_other_workers : bool (defaults to False)
2675            Used with ``workers``. Indicates whether or not the computations
2676            may be performed on workers that are not in the `workers` set(s).
2677        retries : int (default to 0)
2678            Number of allowed automatic retries if computing a result fails
2679        priority : Number
2680            Optional prioritization of task.  Zero is default.
2681            Higher priorities take precedence
2682        resources : dict (defaults to {})
2683            Defines the ``resources`` each instance of this mapped task requires
2684            on the worker; e.g. ``{'GPU': 2}``.
2685            See :doc:`worker resources <resources>` for details on defining
2686            resources.
2687        sync : bool (optional)
2688            Returns Futures if False or concrete values if True (default).
2689        direct : bool
2690            Whether or not to connect directly to the workers, or to ask
2691            the scheduler to serve as intermediary.  This can also be set when
2692            creating the Client.
2693
2694        Examples
2695        --------
2696        >>> from operator import add  # doctest: +SKIP
2697        >>> c = Client('127.0.0.1:8787')  # doctest: +SKIP
2698        >>> c.get({'x': (add, 1, 2)}, 'x')  # doctest: +SKIP
2699        3
2700
2701        See Also
2702        --------
2703        Client.compute : Compute asynchronous collections
2704        """
2705        futures = self._graph_to_futures(
2706            dsk,
2707            keys=set(flatten([keys])),
2708            workers=workers,
2709            allow_other_workers=allow_other_workers,
2710            resources=resources,
2711            fifo_timeout=fifo_timeout,
2712            retries=retries,
2713            user_priority=priority,
2714            actors=actors,
2715        )
2716        packed = pack_data(keys, futures)
2717        if sync:
2718            if getattr(thread_state, "key", False):
2719                try:
2720                    secede()
2721                    should_rejoin = True
2722                except Exception:
2723                    should_rejoin = False
2724            try:
2725                results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2726            finally:
2727                for f in futures.values():
2728                    f.release()
2729                if getattr(thread_state, "key", False) and should_rejoin:
2730                    rejoin()
2731            return results
2732        return packed
2733
2734    def _optimize_insert_futures(self, dsk, keys):
2735        """Replace known keys in dask graph with Futures
2736
2737        When given a Dask graph that might have overlapping keys with our known
2738        results we replace the values of that graph with futures.  This can be
2739        used as an optimization to avoid recomputation.
2740
2741        This returns the same graph if unchanged but a new graph if any changes
2742        were necessary.
2743        """
2744        with self._refcount_lock:
2745            changed = False
2746            for key in list(dsk):
2747                if stringify(key) in self.futures:
2748                    if not changed:
2749                        changed = True
2750                        dsk = ensure_dict(dsk)
2751                    dsk[key] = Future(key, self, inform=False)
2752
2753        if changed:
2754            dsk, _ = dask.optimization.cull(dsk, keys)
2755
2756        return dsk
2757
2758    def normalize_collection(self, collection):
2759        """
2760        Replace collection's tasks by already existing futures if they exist
2761
2762        This normalizes the tasks within a collections task graph against the
2763        known futures within the scheduler.  It returns a copy of the
2764        collection with a task graph that includes the overlapping futures.
2765
2766        Examples
2767        --------
2768        >>> len(x.__dask_graph__())  # x is a dask collection with 100 tasks  # doctest: +SKIP
2769        100
2770        >>> set(client.futures).intersection(x.__dask_graph__())  # some overlap exists  # doctest: +SKIP
2771        10
2772
2773        >>> x = client.normalize_collection(x)  # doctest: +SKIP
2774        >>> len(x.__dask_graph__())  # smaller computational graph  # doctest: +SKIP
2775        20
2776
2777        See Also
2778        --------
2779        Client.persist : trigger computation of collection's tasks
2780        """
2781        dsk_orig = collection.__dask_graph__()
2782        dsk = self._optimize_insert_futures(dsk_orig, collection.__dask_keys__())
2783
2784        if dsk is dsk_orig:
2785            return collection
2786        else:
2787            return redict_collection(collection, dsk)
2788
2789    def compute(
2790        self,
2791        collections,
2792        sync=False,
2793        optimize_graph=True,
2794        workers=None,
2795        allow_other_workers=False,
2796        resources=None,
2797        retries=0,
2798        priority=0,
2799        fifo_timeout="60s",
2800        actors=None,
2801        traverse=True,
2802        **kwargs,
2803    ):
2804        """Compute dask collections on cluster
2805
2806        Parameters
2807        ----------
2808        collections : iterable of dask objects or single dask object
2809            Collections like dask.array or dataframe or dask.value objects
2810        sync : bool (optional)
2811            Returns Futures if False (default) or concrete values if True
2812        optimize_graph : bool
2813            Whether or not to optimize the underlying graphs
2814        workers : string or iterable of strings
2815            A set of worker hostnames on which computations may be performed.
2816            Leave empty to default to all workers (common case)
2817        allow_other_workers : bool (defaults to False)
2818            Used with `workers`. Indicates whether or not the computations
2819            may be performed on workers that are not in the `workers` set(s).
2820        retries : int (default to 0)
2821            Number of allowed automatic retries if computing a result fails
2822        priority : Number
2823            Optional prioritization of task.  Zero is default.
2824            Higher priorities take precedence
2825        fifo_timeout : timedelta str (defaults to '60s')
2826            Allowed amount of time between calls to consider the same priority
2827        traverse : bool (defaults to True)
2828            By default dask traverses builtin python collections looking for
2829            dask objects passed to ``compute``. For large collections this can
2830            be expensive. If none of the arguments contain any dask objects,
2831            set ``traverse=False`` to avoid doing this traversal.
2832        resources : dict (defaults to {})
2833            Defines the `resources` each instance of this mapped task requires
2834            on the worker; e.g. ``{'GPU': 2}``.
2835            See :doc:`worker resources <resources>` for details on defining
2836            resources.
2837        actors : bool or dict (default None)
2838            Whether these tasks should exist on the worker as stateful actors.
2839            Specified on a global (True/False) or per-task (``{'x': True,
2840            'y': False}``) basis. See :doc:`actors` for additional details.
2841        **kwargs
2842            Options to pass to the graph optimize calls
2843
2844        Returns
2845        -------
2846        List of Futures if input is a sequence, or a single future otherwise
2847
2848        Examples
2849        --------
2850        >>> from dask import delayed
2851        >>> from operator import add
2852        >>> x = delayed(add)(1, 2)
2853        >>> y = delayed(add)(x, x)
2854        >>> xx, yy = client.compute([x, y])  # doctest: +SKIP
2855        >>> xx  # doctest: +SKIP
2856        <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
2857        >>> xx.result()  # doctest: +SKIP
2858        3
2859        >>> yy.result()  # doctest: +SKIP
2860        6
2861
2862        Also support single arguments
2863
2864        >>> xx = client.compute(x)  # doctest: +SKIP
2865
2866        See Also
2867        --------
2868        Client.get : Normal synchronous dask.get function
2869        """
2870        if isinstance(collections, (list, tuple, set, frozenset)):
2871            singleton = False
2872        else:
2873            collections = [collections]
2874            singleton = True
2875
2876        if traverse:
2877            collections = tuple(
2878                dask.delayed(a)
2879                if isinstance(a, (list, set, tuple, dict, Iterator))
2880                else a
2881                for a in collections
2882            )
2883
2884        variables = [a for a in collections if dask.is_dask_collection(a)]
2885
2886        dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs)
2887        names = ["finalize-%s" % tokenize(v) for v in variables]
2888        dsk2 = {}
2889        for i, (name, v) in enumerate(zip(names, variables)):
2890            func, extra_args = v.__dask_postcompute__()
2891            keys = v.__dask_keys__()
2892            if func is single_key and len(keys) == 1 and not extra_args:
2893                names[i] = keys[0]
2894            else:
2895                dsk2[name] = (func, keys) + extra_args
2896
2897        if not isinstance(dsk, HighLevelGraph):
2898            dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
2899
2900        # Let's append the finalize graph to dsk
2901        finalize_name = tokenize(names)
2902        layers = {finalize_name: dsk2}
2903        layers.update(dsk.layers)
2904        dependencies = {finalize_name: set(dsk.layers.keys())}
2905        dependencies.update(dsk.dependencies)
2906        dsk = HighLevelGraph(layers, dependencies)
2907
2908        futures_dict = self._graph_to_futures(
2909            dsk,
2910            names,
2911            workers=workers,
2912            allow_other_workers=allow_other_workers,
2913            resources=resources,
2914            retries=retries,
2915            user_priority=priority,
2916            fifo_timeout=fifo_timeout,
2917            actors=actors,
2918        )
2919
2920        i = 0
2921        futures = []
2922        for arg in collections:
2923            if dask.is_dask_collection(arg):
2924                futures.append(futures_dict[names[i]])
2925                i += 1
2926            else:
2927                futures.append(arg)
2928
2929        if sync:
2930            result = self.gather(futures)
2931        else:
2932            result = futures
2933
2934        if singleton:
2935            return first(result)
2936        else:
2937            return result
2938
2939    def persist(
2940        self,
2941        collections,
2942        optimize_graph=True,
2943        workers=None,
2944        allow_other_workers=None,
2945        resources=None,
2946        retries=None,
2947        priority=0,
2948        fifo_timeout="60s",
2949        actors=None,
2950        **kwargs,
2951    ):
2952        """Persist dask collections on cluster
2953
2954        Starts computation of the collection on the cluster in the background.
2955        Provides a new dask collection that is semantically identical to the
2956        previous one, but now based off of futures currently in execution.
2957
2958        Parameters
2959        ----------
2960        collections : sequence or single dask object
2961            Collections like dask.array or dataframe or dask.value objects
2962        optimize_graph : bool
2963            Whether or not to optimize the underlying graphs
2964        workers : string or iterable of strings
2965            A set of worker hostnames on which computations may be performed.
2966            Leave empty to default to all workers (common case)
2967        allow_other_workers : bool (defaults to False)
2968            Used with `workers`. Indicates whether or not the computations
2969            may be performed on workers that are not in the `workers` set(s).
2970        retries : int (default to 0)
2971            Number of allowed automatic retries if computing a result fails
2972        priority : Number
2973            Optional prioritization of task.  Zero is default.
2974            Higher priorities take precedence
2975        fifo_timeout : timedelta str (defaults to '60s')
2976            Allowed amount of time between calls to consider the same priority
2977        resources : dict (defaults to {})
2978            Defines the `resources` each instance of this mapped task requires
2979            on the worker; e.g. ``{'GPU': 2}``.
2980            See :doc:`worker resources <resources>` for details on defining
2981            resources.
2982        actors : bool or dict (default None)
2983            Whether these tasks should exist on the worker as stateful actors.
2984            Specified on a global (True/False) or per-task (``{'x': True,
2985            'y': False}``) basis. See :doc:`actors` for additional details.
2986        **kwargs
2987            Options to pass to the graph optimize calls
2988
2989        Returns
2990        -------
2991        List of collections, or single collection, depending on type of input.
2992
2993        Examples
2994        --------
2995        >>> xx = client.persist(x)  # doctest: +SKIP
2996        >>> xx, yy = client.persist([x, y])  # doctest: +SKIP
2997
2998        See Also
2999        --------
3000        Client.compute
3001        """
3002        if isinstance(collections, (tuple, list, set, frozenset)):
3003            singleton = False
3004        else:
3005            singleton = True
3006            collections = [collections]
3007
3008        assert all(map(dask.is_dask_collection, collections))
3009
3010        dsk = self.collections_to_dsk(collections, optimize_graph, **kwargs)
3011
3012        names = {k for c in collections for k in flatten(c.__dask_keys__())}
3013
3014        futures = self._graph_to_futures(
3015            dsk,
3016            names,
3017            workers=workers,
3018            allow_other_workers=allow_other_workers,
3019            resources=resources,
3020            retries=retries,
3021            user_priority=priority,
3022            fifo_timeout=fifo_timeout,
3023            actors=actors,
3024        )
3025
3026        postpersists = [c.__dask_postpersist__() for c in collections]
3027        result = [
3028            func({k: futures[k] for k in flatten(c.__dask_keys__())}, *args)
3029            for (func, args), c in zip(postpersists, collections)
3030        ]
3031
3032        if singleton:
3033            return first(result)
3034        else:
3035            return result
3036
3037    async def _restart(self, timeout=no_default):
3038        if timeout == no_default:
3039            timeout = self._timeout * 2
3040        if timeout is not None:
3041            timeout = parse_timedelta(timeout, "s")
3042
3043        self._send_to_scheduler({"op": "restart", "timeout": timeout})
3044        self._restart_event = asyncio.Event()
3045        try:
3046            await asyncio.wait_for(self._restart_event.wait(), timeout)
3047        except TimeoutError:
3048            logger.error("Restart timed out after %.2f seconds", timeout)
3049
3050        self.generation += 1
3051        with self._refcount_lock:
3052            self.refcount.clear()
3053
3054        return self
3055
3056    def restart(self, **kwargs):
3057        """Restart the distributed network
3058
3059        This kills all active work, deletes all data on the network, and
3060        restarts the worker processes.
3061        """
3062        return self.sync(self._restart, **kwargs)
3063
3064    async def _upload_large_file(self, local_filename, remote_filename=None):
3065        if remote_filename is None:
3066            remote_filename = os.path.split(local_filename)[1]
3067
3068        with open(local_filename, "rb") as f:
3069            data = f.read()
3070
3071        [future] = await self._scatter([data])
3072        key = future.key
3073        await self._replicate(future)
3074
3075        def dump_to_file(dask_worker=None):
3076            if not os.path.isabs(remote_filename):
3077                fn = os.path.join(dask_worker.local_directory, remote_filename)
3078            else:
3079                fn = remote_filename
3080            with open(fn, "wb") as f:
3081                f.write(dask_worker.data[key])
3082
3083            return len(dask_worker.data[key])
3084
3085        response = await self._run(dump_to_file)
3086
3087        assert all(len(data) == v for v in response.values())
3088
3089    def upload_file(self, filename, **kwargs):
3090        """Upload local package to workers
3091
3092        This sends a local file up to all worker nodes.  This file is placed
3093        into a temporary directory on Python's system path so any .py,  .egg
3094        or .zip  files will be importable.
3095
3096        Parameters
3097        ----------
3098        filename : string
3099            Filename of .py, .egg or .zip file to send to workers
3100
3101        Examples
3102        --------
3103        >>> client.upload_file('mylibrary.egg')  # doctest: +SKIP
3104        >>> from mylibrary import myfunc  # doctest: +SKIP
3105        >>> L = client.map(myfunc, seq)  # doctest: +SKIP
3106        """
3107        return self.register_worker_plugin(
3108            UploadFile(filename),
3109            name=filename + str(uuid.uuid4()),
3110        )
3111
3112    async def _rebalance(self, futures=None, workers=None):
3113        if futures is not None:
3114            await _wait(futures)
3115            keys = list({stringify(f.key) for f in self.futures_of(futures)})
3116        else:
3117            keys = None
3118        result = await self.scheduler.rebalance(keys=keys, workers=workers)
3119        if result["status"] == "partial-fail":
3120            raise KeyError(f"Could not rebalance keys: {result['keys']}")
3121        assert result["status"] == "OK", result
3122
3123    def rebalance(self, futures=None, workers=None, **kwargs):
3124        """Rebalance data within network
3125
3126        Move data between workers to roughly balance memory burden.  This
3127        either affects a subset of the keys/workers or the entire network,
3128        depending on keyword arguments.
3129
3130        For details on the algorithm and configuration options, refer to the matching
3131        scheduler-side method :meth:`~distributed.scheduler.Scheduler.rebalance`.
3132
3133        .. warning::
3134           This operation is generally not well tested against normal operation of the
3135           scheduler. It is not recommended to use it while waiting on computations.
3136
3137        Parameters
3138        ----------
3139        futures : list, optional
3140            A list of futures to balance, defaults all data
3141        workers : list, optional
3142            A list of workers on which to balance, defaults to all workers
3143        """
3144        return self.sync(self._rebalance, futures, workers, **kwargs)
3145
3146    async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
3147        futures = self.futures_of(futures)
3148        await _wait(futures)
3149        keys = {stringify(f.key) for f in futures}
3150        await self.scheduler.replicate(
3151            keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
3152        )
3153
3154    def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs):
3155        """Set replication of futures within network
3156
3157        Copy data onto many workers.  This helps to broadcast frequently
3158        accessed data and it helps to improve resilience.
3159
3160        This performs a tree copy of the data throughout the network
3161        individually on each piece of data.  This operation blocks until
3162        complete.  It does not guarantee replication of data to future workers.
3163
3164        Parameters
3165        ----------
3166        futures : list of futures
3167            Futures we wish to replicate
3168        n : int, optional
3169            Number of processes on the cluster on which to replicate the data.
3170            Defaults to all.
3171        workers : list of worker addresses
3172            Workers on which we want to restrict the replication.
3173            Defaults to all.
3174        branching_factor : int, optional
3175            The number of workers that can copy data in each generation
3176
3177        Examples
3178        --------
3179        >>> x = c.submit(func, *args)  # doctest: +SKIP
3180        >>> c.replicate([x])  # send to all workers  # doctest: +SKIP
3181        >>> c.replicate([x], n=3)  # send to three workers  # doctest: +SKIP
3182        >>> c.replicate([x], workers=['alice', 'bob'])  # send to specific  # doctest: +SKIP
3183        >>> c.replicate([x], n=1, workers=['alice', 'bob'])  # send to one of specific workers  # doctest: +SKIP
3184        >>> c.replicate([x], n=1)  # reduce replications # doctest: +SKIP
3185
3186        See Also
3187        --------
3188        Client.rebalance
3189        """
3190        return self.sync(
3191            self._replicate,
3192            futures,
3193            n=n,
3194            workers=workers,
3195            branching_factor=branching_factor,
3196            **kwargs,
3197        )
3198
3199    def nthreads(self, workers=None, **kwargs):
3200        """The number of threads/cores available on each worker node
3201
3202        Parameters
3203        ----------
3204        workers : list (optional)
3205            A list of workers that we care about specifically.
3206            Leave empty to receive information about all workers.
3207
3208        Examples
3209        --------
3210        >>> c.threads()  # doctest: +SKIP
3211        {'192.168.1.141:46784': 8,
3212         '192.167.1.142:47548': 8,
3213         '192.167.1.143:47329': 8,
3214         '192.167.1.144:37297': 8}
3215
3216        See Also
3217        --------
3218        Client.who_has
3219        Client.has_what
3220        """
3221        if isinstance(workers, tuple) and all(
3222            isinstance(i, (str, tuple)) for i in workers
3223        ):
3224            workers = list(workers)
3225        if workers is not None and not isinstance(workers, (tuple, list, set)):
3226            workers = [workers]
3227        return self.sync(self.scheduler.ncores, workers=workers, **kwargs)
3228
3229    ncores = nthreads
3230
3231    def who_has(self, futures=None, **kwargs):
3232        """The workers storing each future's data
3233
3234        Parameters
3235        ----------
3236        futures : list (optional)
3237            A list of futures, defaults to all data
3238
3239        Examples
3240        --------
3241        >>> x, y, z = c.map(inc, [1, 2, 3])  # doctest: +SKIP
3242        >>> wait([x, y, z])  # doctest: +SKIP
3243        >>> c.who_has()  # doctest: +SKIP
3244        {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
3245         'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'],
3246         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
3247
3248        >>> c.who_has([x, y])  # doctest: +SKIP
3249        {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
3250         'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
3251
3252        See Also
3253        --------
3254        Client.has_what
3255        Client.nthreads
3256        """
3257        if futures is not None:
3258            futures = self.futures_of(futures)
3259            keys = list(map(stringify, {f.key for f in futures}))
3260        else:
3261            keys = None
3262
3263        async def _():
3264            return WhoHas(await self.scheduler.who_has(keys=keys, **kwargs))
3265
3266        return self.sync(_)
3267
3268    def has_what(self, workers=None, **kwargs):
3269        """Which keys are held by which workers
3270
3271        This returns the keys of the data that are held in each worker's
3272        memory.
3273
3274        Parameters
3275        ----------
3276        workers : list (optional)
3277            A list of worker addresses, defaults to all
3278
3279        Examples
3280        --------
3281        >>> x, y, z = c.map(inc, [1, 2, 3])  # doctest: +SKIP
3282        >>> wait([x, y, z])  # doctest: +SKIP
3283        >>> c.has_what()  # doctest: +SKIP
3284        {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
3285                                 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
3286                                 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
3287
3288        See Also
3289        --------
3290        Client.who_has
3291        Client.nthreads
3292        Client.processing
3293        """
3294        if isinstance(workers, tuple) and all(
3295            isinstance(i, (str, tuple)) for i in workers
3296        ):
3297            workers = list(workers)
3298        if workers is not None and not isinstance(workers, (tuple, list, set)):
3299            workers = [workers]
3300
3301        async def _():
3302            return HasWhat(await self.scheduler.has_what(workers=workers, **kwargs))
3303
3304        return self.sync(_)
3305
3306    def processing(self, workers=None):
3307        """The tasks currently running on each worker
3308
3309        Parameters
3310        ----------
3311        workers : list (optional)
3312            A list of worker addresses, defaults to all
3313
3314        Examples
3315        --------
3316        >>> x, y, z = c.map(inc, [1, 2, 3])  # doctest: +SKIP
3317        >>> c.processing()  # doctest: +SKIP
3318        {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
3319                                 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
3320                                 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
3321
3322        See Also
3323        --------
3324        Client.who_has
3325        Client.has_what
3326        Client.nthreads
3327        """
3328        if isinstance(workers, tuple) and all(
3329            isinstance(i, (str, tuple)) for i in workers
3330        ):
3331            workers = list(workers)
3332        if workers is not None and not isinstance(workers, (tuple, list, set)):
3333            workers = [workers]
3334        return self.sync(self.scheduler.processing, workers=workers)
3335
3336    def nbytes(self, keys=None, summary=True, **kwargs):
3337        """The bytes taken up by each key on the cluster
3338
3339        This is as measured by ``sys.getsizeof`` which may not accurately
3340        reflect the true cost.
3341
3342        Parameters
3343        ----------
3344        keys : list (optional)
3345            A list of keys, defaults to all keys
3346        summary : boolean, (optional)
3347            Summarize keys into key types
3348
3349        Examples
3350        --------
3351        >>> x, y, z = c.map(inc, [1, 2, 3])  # doctest: +SKIP
3352        >>> c.nbytes(summary=False)  # doctest: +SKIP
3353        {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28,
3354         'inc-1e297fc27658d7b67b3a758f16bcf47a': 28,
3355         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
3356
3357        >>> c.nbytes(summary=True)  # doctest: +SKIP
3358        {'inc': 84}
3359
3360        See Also
3361        --------
3362        Client.who_has
3363        """
3364        return self.sync(self.scheduler.nbytes, keys=keys, summary=summary, **kwargs)
3365
3366    def call_stack(self, futures=None, keys=None):
3367        """The actively running call stack of all relevant keys
3368
3369        You can specify data of interest either by providing futures or
3370        collections in the ``futures=`` keyword or a list of explicit keys in
3371        the ``keys=`` keyword.  If neither are provided then all call stacks
3372        will be returned.
3373
3374        Parameters
3375        ----------
3376        futures : list (optional)
3377            List of futures, defaults to all data
3378        keys : list (optional)
3379            List of key names, defaults to all data
3380
3381        Examples
3382        --------
3383        >>> df = dd.read_parquet(...).persist()  # doctest: +SKIP
3384        >>> client.call_stack(df)  # call on collections
3385
3386        >>> client.call_stack()  # Or call with no arguments for all activity  # doctest: +SKIP
3387        """
3388        keys = keys or []
3389        if futures is not None:
3390            futures = self.futures_of(futures)
3391            keys += list(map(stringify, {f.key for f in futures}))
3392        return self.sync(self.scheduler.call_stack, keys=keys or None)
3393
3394    def profile(
3395        self,
3396        key=None,
3397        start=None,
3398        stop=None,
3399        workers=None,
3400        merge_workers=True,
3401        plot=False,
3402        filename=None,
3403        server=False,
3404        scheduler=False,
3405    ):
3406        """Collect statistical profiling information about recent work
3407
3408        Parameters
3409        ----------
3410        key : str
3411            Key prefix to select, this is typically a function name like 'inc'
3412            Leave as None to collect all data
3413        start : time
3414        stop : time
3415        workers : list
3416            List of workers to restrict profile information
3417        server : bool
3418            If true, return the profile of the worker's administrative thread
3419            rather than the worker threads.
3420            This is useful when profiling Dask itself, rather than user code.
3421        scheduler : bool
3422            If true, return the profile information from the scheduler's
3423            administrative thread rather than the workers.
3424            This is useful when profiling Dask's scheduling itself.
3425        plot : boolean or string
3426            Whether or not to return a plot object
3427        filename : str
3428            Filename to save the plot
3429
3430        Examples
3431        --------
3432        >>> client.profile()  # call on collections
3433        >>> client.profile(filename='dask-profile.html')  # save to html file
3434        """
3435        return self.sync(
3436            self._profile,
3437            key=key,
3438            workers=workers,
3439            merge_workers=merge_workers,
3440            start=start,
3441            stop=stop,
3442            plot=plot,
3443            filename=filename,
3444            server=server,
3445            scheduler=scheduler,
3446        )
3447
3448    async def _profile(
3449        self,
3450        key=None,
3451        start=None,
3452        stop=None,
3453        workers=None,
3454        merge_workers=True,
3455        plot=False,
3456        filename=None,
3457        server=False,
3458        scheduler=False,
3459    ):
3460        if isinstance(workers, (str, Number)):
3461            workers = [workers]
3462
3463        state = await self.scheduler.profile(
3464            key=key,
3465            workers=workers,
3466            merge_workers=merge_workers,
3467            start=start,
3468            stop=stop,
3469            server=server,
3470            scheduler=scheduler,
3471        )
3472
3473        if filename:
3474            plot = True
3475
3476        if plot:
3477            from . import profile
3478
3479            data = profile.plot_data(state)
3480            figure, source = profile.plot_figure(data, sizing_mode="stretch_both")
3481
3482            if plot == "save" and not filename:
3483                filename = "dask-profile.html"
3484
3485            if filename:
3486                from bokeh.plotting import output_file, save
3487
3488                output_file(filename=filename, title="Dask Profile")
3489                save(figure, filename=filename)
3490            return (state, figure)
3491
3492        else:
3493            return state
3494
3495    def scheduler_info(self, **kwargs):
3496        """Basic information about the workers in the cluster
3497
3498        Examples
3499        --------
3500        >>> c.scheduler_info()  # doctest: +SKIP
3501        {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996',
3502         'services': {},
3503         'type': 'Scheduler',
3504         'workers': {'127.0.0.1:40575': {'active': 0,
3505                                         'last-seen': 1472038237.4845693,
3506                                         'name': '127.0.0.1:40575',
3507                                         'services': {},
3508                                         'stored': 0,
3509                                         'time-delay': 0.0061032772064208984}}}
3510        """
3511        if not self.asynchronous:
3512            self.sync(self._update_scheduler_info)
3513        return self._scheduler_identity
3514
3515    async def _dump_cluster_state(
3516        self,
3517        filename: str,
3518        exclude: Sequence[str] = None,
3519        format: Literal["msgpack"] | Literal["yaml"] = "msgpack",
3520    ) -> None:
3521
3522        scheduler_info = self.scheduler.dump_state()
3523
3524        worker_info = self.scheduler.broadcast(
3525            msg=dict(
3526                op="dump_state",
3527                exclude=exclude,
3528            ),
3529        )
3530        versions = self._get_versions()
3531        scheduler_info, worker_info, versions_info = await asyncio.gather(
3532            scheduler_info, worker_info, versions
3533        )
3534
3535        state = {
3536            "scheduler": scheduler_info,
3537            "workers": worker_info,
3538            "versions": versions_info,
3539        }
3540        filename = str(filename)
3541        if format == "msgpack":
3542            suffix = ".msgpack.gz"
3543            if not filename.endswith(suffix):
3544                filename += suffix
3545            import gzip
3546
3547            import msgpack
3548            import yaml
3549
3550            with gzip.open(filename, "wb") as fdg:
3551                msgpack.pack(state, fdg)
3552        elif format == "yaml":
3553            suffix = ".yaml"
3554            if not filename.endswith(suffix):
3555                filename += suffix
3556            import yaml
3557
3558            with open(filename, "w") as fd:
3559                yaml.dump(state, fd)
3560        else:
3561            raise ValueError(
3562                f"Unsupported format {format}. Possible values are `msgpack` or `yaml`"
3563            )
3564
3565    def dump_cluster_state(
3566        self,
3567        filename: str = "dask-cluster-dump",
3568        exclude: Sequence[str] = None,
3569        format: Literal["msgpack"] | Literal["yaml"] = "msgpack",
3570    ) -> Awaitable | None:
3571        """Extract a dump of the entire cluster state and persist to disk.
3572        This is intended for debugging purposes only.
3573
3574        Warning: Memory usage on client side can be large.
3575
3576        Results will be stored in a dict::
3577
3578            {
3579                "scheduler_info": {...},
3580                "worker_info": {
3581                    worker_addr: {...},  # worker attributes
3582                    ...
3583                }
3584            }
3585
3586        Paramters
3587        ---------
3588        filename:
3589            The output filename. The appropriate file suffix (`.msgpack.gz` or
3590            `.yaml`) will be appended automatically.
3591        exclude:
3592            A sequence of attribute names which are supposed to be blacklisted
3593            from the dump, e.g. to exclude code, tracebacks, logs, etc.
3594        format:
3595            Either msgpack or yaml. If msgpack is used (default), the output
3596            will be stored in a gzipped file as msgpack.
3597
3598            To read::
3599
3600                import gzip, msgpack
3601                with gzip.open("filename") as fd:
3602                    state = msgpack.unpack(fd)
3603
3604            or::
3605
3606                import yaml
3607                try:
3608                    from yaml import CLoader as Loader
3609                except ImportError:
3610                    from yaml import Loader
3611                with open("filename") as fd:
3612                    state = yaml.load(fd, Loader=Loader)
3613
3614        """
3615        return self.sync(
3616            self._dump_cluster_state,
3617            filename=filename,
3618            format=format,
3619            exclude=exclude,
3620        )
3621
3622    def write_scheduler_file(self, scheduler_file):
3623        """Write the scheduler information to a json file.
3624
3625        This facilitates easy sharing of scheduler information using a file
3626        system. The scheduler file can be used to instantiate a second Client
3627        using the same scheduler.
3628
3629        Parameters
3630        ----------
3631        scheduler_file : str
3632            Path to a write the scheduler file.
3633
3634        Examples
3635        --------
3636        >>> client = Client()  # doctest: +SKIP
3637        >>> client.write_scheduler_file('scheduler.json')  # doctest: +SKIP
3638        # connect to previous client's scheduler
3639        >>> client2 = Client(scheduler_file='scheduler.json')  # doctest: +SKIP
3640        """
3641        if self.scheduler_file:
3642            raise ValueError("Scheduler file already set")
3643        else:
3644            self.scheduler_file = scheduler_file
3645
3646        with open(self.scheduler_file, "w") as f:
3647            json.dump(self.scheduler_info(), f, indent=2)
3648
3649    def get_metadata(self, keys, default=no_default):
3650        """Get arbitrary metadata from scheduler
3651
3652        See set_metadata for the full docstring with examples
3653
3654        Parameters
3655        ----------
3656        keys : key or list
3657            Key to access.  If a list then gets within a nested collection
3658        default : optional
3659            If the key does not exist then return this value instead.
3660            If not provided then this raises a KeyError if the key is not
3661            present
3662
3663        See Also
3664        --------
3665        Client.set_metadata
3666        """
3667        if not isinstance(keys, (list, tuple)):
3668            keys = (keys,)
3669        return self.sync(self.scheduler.get_metadata, keys=keys, default=default)
3670
3671    def get_scheduler_logs(self, n=None):
3672        """Get logs from scheduler
3673
3674        Parameters
3675        ----------
3676        n : int
3677            Number of logs to retrive.  Maxes out at 10000 by default,
3678            configurable via the ``distributed.admin.log-length``
3679            configuration value.
3680
3681        Returns
3682        -------
3683        Logs in reversed order (newest first)
3684        """
3685        return self.sync(self.scheduler.logs, n=n)
3686
3687    def get_worker_logs(self, n=None, workers=None, nanny=False):
3688        """Get logs from workers
3689
3690        Parameters
3691        ----------
3692        n : int
3693            Number of logs to retrive.  Maxes out at 10000 by default,
3694            configurable via the ``distributed.admin.log-length``
3695            configuration value.
3696        workers : iterable
3697            List of worker addresses to retrieve.  Gets all workers by default.
3698        nanny : bool, default False
3699            Whether to get the logs from the workers (False) or the nannies (True). If
3700            specified, the addresses in `workers` should still be the worker addresses,
3701            not the nanny addresses.
3702
3703        Returns
3704        -------
3705        Dictionary mapping worker address to logs.
3706        Logs are returned in reversed order (newest first)
3707        """
3708        return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)
3709
3710    def log_event(self, topic, msg):
3711        """Log an event under a given topic
3712
3713        Parameters
3714        ----------
3715        topic : str, list
3716            Name of the topic under which to log an event. To log the same
3717            event under multiple topics, pass a list of topic names.
3718        msg
3719            Event message to log. Note this must be msgpack serializable.
3720
3721        Examples
3722        --------
3723        >>> from time import time
3724        >>> client.log_event("current-time", time())
3725        """
3726        return self.sync(self.scheduler.log_event, topic=topic, msg=msg)
3727
3728    def get_events(self, topic: str = None):
3729        """Retrieve structured topic logs
3730
3731        Parameters
3732        ----------
3733        topic : str, optional
3734            Name of topic log to retrieve events for. If no ``topic`` is
3735            provided, then logs for all topics will be returned.
3736        """
3737        return self.sync(self.scheduler.events, topic=topic)
3738
3739    async def _handle_event(self, topic, event):
3740        if topic not in self._event_handlers:
3741            self.unsubscribe_topic(topic)
3742            return
3743        handler = self._event_handlers[topic]
3744        ret = handler(event)
3745        if inspect.isawaitable(ret):
3746            await ret
3747
3748    def subscribe_topic(self, topic, handler):
3749        """Subscribe to a topic and execute a handler for every received event
3750
3751        Parameters
3752        ----------
3753        topic: str
3754            The topic name
3755        handler: callable or coroutine function
3756            A handler called for every received event. The handler must accept a
3757            single argument `event` which is a tuple `(timestamp, msg)` where
3758            timestamp refers to the clock on the scheduler.
3759
3760        Example
3761        -------
3762
3763        >>> import logging
3764        >>> logger = logging.getLogger("myLogger")  # Log config not shown
3765        >>> client.subscribe_topic("topic-name", lambda: logger.info)
3766
3767        See Also
3768        --------
3769        dask.distributed.Client.unsubscribe_topic
3770        dask.distributed.Client.get_events
3771        dask.distributed.Client.log_event
3772        """
3773        if topic in self._event_handlers:
3774            logger.info("Handler for %s already set. Overwriting.", topic)
3775        self._event_handlers[topic] = handler
3776        msg = {"op": "subscribe-topic", "topic": topic, "client": self.id}
3777        self._send_to_scheduler(msg)
3778
3779    def unsubscribe_topic(self, topic):
3780        """Unsubscribe from a topic and remove event handler
3781
3782        See Also
3783        --------
3784        dask.distributed.Client.subscribe_topic
3785        dask.distributed.Client.get_events
3786        dask.distributed.Client.log_event
3787        """
3788        if topic in self._event_handlers:
3789            msg = {"op": "unsubscribe-topic", "topic": topic, "client": self.id}
3790            self._send_to_scheduler(msg)
3791        else:
3792            raise ValueError(f"No event handler known for topic {topic}.")
3793
3794    def retire_workers(self, workers=None, close_workers=True, **kwargs):
3795        """Retire certain workers on the scheduler
3796
3797        See dask.distributed.Scheduler.retire_workers for the full docstring.
3798
3799        Examples
3800        --------
3801        You can get information about active workers using the following:
3802
3803        >>> workers = client.scheduler_info()['workers']
3804
3805        From that list you may want to select some workers to close
3806
3807        >>> client.retire_workers(workers=['tcp://address:port', ...])
3808
3809        See Also
3810        --------
3811        dask.distributed.Scheduler.retire_workers
3812        """
3813        return self.sync(
3814            self.scheduler.retire_workers,
3815            workers=workers,
3816            close_workers=close_workers,
3817            **kwargs,
3818        )
3819
3820    def set_metadata(self, key, value):
3821        """Set arbitrary metadata in the scheduler
3822
3823        This allows you to store small amounts of data on the central scheduler
3824        process for administrative purposes.  Data should be msgpack
3825        serializable (ints, strings, lists, dicts)
3826
3827        If the key corresponds to a task then that key will be cleaned up when
3828        the task is forgotten by the scheduler.
3829
3830        If the key is a list then it will be assumed that you want to index
3831        into a nested dictionary structure using those keys.  For example if
3832        you call the following::
3833
3834            >>> client.set_metadata(['a', 'b', 'c'], 123)
3835
3836        Then this is the same as setting
3837
3838            >>> scheduler.task_metadata['a']['b']['c'] = 123
3839
3840        The lower level dictionaries will be created on demand.
3841
3842        Examples
3843        --------
3844        >>> client.set_metadata('x', 123)  # doctest: +SKIP
3845        >>> client.get_metadata('x')  # doctest: +SKIP
3846        123
3847
3848        >>> client.set_metadata(['x', 'y'], 123)  # doctest: +SKIP
3849        >>> client.get_metadata('x')  # doctest: +SKIP
3850        {'y': 123}
3851
3852        >>> client.set_metadata(['x', 'w', 'z'], 456)  # doctest: +SKIP
3853        >>> client.get_metadata('x')  # doctest: +SKIP
3854        {'y': 123, 'w': {'z': 456}}
3855
3856        >>> client.get_metadata(['x', 'w'])  # doctest: +SKIP
3857        {'z': 456}
3858
3859        See Also
3860        --------
3861        get_metadata
3862        """
3863        if not isinstance(key, list):
3864            key = (key,)
3865        return self.sync(self.scheduler.set_metadata, keys=key, value=value)
3866
3867    def get_versions(self, check=False, packages=[]):
3868        """Return version info for the scheduler, all workers and myself
3869
3870        Parameters
3871        ----------
3872        check : boolean, default False
3873            raise ValueError if all required & optional packages
3874            do not match
3875        packages : List[str]
3876            Extra package names to check
3877
3878        Examples
3879        --------
3880        >>> c.get_versions()  # doctest: +SKIP
3881
3882        >>> c.get_versions(packages=['sklearn', 'geopandas'])  # doctest: +SKIP
3883        """
3884        return self.sync(self._get_versions, check=check, packages=packages)
3885
3886    async def _get_versions(self, check=False, packages=[]):
3887        client = version_module.get_versions(packages=packages)
3888        try:
3889            scheduler = await self.scheduler.versions(packages=packages)
3890        except KeyError:
3891            scheduler = None
3892        except TypeError:  # packages keyword not supported
3893            scheduler = await self.scheduler.versions()  # this raises
3894
3895        workers = await self.scheduler.broadcast(
3896            msg={"op": "versions", "packages": packages}
3897        )
3898        result = {"scheduler": scheduler, "workers": workers, "client": client}
3899
3900        if check:
3901            msg = version_module.error_message(scheduler, workers, client)
3902            if msg["warning"]:
3903                warnings.warn(msg["warning"])
3904            if msg["error"]:
3905                raise ValueError(msg["error"])
3906
3907        return result
3908
3909    def futures_of(self, futures):
3910        return futures_of(futures, client=self)
3911
3912    def start_ipython(self, *args, **kwargs):
3913        """Deprecated - Method moved to start_ipython_workers"""
3914        raise Exception("Method moved to start_ipython_workers")
3915
3916    async def _start_ipython_workers(self, workers):
3917        if workers is None:
3918            workers = await self.scheduler.ncores()
3919
3920        responses = await self.scheduler.broadcast(
3921            msg=dict(op="start_ipython"), workers=workers
3922        )
3923        return workers, responses
3924
3925    def start_ipython_workers(
3926        self, workers=None, magic_names=False, qtconsole=False, qtconsole_args=None
3927    ):
3928        """Start IPython kernels on workers
3929
3930        Parameters
3931        ----------
3932        workers : list (optional)
3933            A list of worker addresses, defaults to all
3934        magic_names : str or list(str) (optional)
3935            If defined, register IPython magics with these names for
3936            executing code on the workers.  If string has asterix then expand
3937            asterix into 0, 1, ..., n for n workers
3938        qtconsole : bool (optional)
3939            If True, launch a Jupyter QtConsole connected to the worker(s).
3940        qtconsole_args : list(str) (optional)
3941            Additional arguments to pass to the qtconsole on startup.
3942
3943        Examples
3944        --------
3945        >>> info = c.start_ipython_workers() # doctest: +SKIP
3946        >>> %remote info['192.168.1.101:5752'] worker.data  # doctest: +SKIP
3947        {'x': 1, 'y': 100}
3948
3949        >>> c.start_ipython_workers('192.168.1.101:5752', magic_names='w') # doctest: +SKIP
3950        >>> %w worker.data  # doctest: +SKIP
3951        {'x': 1, 'y': 100}
3952
3953        >>> c.start_ipython_workers('192.168.1.101:5752', qtconsole=True) # doctest: +SKIP
3954
3955        Add asterix * in magic names to add one magic per worker
3956
3957        >>> c.start_ipython_workers(magic_names='w_*') # doctest: +SKIP
3958        >>> %w_0 worker.data  # doctest: +SKIP
3959        {'x': 1, 'y': 100}
3960        >>> %w_1 worker.data  # doctest: +SKIP
3961        {'z': 5}
3962
3963        Returns
3964        -------
3965        iter_connection_info: list
3966            List of connection_info dicts containing info necessary
3967            to connect Jupyter clients to the workers.
3968
3969        See Also
3970        --------
3971        Client.start_ipython_scheduler : start ipython on the scheduler
3972        """
3973        if isinstance(workers, (str, Number)):
3974            workers = [workers]
3975
3976        (workers, info_dict) = sync(self.loop, self._start_ipython_workers, workers)
3977
3978        if magic_names and isinstance(magic_names, str):
3979            if "*" in magic_names:
3980                magic_names = [
3981                    magic_names.replace("*", str(i)) for i in range(len(workers))
3982                ]
3983            else:
3984                magic_names = [magic_names]
3985
3986        if "IPython" in sys.modules:
3987            from ._ipython_utils import register_remote_magic
3988
3989            register_remote_magic()
3990        if magic_names:
3991            from ._ipython_utils import register_worker_magic
3992
3993            for worker, magic_name in zip(workers, magic_names):
3994                connection_info = info_dict[worker]
3995                register_worker_magic(connection_info, magic_name)
3996        if qtconsole:
3997            from ._ipython_utils import connect_qtconsole
3998
3999            for worker, connection_info in info_dict.items():
4000                name = "dask-" + worker.replace(":", "-").replace("/", "-")
4001                connect_qtconsole(connection_info, name=name, extra_args=qtconsole_args)
4002        return info_dict
4003
4004    def start_ipython_scheduler(
4005        self, magic_name="scheduler_if_ipython", qtconsole=False, qtconsole_args=None
4006    ):
4007        """Start IPython kernel on the scheduler
4008
4009        Parameters
4010        ----------
4011        magic_name : str or None (optional)
4012            If defined, register IPython magic with this name for
4013            executing code on the scheduler.
4014            If not defined, register %scheduler magic if IPython is running.
4015        qtconsole : bool (optional)
4016            If True, launch a Jupyter QtConsole connected to the worker(s).
4017        qtconsole_args : list(str) (optional)
4018            Additional arguments to pass to the qtconsole on startup.
4019
4020        Examples
4021        --------
4022        >>> c.start_ipython_scheduler() # doctest: +SKIP
4023        >>> %scheduler scheduler.processing  # doctest: +SKIP
4024        {'127.0.0.1:3595': {'inc-1', 'inc-2'},
4025         '127.0.0.1:53589': {'inc-2', 'add-5'}}
4026
4027        >>> c.start_ipython_scheduler(qtconsole=True) # doctest: +SKIP
4028
4029        Returns
4030        -------
4031        connection_info: dict
4032            connection_info dict containing info necessary
4033            to connect Jupyter clients to the scheduler.
4034
4035        See Also
4036        --------
4037        Client.start_ipython_workers : Start IPython on the workers
4038        """
4039        info = sync(self.loop, self.scheduler.start_ipython)
4040        if magic_name == "scheduler_if_ipython":
4041            # default to %scheduler if in IPython, no magic otherwise
4042            in_ipython = False
4043            if "IPython" in sys.modules:
4044                from IPython import get_ipython
4045
4046                in_ipython = bool(get_ipython())
4047            if in_ipython:
4048                magic_name = "scheduler"
4049            else:
4050                magic_name = None
4051        if magic_name:
4052            from ._ipython_utils import register_worker_magic
4053
4054            register_worker_magic(info, magic_name)
4055        if qtconsole:
4056            from ._ipython_utils import connect_qtconsole
4057
4058            connect_qtconsole(info, name="dask-scheduler", extra_args=qtconsole_args)
4059        return info
4060
4061    @classmethod
4062    def _expand_key(cls, k):
4063        """
4064        Expand a user-provided task key specification, e.g. in a resources
4065        or retries dictionary.
4066        """
4067        if not isinstance(k, tuple):
4068            k = (k,)
4069        for kk in k:
4070            if dask.is_dask_collection(kk):
4071                for kkk in kk.__dask_keys__():
4072                    yield stringify(kkk)
4073            else:
4074                yield stringify(kk)
4075
4076    @staticmethod
4077    def collections_to_dsk(collections, *args, **kwargs):
4078        """Convert many collections into a single dask graph, after optimization"""
4079        return collections_to_dsk(collections, *args, **kwargs)
4080
4081    def get_task_stream(
4082        self,
4083        start=None,
4084        stop=None,
4085        count=None,
4086        plot=False,
4087        filename="task-stream.html",
4088        bokeh_resources=None,
4089    ):
4090        """Get task stream data from scheduler
4091
4092        This collects the data present in the diagnostic "Task Stream" plot on
4093        the dashboard.  It includes the start, stop, transfer, and
4094        deserialization time of every task for a particular duration.
4095
4096        Note that the task stream diagnostic does not run by default.  You may
4097        wish to call this function once before you start work to ensure that
4098        things start recording, and then again after you have completed.
4099
4100        Parameters
4101        ----------
4102        start : Number or string
4103            When you want to start recording
4104            If a number it should be the result of calling time()
4105            If a string then it should be a time difference before now,
4106            like '60s' or '500 ms'
4107        stop : Number or string
4108            When you want to stop recording
4109        count : int
4110            The number of desired records, ignored if both start and stop are
4111            specified
4112        plot : boolean, str
4113            If true then also return a Bokeh figure
4114            If plot == 'save' then save the figure to a file
4115        filename : str (optional)
4116            The filename to save to if you set ``plot='save'``
4117        bokeh_resources : bokeh.resources.Resources (optional)
4118            Specifies if the resource component is INLINE or CDN
4119
4120        Examples
4121        --------
4122        >>> client.get_task_stream()  # prime plugin if not already connected
4123        >>> x.compute()  # do some work
4124        >>> client.get_task_stream()
4125        [{'task': ...,
4126          'type': ...,
4127          'thread': ...,
4128          ...}]
4129
4130        Pass the ``plot=True`` or ``plot='save'`` keywords to get back a Bokeh
4131        figure
4132
4133        >>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')
4134
4135        Alternatively consider the context manager
4136
4137        >>> from dask.distributed import get_task_stream
4138        >>> with get_task_stream() as ts:
4139        ...     x.compute()
4140        >>> ts.data
4141        [...]
4142
4143        Returns
4144        -------
4145        L: List[Dict]
4146
4147        See Also
4148        --------
4149        get_task_stream : a context manager version of this method
4150        """
4151        return self.sync(
4152            self._get_task_stream,
4153            start=start,
4154            stop=stop,
4155            count=count,
4156            plot=plot,
4157            filename=filename,
4158            bokeh_resources=bokeh_resources,
4159        )
4160
4161    async def _get_task_stream(
4162        self,
4163        start=None,
4164        stop=None,
4165        count=None,
4166        plot=False,
4167        filename="task-stream.html",
4168        bokeh_resources=None,
4169    ):
4170        msgs = await self.scheduler.get_task_stream(start=start, stop=stop, count=count)
4171        if plot:
4172            from .diagnostics.task_stream import rectangles
4173
4174            rects = rectangles(msgs)
4175            from .dashboard.components.scheduler import task_stream_figure
4176
4177            source, figure = task_stream_figure(sizing_mode="stretch_both")
4178            source.data.update(rects)
4179            if plot == "save":
4180                from bokeh.plotting import output_file, save
4181
4182                output_file(filename=filename, title="Dask Task Stream")
4183                save(figure, filename=filename, resources=bokeh_resources)
4184            return (msgs, figure)
4185        else:
4186            return msgs
4187
4188    async def _register_scheduler_plugin(self, plugin, name, **kwargs):
4189        if isinstance(plugin, type):
4190            plugin = plugin(**kwargs)
4191
4192        return await self.scheduler.register_scheduler_plugin(
4193            plugin=dumps(plugin, protocol=4),
4194            name=name,
4195        )
4196
4197    def register_scheduler_plugin(self, plugin, name=None, **kwargs):
4198        """Register a scheduler plugin.
4199
4200        See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
4201
4202        Parameters
4203        ----------
4204        plugin : SchedulerPlugin
4205            Plugin class or object to pass to the scheduler.
4206        name : str
4207            Name for the plugin; if None, a name is taken from the
4208            plugin instance or automatically generated if not present.
4209        **kwargs : Any
4210            Arguments passed to the Plugin class (if Plugin is an
4211            instance kwargs are unused).
4212
4213        """
4214        if name is None:
4215            name = _get_plugin_name(plugin)
4216
4217        return self.sync(
4218            self._register_scheduler_plugin,
4219            plugin=plugin,
4220            name=name,
4221            **kwargs,
4222        )
4223
4224    def register_worker_callbacks(self, setup=None):
4225        """
4226        Registers a setup callback function for all current and future workers.
4227
4228        This registers a new setup function for workers in this cluster. The
4229        function will run immediately on all currently connected workers. It
4230        will also be run upon connection by any workers that are added in the
4231        future. Multiple setup functions can be registered - these will be
4232        called in the order they were added.
4233
4234        If the function takes an input argument named ``dask_worker`` then
4235        that variable will be populated with the worker itself.
4236
4237        Parameters
4238        ----------
4239        setup : callable(dask_worker: Worker) -> None
4240            Function to register and run on all workers
4241        """
4242        return self.register_worker_plugin(_WorkerSetupPlugin(setup))
4243
4244    async def _register_worker_plugin(self, plugin=None, name=None, nanny=None):
4245        if nanny or nanny is None and isinstance(plugin, NannyPlugin):
4246            method = self.scheduler.register_nanny_plugin
4247        else:
4248            method = self.scheduler.register_worker_plugin
4249
4250        responses = await method(plugin=dumps(plugin, protocol=4), name=name)
4251        for response in responses.values():
4252            if response["status"] == "error":
4253                _, exc, tb = clean_exception(
4254                    response["exception"], response["traceback"]
4255                )
4256                raise exc.with_traceback(tb)
4257        return responses
4258
4259    def register_worker_plugin(self, plugin=None, name=None, nanny=None, **kwargs):
4260        """
4261        Registers a lifecycle worker plugin for all current and future workers.
4262
4263        This registers a new object to handle setup, task state transitions and
4264        teardown for workers in this cluster. The plugin will instantiate itself
4265        on all currently connected workers. It will also be run on any worker
4266        that connects in the future.
4267
4268        The plugin may include methods ``setup``, ``teardown``, ``transition``,
4269        and ``release_key``.  See the
4270        ``dask.distributed.WorkerPlugin`` class or the examples below for the
4271        interface and docstrings.  It must be serializable with the pickle or
4272        cloudpickle modules.
4273
4274        If the plugin has a ``name`` attribute, or if the ``name=`` keyword is
4275        used then that will control idempotency.  If a plugin with that name has
4276        already been registered then any future plugins will not run.
4277
4278        For alternatives to plugins, you may also wish to look into preload
4279        scripts.
4280
4281        Parameters
4282        ----------
4283        plugin : WorkerPlugin or NannyPlugin
4284            The plugin object to register.
4285        name : str, optional
4286            A name for the plugin.
4287            Registering a plugin with the same name will have no effect.
4288            If plugin has no name attribute a random name is used.
4289        nanny : bool, optional
4290            Whether to register the plugin with workers or nannies.
4291        **kwargs : optional
4292            If you pass a class as the plugin, instead of a class instance, then the
4293            class will be instantiated with any extra keyword arguments.
4294
4295        Examples
4296        --------
4297        >>> class MyPlugin(WorkerPlugin):
4298        ...     def __init__(self, *args, **kwargs):
4299        ...         pass  # the constructor is up to you
4300        ...     def setup(self, worker: dask.distributed.Worker):
4301        ...         pass
4302        ...     def teardown(self, worker: dask.distributed.Worker):
4303        ...         pass
4304        ...     def transition(self, key: str, start: str, finish: str, **kwargs):
4305        ...         pass
4306        ...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
4307        ...         pass
4308
4309        >>> plugin = MyPlugin(1, 2, 3)
4310        >>> client.register_worker_plugin(plugin)
4311
4312        You can get access to the plugin with the ``get_worker`` function
4313
4314        >>> client.register_worker_plugin(other_plugin, name='my-plugin')
4315        >>> def f():
4316        ...    worker = get_worker()
4317        ...    plugin = worker.plugins['my-plugin']
4318        ...    return plugin.my_state
4319
4320        >>> future = client.run(f)
4321
4322        See Also
4323        --------
4324        distributed.WorkerPlugin
4325        unregister_worker_plugin
4326        """
4327        if isinstance(plugin, type):
4328            plugin = plugin(**kwargs)
4329
4330        if name is None:
4331            name = _get_plugin_name(plugin)
4332
4333        assert name
4334
4335        return self.sync(
4336            self._register_worker_plugin, plugin=plugin, name=name, nanny=nanny
4337        )
4338
4339    async def _unregister_worker_plugin(self, name, nanny=None):
4340        if nanny:
4341            responses = await self.scheduler.unregister_nanny_plugin(name=name)
4342        else:
4343            responses = await self.scheduler.unregister_worker_plugin(name=name)
4344
4345        for response in responses.values():
4346            if response["status"] == "error":
4347                exc = response["exception"]
4348                tb = response["traceback"]
4349                raise exc.with_traceback(tb)
4350        return responses
4351
4352    def unregister_worker_plugin(self, name, nanny=None):
4353        """Unregisters a lifecycle worker plugin
4354
4355        This unregisters an existing worker plugin. As part of the unregistration process
4356        the plugin's ``teardown`` method will be called.
4357
4358        Parameters
4359        ----------
4360        name : str
4361            Name of the plugin to unregister. See the :meth:`Client.register_worker_plugin`
4362            docstring for more information.
4363
4364        Examples
4365        --------
4366        >>> class MyPlugin(WorkerPlugin):
4367        ...     def __init__(self, *args, **kwargs):
4368        ...         pass  # the constructor is up to you
4369        ...     def setup(self, worker: dask.distributed.Worker):
4370        ...         pass
4371        ...     def teardown(self, worker: dask.distributed.Worker):
4372        ...         pass
4373        ...     def transition(self, key: str, start: str, finish: str, **kwargs):
4374        ...         pass
4375        ...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
4376        ...         pass
4377
4378        >>> plugin = MyPlugin(1, 2, 3)
4379        >>> client.register_worker_plugin(plugin, name='foo')
4380        >>> client.unregister_worker_plugin(name='foo')
4381
4382        See Also
4383        --------
4384        register_worker_plugin
4385        """
4386        return self.sync(self._unregister_worker_plugin, name=name, nanny=nanny)
4387
4388    @property
4389    def amm(self):
4390        """Convenience accessors for the :doc:`active_memory_manager`"""
4391        from .active_memory_manager import AMMClientProxy
4392
4393        return AMMClientProxy(self)
4394
4395
4396class _WorkerSetupPlugin(WorkerPlugin):
4397    """This is used to support older setup functions as callbacks"""
4398
4399    def __init__(self, setup):
4400        self._setup = setup
4401
4402    def setup(self, worker):
4403        if has_keyword(self._setup, "dask_worker"):
4404            return self._setup(dask_worker=worker)
4405        else:
4406            return self._setup()
4407
4408
4409class Executor(Client):
4410    """Deprecated: see Client"""
4411
4412    def __init__(self, *args, **kwargs):
4413        warnings.warn("Executor has been renamed to Client")
4414        super().__init__(*args, **kwargs)
4415
4416
4417def CompatibleExecutor(*args, **kwargs):
4418    raise Exception("This has been moved to the Client.get_executor() method")
4419
4420
4421ALL_COMPLETED = "ALL_COMPLETED"
4422FIRST_COMPLETED = "FIRST_COMPLETED"
4423
4424
4425async def _wait(fs, timeout=None, return_when=ALL_COMPLETED):
4426    if timeout is not None and not isinstance(timeout, Number):
4427        raise TypeError(
4428            "timeout= keyword received a non-numeric value.\n"
4429            "Beware that wait expects a list of values\n"
4430            "  Bad:  wait(x, y, z)\n"
4431            "  Good: wait([x, y, z])"
4432        )
4433    fs = futures_of(fs)
4434    if return_when == ALL_COMPLETED:
4435        wait_for = All
4436    elif return_when == FIRST_COMPLETED:
4437        wait_for = Any
4438    else:
4439        raise NotImplementedError(
4440            "Only return_when='ALL_COMPLETED' and 'FIRST_COMPLETED' are supported"
4441        )
4442
4443    future = wait_for({f._state.wait() for f in fs})
4444    if timeout is not None:
4445        future = asyncio.wait_for(future, timeout)
4446    await future
4447
4448    done, not_done = (
4449        {fu for fu in fs if fu.status != "pending"},
4450        {fu for fu in fs if fu.status == "pending"},
4451    )
4452    cancelled = [f.key for f in done if f.status == "cancelled"]
4453    if cancelled:
4454        raise CancelledError(cancelled)
4455
4456    return DoneAndNotDoneFutures(done, not_done)
4457
4458
4459def wait(fs, timeout=None, return_when=ALL_COMPLETED):
4460    """Wait until all/any futures are finished
4461
4462    Parameters
4463    ----------
4464    fs : list of futures
4465    timeout : number, optional
4466        Time in seconds after which to raise a ``dask.distributed.TimeoutError``
4467    return_when : str, optional
4468        One of `ALL_COMPLETED` or `FIRST_COMPLETED`
4469
4470    Returns
4471    -------
4472    Named tuple of completed, not completed
4473    """
4474    client = default_client()
4475    result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
4476    return result
4477
4478
4479async def _as_completed(fs, queue):
4480    fs = futures_of(fs)
4481    groups = groupby(lambda f: f.key, fs)
4482    firsts = [v[0] for v in groups.values()]
4483    wait_iterator = gen.WaitIterator(
4484        *map(asyncio.ensure_future, [f._state.wait() for f in firsts])
4485    )
4486
4487    while not wait_iterator.done():
4488        await wait_iterator.next()
4489        # TODO: handle case of restarted futures
4490        future = firsts[wait_iterator.current_index]
4491        for f in groups[future.key]:
4492            queue.put_nowait(f)
4493
4494
4495async def _first_completed(futures):
4496    """Return a single completed future
4497
4498    See Also:
4499        _as_completed
4500    """
4501    q = asyncio.Queue()
4502    await _as_completed(futures, q)
4503    result = await q.get()
4504    return result
4505
4506
4507class as_completed:
4508    """
4509    Return futures in the order in which they complete
4510
4511    This returns an iterator that yields the input future objects in the order
4512    in which they complete.  Calling ``next`` on the iterator will block until
4513    the next future completes, irrespective of order.
4514
4515    Additionally, you can also add more futures to this object during
4516    computation with the ``.add`` method
4517
4518    Parameters
4519    ----------
4520    futures: Collection of futures
4521        A list of Future objects to be iterated over in the order in which they
4522        complete
4523    with_results: bool (False)
4524        Whether to wait and include results of futures as well;
4525        in this case `as_completed` yields a tuple of (future, result)
4526    raise_errors: bool (True)
4527        Whether we should raise when the result of a future raises an exception;
4528        only affects behavior when `with_results=True`.
4529
4530    Examples
4531    --------
4532    >>> x, y, z = client.map(inc, [1, 2, 3])  # doctest: +SKIP
4533    >>> for future in as_completed([x, y, z]):  # doctest: +SKIP
4534    ...     print(future.result())  # doctest: +SKIP
4535    3
4536    2
4537    4
4538
4539    Add more futures during computation
4540
4541    >>> x, y, z = client.map(inc, [1, 2, 3])  # doctest: +SKIP
4542    >>> ac = as_completed([x, y, z])  # doctest: +SKIP
4543    >>> for future in ac:  # doctest: +SKIP
4544    ...     print(future.result())  # doctest: +SKIP
4545    ...     if random.random() < 0.5:  # doctest: +SKIP
4546    ...         ac.add(c.submit(double, future))  # doctest: +SKIP
4547    4
4548    2
4549    8
4550    3
4551    6
4552    12
4553    24
4554
4555    Optionally wait until the result has been gathered as well
4556
4557    >>> ac = as_completed([x, y, z], with_results=True)  # doctest: +SKIP
4558    >>> for future, result in ac:  # doctest: +SKIP
4559    ...     print(result)  # doctest: +SKIP
4560    2
4561    4
4562    3
4563    """
4564
4565    def __init__(self, futures=None, loop=None, with_results=False, raise_errors=True):
4566        if futures is None:
4567            futures = []
4568        self.futures = defaultdict(lambda: 0)
4569        self.queue = pyQueue()
4570        self.lock = threading.Lock()
4571        self.loop = loop or default_client().loop
4572        self.thread_condition = threading.Condition()
4573        self.with_results = with_results
4574        self.raise_errors = raise_errors
4575
4576        if futures:
4577            self.update(futures)
4578
4579    @property
4580    def condition(self):
4581        try:
4582            return self._condition
4583        except AttributeError:
4584            self._condition = asyncio.Condition()
4585            return self._condition
4586
4587    async def _track_future(self, future):
4588        try:
4589            await _wait(future)
4590        except CancelledError:
4591            pass
4592        if self.with_results:
4593            try:
4594                result = await future._result(raiseit=False)
4595            except CancelledError as exc:
4596                result = exc
4597        with self.lock:
4598            if future in self.futures:
4599                self.futures[future] -= 1
4600                if not self.futures[future]:
4601                    del self.futures[future]
4602                if self.with_results:
4603                    self.queue.put_nowait((future, result))
4604                else:
4605                    self.queue.put_nowait(future)
4606                async with self.condition:
4607                    self.condition.notify()
4608                with self.thread_condition:
4609                    self.thread_condition.notify()
4610
4611    def update(self, futures):
4612        """Add multiple futures to the collection.
4613
4614        The added futures will emit from the iterator once they finish"""
4615        from .actor import ActorFuture
4616
4617        with self.lock:
4618            for f in futures:
4619                if not isinstance(f, (Future, ActorFuture)):
4620                    raise TypeError("Input must be a future, got %s" % f)
4621                self.futures[f] += 1
4622                self.loop.add_callback(self._track_future, f)
4623
4624    def add(self, future):
4625        """Add a future to the collection
4626
4627        This future will emit from the iterator once it finishes
4628        """
4629        self.update((future,))
4630
4631    def is_empty(self):
4632        """Returns True if there no completed or computing futures"""
4633        return not self.count()
4634
4635    def has_ready(self):
4636        """Returns True if there are completed futures available."""
4637        return not self.queue.empty()
4638
4639    def count(self):
4640        """Return the number of futures yet to be returned
4641
4642        This includes both the number of futures still computing, as well as
4643        those that are finished, but have not yet been returned from this
4644        iterator.
4645        """
4646        with self.lock:
4647            return len(self.futures) + len(self.queue.queue)
4648
4649    def __repr__(self):
4650        return "<as_completed: waiting={} done={}>".format(
4651            len(self.futures), len(self.queue.queue)
4652        )
4653
4654    def __iter__(self):
4655        return self
4656
4657    def __aiter__(self):
4658        return self
4659
4660    def _get_and_raise(self):
4661        res = self.queue.get()
4662        if self.with_results:
4663            future, result = res
4664            if self.raise_errors and future.status == "error":
4665                typ, exc, tb = result
4666                raise exc.with_traceback(tb)
4667        return res
4668
4669    def __next__(self):
4670        while self.queue.empty():
4671            if self.is_empty():
4672                raise StopIteration()
4673            with self.thread_condition:
4674                self.thread_condition.wait(timeout=0.100)
4675        return self._get_and_raise()
4676
4677    async def __anext__(self):
4678        if not self.futures and self.queue.empty():
4679            raise StopAsyncIteration
4680        while self.queue.empty():
4681            if not self.futures:
4682                raise StopAsyncIteration
4683            async with self.condition:
4684                await self.condition.wait()
4685
4686        return self._get_and_raise()
4687
4688    next = __next__
4689
4690    def next_batch(self, block=True):
4691        """Get the next batch of completed futures.
4692
4693        Parameters
4694        ----------
4695        block : bool, optional
4696            If True then wait until we have some result, otherwise return
4697            immediately, even with an empty list.  Defaults to True.
4698
4699        Examples
4700        --------
4701        >>> ac = as_completed(futures)  # doctest: +SKIP
4702        >>> client.gather(ac.next_batch())  # doctest: +SKIP
4703        [4, 1, 3]
4704
4705        >>> client.gather(ac.next_batch(block=False))  # doctest: +SKIP
4706        []
4707
4708        Returns
4709        -------
4710        List of futures or (future, result) tuples
4711        """
4712        if block:
4713            batch = [next(self)]
4714        else:
4715            batch = []
4716        while not self.queue.empty():
4717            batch.append(self.queue.get())
4718        return batch
4719
4720    def batches(self):
4721        """
4722        Yield all finished futures at once rather than one-by-one
4723
4724        This returns an iterator of lists of futures or lists of
4725        (future, result) tuples rather than individual futures or individual
4726        (future, result) tuples.  It will yield these as soon as possible
4727        without waiting.
4728
4729        Examples
4730        --------
4731        >>> for batch in as_completed(futures).batches():  # doctest: +SKIP
4732        ...     results = client.gather(batch)
4733        ...     print(results)
4734        [4, 2]
4735        [1, 3, 7]
4736        [5]
4737        [6]
4738        """
4739        while True:
4740            try:
4741                yield self.next_batch(block=True)
4742            except StopIteration:
4743                return
4744
4745    def clear(self):
4746        """Clear out all submitted futures"""
4747        with self.lock:
4748            self.futures.clear()
4749            while not self.queue.empty():
4750                self.queue.get()
4751
4752
4753def AsCompleted(*args, **kwargs):
4754    raise Exception("This has moved to as_completed")
4755
4756
4757def default_client(c=None):
4758    """Return a client if one has started"""
4759    c = c or _get_global_client()
4760    if c:
4761        return c
4762    else:
4763        raise ValueError(
4764            "No clients found\n"
4765            "Start a client and point it to the scheduler address\n"
4766            "  from distributed import Client\n"
4767            "  client = Client('ip-addr-of-scheduler:8786')\n"
4768        )
4769
4770
4771def ensure_default_get(client):
4772    dask.config.set(scheduler="dask.distributed")
4773    _set_global_client(client)
4774
4775
4776def redict_collection(c, dsk):
4777    from dask.delayed import Delayed
4778
4779    if isinstance(c, Delayed):
4780        return Delayed(c.key, dsk)
4781    else:
4782        cc = copy.copy(c)
4783        cc.dask = dsk
4784        return cc
4785
4786
4787def futures_of(o, client=None):
4788    """Future objects in a collection
4789
4790    Parameters
4791    ----------
4792    o : collection
4793        A possibly nested collection of Dask objects
4794
4795    Examples
4796    --------
4797    >>> futures_of(my_dask_dataframe)
4798    [<Future: finished key: ...>,
4799     <Future: pending  key: ...>]
4800
4801    Returns
4802    -------
4803    futures : List[Future]
4804        A list of futures held by those collections
4805    """
4806    stack = [o]
4807    seen = set()
4808    futures = list()
4809    while stack:
4810        x = stack.pop()
4811        if type(x) in (tuple, set, list):
4812            stack.extend(x)
4813        elif type(x) is dict:
4814            stack.extend(x.values())
4815        elif type(x) is SubgraphCallable:
4816            stack.extend(x.dsk.values())
4817        elif isinstance(x, Future):
4818            if x not in seen:
4819                seen.add(x)
4820                futures.append(x)
4821        elif dask.is_dask_collection(x):
4822            stack.extend(x.__dask_graph__().values())
4823
4824    if client is not None:
4825        bad = {f for f in futures if f.cancelled()}
4826        if bad:
4827            raise CancelledError(bad)
4828
4829    return futures[::-1]
4830
4831
4832def fire_and_forget(obj):
4833    """Run tasks at least once, even if we release the futures
4834
4835    Under normal operation Dask will not run any tasks for which there is not
4836    an active future (this avoids unnecessary work in many situations).
4837    However sometimes you want to just fire off a task, not track its future,
4838    and expect it to finish eventually.  You can use this function on a future
4839    or collection of futures to ask Dask to complete the task even if no active
4840    client is tracking it.
4841
4842    The results will not be kept in memory after the task completes (unless
4843    there is an active future) so this is only useful for tasks that depend on
4844    side effects.
4845
4846    Parameters
4847    ----------
4848    obj : Future, list, dict, dask collection
4849        The futures that you want to run at least once
4850
4851    Examples
4852    --------
4853    >>> fire_and_forget(client.submit(func, *args))  # doctest: +SKIP
4854    """
4855    futures = futures_of(obj)
4856    for future in futures:
4857        future.client._send_to_scheduler(
4858            {
4859                "op": "client-desires-keys",
4860                "keys": [stringify(future.key)],
4861                "client": "fire-and-forget",
4862            }
4863        )
4864
4865
4866class get_task_stream:
4867    """
4868    Collect task stream within a context block
4869
4870    This provides diagnostic information about every task that was run during
4871    the time when this block was active.
4872
4873    This must be used as a context manager.
4874
4875    Parameters
4876    ----------
4877    plot: boolean, str
4878        If true then also return a Bokeh figure
4879        If plot == 'save' then save the figure to a file
4880    filename: str (optional)
4881        The filename to save to if you set ``plot='save'``
4882
4883    Examples
4884    --------
4885    >>> with get_task_stream() as ts:
4886    ...     x.compute()
4887    >>> ts.data
4888    [...]
4889
4890    Get back a Bokeh figure and optionally save to a file
4891
4892    >>> with get_task_stream(plot='save', filename='task-stream.html') as ts:
4893    ...    x.compute()
4894    >>> ts.figure
4895    <Bokeh Figure>
4896
4897    To share this file with others you may wish to upload and serve it online.
4898    A common way to do this is to upload the file as a gist, and then serve it
4899    on https://raw.githack.com ::
4900
4901       $ python -m pip install gist
4902       $ gist task-stream.html
4903       https://gist.github.com/8a5b3c74b10b413f612bb5e250856ceb
4904
4905    You can then navigate to that site, click the "Raw" button to the right of
4906    the ``task-stream.html`` file, and then provide that URL to
4907    https://raw.githack.com .  This process should provide a sharable link that
4908    others can use to see your task stream plot.
4909
4910    See Also
4911    --------
4912    Client.get_task_stream: Function version of this context manager
4913    """
4914
4915    def __init__(self, client=None, plot=False, filename="task-stream.html"):
4916        self.data = []
4917        self._plot = plot
4918        self._filename = filename
4919        self.figure = None
4920        self.client = client or default_client()
4921        self.client.get_task_stream(start=0, stop=0)  # ensure plugin
4922
4923    def __enter__(self):
4924        self.start = time()
4925        return self
4926
4927    def __exit__(self, typ, value, traceback):
4928        L = self.client.get_task_stream(
4929            start=self.start, plot=self._plot, filename=self._filename
4930        )
4931        if self._plot:
4932            L, self.figure = L
4933        self.data.extend(L)
4934
4935    async def __aenter__(self):
4936        return self
4937
4938    async def __aexit__(self, typ, value, traceback):
4939        L = await self.client.get_task_stream(
4940            start=self.start, plot=self._plot, filename=self._filename
4941        )
4942        if self._plot:
4943            L, self.figure = L
4944        self.data.extend(L)
4945
4946
4947class performance_report:
4948    """Gather performance report
4949
4950    This creates a static HTML file that includes many of the same plots of the
4951    dashboard for later viewing.
4952
4953    The resulting file uses JavaScript, and so must be viewed with a web
4954    browser.  Locally we recommend using ``python -m http.server`` or hosting
4955    the file live online.
4956
4957    Parameters
4958    ----------
4959    filename: str, optional
4960        The filename to save the performance report locally
4961
4962    stacklevel: int, optional
4963        The code execution frame utilized for populating the Calling Code section
4964        of the report. Defaults to `1` which is the frame calling ``performance_report``
4965
4966    mode: str, optional
4967        Mode parameter to pass to :func:`bokeh.io.output.output_file`. Defaults to ``None``.
4968
4969    Examples
4970    --------
4971    >>> with performance_report(filename="myfile.html", stacklevel=1):
4972    ...     x.compute()
4973
4974    $ python -m http.server
4975    $ open myfile.html
4976    """
4977
4978    def __init__(self, filename="dask-report.html", stacklevel=1, mode=None):
4979        self.filename = filename
4980        # stacklevel 0 or less - shows dask internals which likely isn't helpful
4981        self._stacklevel = stacklevel if stacklevel > 0 else 1
4982        self.mode = mode
4983
4984    async def __aenter__(self):
4985        self.start = time()
4986        self.last_count = await get_client().run_on_scheduler(
4987            lambda dask_scheduler: dask_scheduler.monitor.count
4988        )
4989        await get_client().get_task_stream(start=0, stop=0)  # ensure plugin
4990
4991    async def __aexit__(self, typ, value, traceback, code=None):
4992        client = get_client()
4993        if code is None:
4994            code = client._get_computation_code(self._stacklevel + 1)
4995        data = await client.scheduler.performance_report(
4996            start=self.start, last_count=self.last_count, code=code, mode=self.mode
4997        )
4998        with open(self.filename, "w") as f:
4999            f.write(data)
5000
5001    def __enter__(self):
5002        get_client().sync(self.__aenter__)
5003
5004    def __exit__(self, typ, value, traceback):
5005        client = get_client()
5006        code = client._get_computation_code(self._stacklevel + 1)
5007        client.sync(self.__aexit__, type, value, traceback, code=code)
5008
5009
5010class get_task_metadata:
5011    """Collect task metadata within a context block
5012
5013    This gathers ``TaskState`` metadata and final state from the scheduler
5014    for tasks which are submitted and finished within the scope of this
5015    context manager.
5016
5017    Examples
5018    --------
5019    >>> with get_task_metadata() as tasks:
5020    ...     x.compute()
5021    >>> tasks.metadata
5022    {...}
5023    >>> tasks.state
5024    {...}
5025    """
5026
5027    def __init__(self):
5028        self.name = f"task-metadata-{uuid.uuid4().hex}"
5029        self.keys = set()
5030        self.metadata = None
5031        self.state = None
5032
5033    async def __aenter__(self):
5034        await get_client().scheduler.start_task_metadata(name=self.name)
5035        return self
5036
5037    async def __aexit__(self, typ, value, traceback):
5038        response = await get_client().scheduler.stop_task_metadata(name=self.name)
5039        self.metadata = response["metadata"]
5040        self.state = response["state"]
5041
5042    def __enter__(self):
5043        return get_client().sync(self.__aenter__)
5044
5045    def __exit__(self, typ, value, traceback):
5046        return get_client().sync(self.__aexit__, type, value, traceback)
5047
5048
5049@contextmanager
5050def temp_default_client(c):
5051    """Set the default client for the duration of the context
5052
5053    .. note::
5054       This function should be used exclusively for unit testing the default client
5055       functionality. In all other cases, please use ``Client.as_current`` instead.
5056
5057    .. note::
5058       Unlike ``Client.as_current``, this context manager is neither thread-local nor
5059       task-local.
5060
5061    Parameters
5062    ----------
5063    c : Client
5064        This is what default_client() will return within the with-block.
5065    """
5066    old_exec = default_client()
5067    _set_global_client(c)
5068    try:
5069        yield
5070    finally:
5071        _set_global_client(old_exec)
5072
5073
5074def _close_global_client():
5075    """
5076    Force close of global client.  This cleans up when a client
5077    wasn't close explicitly, e.g. interactive sessions.
5078    """
5079    c = _get_global_client()
5080    if c is not None:
5081        c._should_close_loop = False
5082        with suppress(TimeoutError, RuntimeError):
5083            if c.asynchronous:
5084                c.loop.add_callback(c.close, timeout=3)
5085            else:
5086                c.close(timeout=3)
5087
5088
5089atexit.register(_close_global_client)
5090