1from __future__ import annotations 2 3import asyncio 4import atexit 5import copy 6import errno 7import inspect 8import json 9import logging 10import os 11import re 12import sys 13import threading 14import traceback 15import uuid 16import warnings 17import weakref 18from collections import defaultdict 19from collections.abc import Iterator 20from concurrent.futures import ThreadPoolExecutor 21from concurrent.futures._base import DoneAndNotDoneFutures 22from contextlib import contextmanager, suppress 23from contextvars import ContextVar 24from functools import partial 25from numbers import Number 26from queue import Queue as pyQueue 27from typing import TYPE_CHECKING, Awaitable, ClassVar, Sequence 28 29from tlz import first, groupby, keymap, merge, partition_all, valmap 30 31if TYPE_CHECKING: 32 from typing_extensions import Literal 33 34import dask 35from dask.base import collections_to_dsk, normalize_token, tokenize 36from dask.core import flatten 37from dask.highlevelgraph import HighLevelGraph 38from dask.optimization import SubgraphCallable 39from dask.utils import ( 40 _deprecated, 41 apply, 42 ensure_dict, 43 format_bytes, 44 funcname, 45 parse_timedelta, 46 stringify, 47 typename, 48) 49from dask.widgets import get_template 50 51try: 52 from dask.delayed import single_key 53except ImportError: 54 single_key = first 55from tornado import gen 56from tornado.ioloop import IOLoop, PeriodicCallback 57 58from . import versions as version_module # type: ignore 59from .batched import BatchedSend 60from .cfexecutor import ClientExecutor 61from .core import ( 62 CommClosedError, 63 ConnectionPool, 64 PooledRPCCall, 65 clean_exception, 66 connect, 67 rpc, 68) 69from .diagnostics.plugin import NannyPlugin, UploadFile, WorkerPlugin, _get_plugin_name 70from .metrics import time 71from .objects import HasWhat, SchedulerInfo, WhoHas 72from .protocol import to_serialize 73from .protocol.pickle import dumps, loads 74from .publish import Datasets 75from .pubsub import PubSubClientExtension 76from .security import Security 77from .sizeof import sizeof 78from .threadpoolexecutor import rejoin 79from .utils import ( 80 All, 81 Any, 82 CancelledError, 83 LoopRunner, 84 TimeoutError, 85 format_dashboard_link, 86 has_keyword, 87 log_errors, 88 no_default, 89 sync, 90 thread_state, 91) 92from .utils_comm import ( 93 WrappedKey, 94 gather_from_workers, 95 pack_data, 96 retry_operation, 97 scatter_to_workers, 98 unpack_remotedata, 99) 100from .worker import get_client, get_worker, secede 101 102logger = logging.getLogger(__name__) 103 104_global_clients: weakref.WeakValueDictionary[ 105 int, Client 106] = weakref.WeakValueDictionary() 107_global_client_index = [0] 108 109_current_client = ContextVar("_current_client", default=None) 110 111DEFAULT_EXTENSIONS = [PubSubClientExtension] 112# Placeholder used in the get_dataset function(s) 113NO_DEFAULT_PLACEHOLDER = "_no_default_" 114 115 116def _get_global_client() -> Client | None: 117 L = sorted(list(_global_clients), reverse=True) 118 for k in L: 119 c = _global_clients[k] 120 if c.status != "closed": 121 return c 122 else: 123 del _global_clients[k] 124 return None 125 126 127def _set_global_client(c: Client | None) -> None: 128 if c is not None: 129 _global_clients[_global_client_index[0]] = c 130 _global_client_index[0] += 1 131 132 133def _del_global_client(c: Client) -> None: 134 for k in list(_global_clients): 135 try: 136 if _global_clients[k] is c: 137 del _global_clients[k] 138 except KeyError: 139 pass 140 141 142class Future(WrappedKey): 143 """A remotely running computation 144 145 A Future is a local proxy to a result running on a remote worker. A user 146 manages future objects in the local Python process to determine what 147 happens in the larger cluster. 148 149 Parameters 150 ---------- 151 key: str, or tuple 152 Key of remote data to which this future refers 153 client: Client 154 Client that should own this future. Defaults to _get_global_client() 155 inform: bool 156 Do we inform the scheduler that we need an update on this future 157 158 Examples 159 -------- 160 Futures typically emerge from Client computations 161 162 >>> my_future = client.submit(add, 1, 2) # doctest: +SKIP 163 164 We can track the progress and results of a future 165 166 >>> my_future # doctest: +SKIP 167 <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> 168 169 We can get the result or the exception and traceback from the future 170 171 >>> my_future.result() # doctest: +SKIP 172 173 See Also 174 -------- 175 Client: Creates futures 176 """ 177 178 _cb_executor = None 179 _cb_executor_pid = None 180 181 def __init__(self, key, client=None, inform=True, state=None): 182 self.key = key 183 self._cleared = False 184 tkey = stringify(key) 185 self.client = client or Client.current() 186 self.client._inc_ref(tkey) 187 self._generation = self.client.generation 188 189 if tkey in self.client.futures: 190 self._state = self.client.futures[tkey] 191 else: 192 self._state = self.client.futures[tkey] = FutureState() 193 194 if inform: 195 self.client._send_to_scheduler( 196 { 197 "op": "client-desires-keys", 198 "keys": [stringify(key)], 199 "client": self.client.id, 200 } 201 ) 202 203 if state is not None: 204 try: 205 handler = self.client._state_handlers[state] 206 except KeyError: 207 pass 208 else: 209 handler(key=key) 210 211 @property 212 def executor(self): 213 return self.client 214 215 @property 216 def status(self): 217 return self._state.status 218 219 def done(self): 220 """Is the computation complete?""" 221 return self._state.done() 222 223 def result(self, timeout=None): 224 """Wait until computation completes, gather result to local process. 225 226 If *timeout* seconds are elapsed before returning, a 227 ``dask.distributed.TimeoutError`` is raised. 228 """ 229 if self.client.asynchronous: 230 return self.client.sync(self._result, callback_timeout=timeout) 231 232 # shorten error traceback 233 result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False) 234 if self.status == "error": 235 typ, exc, tb = result 236 raise exc.with_traceback(tb) 237 elif self.status == "cancelled": 238 raise result 239 else: 240 return result 241 242 async def _result(self, raiseit=True): 243 await self._state.wait() 244 if self.status == "error": 245 exc = clean_exception(self._state.exception, self._state.traceback) 246 if raiseit: 247 typ, exc, tb = exc 248 raise exc.with_traceback(tb) 249 else: 250 return exc 251 elif self.status == "cancelled": 252 exception = CancelledError(self.key) 253 if raiseit: 254 raise exception 255 else: 256 return exception 257 else: 258 result = await self.client._gather([self]) 259 return result[0] 260 261 async def _exception(self): 262 await self._state.wait() 263 if self.status == "error": 264 return self._state.exception 265 else: 266 return None 267 268 def exception(self, timeout=None, **kwargs): 269 """Return the exception of a failed task 270 271 If *timeout* seconds are elapsed before returning, a 272 ``dask.distributed.TimeoutError`` is raised. 273 274 See Also 275 -------- 276 Future.traceback 277 """ 278 return self.client.sync(self._exception, callback_timeout=timeout, **kwargs) 279 280 def add_done_callback(self, fn): 281 """Call callback on future when callback has finished 282 283 The callback ``fn`` should take the future as its only argument. This 284 will be called regardless of if the future completes successfully, 285 errs, or is cancelled 286 287 The callback is executed in a separate thread. 288 """ 289 cls = Future 290 if cls._cb_executor is None or cls._cb_executor_pid != os.getpid(): 291 try: 292 cls._cb_executor = ThreadPoolExecutor( 293 1, thread_name_prefix="Dask-Callback-Thread" 294 ) 295 except TypeError: 296 cls._cb_executor = ThreadPoolExecutor(1) 297 cls._cb_executor_pid = os.getpid() 298 299 def execute_callback(fut): 300 try: 301 fn(fut) 302 except BaseException: 303 logger.exception("Error in callback %s of %s:", fn, fut) 304 305 self.client.loop.add_callback( 306 done_callback, self, partial(cls._cb_executor.submit, execute_callback) 307 ) 308 309 def cancel(self, **kwargs): 310 """Cancel request to run this future 311 312 See Also 313 -------- 314 Client.cancel 315 """ 316 return self.client.cancel([self], **kwargs) 317 318 def retry(self, **kwargs): 319 """Retry this future if it has failed 320 321 See Also 322 -------- 323 Client.retry 324 """ 325 return self.client.retry([self], **kwargs) 326 327 def cancelled(self): 328 """Returns True if the future has been cancelled""" 329 return self._state.status == "cancelled" 330 331 async def _traceback(self): 332 await self._state.wait() 333 if self.status == "error": 334 return self._state.traceback 335 else: 336 return None 337 338 def traceback(self, timeout=None, **kwargs): 339 """Return the traceback of a failed task 340 341 This returns a traceback object. You can inspect this object using the 342 ``traceback`` module. Alternatively if you call ``future.result()`` 343 this traceback will accompany the raised exception. 344 345 If *timeout* seconds are elapsed before returning, a 346 ``dask.distributed.TimeoutError`` is raised. 347 348 Examples 349 -------- 350 >>> import traceback # doctest: +SKIP 351 >>> tb = future.traceback() # doctest: +SKIP 352 >>> traceback.format_tb(tb) # doctest: +SKIP 353 [...] 354 355 See Also 356 -------- 357 Future.exception 358 """ 359 return self.client.sync(self._traceback, callback_timeout=timeout, **kwargs) 360 361 @property 362 def type(self): 363 return self._state.type 364 365 def release(self, _in_destructor=False): 366 # NOTE: this method can be called from different threads 367 # (see e.g. Client.get() or Future.__del__()) 368 if not self._cleared and self.client.generation == self._generation: 369 self._cleared = True 370 try: 371 self.client.loop.add_callback(self.client._dec_ref, stringify(self.key)) 372 except TypeError: 373 pass # Shutting down, add_callback may be None 374 375 def __getstate__(self): 376 return self.key, self.client.scheduler.address 377 378 def __setstate__(self, state): 379 key, address = state 380 try: 381 c = Client.current(allow_global=False) 382 except ValueError: 383 c = get_client(address) 384 self.__init__(key, c) 385 c._send_to_scheduler( 386 { 387 "op": "update-graph", 388 "tasks": {}, 389 "keys": [stringify(self.key)], 390 "client": c.id, 391 } 392 ) 393 394 def __del__(self): 395 try: 396 self.release() 397 except AttributeError: 398 # Ocassionally we see this error when shutting down the client 399 # https://github.com/dask/distributed/issues/4305 400 if not sys.is_finalizing(): 401 raise 402 except RuntimeError: # closed event loop 403 pass 404 405 def __repr__(self): 406 if self.type: 407 return ( 408 f"<Future: {self.status}, type: {typename(self.type)}, key: {self.key}>" 409 ) 410 else: 411 return f"<Future: {self.status}, key: {self.key}>" 412 413 def _repr_html_(self): 414 return get_template("future.html.j2").render( 415 key=str(self.key), 416 type=typename(self.type), 417 status=self.status, 418 ) 419 420 def __await__(self): 421 return self.result().__await__() 422 423 424class FutureState: 425 """A Future's internal state. 426 427 This is shared between all Futures with the same key and client. 428 """ 429 430 __slots__ = ("_event", "status", "type", "exception", "traceback") 431 432 def __init__(self): 433 self._event = None 434 self.status = "pending" 435 self.type = None 436 437 def _get_event(self): 438 # Can't create Event eagerly in constructor as it can fetch 439 # its IOLoop from the wrong thread 440 # (https://github.com/tornadoweb/tornado/issues/2189) 441 event = self._event 442 if event is None: 443 event = self._event = asyncio.Event() 444 return event 445 446 def cancel(self): 447 self.status = "cancelled" 448 self.exception = CancelledError() 449 self._get_event().set() 450 451 def finish(self, type=None): 452 self.status = "finished" 453 self._get_event().set() 454 if type is not None: 455 self.type = type 456 457 def lose(self): 458 self.status = "lost" 459 self._get_event().clear() 460 461 def retry(self): 462 self.status = "pending" 463 self._get_event().clear() 464 465 def set_error(self, exception, traceback): 466 _, exception, traceback = clean_exception(exception, traceback) 467 468 self.status = "error" 469 self.exception = exception 470 self.traceback = traceback 471 self._get_event().set() 472 473 def done(self): 474 return self._event is not None and self._event.is_set() 475 476 def reset(self): 477 self.status = "pending" 478 if self._event is not None: 479 self._event.clear() 480 481 async def wait(self, timeout=None): 482 await asyncio.wait_for(self._get_event().wait(), timeout) 483 484 def __repr__(self): 485 return f"<{self.__class__.__name__}: {self.status}>" 486 487 488async def done_callback(future, callback): 489 """Coroutine that waits on future, then calls callback""" 490 while future.status == "pending": 491 await future._state.wait() 492 callback(future) 493 494 495@partial(normalize_token.register, Future) 496def normalize_future(f): 497 return [f.key, type(f)] 498 499 500class AllExit(Exception): 501 """Custom exception class to exit All(...) early.""" 502 503 504def _handle_print(event): 505 _, msg = event 506 if isinstance(msg, dict) and "args" in msg and "kwargs" in msg: 507 print(*msg["args"], **msg["kwargs"]) 508 else: 509 print(msg) 510 511 512def _handle_warn(event): 513 _, msg = event 514 if isinstance(msg, dict) and "args" in msg and "kwargs" in msg: 515 warnings.warn(*msg["args"], **msg["kwargs"]) 516 else: 517 warnings.warn(msg) 518 519 520class Client: 521 """Connect to and submit computation to a Dask cluster 522 523 The Client connects users to a Dask cluster. It provides an asynchronous 524 user interface around functions and futures. This class resembles 525 executors in ``concurrent.futures`` but also allows ``Future`` objects 526 within ``submit/map`` calls. When a Client is instantiated it takes over 527 all ``dask.compute`` and ``dask.persist`` calls by default. 528 529 It is also common to create a Client without specifying the scheduler 530 address , like ``Client()``. In this case the Client creates a 531 :class:`LocalCluster` in the background and connects to that. Any extra 532 keywords are passed from Client to LocalCluster in this case. See the 533 LocalCluster documentation for more information. 534 535 Parameters 536 ---------- 537 address: string, or Cluster 538 This can be the address of a ``Scheduler`` server like a string 539 ``'127.0.0.1:8786'`` or a cluster object like ``LocalCluster()`` 540 timeout: int 541 Timeout duration for initial connection to the scheduler 542 set_as_default: bool (True) 543 Use this Client as the global dask scheduler 544 scheduler_file: string (optional) 545 Path to a file with scheduler information if available 546 security: Security or bool, optional 547 Optional security information. If creating a local cluster can also 548 pass in ``True``, in which case temporary self-signed credentials will 549 be created automatically. 550 asynchronous: bool (False by default) 551 Set to True if using this client within async/await functions or within 552 Tornado gen.coroutines. Otherwise this should remain False for normal 553 use. 554 name: string (optional) 555 Gives the client a name that will be included in logs generated on 556 the scheduler for matters relating to this client 557 direct_to_workers: bool (optional) 558 Whether or not to connect directly to the workers, or to ask 559 the scheduler to serve as intermediary. 560 heartbeat_interval: int 561 Time in milliseconds between heartbeats to scheduler 562 **kwargs: 563 If you do not pass a scheduler address, Client will create a 564 ``LocalCluster`` object, passing any extra keyword arguments. 565 566 Examples 567 -------- 568 Provide cluster's scheduler node address on initialization: 569 570 >>> client = Client('127.0.0.1:8786') # doctest: +SKIP 571 572 Use ``submit`` method to send individual computations to the cluster 573 574 >>> a = client.submit(add, 1, 2) # doctest: +SKIP 575 >>> b = client.submit(add, 10, 20) # doctest: +SKIP 576 577 Continue using submit or map on results to build up larger computations 578 579 >>> c = client.submit(add, a, b) # doctest: +SKIP 580 581 Gather results with the ``gather`` method. 582 583 >>> client.gather(c) # doctest: +SKIP 584 33 585 586 You can also call Client with no arguments in order to create your own 587 local cluster. 588 589 >>> client = Client() # makes your own local "cluster" # doctest: +SKIP 590 591 Extra keywords will be passed directly to LocalCluster 592 593 >>> client = Client(n_workers=2, threads_per_worker=4) # doctest: +SKIP 594 595 See Also 596 -------- 597 distributed.scheduler.Scheduler: Internal scheduler 598 distributed.LocalCluster: 599 """ 600 601 _instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet() 602 603 _default_event_handlers = {"print": _handle_print, "warn": _handle_warn} 604 605 def __init__( 606 self, 607 address=None, 608 loop=None, 609 timeout=no_default, 610 set_as_default=True, 611 scheduler_file=None, 612 security=None, 613 asynchronous=False, 614 name=None, 615 heartbeat_interval=None, 616 serializers=None, 617 deserializers=None, 618 extensions=DEFAULT_EXTENSIONS, 619 direct_to_workers=None, 620 connection_limit=512, 621 **kwargs, 622 ): 623 if timeout == no_default: 624 timeout = dask.config.get("distributed.comm.timeouts.connect") 625 if timeout is not None: 626 timeout = parse_timedelta(timeout, "s") 627 self._timeout = timeout 628 629 self.futures = dict() 630 self.refcount = defaultdict(lambda: 0) 631 self.coroutines = [] 632 if name is None: 633 name = dask.config.get("client-name", None) 634 self.id = ( 635 type(self).__name__ 636 + ("-" + name + "-" if name else "-") 637 + str(uuid.uuid1(clock_seq=os.getpid())) 638 ) 639 self.generation = 0 640 self.status = "newly-created" 641 self._pending_msg_buffer = [] 642 self.extensions = {} 643 self.scheduler_file = scheduler_file 644 self._startup_kwargs = kwargs 645 self.cluster = None 646 self.scheduler = None 647 self._scheduler_identity = {} 648 # A reentrant-lock on the refcounts for futures associated with this 649 # client. Should be held by individual operations modifying refcounts, 650 # or any bulk operation that needs to ensure the set of futures doesn't 651 # change during operation. 652 self._refcount_lock = threading.RLock() 653 self.datasets = Datasets(self) 654 self._serializers = serializers 655 if deserializers is None: 656 deserializers = serializers 657 self._deserializers = deserializers 658 self.direct_to_workers = direct_to_workers 659 660 # Communication 661 self.scheduler_comm = None 662 663 if address is None: 664 address = dask.config.get("scheduler-address", None) 665 if address: 666 logger.info("Config value `scheduler-address` found: %s", address) 667 668 if address is not None and kwargs: 669 raise ValueError(f"Unexpected keyword arguments: {sorted(kwargs)}") 670 671 if isinstance(address, (rpc, PooledRPCCall)): 672 self.scheduler = address 673 elif isinstance(getattr(address, "scheduler_address", None), str): 674 # It's a LocalCluster or LocalCluster-compatible object 675 self.cluster = address 676 with suppress(AttributeError): 677 loop = address.loop 678 if security is None: 679 security = getattr(self.cluster, "security", None) 680 elif address is not None and not isinstance(address, str): 681 raise TypeError( 682 "Scheduler address must be a string or a Cluster instance, got {}".format( 683 type(address) 684 ) 685 ) 686 687 if security is None: 688 security = Security() 689 elif isinstance(security, dict): 690 security = Security(**security) 691 elif security is True: 692 security = Security.temporary() 693 self._startup_kwargs["security"] = security 694 elif not isinstance(security, Security): 695 raise TypeError("security must be a Security object") 696 697 self.security = security 698 699 if name == "worker": 700 self.connection_args = self.security.get_connection_args("worker") 701 else: 702 self.connection_args = self.security.get_connection_args("client") 703 704 self._connecting_to_scheduler = False 705 self._asynchronous = asynchronous 706 self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) 707 self.io_loop = self.loop = self._loop_runner.loop 708 709 self._gather_keys = None 710 self._gather_future = None 711 712 if heartbeat_interval is None: 713 heartbeat_interval = dask.config.get("distributed.client.heartbeat") 714 heartbeat_interval = parse_timedelta(heartbeat_interval, default="ms") 715 716 scheduler_info_interval = parse_timedelta( 717 dask.config.get("distributed.client.scheduler-info-interval", default="ms") 718 ) 719 720 self._periodic_callbacks = dict() 721 self._periodic_callbacks["scheduler-info"] = PeriodicCallback( 722 self._update_scheduler_info, scheduler_info_interval * 1000 723 ) 724 self._periodic_callbacks["heartbeat"] = PeriodicCallback( 725 self._heartbeat, heartbeat_interval * 1000 726 ) 727 728 self._start_arg = address 729 self._set_as_default = set_as_default 730 if set_as_default: 731 self._set_config = dask.config.set( 732 scheduler="dask.distributed", shuffle="tasks" 733 ) 734 self._event_handlers = {} 735 736 self._stream_handlers = { 737 "key-in-memory": self._handle_key_in_memory, 738 "lost-data": self._handle_lost_data, 739 "cancelled-key": self._handle_cancelled_key, 740 "task-retried": self._handle_retried_key, 741 "task-erred": self._handle_task_erred, 742 "restart": self._handle_restart, 743 "error": self._handle_error, 744 "event": self._handle_event, 745 } 746 747 self._state_handlers = { 748 "memory": self._handle_key_in_memory, 749 "lost": self._handle_lost_data, 750 "erred": self._handle_task_erred, 751 } 752 753 self.rpc = ConnectionPool( 754 limit=connection_limit, 755 serializers=serializers, 756 deserializers=deserializers, 757 deserialize=True, 758 connection_args=self.connection_args, 759 timeout=timeout, 760 server=self, 761 ) 762 763 for ext in extensions: 764 ext(self) 765 766 self.start(timeout=timeout) 767 Client._instances.add(self) 768 769 from distributed.recreate_tasks import ReplayTaskClient 770 771 ReplayTaskClient(self) 772 773 @contextmanager 774 def as_current(self): 775 """Thread-local, Task-local context manager that causes the Client.current class 776 method to return self. Any Future objects deserialized inside this context 777 manager will be automatically attached to this Client. 778 """ 779 tok = _current_client.set(self) 780 try: 781 yield 782 finally: 783 _current_client.reset(tok) 784 785 @classmethod 786 def current(cls, allow_global=True): 787 """When running within the context of `as_client`, return the context-local 788 current client. Otherwise, return the latest initialised Client. 789 If no Client instances exist, raise ValueError. 790 If allow_global is set to False, raise ValueError if running outside of the 791 `as_client` context manager. 792 """ 793 out = _current_client.get() 794 if out: 795 return out 796 if allow_global: 797 return default_client() 798 raise ValueError("Not running inside the `as_current` context manager") 799 800 @property 801 def asynchronous(self): 802 """Are we running in the event loop? 803 804 This is true if the user signaled that we might be when creating the 805 client as in the following:: 806 807 client = Client(asynchronous=True) 808 809 However, we override this expectation if we can definitively tell that 810 we are running from a thread that is not the event loop. This is 811 common when calling get_client() from within a worker task. Even 812 though the client was originally created in asynchronous mode we may 813 find ourselves in contexts when it is better to operate synchronously. 814 """ 815 try: 816 return self._asynchronous and self.loop is IOLoop.current() 817 except RuntimeError: 818 return False 819 820 @property 821 def dashboard_link(self): 822 """Link to the scheduler's dashboard. 823 824 Returns 825 ------- 826 str 827 Dashboard URL. 828 829 Examples 830 -------- 831 Opening the dashboard in your default web browser: 832 833 >>> import webbrowser 834 >>> from distributed import Client 835 >>> client = Client() 836 >>> webbrowser.open(client.dashboard_link) 837 838 """ 839 try: 840 return self.cluster.dashboard_link 841 except AttributeError: 842 scheduler, info = self._get_scheduler_info() 843 if scheduler is None: 844 return None 845 else: 846 protocol, rest = scheduler.address.split("://") 847 848 port = info["services"]["dashboard"] 849 if protocol == "inproc": 850 host = "localhost" 851 else: 852 host = rest.split(":")[0] 853 854 return format_dashboard_link(host, port) 855 856 def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs): 857 callback_timeout = parse_timedelta(callback_timeout) 858 if ( 859 asynchronous 860 or self.asynchronous 861 or getattr(thread_state, "asynchronous", False) 862 ): 863 future = func(*args, **kwargs) 864 if callback_timeout is not None: 865 future = asyncio.wait_for(future, callback_timeout) 866 return future 867 else: 868 return sync( 869 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 870 ) 871 872 def _get_scheduler_info(self): 873 from .scheduler import Scheduler 874 875 if ( 876 self.cluster 877 and hasattr(self.cluster, "scheduler") 878 and isinstance(self.cluster.scheduler, Scheduler) 879 ): 880 info = self.cluster.scheduler.identity() 881 scheduler = self.cluster.scheduler 882 elif ( 883 self._loop_runner.is_started() 884 and self.scheduler 885 and not (self.asynchronous and self.loop is IOLoop.current()) 886 ): 887 info = sync(self.loop, self.scheduler.identity) 888 scheduler = self.scheduler 889 else: 890 info = self._scheduler_identity 891 scheduler = self.scheduler 892 893 return scheduler, SchedulerInfo(info) 894 895 def __repr__(self): 896 # Note: avoid doing I/O here... 897 info = self._scheduler_identity 898 addr = info.get("address") 899 if addr: 900 workers = info.get("workers", {}) 901 nworkers = len(workers) 902 nthreads = sum(w["nthreads"] for w in workers.values()) 903 text = "<%s: %r processes=%d threads=%d" % ( 904 self.__class__.__name__, 905 addr, 906 nworkers, 907 nthreads, 908 ) 909 memory = [w["memory_limit"] for w in workers.values()] 910 if all(memory): 911 text += ", memory=" + format_bytes(sum(memory)) 912 text += ">" 913 return text 914 915 elif self.scheduler is not None: 916 return "<{}: scheduler={!r}>".format( 917 self.__class__.__name__, 918 self.scheduler.address, 919 ) 920 else: 921 return f"<{self.__class__.__name__}: No scheduler connected>" 922 923 def _repr_html_(self): 924 scheduler, info = self._get_scheduler_info() 925 926 return get_template("client.html.j2").render( 927 id=self.id, 928 scheduler=scheduler, 929 info=info, 930 cluster=self.cluster, 931 scheduler_file=self.scheduler_file, 932 dashboard_link=self.dashboard_link, 933 ) 934 935 def start(self, **kwargs): 936 """Start scheduler running in separate thread""" 937 if self.status != "newly-created": 938 return 939 940 self._loop_runner.start() 941 if self._set_as_default: 942 _set_global_client(self) 943 self.status = "connecting" 944 945 if self.asynchronous: 946 self._started = asyncio.ensure_future(self._start(**kwargs)) 947 else: 948 sync(self.loop, self._start, **kwargs) 949 950 def __await__(self): 951 if hasattr(self, "_started"): 952 return self._started.__await__() 953 else: 954 955 async def _(): 956 return self 957 958 return _().__await__() 959 960 def _send_to_scheduler_safe(self, msg): 961 if self.status in ("running", "closing"): 962 try: 963 self.scheduler_comm.send(msg) 964 except (CommClosedError, AttributeError): 965 if self.status == "running": 966 raise 967 elif self.status in ("connecting", "newly-created"): 968 self._pending_msg_buffer.append(msg) 969 970 def _send_to_scheduler(self, msg): 971 if self.status in ("running", "closing", "connecting", "newly-created"): 972 self.loop.add_callback(self._send_to_scheduler_safe, msg) 973 else: 974 raise Exception( 975 "Tried sending message after closing. Status: %s\n" 976 "Message: %s" % (self.status, msg) 977 ) 978 979 async def _start(self, timeout=no_default, **kwargs): 980 await self.rpc.start() 981 982 if timeout == no_default: 983 timeout = self._timeout 984 if timeout is not None: 985 timeout = parse_timedelta(timeout, "s") 986 987 address = self._start_arg 988 if self.cluster is not None: 989 # Ensure the cluster is started (no-op if already running) 990 try: 991 await self.cluster 992 except Exception: 993 logger.info( 994 "Tried to start cluster and received an error. Proceeding.", 995 exc_info=True, 996 ) 997 address = self.cluster.scheduler_address 998 elif self.scheduler_file is not None: 999 while not os.path.exists(self.scheduler_file): 1000 await asyncio.sleep(0.01) 1001 for i in range(10): 1002 try: 1003 with open(self.scheduler_file) as f: 1004 cfg = json.load(f) 1005 address = cfg["address"] 1006 break 1007 except (ValueError, KeyError): # JSON file not yet flushed 1008 await asyncio.sleep(0.01) 1009 elif self._start_arg is None: 1010 from .deploy import LocalCluster 1011 1012 try: 1013 self.cluster = await LocalCluster( 1014 loop=self.loop, 1015 asynchronous=self._asynchronous, 1016 **self._startup_kwargs, 1017 ) 1018 except OSError as e: 1019 if e.errno != errno.EADDRINUSE: 1020 raise 1021 # The default port was taken, use a random one 1022 self.cluster = await LocalCluster( 1023 scheduler_port=0, 1024 loop=self.loop, 1025 asynchronous=True, 1026 **self._startup_kwargs, 1027 ) 1028 1029 address = self.cluster.scheduler_address 1030 1031 self._gather_semaphore = asyncio.Semaphore(5) 1032 1033 if self.scheduler is None: 1034 self.scheduler = self.rpc(address) 1035 self.scheduler_comm = None 1036 1037 try: 1038 await self._ensure_connected(timeout=timeout) 1039 except (OSError, ImportError): 1040 await self._close() 1041 raise 1042 1043 for pc in self._periodic_callbacks.values(): 1044 pc.start() 1045 1046 for topic, handler in Client._default_event_handlers.items(): 1047 self.subscribe_topic(topic, handler) 1048 1049 self._handle_scheduler_coroutine = asyncio.ensure_future(self._handle_report()) 1050 self.coroutines.append(self._handle_scheduler_coroutine) 1051 1052 return self 1053 1054 async def _reconnect(self): 1055 with log_errors(): 1056 assert self.scheduler_comm.comm.closed() 1057 1058 self.status = "connecting" 1059 self.scheduler_comm = None 1060 1061 for st in self.futures.values(): 1062 st.cancel() 1063 self.futures.clear() 1064 1065 timeout = self._timeout 1066 deadline = self.loop.time() + timeout 1067 while timeout > 0 and self.status == "connecting": 1068 try: 1069 await self._ensure_connected(timeout=timeout) 1070 break 1071 except OSError: 1072 # Wait a bit before retrying 1073 await asyncio.sleep(0.1) 1074 timeout = deadline - self.loop.time() 1075 except ImportError: 1076 await self._close() 1077 break 1078 else: 1079 logger.error( 1080 "Failed to reconnect to scheduler after %.2f " 1081 "seconds, closing client", 1082 self._timeout, 1083 ) 1084 await self._close() 1085 1086 async def _ensure_connected(self, timeout=None): 1087 if ( 1088 self.scheduler_comm 1089 and not self.scheduler_comm.closed() 1090 or self._connecting_to_scheduler 1091 or self.scheduler is None 1092 ): 1093 return 1094 1095 self._connecting_to_scheduler = True 1096 1097 try: 1098 comm = await connect( 1099 self.scheduler.address, timeout=timeout, **self.connection_args 1100 ) 1101 comm.name = "Client->Scheduler" 1102 if timeout is not None: 1103 await asyncio.wait_for(self._update_scheduler_info(), timeout) 1104 else: 1105 await self._update_scheduler_info() 1106 await comm.write( 1107 { 1108 "op": "register-client", 1109 "client": self.id, 1110 "reply": False, 1111 "versions": version_module.get_versions(), 1112 } 1113 ) 1114 except Exception: 1115 if self.status == "closed": 1116 return 1117 else: 1118 raise 1119 finally: 1120 self._connecting_to_scheduler = False 1121 if timeout is not None: 1122 msg = await asyncio.wait_for(comm.read(), timeout) 1123 else: 1124 msg = await comm.read() 1125 assert len(msg) == 1 1126 assert msg[0]["op"] == "stream-start" 1127 1128 if msg[0].get("error"): 1129 raise ImportError(msg[0]["error"]) 1130 if msg[0].get("warning"): 1131 warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) 1132 1133 bcomm = BatchedSend(interval="10ms", loop=self.loop) 1134 bcomm.start(comm) 1135 self.scheduler_comm = bcomm 1136 if self._set_as_default: 1137 _set_global_client(self) 1138 self.status = "running" 1139 1140 for msg in self._pending_msg_buffer: 1141 self._send_to_scheduler(msg) 1142 del self._pending_msg_buffer[:] 1143 1144 logger.debug("Started scheduling coroutines. Synchronized") 1145 1146 async def _update_scheduler_info(self): 1147 if self.status not in ("running", "connecting"): 1148 return 1149 try: 1150 self._scheduler_identity = SchedulerInfo(await self.scheduler.identity()) 1151 except OSError: 1152 logger.debug("Not able to query scheduler for identity") 1153 1154 async def _wait_for_workers(self, n_workers=0, timeout=None): 1155 info = await self.scheduler.identity() 1156 if timeout: 1157 deadline = time() + parse_timedelta(timeout) 1158 else: 1159 deadline = None 1160 while n_workers and len(info["workers"]) < n_workers: 1161 if deadline and time() > deadline: 1162 raise TimeoutError( 1163 "Only %d/%d workers arrived after %s" 1164 % (len(info["workers"]), n_workers, timeout) 1165 ) 1166 await asyncio.sleep(0.1) 1167 info = await self.scheduler.identity() 1168 1169 def wait_for_workers(self, n_workers=0, timeout=None): 1170 """Blocking call to wait for n workers before continuing""" 1171 return self.sync(self._wait_for_workers, n_workers, timeout=timeout) 1172 1173 def _heartbeat(self): 1174 if self.scheduler_comm: 1175 self.scheduler_comm.send({"op": "heartbeat-client"}) 1176 1177 def __enter__(self): 1178 if not self._loop_runner.is_started(): 1179 self.start() 1180 return self 1181 1182 async def __aenter__(self): 1183 await self 1184 return self 1185 1186 async def __aexit__(self, typ, value, traceback): 1187 await self._close() 1188 1189 def __exit__(self, type, value, traceback): 1190 self.close() 1191 1192 def __del__(self): 1193 self.close() 1194 1195 def _inc_ref(self, key): 1196 with self._refcount_lock: 1197 self.refcount[key] += 1 1198 1199 def _dec_ref(self, key): 1200 with self._refcount_lock: 1201 self.refcount[key] -= 1 1202 if self.refcount[key] == 0: 1203 del self.refcount[key] 1204 self._release_key(key) 1205 1206 def _release_key(self, key): 1207 """Release key from distributed memory""" 1208 logger.debug("Release key %s", key) 1209 st = self.futures.pop(key, None) 1210 if st is not None: 1211 st.cancel() 1212 if self.status != "closed": 1213 self._send_to_scheduler( 1214 {"op": "client-releases-keys", "keys": [key], "client": self.id} 1215 ) 1216 1217 async def _handle_report(self): 1218 """Listen to scheduler""" 1219 with log_errors(): 1220 try: 1221 while True: 1222 if self.scheduler_comm is None: 1223 break 1224 try: 1225 msgs = await self.scheduler_comm.comm.read() 1226 except CommClosedError: 1227 if self.status == "running": 1228 logger.info("Client report stream closed to scheduler") 1229 logger.info("Reconnecting...") 1230 self.status = "connecting" 1231 await self._reconnect() 1232 continue 1233 else: 1234 break 1235 if not isinstance(msgs, (list, tuple)): 1236 msgs = (msgs,) 1237 1238 breakout = False 1239 for msg in msgs: 1240 logger.debug("Client receives message %s", msg) 1241 1242 if "status" in msg and "error" in msg["status"]: 1243 typ, exc, tb = clean_exception(**msg) 1244 raise exc.with_traceback(tb) 1245 1246 op = msg.pop("op") 1247 1248 if op == "close" or op == "stream-closed": 1249 breakout = True 1250 break 1251 1252 try: 1253 handler = self._stream_handlers[op] 1254 result = handler(**msg) 1255 if inspect.isawaitable(result): 1256 await result 1257 except Exception as e: 1258 logger.exception(e) 1259 if breakout: 1260 break 1261 except CancelledError: 1262 pass 1263 1264 def _handle_key_in_memory(self, key=None, type=None, workers=None): 1265 state = self.futures.get(key) 1266 if state is not None: 1267 if type and not state.type: # Type exists and not yet set 1268 try: 1269 type = loads(type) 1270 except Exception: 1271 type = None 1272 # Here, `type` may be a str if actual type failed 1273 # serializing in Worker 1274 else: 1275 type = None 1276 state.finish(type) 1277 1278 def _handle_lost_data(self, key=None): 1279 state = self.futures.get(key) 1280 if state is not None: 1281 state.lose() 1282 1283 def _handle_cancelled_key(self, key=None): 1284 state = self.futures.get(key) 1285 if state is not None: 1286 state.cancel() 1287 1288 def _handle_retried_key(self, key=None): 1289 state = self.futures.get(key) 1290 if state is not None: 1291 state.retry() 1292 1293 def _handle_task_erred(self, key=None, exception=None, traceback=None): 1294 state = self.futures.get(key) 1295 if state is not None: 1296 state.set_error(exception, traceback) 1297 1298 def _handle_restart(self): 1299 logger.info("Receive restart signal from scheduler") 1300 for state in self.futures.values(): 1301 state.cancel() 1302 self.futures.clear() 1303 with suppress(AttributeError): 1304 self._restart_event.set() 1305 1306 def _handle_error(self, exception=None): 1307 logger.warning("Scheduler exception:") 1308 logger.exception(exception) 1309 1310 async def _close(self, fast=False): 1311 """Send close signal and wait until scheduler completes""" 1312 if self.status == "closed": 1313 return 1314 1315 self.status = "closing" 1316 1317 with suppress(AttributeError): 1318 for pc in self._periodic_callbacks.values(): 1319 pc.stop() 1320 1321 with log_errors(): 1322 _del_global_client(self) 1323 self._scheduler_identity = {} 1324 with suppress(AttributeError): 1325 # clear the dask.config set keys 1326 with self._set_config: 1327 pass 1328 if self.get == dask.config.get("get", None): 1329 del dask.config.config["get"] 1330 1331 if ( 1332 self.scheduler_comm 1333 and self.scheduler_comm.comm 1334 and not self.scheduler_comm.comm.closed() 1335 ): 1336 self._send_to_scheduler({"op": "close-client"}) 1337 self._send_to_scheduler({"op": "close-stream"}) 1338 1339 # Give the scheduler 'stream-closed' message 100ms to come through 1340 # This makes the shutdown slightly smoother and quieter 1341 with suppress(AttributeError, asyncio.CancelledError, TimeoutError): 1342 await asyncio.wait_for( 1343 asyncio.shield(self._handle_scheduler_coroutine), 0.1 1344 ) 1345 1346 if ( 1347 self.scheduler_comm 1348 and self.scheduler_comm.comm 1349 and not self.scheduler_comm.comm.closed() 1350 ): 1351 await self.scheduler_comm.close() 1352 1353 for key in list(self.futures): 1354 self._release_key(key=key) 1355 1356 if self._start_arg is None: 1357 with suppress(AttributeError): 1358 await self.cluster.close() 1359 1360 await self.rpc.close() 1361 1362 self.status = "closed" 1363 1364 if _get_global_client() is self: 1365 _set_global_client(None) 1366 1367 coroutines = set(self.coroutines) 1368 for f in self.coroutines: 1369 # cancel() works on asyncio futures (Tornado 5) 1370 # but is a no-op on Tornado futures 1371 with suppress(RuntimeError): 1372 f.cancel() 1373 if f.cancelled(): 1374 coroutines.remove(f) 1375 del self.coroutines[:] 1376 1377 if not fast: 1378 with suppress(TimeoutError, asyncio.CancelledError): 1379 await asyncio.wait_for(asyncio.gather(*coroutines), 2) 1380 1381 with suppress(AttributeError): 1382 await self.scheduler.close_rpc() 1383 1384 self.scheduler = None 1385 1386 self.status = "closed" 1387 1388 def close(self, timeout=no_default): 1389 """Close this client 1390 1391 Clients will also close automatically when your Python session ends 1392 1393 If you started a client without arguments like ``Client()`` then this 1394 will also close the local cluster that was started at the same time. 1395 1396 See Also 1397 -------- 1398 Client.restart 1399 """ 1400 if timeout == no_default: 1401 timeout = self._timeout * 2 1402 # XXX handling of self.status here is not thread-safe 1403 if self.status in ["closed", "newly-created"]: 1404 if self.asynchronous: 1405 future = asyncio.Future() 1406 future.set_result(None) 1407 return future 1408 return 1409 self.status = "closing" 1410 1411 with suppress(AttributeError): 1412 for pc in self._periodic_callbacks.values(): 1413 pc.stop() 1414 1415 if self.asynchronous: 1416 coro = self._close() 1417 if timeout: 1418 coro = asyncio.wait_for(coro, timeout) 1419 return coro 1420 1421 if self._start_arg is None: 1422 with suppress(AttributeError): 1423 f = self.cluster.close() 1424 if asyncio.iscoroutine(f): 1425 1426 async def _(): 1427 await f 1428 1429 self.sync(_) 1430 1431 sync(self.loop, self._close, fast=True, callback_timeout=timeout) 1432 1433 assert self.status == "closed" 1434 1435 if not sys.is_finalizing(): 1436 self._loop_runner.stop() 1437 1438 async def _shutdown(self): 1439 logger.info("Shutting down scheduler from Client") 1440 if self.cluster: 1441 await self.cluster.close() 1442 else: 1443 with suppress(CommClosedError): 1444 self.status = "closing" 1445 await self.scheduler.terminate(close_workers=True) 1446 1447 def shutdown(self): 1448 """Shut down the connected scheduler and workers 1449 1450 Note, this may disrupt other clients that may be using the same 1451 scheduler and workers. 1452 1453 See Also 1454 -------- 1455 Client.close : close only this client 1456 """ 1457 return self.sync(self._shutdown) 1458 1459 def get_executor(self, **kwargs): 1460 """ 1461 Return a concurrent.futures Executor for submitting tasks on this Client 1462 1463 Parameters 1464 ---------- 1465 **kwargs 1466 Any submit()- or map()- compatible arguments, such as 1467 `workers` or `resources`. 1468 1469 Returns 1470 ------- 1471 An Executor object that's fully compatible with the concurrent.futures 1472 API. 1473 """ 1474 return ClientExecutor(self, **kwargs) 1475 1476 def submit( 1477 self, 1478 func, 1479 *args, 1480 key=None, 1481 workers=None, 1482 resources=None, 1483 retries=None, 1484 priority=0, 1485 fifo_timeout="100 ms", 1486 allow_other_workers=False, 1487 actor=False, 1488 actors=False, 1489 pure=None, 1490 **kwargs, 1491 ): 1492 1493 """Submit a function application to the scheduler 1494 1495 Parameters 1496 ---------- 1497 func : callable 1498 Callable to be scheduled as ``func(*args **kwargs)``. If ``func`` returns a 1499 coroutine, it will be run on the main event loop of a worker. Otherwise 1500 ``func`` will be run in a worker's task executor pool (see 1501 ``Worker.executors`` for more information.) 1502 *args 1503 **kwargs 1504 pure : bool (defaults to True) 1505 Whether or not the function is pure. Set ``pure=False`` for 1506 impure functions like ``np.random.random``. 1507 See :ref:`pure functions` for more details. 1508 workers : string or iterable of strings 1509 A set of worker addresses or hostnames on which computations may be 1510 performed. Leave empty to default to all workers (common case) 1511 key : str 1512 Unique identifier for the task. Defaults to function-name and hash 1513 allow_other_workers : bool (defaults to False) 1514 Used with ``workers``. Indicates whether or not the computations 1515 may be performed on workers that are not in the `workers` set(s). 1516 retries : int (default to 0) 1517 Number of allowed automatic retries if the task fails 1518 priority : Number 1519 Optional prioritization of task. Zero is default. 1520 Higher priorities take precedence 1521 fifo_timeout : str timedelta (default '100ms') 1522 Allowed amount of time between calls to consider the same priority 1523 resources : dict (defaults to {}) 1524 Defines the ``resources`` each instance of this mapped task requires 1525 on the worker; e.g. ``{'GPU': 2}``. 1526 See :doc:`worker resources <resources>` for details on defining 1527 resources. 1528 actor : bool (default False) 1529 Whether this task should exist on the worker as a stateful actor. 1530 See :doc:`actors` for additional details. 1531 actors : bool (default False) 1532 Alias for `actor` 1533 1534 Examples 1535 -------- 1536 >>> c = client.submit(add, a, b) # doctest: +SKIP 1537 1538 Returns 1539 ------- 1540 Future 1541 1542 See Also 1543 -------- 1544 Client.map : Submit on many arguments at once 1545 """ 1546 if not callable(func): 1547 raise TypeError("First input to submit must be a callable function") 1548 1549 actor = actor or actors 1550 if pure is None: 1551 pure = not actor 1552 1553 if allow_other_workers not in (True, False, None): 1554 raise TypeError("allow_other_workers= must be True or False") 1555 1556 if key is None: 1557 if pure: 1558 key = funcname(func) + "-" + tokenize(func, kwargs, *args) 1559 else: 1560 key = funcname(func) + "-" + str(uuid.uuid4()) 1561 1562 skey = stringify(key) 1563 1564 with self._refcount_lock: 1565 if skey in self.futures: 1566 return Future(key, self, inform=False) 1567 1568 if allow_other_workers and workers is None: 1569 raise ValueError("Only use allow_other_workers= if using workers=") 1570 1571 if isinstance(workers, (str, Number)): 1572 workers = [workers] 1573 1574 if kwargs: 1575 dsk = {skey: (apply, func, list(args), kwargs)} 1576 else: 1577 dsk = {skey: (func,) + tuple(args)} 1578 1579 futures = self._graph_to_futures( 1580 dsk, 1581 [skey], 1582 workers=workers, 1583 allow_other_workers=allow_other_workers, 1584 priority={skey: 0}, 1585 user_priority=priority, 1586 resources=resources, 1587 retries=retries, 1588 fifo_timeout=fifo_timeout, 1589 actors=actor, 1590 ) 1591 1592 logger.debug("Submit %s(...), %s", funcname(func), key) 1593 1594 return futures[skey] 1595 1596 def map( 1597 self, 1598 func, 1599 *iterables, 1600 key=None, 1601 workers=None, 1602 retries=None, 1603 resources=None, 1604 priority=0, 1605 allow_other_workers=False, 1606 fifo_timeout="100 ms", 1607 actor=False, 1608 actors=False, 1609 pure=None, 1610 batch_size=None, 1611 **kwargs, 1612 ): 1613 """Map a function on a sequence of arguments 1614 1615 Arguments can be normal objects or Futures 1616 1617 Parameters 1618 ---------- 1619 func : callable 1620 Callable to be scheduled for execution. If ``func`` returns a coroutine, it 1621 will be run on the main event loop of a worker. Otherwise ``func`` will be 1622 run in a worker's task executor pool (see ``Worker.executors`` for more 1623 information.) 1624 iterables : Iterables 1625 List-like objects to map over. They should have the same length. 1626 key : str, list 1627 Prefix for task names if string. Explicit names if list. 1628 pure : bool (defaults to True) 1629 Whether or not the function is pure. Set ``pure=False`` for 1630 impure functions like ``np.random.random``. 1631 See :ref:`pure functions` for more details. 1632 workers : string or iterable of strings 1633 A set of worker hostnames on which computations may be performed. 1634 Leave empty to default to all workers (common case) 1635 allow_other_workers : bool (defaults to False) 1636 Used with `workers`. Indicates whether or not the computations 1637 may be performed on workers that are not in the `workers` set(s). 1638 retries : int (default to 0) 1639 Number of allowed automatic retries if a task fails 1640 priority : Number 1641 Optional prioritization of task. Zero is default. 1642 Higher priorities take precedence 1643 fifo_timeout : str timedelta (default '100ms') 1644 Allowed amount of time between calls to consider the same priority 1645 resources : dict (defaults to {}) 1646 Defines the `resources` each instance of this mapped task requires 1647 on the worker; e.g. ``{'GPU': 2}``. 1648 See :doc:`worker resources <resources>` for details on defining 1649 resources. 1650 actor : bool (default False) 1651 Whether these tasks should exist on the worker as stateful actors. 1652 See :doc:`actors` for additional details. 1653 actors : bool (default False) 1654 Alias for `actor` 1655 batch_size : int, optional 1656 Submit tasks to the scheduler in batches of (at most) ``batch_size``. 1657 Larger batch sizes can be useful for very large ``iterables``, 1658 as the cluster can start processing tasks while later ones are 1659 submitted asynchronously. 1660 **kwargs : dict 1661 Extra keywords to send to the function. 1662 Large values will be included explicitly in the task graph. 1663 1664 Examples 1665 -------- 1666 >>> L = client.map(func, sequence) # doctest: +SKIP 1667 1668 Returns 1669 ------- 1670 List, iterator, or Queue of futures, depending on the type of the 1671 inputs. 1672 1673 See Also 1674 -------- 1675 Client.submit : Submit a single function 1676 """ 1677 if not callable(func): 1678 raise TypeError("First input to map must be a callable function") 1679 1680 if all(isinstance(it, pyQueue) for it in iterables) or all( 1681 isinstance(i, Iterator) for i in iterables 1682 ): 1683 raise TypeError( 1684 "Dask no longer supports mapping over Iterators or Queues." 1685 "Consider using a normal for loop and Client.submit" 1686 ) 1687 total_length = sum(len(x) for x in iterables) 1688 1689 if batch_size and batch_size > 1 and total_length > batch_size: 1690 batches = list( 1691 zip(*(partition_all(batch_size, iterable) for iterable in iterables)) 1692 ) 1693 if isinstance(key, list): 1694 keys = [list(element) for element in partition_all(batch_size, key)] 1695 else: 1696 keys = [key for _ in range(len(batches))] 1697 return sum( 1698 ( 1699 self.map( 1700 func, 1701 *batch, 1702 key=key, 1703 workers=workers, 1704 retries=retries, 1705 priority=priority, 1706 allow_other_workers=allow_other_workers, 1707 fifo_timeout=fifo_timeout, 1708 resources=resources, 1709 actor=actor, 1710 actors=actors, 1711 pure=pure, 1712 **kwargs, 1713 ) 1714 for key, batch in zip(keys, batches) 1715 ), 1716 [], 1717 ) 1718 1719 key = key or funcname(func) 1720 actor = actor or actors 1721 if pure is None: 1722 pure = not actor 1723 1724 if allow_other_workers and workers is None: 1725 raise ValueError("Only use allow_other_workers= if using workers=") 1726 1727 iterables = list(zip(*zip(*iterables))) 1728 if isinstance(key, list): 1729 keys = key 1730 else: 1731 if pure: 1732 keys = [ 1733 key + "-" + tokenize(func, kwargs, *args) 1734 for args in zip(*iterables) 1735 ] 1736 else: 1737 uid = str(uuid.uuid4()) 1738 keys = ( 1739 [ 1740 key + "-" + uid + "-" + str(i) 1741 for i in range(min(map(len, iterables))) 1742 ] 1743 if iterables 1744 else [] 1745 ) 1746 1747 if not kwargs: 1748 dsk = {key: (func,) + args for key, args in zip(keys, zip(*iterables))} 1749 else: 1750 kwargs2 = {} 1751 dsk = {} 1752 for k, v in kwargs.items(): 1753 if sizeof(v) > 1e5: 1754 vv = dask.delayed(v) 1755 kwargs2[k] = vv._key 1756 dsk.update(vv.dask) 1757 else: 1758 kwargs2[k] = v 1759 dsk.update( 1760 { 1761 key: (apply, func, (tuple, list(args)), kwargs2) 1762 for key, args in zip(keys, zip(*iterables)) 1763 } 1764 ) 1765 1766 if isinstance(workers, (str, Number)): 1767 workers = [workers] 1768 if workers is not None and not isinstance(workers, (list, set)): 1769 raise TypeError("Workers must be a list or set of workers or None") 1770 1771 internal_priority = dict(zip(keys, range(len(keys)))) 1772 1773 futures = self._graph_to_futures( 1774 dsk, 1775 keys, 1776 workers=workers, 1777 allow_other_workers=allow_other_workers, 1778 priority=internal_priority, 1779 resources=resources, 1780 retries=retries, 1781 user_priority=priority, 1782 fifo_timeout=fifo_timeout, 1783 actors=actor, 1784 ) 1785 logger.debug("map(%s, ...)", funcname(func)) 1786 1787 return [futures[stringify(k)] for k in keys] 1788 1789 async def _gather(self, futures, errors="raise", direct=None, local_worker=None): 1790 unpacked, future_set = unpack_remotedata(futures, byte_keys=True) 1791 mismatched_futures = [f for f in future_set if f.client is not self] 1792 if mismatched_futures: 1793 raise ValueError( 1794 "Cannot gather Futures created by another client. " 1795 f"These are the {len(mismatched_futures)} (out of {len(futures)}) mismatched Futures and their client IDs " 1796 f"(this client is {self.id}): " 1797 f"{ {f: f.client.id for f in mismatched_futures} }" 1798 ) 1799 keys = [stringify(future.key) for future in future_set] 1800 bad_data = dict() 1801 data = {} 1802 1803 if direct is None: 1804 direct = self.direct_to_workers 1805 if direct is None: 1806 try: 1807 w = get_worker() 1808 except Exception: 1809 direct = False 1810 else: 1811 if w.scheduler.address == self.scheduler.address: 1812 direct = True 1813 1814 async def wait(k): 1815 """Want to stop the All(...) early if we find an error""" 1816 st = self.futures[k] 1817 await st.wait() 1818 if st.status != "finished" and errors == "raise": 1819 raise AllExit() 1820 1821 while True: 1822 logger.debug("Waiting on futures to clear before gather") 1823 1824 with suppress(AllExit): 1825 await All( 1826 [wait(key) for key in keys if key in self.futures], 1827 quiet_exceptions=AllExit, 1828 ) 1829 1830 failed = ("error", "cancelled") 1831 1832 exceptions = set() 1833 bad_keys = set() 1834 for key in keys: 1835 if key not in self.futures or self.futures[key].status in failed: 1836 exceptions.add(key) 1837 if errors == "raise": 1838 try: 1839 st = self.futures[key] 1840 exception = st.exception 1841 traceback = st.traceback 1842 except (KeyError, AttributeError): 1843 exc = CancelledError(key) 1844 else: 1845 raise exception.with_traceback(traceback) 1846 raise exc 1847 if errors == "skip": 1848 bad_keys.add(key) 1849 bad_data[key] = None 1850 else: 1851 raise ValueError("Bad value, `errors=%s`" % errors) 1852 1853 keys = [k for k in keys if k not in bad_keys and k not in data] 1854 1855 if local_worker: # look inside local worker 1856 data.update( 1857 {k: local_worker.data[k] for k in keys if k in local_worker.data} 1858 ) 1859 keys = [k for k in keys if k not in data] 1860 1861 # We now do an actual remote communication with workers or scheduler 1862 if self._gather_future: # attach onto another pending gather request 1863 self._gather_keys |= set(keys) 1864 response = await self._gather_future 1865 else: # no one waiting, go ahead 1866 self._gather_keys = set(keys) 1867 future = asyncio.ensure_future( 1868 self._gather_remote(direct, local_worker) 1869 ) 1870 if self._gather_keys is None: 1871 self._gather_future = None 1872 else: 1873 self._gather_future = future 1874 response = await future 1875 1876 if response["status"] == "error": 1877 log = logger.warning if errors == "raise" else logger.debug 1878 log( 1879 "Couldn't gather %s keys, rescheduling %s", 1880 len(response["keys"]), 1881 response["keys"], 1882 ) 1883 for key in response["keys"]: 1884 self._send_to_scheduler({"op": "report-key", "key": key}) 1885 for key in response["keys"]: 1886 try: 1887 self.futures[key].reset() 1888 except KeyError: # TODO: verify that this is safe 1889 pass 1890 else: 1891 break 1892 1893 if bad_data and errors == "skip" and isinstance(unpacked, list): 1894 unpacked = [f for f in unpacked if f not in bad_data] 1895 1896 data.update(response["data"]) 1897 result = pack_data(unpacked, merge(data, bad_data)) 1898 return result 1899 1900 async def _gather_remote(self, direct, local_worker): 1901 """Perform gather with workers or scheduler 1902 1903 This method exists to limit and batch many concurrent gathers into a 1904 few. In controls access using a Tornado semaphore, and picks up keys 1905 from other requests made recently. 1906 """ 1907 async with self._gather_semaphore: 1908 keys = list(self._gather_keys) 1909 self._gather_keys = None # clear state, these keys are being sent off 1910 self._gather_future = None 1911 1912 if direct or local_worker: # gather directly from workers 1913 who_has = await retry_operation(self.scheduler.who_has, keys=keys) 1914 data2, missing_keys, missing_workers = await gather_from_workers( 1915 who_has, rpc=self.rpc, close=False 1916 ) 1917 response = {"status": "OK", "data": data2} 1918 if missing_keys: 1919 keys2 = [key for key in keys if key not in data2] 1920 response = await retry_operation(self.scheduler.gather, keys=keys2) 1921 if response["status"] == "OK": 1922 response["data"].update(data2) 1923 1924 else: # ask scheduler to gather data for us 1925 response = await retry_operation(self.scheduler.gather, keys=keys) 1926 1927 return response 1928 1929 def gather(self, futures, errors="raise", direct=None, asynchronous=None): 1930 """Gather futures from distributed memory 1931 1932 Accepts a future, nested container of futures, iterator, or queue. 1933 The return type will match the input type. 1934 1935 Parameters 1936 ---------- 1937 futures : Collection of futures 1938 This can be a possibly nested collection of Future objects. 1939 Collections can be lists, sets, or dictionaries 1940 errors : string 1941 Either 'raise' or 'skip' if we should raise if a future has erred 1942 or skip its inclusion in the output collection 1943 direct : boolean 1944 Whether or not to connect directly to the workers, or to ask 1945 the scheduler to serve as intermediary. This can also be set when 1946 creating the Client. 1947 1948 Returns 1949 ------- 1950 results: a collection of the same type as the input, but now with 1951 gathered results rather than futures 1952 1953 Examples 1954 -------- 1955 >>> from operator import add # doctest: +SKIP 1956 >>> c = Client('127.0.0.1:8787') # doctest: +SKIP 1957 >>> x = c.submit(add, 1, 2) # doctest: +SKIP 1958 >>> c.gather(x) # doctest: +SKIP 1959 3 1960 >>> c.gather([x, [x], x]) # support lists and dicts # doctest: +SKIP 1961 [3, [3], 3] 1962 1963 See Also 1964 -------- 1965 Client.scatter : Send data out to cluster 1966 """ 1967 if isinstance(futures, pyQueue): 1968 raise TypeError( 1969 "Dask no longer supports gathering over Iterators and Queues. " 1970 "Consider using a normal for loop and Client.submit/gather" 1971 ) 1972 1973 elif isinstance(futures, Iterator): 1974 return (self.gather(f, errors=errors, direct=direct) for f in futures) 1975 else: 1976 if hasattr(thread_state, "execution_state"): # within worker task 1977 local_worker = thread_state.execution_state["worker"] 1978 else: 1979 local_worker = None 1980 return self.sync( 1981 self._gather, 1982 futures, 1983 errors=errors, 1984 direct=direct, 1985 local_worker=local_worker, 1986 asynchronous=asynchronous, 1987 ) 1988 1989 async def _scatter( 1990 self, 1991 data, 1992 workers=None, 1993 broadcast=False, 1994 direct=None, 1995 local_worker=None, 1996 timeout=no_default, 1997 hash=True, 1998 ): 1999 if timeout == no_default: 2000 timeout = self._timeout 2001 if isinstance(workers, (str, Number)): 2002 workers = [workers] 2003 if isinstance(data, dict) and not all( 2004 isinstance(k, (bytes, str)) for k in data 2005 ): 2006 d = await self._scatter(keymap(stringify, data), workers, broadcast) 2007 return {k: d[stringify(k)] for k in data} 2008 2009 if isinstance(data, type(range(0))): 2010 data = list(data) 2011 input_type = type(data) 2012 names = False 2013 unpack = False 2014 if isinstance(data, Iterator): 2015 data = list(data) 2016 if isinstance(data, (set, frozenset)): 2017 data = list(data) 2018 if not isinstance(data, (dict, list, tuple, set, frozenset)): 2019 unpack = True 2020 data = [data] 2021 if isinstance(data, (list, tuple)): 2022 if hash: 2023 names = [type(x).__name__ + "-" + tokenize(x) for x in data] 2024 else: 2025 names = [type(x).__name__ + "-" + uuid.uuid4().hex for x in data] 2026 data = dict(zip(names, data)) 2027 2028 assert isinstance(data, dict) 2029 2030 types = valmap(type, data) 2031 2032 if direct is None: 2033 direct = self.direct_to_workers 2034 if direct is None: 2035 try: 2036 w = get_worker() 2037 except Exception: 2038 direct = False 2039 else: 2040 if w.scheduler.address == self.scheduler.address: 2041 direct = True 2042 2043 if local_worker: # running within task 2044 local_worker.update_data(data=data, report=False) 2045 2046 await self.scheduler.update_data( 2047 who_has={key: [local_worker.address] for key in data}, 2048 nbytes=valmap(sizeof, data), 2049 client=self.id, 2050 ) 2051 2052 else: 2053 data2 = valmap(to_serialize, data) 2054 if direct: 2055 nthreads = None 2056 start = time() 2057 while not nthreads: 2058 if nthreads is not None: 2059 await asyncio.sleep(0.1) 2060 if time() > start + timeout: 2061 raise TimeoutError("No valid workers found") 2062 # Exclude paused and closing_gracefully workers 2063 nthreads = await self.scheduler.ncores_running(workers=workers) 2064 if not nthreads: 2065 raise ValueError("No valid workers found") 2066 2067 _, who_has, nbytes = await scatter_to_workers( 2068 nthreads, data2, report=False, rpc=self.rpc 2069 ) 2070 2071 await self.scheduler.update_data( 2072 who_has=who_has, nbytes=nbytes, client=self.id 2073 ) 2074 else: 2075 await self.scheduler.scatter( 2076 data=data2, 2077 workers=workers, 2078 client=self.id, 2079 broadcast=broadcast, 2080 timeout=timeout, 2081 ) 2082 2083 out = {k: Future(k, self, inform=False) for k in data} 2084 for key, typ in types.items(): 2085 self.futures[key].finish(type=typ) 2086 2087 if direct and broadcast: 2088 n = None if broadcast is True else broadcast 2089 await self._replicate(list(out.values()), workers=workers, n=n) 2090 2091 if issubclass(input_type, (list, tuple, set, frozenset)): 2092 out = input_type(out[k] for k in names) 2093 2094 if unpack: 2095 assert len(out) == 1 2096 out = list(out.values())[0] 2097 return out 2098 2099 def scatter( 2100 self, 2101 data, 2102 workers=None, 2103 broadcast=False, 2104 direct=None, 2105 hash=True, 2106 timeout=no_default, 2107 asynchronous=None, 2108 ): 2109 """Scatter data into distributed memory 2110 2111 This moves data from the local client process into the workers of the 2112 distributed scheduler. Note that it is often better to submit jobs to 2113 your workers to have them load the data rather than loading data 2114 locally and then scattering it out to them. 2115 2116 Parameters 2117 ---------- 2118 data : list, dict, or object 2119 Data to scatter out to workers. Output type matches input type. 2120 workers : list of tuples (optional) 2121 Optionally constrain locations of data. 2122 Specify workers as hostname/port pairs, e.g. ``('127.0.0.1', 8787)``. 2123 broadcast : bool (defaults to False) 2124 Whether to send each data element to all workers. 2125 By default we round-robin based on number of cores. 2126 direct : bool (defaults to automatically check) 2127 Whether or not to connect directly to the workers, or to ask 2128 the scheduler to serve as intermediary. This can also be set when 2129 creating the Client. 2130 hash : bool (optional) 2131 Whether or not to hash data to determine key. 2132 If False then this uses a random key 2133 2134 Returns 2135 ------- 2136 List, dict, iterator, or queue of futures matching the type of input. 2137 2138 Examples 2139 -------- 2140 >>> c = Client('127.0.0.1:8787') # doctest: +SKIP 2141 >>> c.scatter(1) # doctest: +SKIP 2142 <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195> 2143 2144 >>> c.scatter([1, 2, 3]) # doctest: +SKIP 2145 [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, 2146 <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, 2147 <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>] 2148 2149 >>> c.scatter({'x': 1, 'y': 2, 'z': 3}) # doctest: +SKIP 2150 {'x': <Future: status: finished, key: x>, 2151 'y': <Future: status: finished, key: y>, 2152 'z': <Future: status: finished, key: z>} 2153 2154 Constrain location of data to subset of workers 2155 2156 >>> c.scatter([1, 2, 3], workers=[('hostname', 8788)]) # doctest: +SKIP 2157 2158 Broadcast data to all workers 2159 2160 >>> [future] = c.scatter([element], broadcast=True) # doctest: +SKIP 2161 2162 Send scattered data to parallelized function using client futures 2163 interface 2164 2165 >>> data = c.scatter(data, broadcast=True) # doctest: +SKIP 2166 >>> res = [c.submit(func, data, i) for i in range(100)] 2167 2168 See Also 2169 -------- 2170 Client.gather : Gather data back to local process 2171 """ 2172 if timeout == no_default: 2173 timeout = self._timeout 2174 if isinstance(data, pyQueue) or isinstance(data, Iterator): 2175 raise TypeError( 2176 "Dask no longer supports mapping over Iterators or Queues." 2177 "Consider using a normal for loop and Client.submit" 2178 ) 2179 2180 if hasattr(thread_state, "execution_state"): # within worker task 2181 local_worker = thread_state.execution_state["worker"] 2182 else: 2183 local_worker = None 2184 return self.sync( 2185 self._scatter, 2186 data, 2187 workers=workers, 2188 broadcast=broadcast, 2189 direct=direct, 2190 local_worker=local_worker, 2191 timeout=timeout, 2192 asynchronous=asynchronous, 2193 hash=hash, 2194 ) 2195 2196 async def _cancel(self, futures, force=False): 2197 keys = list({stringify(f.key) for f in futures_of(futures)}) 2198 await self.scheduler.cancel(keys=keys, client=self.id, force=force) 2199 for k in keys: 2200 st = self.futures.pop(k, None) 2201 if st is not None: 2202 st.cancel() 2203 2204 def cancel(self, futures, asynchronous=None, force=False): 2205 """ 2206 Cancel running futures 2207 2208 This stops future tasks from being scheduled if they have not yet run 2209 and deletes them if they have already run. After calling, this result 2210 and all dependent results will no longer be accessible 2211 2212 Parameters 2213 ---------- 2214 futures : list of Futures 2215 force : boolean (False) 2216 Cancel this future even if other clients desire it 2217 """ 2218 return self.sync(self._cancel, futures, asynchronous=asynchronous, force=force) 2219 2220 async def _retry(self, futures): 2221 keys = list({stringify(f.key) for f in futures_of(futures)}) 2222 response = await self.scheduler.retry(keys=keys, client=self.id) 2223 for key in response: 2224 st = self.futures[key] 2225 st.retry() 2226 2227 def retry(self, futures, asynchronous=None): 2228 """ 2229 Retry failed futures 2230 2231 Parameters 2232 ---------- 2233 futures : list of Futures 2234 """ 2235 return self.sync(self._retry, futures, asynchronous=asynchronous) 2236 2237 async def _publish_dataset(self, *args, name=None, override=False, **kwargs): 2238 with log_errors(): 2239 coroutines = [] 2240 2241 def add_coro(name, data): 2242 keys = [stringify(f.key) for f in futures_of(data)] 2243 coroutines.append( 2244 self.scheduler.publish_put( 2245 keys=keys, 2246 name=name, 2247 data=to_serialize(data), 2248 override=override, 2249 client=self.id, 2250 ) 2251 ) 2252 2253 if name: 2254 if len(args) == 0: 2255 raise ValueError( 2256 "If name is provided, expecting call signature like" 2257 " publish_dataset(df, name='ds')" 2258 ) 2259 # in case this is a singleton, collapse it 2260 elif len(args) == 1: 2261 args = args[0] 2262 add_coro(name, args) 2263 2264 for name, data in kwargs.items(): 2265 add_coro(name, data) 2266 2267 await asyncio.gather(*coroutines) 2268 2269 def publish_dataset(self, *args, **kwargs): 2270 """ 2271 Publish named datasets to scheduler 2272 2273 This stores a named reference to a dask collection or list of futures 2274 on the scheduler. These references are available to other Clients 2275 which can download the collection or futures with ``get_dataset``. 2276 2277 Datasets are not immediately computed. You may wish to call 2278 ``Client.persist`` prior to publishing a dataset. 2279 2280 Parameters 2281 ---------- 2282 args : list of objects to publish as name 2283 name : optional name of the dataset to publish 2284 override : bool (optional, default False) 2285 if true, override any already present dataset with the same name 2286 kwargs : dict 2287 named collections to publish on the scheduler 2288 2289 Examples 2290 -------- 2291 Publishing client: 2292 2293 >>> df = dd.read_csv('s3://...') # doctest: +SKIP 2294 >>> df = c.persist(df) # doctest: +SKIP 2295 >>> c.publish_dataset(my_dataset=df) # doctest: +SKIP 2296 2297 Alternative invocation 2298 >>> c.publish_dataset(df, name='my_dataset') 2299 2300 Receiving client: 2301 2302 >>> c.list_datasets() # doctest: +SKIP 2303 ['my_dataset'] 2304 >>> df2 = c.get_dataset('my_dataset') # doctest: +SKIP 2305 2306 Returns 2307 ------- 2308 None 2309 2310 See Also 2311 -------- 2312 Client.list_datasets 2313 Client.get_dataset 2314 Client.unpublish_dataset 2315 Client.persist 2316 """ 2317 return self.sync(self._publish_dataset, *args, **kwargs) 2318 2319 def unpublish_dataset(self, name, **kwargs): 2320 """ 2321 Remove named datasets from scheduler 2322 2323 Examples 2324 -------- 2325 >>> c.list_datasets() # doctest: +SKIP 2326 ['my_dataset'] 2327 >>> c.unpublish_datasets('my_dataset') # doctest: +SKIP 2328 >>> c.list_datasets() # doctest: +SKIP 2329 [] 2330 2331 See Also 2332 -------- 2333 Client.publish_dataset 2334 """ 2335 return self.sync(self.scheduler.publish_delete, name=name, **kwargs) 2336 2337 def list_datasets(self, **kwargs): 2338 """ 2339 List named datasets available on the scheduler 2340 2341 See Also 2342 -------- 2343 Client.publish_dataset 2344 Client.get_dataset 2345 """ 2346 return self.sync(self.scheduler.publish_list, **kwargs) 2347 2348 async def _get_dataset(self, name, default=NO_DEFAULT_PLACEHOLDER): 2349 with self.as_current(): 2350 out = await self.scheduler.publish_get(name=name, client=self.id) 2351 2352 if out is None: 2353 if default is NO_DEFAULT_PLACEHOLDER: 2354 raise KeyError(f"Dataset '{name}' not found") 2355 else: 2356 return default 2357 return out["data"] 2358 2359 def get_dataset(self, name, default=NO_DEFAULT_PLACEHOLDER, **kwargs): 2360 """ 2361 Get named dataset from the scheduler if present. 2362 Return the default or raise a KeyError if not present. 2363 2364 Parameters 2365 ---------- 2366 name : name of the dataset to retrieve 2367 default : optional, not set by default 2368 If set, do not raise a KeyError if the name is not present but return this default 2369 kwargs : dict 2370 additional arguments to _get_dataset 2371 2372 See Also 2373 -------- 2374 Client.publish_dataset 2375 Client.list_datasets 2376 """ 2377 return self.sync(self._get_dataset, name, default=default, **kwargs) 2378 2379 async def _run_on_scheduler(self, function, *args, wait=True, **kwargs): 2380 response = await self.scheduler.run_function( 2381 function=dumps(function, protocol=4), 2382 args=dumps(args, protocol=4), 2383 kwargs=dumps(kwargs, protocol=4), 2384 wait=wait, 2385 ) 2386 if response["status"] == "error": 2387 typ, exc, tb = clean_exception(**response) 2388 raise exc.with_traceback(tb) 2389 else: 2390 return response["result"] 2391 2392 def run_on_scheduler(self, function, *args, **kwargs): 2393 """Run a function on the scheduler process 2394 2395 This is typically used for live debugging. The function should take a 2396 keyword argument ``dask_scheduler=``, which will be given the scheduler 2397 object itself. 2398 2399 Examples 2400 -------- 2401 >>> def get_number_of_tasks(dask_scheduler=None): 2402 ... return len(dask_scheduler.tasks) 2403 2404 >>> client.run_on_scheduler(get_number_of_tasks) # doctest: +SKIP 2405 100 2406 2407 Run asynchronous functions in the background: 2408 2409 >>> async def print_state(dask_scheduler): # doctest: +SKIP 2410 ... while True: 2411 ... print(dask_scheduler.status) 2412 ... await asyncio.sleep(1) 2413 2414 >>> c.run(print_state, wait=False) # doctest: +SKIP 2415 2416 See Also 2417 -------- 2418 Client.run : Run a function on all workers 2419 Client.start_ipython_scheduler : Start an IPython session on scheduler 2420 """ 2421 return self.sync(self._run_on_scheduler, function, *args, **kwargs) 2422 2423 async def _run( 2424 self, function, *args, nanny=False, workers=None, wait=True, **kwargs 2425 ): 2426 responses = await self.scheduler.broadcast( 2427 msg=dict( 2428 op="run", 2429 function=dumps(function, protocol=4), 2430 args=dumps(args, protocol=4), 2431 wait=wait, 2432 kwargs=dumps(kwargs, protocol=4), 2433 ), 2434 workers=workers, 2435 nanny=nanny, 2436 ) 2437 results = {} 2438 for key, resp in responses.items(): 2439 if resp["status"] == "OK": 2440 results[key] = resp["result"] 2441 elif resp["status"] == "error": 2442 typ, exc, tb = clean_exception(**resp) 2443 raise exc.with_traceback(tb) 2444 if wait: 2445 return results 2446 2447 def run(self, function, *args, **kwargs): 2448 """ 2449 Run a function on all workers outside of task scheduling system 2450 2451 This calls a function on all currently known workers immediately, 2452 blocks until those results come back, and returns the results 2453 asynchronously as a dictionary keyed by worker address. This method 2454 if generally used for side effects, such and collecting diagnostic 2455 information or installing libraries. 2456 2457 If your function takes an input argument named ``dask_worker`` then 2458 that variable will be populated with the worker itself. 2459 2460 Parameters 2461 ---------- 2462 function : callable 2463 *args : arguments for remote function 2464 **kwargs : keyword arguments for remote function 2465 workers : list 2466 Workers on which to run the function. Defaults to all known workers. 2467 wait : boolean (optional) 2468 If the function is asynchronous whether or not to wait until that 2469 function finishes. 2470 nanny : bool, defualt False 2471 Whether to run ``function`` on the nanny. By default, the function 2472 is run on the worker process. If specified, the addresses in 2473 ``workers`` should still be the worker addresses, not the nanny addresses. 2474 2475 Examples 2476 -------- 2477 >>> c.run(os.getpid) # doctest: +SKIP 2478 {'192.168.0.100:9000': 1234, 2479 '192.168.0.101:9000': 4321, 2480 '192.168.0.102:9000': 5555} 2481 2482 Restrict computation to particular workers with the ``workers=`` 2483 keyword argument. 2484 2485 >>> c.run(os.getpid, workers=['192.168.0.100:9000', 2486 ... '192.168.0.101:9000']) # doctest: +SKIP 2487 {'192.168.0.100:9000': 1234, 2488 '192.168.0.101:9000': 4321} 2489 2490 >>> def get_status(dask_worker): 2491 ... return dask_worker.status 2492 2493 >>> c.run(get_hostname) # doctest: +SKIP 2494 {'192.168.0.100:9000': 'running', 2495 '192.168.0.101:9000': 'running} 2496 2497 Run asynchronous functions in the background: 2498 2499 >>> async def print_state(dask_worker): # doctest: +SKIP 2500 ... while True: 2501 ... print(dask_worker.status) 2502 ... await asyncio.sleep(1) 2503 2504 >>> c.run(print_state, wait=False) # doctest: +SKIP 2505 """ 2506 return self.sync(self._run, function, *args, **kwargs) 2507 2508 @_deprecated(use_instead="Client.run which detects async functions automatically") 2509 def run_coroutine(self, function, *args, **kwargs): 2510 """ 2511 Spawn a coroutine on all workers. 2512 2513 This spawns a coroutine on all currently known workers and then waits 2514 for the coroutine on each worker. The coroutines' results are returned 2515 as a dictionary keyed by worker address. 2516 2517 Parameters 2518 ---------- 2519 function : a coroutine function 2520 (typically a function wrapped in gen.coroutine or 2521 a Python 3.5+ async function) 2522 *args : arguments for remote function 2523 **kwargs : keyword arguments for remote function 2524 wait : boolean (default True) 2525 Whether to wait for coroutines to end. 2526 workers : list 2527 Workers on which to run the function. Defaults to all known workers. 2528 2529 """ 2530 return self.run(function, *args, **kwargs) 2531 2532 @staticmethod 2533 def _get_computation_code(stacklevel=None) -> str: 2534 """Walk up the stack to the user code and extract the code surrounding 2535 the compute/submit/persist call. All modules encountered which are 2536 blacklisted by the option 2537 `distributed.diagnostics.computations.ignore-modules` will be ignored. 2538 This can be used to blacklist commonly used libraries which wrap 2539 dask/distributed compute calls. 2540 2541 ``stacklevel`` may be used to explicitly indicate from which frame on 2542 the stack to get the source code. 2543 """ 2544 2545 ignore_modules = dask.config.get( 2546 "distributed.diagnostics.computations.ignore-modules" 2547 ) 2548 if not isinstance(ignore_modules, list): 2549 raise TypeError( 2550 "Ignored modules must be a list. Instead got " 2551 f"({type(ignore_modules)}, {ignore_modules})" 2552 ) 2553 if stacklevel is None: 2554 pattern: re.Pattern | None 2555 if ignore_modules: 2556 pattern = re.compile("|".join([f"(?:{mod})" for mod in ignore_modules])) 2557 else: 2558 pattern = None 2559 else: 2560 # stacklevel 0 or less - shows dask internals which likely isn't helpful 2561 stacklevel = stacklevel if stacklevel > 0 else 1 2562 for i, (fr, _) in enumerate(traceback.walk_stack(None), 1): 2563 if stacklevel is not None: 2564 if i != stacklevel: 2565 continue 2566 elif pattern is not None and ( 2567 pattern.match(fr.f_globals.get("__name__", "")) 2568 or fr.f_code.co_name in ("<listcomp>", "<dictcomp>") 2569 ): 2570 continue 2571 try: 2572 return inspect.getsource(fr) 2573 except OSError: 2574 # Try to fine the source if we are in %%time or %%timeit magic. 2575 if ( 2576 fr.f_code.co_filename in {"<timed exec>", "<magic-timeit>"} 2577 and "IPython" in sys.modules 2578 ): 2579 from IPython import get_ipython 2580 2581 ip = get_ipython() 2582 if ip is not None: 2583 # The current cell 2584 return ip.history_manager._i00 2585 break 2586 return "<Code not available>" 2587 2588 def _graph_to_futures( 2589 self, 2590 dsk, 2591 keys, 2592 workers=None, 2593 allow_other_workers=None, 2594 priority=None, 2595 user_priority=0, 2596 resources=None, 2597 retries=None, 2598 fifo_timeout=0, 2599 actors=None, 2600 ): 2601 with self._refcount_lock: 2602 if actors is not None and actors is not True and actors is not False: 2603 actors = list(self._expand_key(actors)) 2604 2605 # Make sure `dsk` is a high level graph 2606 if not isinstance(dsk, HighLevelGraph): 2607 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=()) 2608 2609 annotations = {} 2610 if user_priority: 2611 annotations["priority"] = user_priority 2612 if workers: 2613 if not isinstance(workers, (list, tuple, set)): 2614 workers = [workers] 2615 annotations["workers"] = workers 2616 if retries: 2617 annotations["retries"] = retries 2618 if allow_other_workers not in (True, False, None): 2619 raise TypeError("allow_other_workers= must be True, False, or None") 2620 if allow_other_workers: 2621 annotations["allow_other_workers"] = allow_other_workers 2622 if resources: 2623 annotations["resources"] = resources 2624 2625 # Merge global and local annotations 2626 annotations = merge(dask.config.get("annotations", {}), annotations) 2627 2628 # Pack the high level graph before sending it to the scheduler 2629 keyset = set(keys) 2630 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations) 2631 2632 # Create futures before sending graph (helps avoid contention) 2633 futures = {key: Future(key, self, inform=False) for key in keyset} 2634 2635 self._send_to_scheduler( 2636 { 2637 "op": "update-graph-hlg", 2638 "hlg": dsk, 2639 "keys": list(map(stringify, keys)), 2640 "priority": priority, 2641 "submitting_task": getattr(thread_state, "key", None), 2642 "fifo_timeout": fifo_timeout, 2643 "actors": actors, 2644 "code": self._get_computation_code(), 2645 } 2646 ) 2647 return futures 2648 2649 def get( 2650 self, 2651 dsk, 2652 keys, 2653 workers=None, 2654 allow_other_workers=None, 2655 resources=None, 2656 sync=True, 2657 asynchronous=None, 2658 direct=None, 2659 retries=None, 2660 priority=0, 2661 fifo_timeout="60s", 2662 actors=None, 2663 **kwargs, 2664 ): 2665 """Compute dask graph 2666 2667 Parameters 2668 ---------- 2669 dsk : dict 2670 keys : object, or nested lists of objects 2671 workers : string or iterable of strings 2672 A set of worker addresses or hostnames on which computations may be 2673 performed. Leave empty to default to all workers (common case) 2674 allow_other_workers : bool (defaults to False) 2675 Used with ``workers``. Indicates whether or not the computations 2676 may be performed on workers that are not in the `workers` set(s). 2677 retries : int (default to 0) 2678 Number of allowed automatic retries if computing a result fails 2679 priority : Number 2680 Optional prioritization of task. Zero is default. 2681 Higher priorities take precedence 2682 resources : dict (defaults to {}) 2683 Defines the ``resources`` each instance of this mapped task requires 2684 on the worker; e.g. ``{'GPU': 2}``. 2685 See :doc:`worker resources <resources>` for details on defining 2686 resources. 2687 sync : bool (optional) 2688 Returns Futures if False or concrete values if True (default). 2689 direct : bool 2690 Whether or not to connect directly to the workers, or to ask 2691 the scheduler to serve as intermediary. This can also be set when 2692 creating the Client. 2693 2694 Examples 2695 -------- 2696 >>> from operator import add # doctest: +SKIP 2697 >>> c = Client('127.0.0.1:8787') # doctest: +SKIP 2698 >>> c.get({'x': (add, 1, 2)}, 'x') # doctest: +SKIP 2699 3 2700 2701 See Also 2702 -------- 2703 Client.compute : Compute asynchronous collections 2704 """ 2705 futures = self._graph_to_futures( 2706 dsk, 2707 keys=set(flatten([keys])), 2708 workers=workers, 2709 allow_other_workers=allow_other_workers, 2710 resources=resources, 2711 fifo_timeout=fifo_timeout, 2712 retries=retries, 2713 user_priority=priority, 2714 actors=actors, 2715 ) 2716 packed = pack_data(keys, futures) 2717 if sync: 2718 if getattr(thread_state, "key", False): 2719 try: 2720 secede() 2721 should_rejoin = True 2722 except Exception: 2723 should_rejoin = False 2724 try: 2725 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2726 finally: 2727 for f in futures.values(): 2728 f.release() 2729 if getattr(thread_state, "key", False) and should_rejoin: 2730 rejoin() 2731 return results 2732 return packed 2733 2734 def _optimize_insert_futures(self, dsk, keys): 2735 """Replace known keys in dask graph with Futures 2736 2737 When given a Dask graph that might have overlapping keys with our known 2738 results we replace the values of that graph with futures. This can be 2739 used as an optimization to avoid recomputation. 2740 2741 This returns the same graph if unchanged but a new graph if any changes 2742 were necessary. 2743 """ 2744 with self._refcount_lock: 2745 changed = False 2746 for key in list(dsk): 2747 if stringify(key) in self.futures: 2748 if not changed: 2749 changed = True 2750 dsk = ensure_dict(dsk) 2751 dsk[key] = Future(key, self, inform=False) 2752 2753 if changed: 2754 dsk, _ = dask.optimization.cull(dsk, keys) 2755 2756 return dsk 2757 2758 def normalize_collection(self, collection): 2759 """ 2760 Replace collection's tasks by already existing futures if they exist 2761 2762 This normalizes the tasks within a collections task graph against the 2763 known futures within the scheduler. It returns a copy of the 2764 collection with a task graph that includes the overlapping futures. 2765 2766 Examples 2767 -------- 2768 >>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks # doctest: +SKIP 2769 100 2770 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists # doctest: +SKIP 2771 10 2772 2773 >>> x = client.normalize_collection(x) # doctest: +SKIP 2774 >>> len(x.__dask_graph__()) # smaller computational graph # doctest: +SKIP 2775 20 2776 2777 See Also 2778 -------- 2779 Client.persist : trigger computation of collection's tasks 2780 """ 2781 dsk_orig = collection.__dask_graph__() 2782 dsk = self._optimize_insert_futures(dsk_orig, collection.__dask_keys__()) 2783 2784 if dsk is dsk_orig: 2785 return collection 2786 else: 2787 return redict_collection(collection, dsk) 2788 2789 def compute( 2790 self, 2791 collections, 2792 sync=False, 2793 optimize_graph=True, 2794 workers=None, 2795 allow_other_workers=False, 2796 resources=None, 2797 retries=0, 2798 priority=0, 2799 fifo_timeout="60s", 2800 actors=None, 2801 traverse=True, 2802 **kwargs, 2803 ): 2804 """Compute dask collections on cluster 2805 2806 Parameters 2807 ---------- 2808 collections : iterable of dask objects or single dask object 2809 Collections like dask.array or dataframe or dask.value objects 2810 sync : bool (optional) 2811 Returns Futures if False (default) or concrete values if True 2812 optimize_graph : bool 2813 Whether or not to optimize the underlying graphs 2814 workers : string or iterable of strings 2815 A set of worker hostnames on which computations may be performed. 2816 Leave empty to default to all workers (common case) 2817 allow_other_workers : bool (defaults to False) 2818 Used with `workers`. Indicates whether or not the computations 2819 may be performed on workers that are not in the `workers` set(s). 2820 retries : int (default to 0) 2821 Number of allowed automatic retries if computing a result fails 2822 priority : Number 2823 Optional prioritization of task. Zero is default. 2824 Higher priorities take precedence 2825 fifo_timeout : timedelta str (defaults to '60s') 2826 Allowed amount of time between calls to consider the same priority 2827 traverse : bool (defaults to True) 2828 By default dask traverses builtin python collections looking for 2829 dask objects passed to ``compute``. For large collections this can 2830 be expensive. If none of the arguments contain any dask objects, 2831 set ``traverse=False`` to avoid doing this traversal. 2832 resources : dict (defaults to {}) 2833 Defines the `resources` each instance of this mapped task requires 2834 on the worker; e.g. ``{'GPU': 2}``. 2835 See :doc:`worker resources <resources>` for details on defining 2836 resources. 2837 actors : bool or dict (default None) 2838 Whether these tasks should exist on the worker as stateful actors. 2839 Specified on a global (True/False) or per-task (``{'x': True, 2840 'y': False}``) basis. See :doc:`actors` for additional details. 2841 **kwargs 2842 Options to pass to the graph optimize calls 2843 2844 Returns 2845 ------- 2846 List of Futures if input is a sequence, or a single future otherwise 2847 2848 Examples 2849 -------- 2850 >>> from dask import delayed 2851 >>> from operator import add 2852 >>> x = delayed(add)(1, 2) 2853 >>> y = delayed(add)(x, x) 2854 >>> xx, yy = client.compute([x, y]) # doctest: +SKIP 2855 >>> xx # doctest: +SKIP 2856 <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> 2857 >>> xx.result() # doctest: +SKIP 2858 3 2859 >>> yy.result() # doctest: +SKIP 2860 6 2861 2862 Also support single arguments 2863 2864 >>> xx = client.compute(x) # doctest: +SKIP 2865 2866 See Also 2867 -------- 2868 Client.get : Normal synchronous dask.get function 2869 """ 2870 if isinstance(collections, (list, tuple, set, frozenset)): 2871 singleton = False 2872 else: 2873 collections = [collections] 2874 singleton = True 2875 2876 if traverse: 2877 collections = tuple( 2878 dask.delayed(a) 2879 if isinstance(a, (list, set, tuple, dict, Iterator)) 2880 else a 2881 for a in collections 2882 ) 2883 2884 variables = [a for a in collections if dask.is_dask_collection(a)] 2885 2886 dsk = self.collections_to_dsk(variables, optimize_graph, **kwargs) 2887 names = ["finalize-%s" % tokenize(v) for v in variables] 2888 dsk2 = {} 2889 for i, (name, v) in enumerate(zip(names, variables)): 2890 func, extra_args = v.__dask_postcompute__() 2891 keys = v.__dask_keys__() 2892 if func is single_key and len(keys) == 1 and not extra_args: 2893 names[i] = keys[0] 2894 else: 2895 dsk2[name] = (func, keys) + extra_args 2896 2897 if not isinstance(dsk, HighLevelGraph): 2898 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=()) 2899 2900 # Let's append the finalize graph to dsk 2901 finalize_name = tokenize(names) 2902 layers = {finalize_name: dsk2} 2903 layers.update(dsk.layers) 2904 dependencies = {finalize_name: set(dsk.layers.keys())} 2905 dependencies.update(dsk.dependencies) 2906 dsk = HighLevelGraph(layers, dependencies) 2907 2908 futures_dict = self._graph_to_futures( 2909 dsk, 2910 names, 2911 workers=workers, 2912 allow_other_workers=allow_other_workers, 2913 resources=resources, 2914 retries=retries, 2915 user_priority=priority, 2916 fifo_timeout=fifo_timeout, 2917 actors=actors, 2918 ) 2919 2920 i = 0 2921 futures = [] 2922 for arg in collections: 2923 if dask.is_dask_collection(arg): 2924 futures.append(futures_dict[names[i]]) 2925 i += 1 2926 else: 2927 futures.append(arg) 2928 2929 if sync: 2930 result = self.gather(futures) 2931 else: 2932 result = futures 2933 2934 if singleton: 2935 return first(result) 2936 else: 2937 return result 2938 2939 def persist( 2940 self, 2941 collections, 2942 optimize_graph=True, 2943 workers=None, 2944 allow_other_workers=None, 2945 resources=None, 2946 retries=None, 2947 priority=0, 2948 fifo_timeout="60s", 2949 actors=None, 2950 **kwargs, 2951 ): 2952 """Persist dask collections on cluster 2953 2954 Starts computation of the collection on the cluster in the background. 2955 Provides a new dask collection that is semantically identical to the 2956 previous one, but now based off of futures currently in execution. 2957 2958 Parameters 2959 ---------- 2960 collections : sequence or single dask object 2961 Collections like dask.array or dataframe or dask.value objects 2962 optimize_graph : bool 2963 Whether or not to optimize the underlying graphs 2964 workers : string or iterable of strings 2965 A set of worker hostnames on which computations may be performed. 2966 Leave empty to default to all workers (common case) 2967 allow_other_workers : bool (defaults to False) 2968 Used with `workers`. Indicates whether or not the computations 2969 may be performed on workers that are not in the `workers` set(s). 2970 retries : int (default to 0) 2971 Number of allowed automatic retries if computing a result fails 2972 priority : Number 2973 Optional prioritization of task. Zero is default. 2974 Higher priorities take precedence 2975 fifo_timeout : timedelta str (defaults to '60s') 2976 Allowed amount of time between calls to consider the same priority 2977 resources : dict (defaults to {}) 2978 Defines the `resources` each instance of this mapped task requires 2979 on the worker; e.g. ``{'GPU': 2}``. 2980 See :doc:`worker resources <resources>` for details on defining 2981 resources. 2982 actors : bool or dict (default None) 2983 Whether these tasks should exist on the worker as stateful actors. 2984 Specified on a global (True/False) or per-task (``{'x': True, 2985 'y': False}``) basis. See :doc:`actors` for additional details. 2986 **kwargs 2987 Options to pass to the graph optimize calls 2988 2989 Returns 2990 ------- 2991 List of collections, or single collection, depending on type of input. 2992 2993 Examples 2994 -------- 2995 >>> xx = client.persist(x) # doctest: +SKIP 2996 >>> xx, yy = client.persist([x, y]) # doctest: +SKIP 2997 2998 See Also 2999 -------- 3000 Client.compute 3001 """ 3002 if isinstance(collections, (tuple, list, set, frozenset)): 3003 singleton = False 3004 else: 3005 singleton = True 3006 collections = [collections] 3007 3008 assert all(map(dask.is_dask_collection, collections)) 3009 3010 dsk = self.collections_to_dsk(collections, optimize_graph, **kwargs) 3011 3012 names = {k for c in collections for k in flatten(c.__dask_keys__())} 3013 3014 futures = self._graph_to_futures( 3015 dsk, 3016 names, 3017 workers=workers, 3018 allow_other_workers=allow_other_workers, 3019 resources=resources, 3020 retries=retries, 3021 user_priority=priority, 3022 fifo_timeout=fifo_timeout, 3023 actors=actors, 3024 ) 3025 3026 postpersists = [c.__dask_postpersist__() for c in collections] 3027 result = [ 3028 func({k: futures[k] for k in flatten(c.__dask_keys__())}, *args) 3029 for (func, args), c in zip(postpersists, collections) 3030 ] 3031 3032 if singleton: 3033 return first(result) 3034 else: 3035 return result 3036 3037 async def _restart(self, timeout=no_default): 3038 if timeout == no_default: 3039 timeout = self._timeout * 2 3040 if timeout is not None: 3041 timeout = parse_timedelta(timeout, "s") 3042 3043 self._send_to_scheduler({"op": "restart", "timeout": timeout}) 3044 self._restart_event = asyncio.Event() 3045 try: 3046 await asyncio.wait_for(self._restart_event.wait(), timeout) 3047 except TimeoutError: 3048 logger.error("Restart timed out after %.2f seconds", timeout) 3049 3050 self.generation += 1 3051 with self._refcount_lock: 3052 self.refcount.clear() 3053 3054 return self 3055 3056 def restart(self, **kwargs): 3057 """Restart the distributed network 3058 3059 This kills all active work, deletes all data on the network, and 3060 restarts the worker processes. 3061 """ 3062 return self.sync(self._restart, **kwargs) 3063 3064 async def _upload_large_file(self, local_filename, remote_filename=None): 3065 if remote_filename is None: 3066 remote_filename = os.path.split(local_filename)[1] 3067 3068 with open(local_filename, "rb") as f: 3069 data = f.read() 3070 3071 [future] = await self._scatter([data]) 3072 key = future.key 3073 await self._replicate(future) 3074 3075 def dump_to_file(dask_worker=None): 3076 if not os.path.isabs(remote_filename): 3077 fn = os.path.join(dask_worker.local_directory, remote_filename) 3078 else: 3079 fn = remote_filename 3080 with open(fn, "wb") as f: 3081 f.write(dask_worker.data[key]) 3082 3083 return len(dask_worker.data[key]) 3084 3085 response = await self._run(dump_to_file) 3086 3087 assert all(len(data) == v for v in response.values()) 3088 3089 def upload_file(self, filename, **kwargs): 3090 """Upload local package to workers 3091 3092 This sends a local file up to all worker nodes. This file is placed 3093 into a temporary directory on Python's system path so any .py, .egg 3094 or .zip files will be importable. 3095 3096 Parameters 3097 ---------- 3098 filename : string 3099 Filename of .py, .egg or .zip file to send to workers 3100 3101 Examples 3102 -------- 3103 >>> client.upload_file('mylibrary.egg') # doctest: +SKIP 3104 >>> from mylibrary import myfunc # doctest: +SKIP 3105 >>> L = client.map(myfunc, seq) # doctest: +SKIP 3106 """ 3107 return self.register_worker_plugin( 3108 UploadFile(filename), 3109 name=filename + str(uuid.uuid4()), 3110 ) 3111 3112 async def _rebalance(self, futures=None, workers=None): 3113 if futures is not None: 3114 await _wait(futures) 3115 keys = list({stringify(f.key) for f in self.futures_of(futures)}) 3116 else: 3117 keys = None 3118 result = await self.scheduler.rebalance(keys=keys, workers=workers) 3119 if result["status"] == "partial-fail": 3120 raise KeyError(f"Could not rebalance keys: {result['keys']}") 3121 assert result["status"] == "OK", result 3122 3123 def rebalance(self, futures=None, workers=None, **kwargs): 3124 """Rebalance data within network 3125 3126 Move data between workers to roughly balance memory burden. This 3127 either affects a subset of the keys/workers or the entire network, 3128 depending on keyword arguments. 3129 3130 For details on the algorithm and configuration options, refer to the matching 3131 scheduler-side method :meth:`~distributed.scheduler.Scheduler.rebalance`. 3132 3133 .. warning:: 3134 This operation is generally not well tested against normal operation of the 3135 scheduler. It is not recommended to use it while waiting on computations. 3136 3137 Parameters 3138 ---------- 3139 futures : list, optional 3140 A list of futures to balance, defaults all data 3141 workers : list, optional 3142 A list of workers on which to balance, defaults to all workers 3143 """ 3144 return self.sync(self._rebalance, futures, workers, **kwargs) 3145 3146 async def _replicate(self, futures, n=None, workers=None, branching_factor=2): 3147 futures = self.futures_of(futures) 3148 await _wait(futures) 3149 keys = {stringify(f.key) for f in futures} 3150 await self.scheduler.replicate( 3151 keys=list(keys), n=n, workers=workers, branching_factor=branching_factor 3152 ) 3153 3154 def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs): 3155 """Set replication of futures within network 3156 3157 Copy data onto many workers. This helps to broadcast frequently 3158 accessed data and it helps to improve resilience. 3159 3160 This performs a tree copy of the data throughout the network 3161 individually on each piece of data. This operation blocks until 3162 complete. It does not guarantee replication of data to future workers. 3163 3164 Parameters 3165 ---------- 3166 futures : list of futures 3167 Futures we wish to replicate 3168 n : int, optional 3169 Number of processes on the cluster on which to replicate the data. 3170 Defaults to all. 3171 workers : list of worker addresses 3172 Workers on which we want to restrict the replication. 3173 Defaults to all. 3174 branching_factor : int, optional 3175 The number of workers that can copy data in each generation 3176 3177 Examples 3178 -------- 3179 >>> x = c.submit(func, *args) # doctest: +SKIP 3180 >>> c.replicate([x]) # send to all workers # doctest: +SKIP 3181 >>> c.replicate([x], n=3) # send to three workers # doctest: +SKIP 3182 >>> c.replicate([x], workers=['alice', 'bob']) # send to specific # doctest: +SKIP 3183 >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers # doctest: +SKIP 3184 >>> c.replicate([x], n=1) # reduce replications # doctest: +SKIP 3185 3186 See Also 3187 -------- 3188 Client.rebalance 3189 """ 3190 return self.sync( 3191 self._replicate, 3192 futures, 3193 n=n, 3194 workers=workers, 3195 branching_factor=branching_factor, 3196 **kwargs, 3197 ) 3198 3199 def nthreads(self, workers=None, **kwargs): 3200 """The number of threads/cores available on each worker node 3201 3202 Parameters 3203 ---------- 3204 workers : list (optional) 3205 A list of workers that we care about specifically. 3206 Leave empty to receive information about all workers. 3207 3208 Examples 3209 -------- 3210 >>> c.threads() # doctest: +SKIP 3211 {'192.168.1.141:46784': 8, 3212 '192.167.1.142:47548': 8, 3213 '192.167.1.143:47329': 8, 3214 '192.167.1.144:37297': 8} 3215 3216 See Also 3217 -------- 3218 Client.who_has 3219 Client.has_what 3220 """ 3221 if isinstance(workers, tuple) and all( 3222 isinstance(i, (str, tuple)) for i in workers 3223 ): 3224 workers = list(workers) 3225 if workers is not None and not isinstance(workers, (tuple, list, set)): 3226 workers = [workers] 3227 return self.sync(self.scheduler.ncores, workers=workers, **kwargs) 3228 3229 ncores = nthreads 3230 3231 def who_has(self, futures=None, **kwargs): 3232 """The workers storing each future's data 3233 3234 Parameters 3235 ---------- 3236 futures : list (optional) 3237 A list of futures, defaults to all data 3238 3239 Examples 3240 -------- 3241 >>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP 3242 >>> wait([x, y, z]) # doctest: +SKIP 3243 >>> c.who_has() # doctest: +SKIP 3244 {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 3245 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 3246 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']} 3247 3248 >>> c.who_has([x, y]) # doctest: +SKIP 3249 {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 3250 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']} 3251 3252 See Also 3253 -------- 3254 Client.has_what 3255 Client.nthreads 3256 """ 3257 if futures is not None: 3258 futures = self.futures_of(futures) 3259 keys = list(map(stringify, {f.key for f in futures})) 3260 else: 3261 keys = None 3262 3263 async def _(): 3264 return WhoHas(await self.scheduler.who_has(keys=keys, **kwargs)) 3265 3266 return self.sync(_) 3267 3268 def has_what(self, workers=None, **kwargs): 3269 """Which keys are held by which workers 3270 3271 This returns the keys of the data that are held in each worker's 3272 memory. 3273 3274 Parameters 3275 ---------- 3276 workers : list (optional) 3277 A list of worker addresses, defaults to all 3278 3279 Examples 3280 -------- 3281 >>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP 3282 >>> wait([x, y, z]) # doctest: +SKIP 3283 >>> c.has_what() # doctest: +SKIP 3284 {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 3285 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 3286 'inc-1e297fc27658d7b67b3a758f16bcf47a']} 3287 3288 See Also 3289 -------- 3290 Client.who_has 3291 Client.nthreads 3292 Client.processing 3293 """ 3294 if isinstance(workers, tuple) and all( 3295 isinstance(i, (str, tuple)) for i in workers 3296 ): 3297 workers = list(workers) 3298 if workers is not None and not isinstance(workers, (tuple, list, set)): 3299 workers = [workers] 3300 3301 async def _(): 3302 return HasWhat(await self.scheduler.has_what(workers=workers, **kwargs)) 3303 3304 return self.sync(_) 3305 3306 def processing(self, workers=None): 3307 """The tasks currently running on each worker 3308 3309 Parameters 3310 ---------- 3311 workers : list (optional) 3312 A list of worker addresses, defaults to all 3313 3314 Examples 3315 -------- 3316 >>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP 3317 >>> c.processing() # doctest: +SKIP 3318 {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 3319 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 3320 'inc-1e297fc27658d7b67b3a758f16bcf47a']} 3321 3322 See Also 3323 -------- 3324 Client.who_has 3325 Client.has_what 3326 Client.nthreads 3327 """ 3328 if isinstance(workers, tuple) and all( 3329 isinstance(i, (str, tuple)) for i in workers 3330 ): 3331 workers = list(workers) 3332 if workers is not None and not isinstance(workers, (tuple, list, set)): 3333 workers = [workers] 3334 return self.sync(self.scheduler.processing, workers=workers) 3335 3336 def nbytes(self, keys=None, summary=True, **kwargs): 3337 """The bytes taken up by each key on the cluster 3338 3339 This is as measured by ``sys.getsizeof`` which may not accurately 3340 reflect the true cost. 3341 3342 Parameters 3343 ---------- 3344 keys : list (optional) 3345 A list of keys, defaults to all keys 3346 summary : boolean, (optional) 3347 Summarize keys into key types 3348 3349 Examples 3350 -------- 3351 >>> x, y, z = c.map(inc, [1, 2, 3]) # doctest: +SKIP 3352 >>> c.nbytes(summary=False) # doctest: +SKIP 3353 {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 3354 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 3355 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28} 3356 3357 >>> c.nbytes(summary=True) # doctest: +SKIP 3358 {'inc': 84} 3359 3360 See Also 3361 -------- 3362 Client.who_has 3363 """ 3364 return self.sync(self.scheduler.nbytes, keys=keys, summary=summary, **kwargs) 3365 3366 def call_stack(self, futures=None, keys=None): 3367 """The actively running call stack of all relevant keys 3368 3369 You can specify data of interest either by providing futures or 3370 collections in the ``futures=`` keyword or a list of explicit keys in 3371 the ``keys=`` keyword. If neither are provided then all call stacks 3372 will be returned. 3373 3374 Parameters 3375 ---------- 3376 futures : list (optional) 3377 List of futures, defaults to all data 3378 keys : list (optional) 3379 List of key names, defaults to all data 3380 3381 Examples 3382 -------- 3383 >>> df = dd.read_parquet(...).persist() # doctest: +SKIP 3384 >>> client.call_stack(df) # call on collections 3385 3386 >>> client.call_stack() # Or call with no arguments for all activity # doctest: +SKIP 3387 """ 3388 keys = keys or [] 3389 if futures is not None: 3390 futures = self.futures_of(futures) 3391 keys += list(map(stringify, {f.key for f in futures})) 3392 return self.sync(self.scheduler.call_stack, keys=keys or None) 3393 3394 def profile( 3395 self, 3396 key=None, 3397 start=None, 3398 stop=None, 3399 workers=None, 3400 merge_workers=True, 3401 plot=False, 3402 filename=None, 3403 server=False, 3404 scheduler=False, 3405 ): 3406 """Collect statistical profiling information about recent work 3407 3408 Parameters 3409 ---------- 3410 key : str 3411 Key prefix to select, this is typically a function name like 'inc' 3412 Leave as None to collect all data 3413 start : time 3414 stop : time 3415 workers : list 3416 List of workers to restrict profile information 3417 server : bool 3418 If true, return the profile of the worker's administrative thread 3419 rather than the worker threads. 3420 This is useful when profiling Dask itself, rather than user code. 3421 scheduler : bool 3422 If true, return the profile information from the scheduler's 3423 administrative thread rather than the workers. 3424 This is useful when profiling Dask's scheduling itself. 3425 plot : boolean or string 3426 Whether or not to return a plot object 3427 filename : str 3428 Filename to save the plot 3429 3430 Examples 3431 -------- 3432 >>> client.profile() # call on collections 3433 >>> client.profile(filename='dask-profile.html') # save to html file 3434 """ 3435 return self.sync( 3436 self._profile, 3437 key=key, 3438 workers=workers, 3439 merge_workers=merge_workers, 3440 start=start, 3441 stop=stop, 3442 plot=plot, 3443 filename=filename, 3444 server=server, 3445 scheduler=scheduler, 3446 ) 3447 3448 async def _profile( 3449 self, 3450 key=None, 3451 start=None, 3452 stop=None, 3453 workers=None, 3454 merge_workers=True, 3455 plot=False, 3456 filename=None, 3457 server=False, 3458 scheduler=False, 3459 ): 3460 if isinstance(workers, (str, Number)): 3461 workers = [workers] 3462 3463 state = await self.scheduler.profile( 3464 key=key, 3465 workers=workers, 3466 merge_workers=merge_workers, 3467 start=start, 3468 stop=stop, 3469 server=server, 3470 scheduler=scheduler, 3471 ) 3472 3473 if filename: 3474 plot = True 3475 3476 if plot: 3477 from . import profile 3478 3479 data = profile.plot_data(state) 3480 figure, source = profile.plot_figure(data, sizing_mode="stretch_both") 3481 3482 if plot == "save" and not filename: 3483 filename = "dask-profile.html" 3484 3485 if filename: 3486 from bokeh.plotting import output_file, save 3487 3488 output_file(filename=filename, title="Dask Profile") 3489 save(figure, filename=filename) 3490 return (state, figure) 3491 3492 else: 3493 return state 3494 3495 def scheduler_info(self, **kwargs): 3496 """Basic information about the workers in the cluster 3497 3498 Examples 3499 -------- 3500 >>> c.scheduler_info() # doctest: +SKIP 3501 {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 3502 'services': {}, 3503 'type': 'Scheduler', 3504 'workers': {'127.0.0.1:40575': {'active': 0, 3505 'last-seen': 1472038237.4845693, 3506 'name': '127.0.0.1:40575', 3507 'services': {}, 3508 'stored': 0, 3509 'time-delay': 0.0061032772064208984}}} 3510 """ 3511 if not self.asynchronous: 3512 self.sync(self._update_scheduler_info) 3513 return self._scheduler_identity 3514 3515 async def _dump_cluster_state( 3516 self, 3517 filename: str, 3518 exclude: Sequence[str] = None, 3519 format: Literal["msgpack"] | Literal["yaml"] = "msgpack", 3520 ) -> None: 3521 3522 scheduler_info = self.scheduler.dump_state() 3523 3524 worker_info = self.scheduler.broadcast( 3525 msg=dict( 3526 op="dump_state", 3527 exclude=exclude, 3528 ), 3529 ) 3530 versions = self._get_versions() 3531 scheduler_info, worker_info, versions_info = await asyncio.gather( 3532 scheduler_info, worker_info, versions 3533 ) 3534 3535 state = { 3536 "scheduler": scheduler_info, 3537 "workers": worker_info, 3538 "versions": versions_info, 3539 } 3540 filename = str(filename) 3541 if format == "msgpack": 3542 suffix = ".msgpack.gz" 3543 if not filename.endswith(suffix): 3544 filename += suffix 3545 import gzip 3546 3547 import msgpack 3548 import yaml 3549 3550 with gzip.open(filename, "wb") as fdg: 3551 msgpack.pack(state, fdg) 3552 elif format == "yaml": 3553 suffix = ".yaml" 3554 if not filename.endswith(suffix): 3555 filename += suffix 3556 import yaml 3557 3558 with open(filename, "w") as fd: 3559 yaml.dump(state, fd) 3560 else: 3561 raise ValueError( 3562 f"Unsupported format {format}. Possible values are `msgpack` or `yaml`" 3563 ) 3564 3565 def dump_cluster_state( 3566 self, 3567 filename: str = "dask-cluster-dump", 3568 exclude: Sequence[str] = None, 3569 format: Literal["msgpack"] | Literal["yaml"] = "msgpack", 3570 ) -> Awaitable | None: 3571 """Extract a dump of the entire cluster state and persist to disk. 3572 This is intended for debugging purposes only. 3573 3574 Warning: Memory usage on client side can be large. 3575 3576 Results will be stored in a dict:: 3577 3578 { 3579 "scheduler_info": {...}, 3580 "worker_info": { 3581 worker_addr: {...}, # worker attributes 3582 ... 3583 } 3584 } 3585 3586 Paramters 3587 --------- 3588 filename: 3589 The output filename. The appropriate file suffix (`.msgpack.gz` or 3590 `.yaml`) will be appended automatically. 3591 exclude: 3592 A sequence of attribute names which are supposed to be blacklisted 3593 from the dump, e.g. to exclude code, tracebacks, logs, etc. 3594 format: 3595 Either msgpack or yaml. If msgpack is used (default), the output 3596 will be stored in a gzipped file as msgpack. 3597 3598 To read:: 3599 3600 import gzip, msgpack 3601 with gzip.open("filename") as fd: 3602 state = msgpack.unpack(fd) 3603 3604 or:: 3605 3606 import yaml 3607 try: 3608 from yaml import CLoader as Loader 3609 except ImportError: 3610 from yaml import Loader 3611 with open("filename") as fd: 3612 state = yaml.load(fd, Loader=Loader) 3613 3614 """ 3615 return self.sync( 3616 self._dump_cluster_state, 3617 filename=filename, 3618 format=format, 3619 exclude=exclude, 3620 ) 3621 3622 def write_scheduler_file(self, scheduler_file): 3623 """Write the scheduler information to a json file. 3624 3625 This facilitates easy sharing of scheduler information using a file 3626 system. The scheduler file can be used to instantiate a second Client 3627 using the same scheduler. 3628 3629 Parameters 3630 ---------- 3631 scheduler_file : str 3632 Path to a write the scheduler file. 3633 3634 Examples 3635 -------- 3636 >>> client = Client() # doctest: +SKIP 3637 >>> client.write_scheduler_file('scheduler.json') # doctest: +SKIP 3638 # connect to previous client's scheduler 3639 >>> client2 = Client(scheduler_file='scheduler.json') # doctest: +SKIP 3640 """ 3641 if self.scheduler_file: 3642 raise ValueError("Scheduler file already set") 3643 else: 3644 self.scheduler_file = scheduler_file 3645 3646 with open(self.scheduler_file, "w") as f: 3647 json.dump(self.scheduler_info(), f, indent=2) 3648 3649 def get_metadata(self, keys, default=no_default): 3650 """Get arbitrary metadata from scheduler 3651 3652 See set_metadata for the full docstring with examples 3653 3654 Parameters 3655 ---------- 3656 keys : key or list 3657 Key to access. If a list then gets within a nested collection 3658 default : optional 3659 If the key does not exist then return this value instead. 3660 If not provided then this raises a KeyError if the key is not 3661 present 3662 3663 See Also 3664 -------- 3665 Client.set_metadata 3666 """ 3667 if not isinstance(keys, (list, tuple)): 3668 keys = (keys,) 3669 return self.sync(self.scheduler.get_metadata, keys=keys, default=default) 3670 3671 def get_scheduler_logs(self, n=None): 3672 """Get logs from scheduler 3673 3674 Parameters 3675 ---------- 3676 n : int 3677 Number of logs to retrive. Maxes out at 10000 by default, 3678 configurable via the ``distributed.admin.log-length`` 3679 configuration value. 3680 3681 Returns 3682 ------- 3683 Logs in reversed order (newest first) 3684 """ 3685 return self.sync(self.scheduler.logs, n=n) 3686 3687 def get_worker_logs(self, n=None, workers=None, nanny=False): 3688 """Get logs from workers 3689 3690 Parameters 3691 ---------- 3692 n : int 3693 Number of logs to retrive. Maxes out at 10000 by default, 3694 configurable via the ``distributed.admin.log-length`` 3695 configuration value. 3696 workers : iterable 3697 List of worker addresses to retrieve. Gets all workers by default. 3698 nanny : bool, default False 3699 Whether to get the logs from the workers (False) or the nannies (True). If 3700 specified, the addresses in `workers` should still be the worker addresses, 3701 not the nanny addresses. 3702 3703 Returns 3704 ------- 3705 Dictionary mapping worker address to logs. 3706 Logs are returned in reversed order (newest first) 3707 """ 3708 return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny) 3709 3710 def log_event(self, topic, msg): 3711 """Log an event under a given topic 3712 3713 Parameters 3714 ---------- 3715 topic : str, list 3716 Name of the topic under which to log an event. To log the same 3717 event under multiple topics, pass a list of topic names. 3718 msg 3719 Event message to log. Note this must be msgpack serializable. 3720 3721 Examples 3722 -------- 3723 >>> from time import time 3724 >>> client.log_event("current-time", time()) 3725 """ 3726 return self.sync(self.scheduler.log_event, topic=topic, msg=msg) 3727 3728 def get_events(self, topic: str = None): 3729 """Retrieve structured topic logs 3730 3731 Parameters 3732 ---------- 3733 topic : str, optional 3734 Name of topic log to retrieve events for. If no ``topic`` is 3735 provided, then logs for all topics will be returned. 3736 """ 3737 return self.sync(self.scheduler.events, topic=topic) 3738 3739 async def _handle_event(self, topic, event): 3740 if topic not in self._event_handlers: 3741 self.unsubscribe_topic(topic) 3742 return 3743 handler = self._event_handlers[topic] 3744 ret = handler(event) 3745 if inspect.isawaitable(ret): 3746 await ret 3747 3748 def subscribe_topic(self, topic, handler): 3749 """Subscribe to a topic and execute a handler for every received event 3750 3751 Parameters 3752 ---------- 3753 topic: str 3754 The topic name 3755 handler: callable or coroutine function 3756 A handler called for every received event. The handler must accept a 3757 single argument `event` which is a tuple `(timestamp, msg)` where 3758 timestamp refers to the clock on the scheduler. 3759 3760 Example 3761 ------- 3762 3763 >>> import logging 3764 >>> logger = logging.getLogger("myLogger") # Log config not shown 3765 >>> client.subscribe_topic("topic-name", lambda: logger.info) 3766 3767 See Also 3768 -------- 3769 dask.distributed.Client.unsubscribe_topic 3770 dask.distributed.Client.get_events 3771 dask.distributed.Client.log_event 3772 """ 3773 if topic in self._event_handlers: 3774 logger.info("Handler for %s already set. Overwriting.", topic) 3775 self._event_handlers[topic] = handler 3776 msg = {"op": "subscribe-topic", "topic": topic, "client": self.id} 3777 self._send_to_scheduler(msg) 3778 3779 def unsubscribe_topic(self, topic): 3780 """Unsubscribe from a topic and remove event handler 3781 3782 See Also 3783 -------- 3784 dask.distributed.Client.subscribe_topic 3785 dask.distributed.Client.get_events 3786 dask.distributed.Client.log_event 3787 """ 3788 if topic in self._event_handlers: 3789 msg = {"op": "unsubscribe-topic", "topic": topic, "client": self.id} 3790 self._send_to_scheduler(msg) 3791 else: 3792 raise ValueError(f"No event handler known for topic {topic}.") 3793 3794 def retire_workers(self, workers=None, close_workers=True, **kwargs): 3795 """Retire certain workers on the scheduler 3796 3797 See dask.distributed.Scheduler.retire_workers for the full docstring. 3798 3799 Examples 3800 -------- 3801 You can get information about active workers using the following: 3802 3803 >>> workers = client.scheduler_info()['workers'] 3804 3805 From that list you may want to select some workers to close 3806 3807 >>> client.retire_workers(workers=['tcp://address:port', ...]) 3808 3809 See Also 3810 -------- 3811 dask.distributed.Scheduler.retire_workers 3812 """ 3813 return self.sync( 3814 self.scheduler.retire_workers, 3815 workers=workers, 3816 close_workers=close_workers, 3817 **kwargs, 3818 ) 3819 3820 def set_metadata(self, key, value): 3821 """Set arbitrary metadata in the scheduler 3822 3823 This allows you to store small amounts of data on the central scheduler 3824 process for administrative purposes. Data should be msgpack 3825 serializable (ints, strings, lists, dicts) 3826 3827 If the key corresponds to a task then that key will be cleaned up when 3828 the task is forgotten by the scheduler. 3829 3830 If the key is a list then it will be assumed that you want to index 3831 into a nested dictionary structure using those keys. For example if 3832 you call the following:: 3833 3834 >>> client.set_metadata(['a', 'b', 'c'], 123) 3835 3836 Then this is the same as setting 3837 3838 >>> scheduler.task_metadata['a']['b']['c'] = 123 3839 3840 The lower level dictionaries will be created on demand. 3841 3842 Examples 3843 -------- 3844 >>> client.set_metadata('x', 123) # doctest: +SKIP 3845 >>> client.get_metadata('x') # doctest: +SKIP 3846 123 3847 3848 >>> client.set_metadata(['x', 'y'], 123) # doctest: +SKIP 3849 >>> client.get_metadata('x') # doctest: +SKIP 3850 {'y': 123} 3851 3852 >>> client.set_metadata(['x', 'w', 'z'], 456) # doctest: +SKIP 3853 >>> client.get_metadata('x') # doctest: +SKIP 3854 {'y': 123, 'w': {'z': 456}} 3855 3856 >>> client.get_metadata(['x', 'w']) # doctest: +SKIP 3857 {'z': 456} 3858 3859 See Also 3860 -------- 3861 get_metadata 3862 """ 3863 if not isinstance(key, list): 3864 key = (key,) 3865 return self.sync(self.scheduler.set_metadata, keys=key, value=value) 3866 3867 def get_versions(self, check=False, packages=[]): 3868 """Return version info for the scheduler, all workers and myself 3869 3870 Parameters 3871 ---------- 3872 check : boolean, default False 3873 raise ValueError if all required & optional packages 3874 do not match 3875 packages : List[str] 3876 Extra package names to check 3877 3878 Examples 3879 -------- 3880 >>> c.get_versions() # doctest: +SKIP 3881 3882 >>> c.get_versions(packages=['sklearn', 'geopandas']) # doctest: +SKIP 3883 """ 3884 return self.sync(self._get_versions, check=check, packages=packages) 3885 3886 async def _get_versions(self, check=False, packages=[]): 3887 client = version_module.get_versions(packages=packages) 3888 try: 3889 scheduler = await self.scheduler.versions(packages=packages) 3890 except KeyError: 3891 scheduler = None 3892 except TypeError: # packages keyword not supported 3893 scheduler = await self.scheduler.versions() # this raises 3894 3895 workers = await self.scheduler.broadcast( 3896 msg={"op": "versions", "packages": packages} 3897 ) 3898 result = {"scheduler": scheduler, "workers": workers, "client": client} 3899 3900 if check: 3901 msg = version_module.error_message(scheduler, workers, client) 3902 if msg["warning"]: 3903 warnings.warn(msg["warning"]) 3904 if msg["error"]: 3905 raise ValueError(msg["error"]) 3906 3907 return result 3908 3909 def futures_of(self, futures): 3910 return futures_of(futures, client=self) 3911 3912 def start_ipython(self, *args, **kwargs): 3913 """Deprecated - Method moved to start_ipython_workers""" 3914 raise Exception("Method moved to start_ipython_workers") 3915 3916 async def _start_ipython_workers(self, workers): 3917 if workers is None: 3918 workers = await self.scheduler.ncores() 3919 3920 responses = await self.scheduler.broadcast( 3921 msg=dict(op="start_ipython"), workers=workers 3922 ) 3923 return workers, responses 3924 3925 def start_ipython_workers( 3926 self, workers=None, magic_names=False, qtconsole=False, qtconsole_args=None 3927 ): 3928 """Start IPython kernels on workers 3929 3930 Parameters 3931 ---------- 3932 workers : list (optional) 3933 A list of worker addresses, defaults to all 3934 magic_names : str or list(str) (optional) 3935 If defined, register IPython magics with these names for 3936 executing code on the workers. If string has asterix then expand 3937 asterix into 0, 1, ..., n for n workers 3938 qtconsole : bool (optional) 3939 If True, launch a Jupyter QtConsole connected to the worker(s). 3940 qtconsole_args : list(str) (optional) 3941 Additional arguments to pass to the qtconsole on startup. 3942 3943 Examples 3944 -------- 3945 >>> info = c.start_ipython_workers() # doctest: +SKIP 3946 >>> %remote info['192.168.1.101:5752'] worker.data # doctest: +SKIP 3947 {'x': 1, 'y': 100} 3948 3949 >>> c.start_ipython_workers('192.168.1.101:5752', magic_names='w') # doctest: +SKIP 3950 >>> %w worker.data # doctest: +SKIP 3951 {'x': 1, 'y': 100} 3952 3953 >>> c.start_ipython_workers('192.168.1.101:5752', qtconsole=True) # doctest: +SKIP 3954 3955 Add asterix * in magic names to add one magic per worker 3956 3957 >>> c.start_ipython_workers(magic_names='w_*') # doctest: +SKIP 3958 >>> %w_0 worker.data # doctest: +SKIP 3959 {'x': 1, 'y': 100} 3960 >>> %w_1 worker.data # doctest: +SKIP 3961 {'z': 5} 3962 3963 Returns 3964 ------- 3965 iter_connection_info: list 3966 List of connection_info dicts containing info necessary 3967 to connect Jupyter clients to the workers. 3968 3969 See Also 3970 -------- 3971 Client.start_ipython_scheduler : start ipython on the scheduler 3972 """ 3973 if isinstance(workers, (str, Number)): 3974 workers = [workers] 3975 3976 (workers, info_dict) = sync(self.loop, self._start_ipython_workers, workers) 3977 3978 if magic_names and isinstance(magic_names, str): 3979 if "*" in magic_names: 3980 magic_names = [ 3981 magic_names.replace("*", str(i)) for i in range(len(workers)) 3982 ] 3983 else: 3984 magic_names = [magic_names] 3985 3986 if "IPython" in sys.modules: 3987 from ._ipython_utils import register_remote_magic 3988 3989 register_remote_magic() 3990 if magic_names: 3991 from ._ipython_utils import register_worker_magic 3992 3993 for worker, magic_name in zip(workers, magic_names): 3994 connection_info = info_dict[worker] 3995 register_worker_magic(connection_info, magic_name) 3996 if qtconsole: 3997 from ._ipython_utils import connect_qtconsole 3998 3999 for worker, connection_info in info_dict.items(): 4000 name = "dask-" + worker.replace(":", "-").replace("/", "-") 4001 connect_qtconsole(connection_info, name=name, extra_args=qtconsole_args) 4002 return info_dict 4003 4004 def start_ipython_scheduler( 4005 self, magic_name="scheduler_if_ipython", qtconsole=False, qtconsole_args=None 4006 ): 4007 """Start IPython kernel on the scheduler 4008 4009 Parameters 4010 ---------- 4011 magic_name : str or None (optional) 4012 If defined, register IPython magic with this name for 4013 executing code on the scheduler. 4014 If not defined, register %scheduler magic if IPython is running. 4015 qtconsole : bool (optional) 4016 If True, launch a Jupyter QtConsole connected to the worker(s). 4017 qtconsole_args : list(str) (optional) 4018 Additional arguments to pass to the qtconsole on startup. 4019 4020 Examples 4021 -------- 4022 >>> c.start_ipython_scheduler() # doctest: +SKIP 4023 >>> %scheduler scheduler.processing # doctest: +SKIP 4024 {'127.0.0.1:3595': {'inc-1', 'inc-2'}, 4025 '127.0.0.1:53589': {'inc-2', 'add-5'}} 4026 4027 >>> c.start_ipython_scheduler(qtconsole=True) # doctest: +SKIP 4028 4029 Returns 4030 ------- 4031 connection_info: dict 4032 connection_info dict containing info necessary 4033 to connect Jupyter clients to the scheduler. 4034 4035 See Also 4036 -------- 4037 Client.start_ipython_workers : Start IPython on the workers 4038 """ 4039 info = sync(self.loop, self.scheduler.start_ipython) 4040 if magic_name == "scheduler_if_ipython": 4041 # default to %scheduler if in IPython, no magic otherwise 4042 in_ipython = False 4043 if "IPython" in sys.modules: 4044 from IPython import get_ipython 4045 4046 in_ipython = bool(get_ipython()) 4047 if in_ipython: 4048 magic_name = "scheduler" 4049 else: 4050 magic_name = None 4051 if magic_name: 4052 from ._ipython_utils import register_worker_magic 4053 4054 register_worker_magic(info, magic_name) 4055 if qtconsole: 4056 from ._ipython_utils import connect_qtconsole 4057 4058 connect_qtconsole(info, name="dask-scheduler", extra_args=qtconsole_args) 4059 return info 4060 4061 @classmethod 4062 def _expand_key(cls, k): 4063 """ 4064 Expand a user-provided task key specification, e.g. in a resources 4065 or retries dictionary. 4066 """ 4067 if not isinstance(k, tuple): 4068 k = (k,) 4069 for kk in k: 4070 if dask.is_dask_collection(kk): 4071 for kkk in kk.__dask_keys__(): 4072 yield stringify(kkk) 4073 else: 4074 yield stringify(kk) 4075 4076 @staticmethod 4077 def collections_to_dsk(collections, *args, **kwargs): 4078 """Convert many collections into a single dask graph, after optimization""" 4079 return collections_to_dsk(collections, *args, **kwargs) 4080 4081 def get_task_stream( 4082 self, 4083 start=None, 4084 stop=None, 4085 count=None, 4086 plot=False, 4087 filename="task-stream.html", 4088 bokeh_resources=None, 4089 ): 4090 """Get task stream data from scheduler 4091 4092 This collects the data present in the diagnostic "Task Stream" plot on 4093 the dashboard. It includes the start, stop, transfer, and 4094 deserialization time of every task for a particular duration. 4095 4096 Note that the task stream diagnostic does not run by default. You may 4097 wish to call this function once before you start work to ensure that 4098 things start recording, and then again after you have completed. 4099 4100 Parameters 4101 ---------- 4102 start : Number or string 4103 When you want to start recording 4104 If a number it should be the result of calling time() 4105 If a string then it should be a time difference before now, 4106 like '60s' or '500 ms' 4107 stop : Number or string 4108 When you want to stop recording 4109 count : int 4110 The number of desired records, ignored if both start and stop are 4111 specified 4112 plot : boolean, str 4113 If true then also return a Bokeh figure 4114 If plot == 'save' then save the figure to a file 4115 filename : str (optional) 4116 The filename to save to if you set ``plot='save'`` 4117 bokeh_resources : bokeh.resources.Resources (optional) 4118 Specifies if the resource component is INLINE or CDN 4119 4120 Examples 4121 -------- 4122 >>> client.get_task_stream() # prime plugin if not already connected 4123 >>> x.compute() # do some work 4124 >>> client.get_task_stream() 4125 [{'task': ..., 4126 'type': ..., 4127 'thread': ..., 4128 ...}] 4129 4130 Pass the ``plot=True`` or ``plot='save'`` keywords to get back a Bokeh 4131 figure 4132 4133 >>> data, figure = client.get_task_stream(plot='save', filename='myfile.html') 4134 4135 Alternatively consider the context manager 4136 4137 >>> from dask.distributed import get_task_stream 4138 >>> with get_task_stream() as ts: 4139 ... x.compute() 4140 >>> ts.data 4141 [...] 4142 4143 Returns 4144 ------- 4145 L: List[Dict] 4146 4147 See Also 4148 -------- 4149 get_task_stream : a context manager version of this method 4150 """ 4151 return self.sync( 4152 self._get_task_stream, 4153 start=start, 4154 stop=stop, 4155 count=count, 4156 plot=plot, 4157 filename=filename, 4158 bokeh_resources=bokeh_resources, 4159 ) 4160 4161 async def _get_task_stream( 4162 self, 4163 start=None, 4164 stop=None, 4165 count=None, 4166 plot=False, 4167 filename="task-stream.html", 4168 bokeh_resources=None, 4169 ): 4170 msgs = await self.scheduler.get_task_stream(start=start, stop=stop, count=count) 4171 if plot: 4172 from .diagnostics.task_stream import rectangles 4173 4174 rects = rectangles(msgs) 4175 from .dashboard.components.scheduler import task_stream_figure 4176 4177 source, figure = task_stream_figure(sizing_mode="stretch_both") 4178 source.data.update(rects) 4179 if plot == "save": 4180 from bokeh.plotting import output_file, save 4181 4182 output_file(filename=filename, title="Dask Task Stream") 4183 save(figure, filename=filename, resources=bokeh_resources) 4184 return (msgs, figure) 4185 else: 4186 return msgs 4187 4188 async def _register_scheduler_plugin(self, plugin, name, **kwargs): 4189 if isinstance(plugin, type): 4190 plugin = plugin(**kwargs) 4191 4192 return await self.scheduler.register_scheduler_plugin( 4193 plugin=dumps(plugin, protocol=4), 4194 name=name, 4195 ) 4196 4197 def register_scheduler_plugin(self, plugin, name=None, **kwargs): 4198 """Register a scheduler plugin. 4199 4200 See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins 4201 4202 Parameters 4203 ---------- 4204 plugin : SchedulerPlugin 4205 Plugin class or object to pass to the scheduler. 4206 name : str 4207 Name for the plugin; if None, a name is taken from the 4208 plugin instance or automatically generated if not present. 4209 **kwargs : Any 4210 Arguments passed to the Plugin class (if Plugin is an 4211 instance kwargs are unused). 4212 4213 """ 4214 if name is None: 4215 name = _get_plugin_name(plugin) 4216 4217 return self.sync( 4218 self._register_scheduler_plugin, 4219 plugin=plugin, 4220 name=name, 4221 **kwargs, 4222 ) 4223 4224 def register_worker_callbacks(self, setup=None): 4225 """ 4226 Registers a setup callback function for all current and future workers. 4227 4228 This registers a new setup function for workers in this cluster. The 4229 function will run immediately on all currently connected workers. It 4230 will also be run upon connection by any workers that are added in the 4231 future. Multiple setup functions can be registered - these will be 4232 called in the order they were added. 4233 4234 If the function takes an input argument named ``dask_worker`` then 4235 that variable will be populated with the worker itself. 4236 4237 Parameters 4238 ---------- 4239 setup : callable(dask_worker: Worker) -> None 4240 Function to register and run on all workers 4241 """ 4242 return self.register_worker_plugin(_WorkerSetupPlugin(setup)) 4243 4244 async def _register_worker_plugin(self, plugin=None, name=None, nanny=None): 4245 if nanny or nanny is None and isinstance(plugin, NannyPlugin): 4246 method = self.scheduler.register_nanny_plugin 4247 else: 4248 method = self.scheduler.register_worker_plugin 4249 4250 responses = await method(plugin=dumps(plugin, protocol=4), name=name) 4251 for response in responses.values(): 4252 if response["status"] == "error": 4253 _, exc, tb = clean_exception( 4254 response["exception"], response["traceback"] 4255 ) 4256 raise exc.with_traceback(tb) 4257 return responses 4258 4259 def register_worker_plugin(self, plugin=None, name=None, nanny=None, **kwargs): 4260 """ 4261 Registers a lifecycle worker plugin for all current and future workers. 4262 4263 This registers a new object to handle setup, task state transitions and 4264 teardown for workers in this cluster. The plugin will instantiate itself 4265 on all currently connected workers. It will also be run on any worker 4266 that connects in the future. 4267 4268 The plugin may include methods ``setup``, ``teardown``, ``transition``, 4269 and ``release_key``. See the 4270 ``dask.distributed.WorkerPlugin`` class or the examples below for the 4271 interface and docstrings. It must be serializable with the pickle or 4272 cloudpickle modules. 4273 4274 If the plugin has a ``name`` attribute, or if the ``name=`` keyword is 4275 used then that will control idempotency. If a plugin with that name has 4276 already been registered then any future plugins will not run. 4277 4278 For alternatives to plugins, you may also wish to look into preload 4279 scripts. 4280 4281 Parameters 4282 ---------- 4283 plugin : WorkerPlugin or NannyPlugin 4284 The plugin object to register. 4285 name : str, optional 4286 A name for the plugin. 4287 Registering a plugin with the same name will have no effect. 4288 If plugin has no name attribute a random name is used. 4289 nanny : bool, optional 4290 Whether to register the plugin with workers or nannies. 4291 **kwargs : optional 4292 If you pass a class as the plugin, instead of a class instance, then the 4293 class will be instantiated with any extra keyword arguments. 4294 4295 Examples 4296 -------- 4297 >>> class MyPlugin(WorkerPlugin): 4298 ... def __init__(self, *args, **kwargs): 4299 ... pass # the constructor is up to you 4300 ... def setup(self, worker: dask.distributed.Worker): 4301 ... pass 4302 ... def teardown(self, worker: dask.distributed.Worker): 4303 ... pass 4304 ... def transition(self, key: str, start: str, finish: str, **kwargs): 4305 ... pass 4306 ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): 4307 ... pass 4308 4309 >>> plugin = MyPlugin(1, 2, 3) 4310 >>> client.register_worker_plugin(plugin) 4311 4312 You can get access to the plugin with the ``get_worker`` function 4313 4314 >>> client.register_worker_plugin(other_plugin, name='my-plugin') 4315 >>> def f(): 4316 ... worker = get_worker() 4317 ... plugin = worker.plugins['my-plugin'] 4318 ... return plugin.my_state 4319 4320 >>> future = client.run(f) 4321 4322 See Also 4323 -------- 4324 distributed.WorkerPlugin 4325 unregister_worker_plugin 4326 """ 4327 if isinstance(plugin, type): 4328 plugin = plugin(**kwargs) 4329 4330 if name is None: 4331 name = _get_plugin_name(plugin) 4332 4333 assert name 4334 4335 return self.sync( 4336 self._register_worker_plugin, plugin=plugin, name=name, nanny=nanny 4337 ) 4338 4339 async def _unregister_worker_plugin(self, name, nanny=None): 4340 if nanny: 4341 responses = await self.scheduler.unregister_nanny_plugin(name=name) 4342 else: 4343 responses = await self.scheduler.unregister_worker_plugin(name=name) 4344 4345 for response in responses.values(): 4346 if response["status"] == "error": 4347 exc = response["exception"] 4348 tb = response["traceback"] 4349 raise exc.with_traceback(tb) 4350 return responses 4351 4352 def unregister_worker_plugin(self, name, nanny=None): 4353 """Unregisters a lifecycle worker plugin 4354 4355 This unregisters an existing worker plugin. As part of the unregistration process 4356 the plugin's ``teardown`` method will be called. 4357 4358 Parameters 4359 ---------- 4360 name : str 4361 Name of the plugin to unregister. See the :meth:`Client.register_worker_plugin` 4362 docstring for more information. 4363 4364 Examples 4365 -------- 4366 >>> class MyPlugin(WorkerPlugin): 4367 ... def __init__(self, *args, **kwargs): 4368 ... pass # the constructor is up to you 4369 ... def setup(self, worker: dask.distributed.Worker): 4370 ... pass 4371 ... def teardown(self, worker: dask.distributed.Worker): 4372 ... pass 4373 ... def transition(self, key: str, start: str, finish: str, **kwargs): 4374 ... pass 4375 ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): 4376 ... pass 4377 4378 >>> plugin = MyPlugin(1, 2, 3) 4379 >>> client.register_worker_plugin(plugin, name='foo') 4380 >>> client.unregister_worker_plugin(name='foo') 4381 4382 See Also 4383 -------- 4384 register_worker_plugin 4385 """ 4386 return self.sync(self._unregister_worker_plugin, name=name, nanny=nanny) 4387 4388 @property 4389 def amm(self): 4390 """Convenience accessors for the :doc:`active_memory_manager`""" 4391 from .active_memory_manager import AMMClientProxy 4392 4393 return AMMClientProxy(self) 4394 4395 4396class _WorkerSetupPlugin(WorkerPlugin): 4397 """This is used to support older setup functions as callbacks""" 4398 4399 def __init__(self, setup): 4400 self._setup = setup 4401 4402 def setup(self, worker): 4403 if has_keyword(self._setup, "dask_worker"): 4404 return self._setup(dask_worker=worker) 4405 else: 4406 return self._setup() 4407 4408 4409class Executor(Client): 4410 """Deprecated: see Client""" 4411 4412 def __init__(self, *args, **kwargs): 4413 warnings.warn("Executor has been renamed to Client") 4414 super().__init__(*args, **kwargs) 4415 4416 4417def CompatibleExecutor(*args, **kwargs): 4418 raise Exception("This has been moved to the Client.get_executor() method") 4419 4420 4421ALL_COMPLETED = "ALL_COMPLETED" 4422FIRST_COMPLETED = "FIRST_COMPLETED" 4423 4424 4425async def _wait(fs, timeout=None, return_when=ALL_COMPLETED): 4426 if timeout is not None and not isinstance(timeout, Number): 4427 raise TypeError( 4428 "timeout= keyword received a non-numeric value.\n" 4429 "Beware that wait expects a list of values\n" 4430 " Bad: wait(x, y, z)\n" 4431 " Good: wait([x, y, z])" 4432 ) 4433 fs = futures_of(fs) 4434 if return_when == ALL_COMPLETED: 4435 wait_for = All 4436 elif return_when == FIRST_COMPLETED: 4437 wait_for = Any 4438 else: 4439 raise NotImplementedError( 4440 "Only return_when='ALL_COMPLETED' and 'FIRST_COMPLETED' are supported" 4441 ) 4442 4443 future = wait_for({f._state.wait() for f in fs}) 4444 if timeout is not None: 4445 future = asyncio.wait_for(future, timeout) 4446 await future 4447 4448 done, not_done = ( 4449 {fu for fu in fs if fu.status != "pending"}, 4450 {fu for fu in fs if fu.status == "pending"}, 4451 ) 4452 cancelled = [f.key for f in done if f.status == "cancelled"] 4453 if cancelled: 4454 raise CancelledError(cancelled) 4455 4456 return DoneAndNotDoneFutures(done, not_done) 4457 4458 4459def wait(fs, timeout=None, return_when=ALL_COMPLETED): 4460 """Wait until all/any futures are finished 4461 4462 Parameters 4463 ---------- 4464 fs : list of futures 4465 timeout : number, optional 4466 Time in seconds after which to raise a ``dask.distributed.TimeoutError`` 4467 return_when : str, optional 4468 One of `ALL_COMPLETED` or `FIRST_COMPLETED` 4469 4470 Returns 4471 ------- 4472 Named tuple of completed, not completed 4473 """ 4474 client = default_client() 4475 result = client.sync(_wait, fs, timeout=timeout, return_when=return_when) 4476 return result 4477 4478 4479async def _as_completed(fs, queue): 4480 fs = futures_of(fs) 4481 groups = groupby(lambda f: f.key, fs) 4482 firsts = [v[0] for v in groups.values()] 4483 wait_iterator = gen.WaitIterator( 4484 *map(asyncio.ensure_future, [f._state.wait() for f in firsts]) 4485 ) 4486 4487 while not wait_iterator.done(): 4488 await wait_iterator.next() 4489 # TODO: handle case of restarted futures 4490 future = firsts[wait_iterator.current_index] 4491 for f in groups[future.key]: 4492 queue.put_nowait(f) 4493 4494 4495async def _first_completed(futures): 4496 """Return a single completed future 4497 4498 See Also: 4499 _as_completed 4500 """ 4501 q = asyncio.Queue() 4502 await _as_completed(futures, q) 4503 result = await q.get() 4504 return result 4505 4506 4507class as_completed: 4508 """ 4509 Return futures in the order in which they complete 4510 4511 This returns an iterator that yields the input future objects in the order 4512 in which they complete. Calling ``next`` on the iterator will block until 4513 the next future completes, irrespective of order. 4514 4515 Additionally, you can also add more futures to this object during 4516 computation with the ``.add`` method 4517 4518 Parameters 4519 ---------- 4520 futures: Collection of futures 4521 A list of Future objects to be iterated over in the order in which they 4522 complete 4523 with_results: bool (False) 4524 Whether to wait and include results of futures as well; 4525 in this case `as_completed` yields a tuple of (future, result) 4526 raise_errors: bool (True) 4527 Whether we should raise when the result of a future raises an exception; 4528 only affects behavior when `with_results=True`. 4529 4530 Examples 4531 -------- 4532 >>> x, y, z = client.map(inc, [1, 2, 3]) # doctest: +SKIP 4533 >>> for future in as_completed([x, y, z]): # doctest: +SKIP 4534 ... print(future.result()) # doctest: +SKIP 4535 3 4536 2 4537 4 4538 4539 Add more futures during computation 4540 4541 >>> x, y, z = client.map(inc, [1, 2, 3]) # doctest: +SKIP 4542 >>> ac = as_completed([x, y, z]) # doctest: +SKIP 4543 >>> for future in ac: # doctest: +SKIP 4544 ... print(future.result()) # doctest: +SKIP 4545 ... if random.random() < 0.5: # doctest: +SKIP 4546 ... ac.add(c.submit(double, future)) # doctest: +SKIP 4547 4 4548 2 4549 8 4550 3 4551 6 4552 12 4553 24 4554 4555 Optionally wait until the result has been gathered as well 4556 4557 >>> ac = as_completed([x, y, z], with_results=True) # doctest: +SKIP 4558 >>> for future, result in ac: # doctest: +SKIP 4559 ... print(result) # doctest: +SKIP 4560 2 4561 4 4562 3 4563 """ 4564 4565 def __init__(self, futures=None, loop=None, with_results=False, raise_errors=True): 4566 if futures is None: 4567 futures = [] 4568 self.futures = defaultdict(lambda: 0) 4569 self.queue = pyQueue() 4570 self.lock = threading.Lock() 4571 self.loop = loop or default_client().loop 4572 self.thread_condition = threading.Condition() 4573 self.with_results = with_results 4574 self.raise_errors = raise_errors 4575 4576 if futures: 4577 self.update(futures) 4578 4579 @property 4580 def condition(self): 4581 try: 4582 return self._condition 4583 except AttributeError: 4584 self._condition = asyncio.Condition() 4585 return self._condition 4586 4587 async def _track_future(self, future): 4588 try: 4589 await _wait(future) 4590 except CancelledError: 4591 pass 4592 if self.with_results: 4593 try: 4594 result = await future._result(raiseit=False) 4595 except CancelledError as exc: 4596 result = exc 4597 with self.lock: 4598 if future in self.futures: 4599 self.futures[future] -= 1 4600 if not self.futures[future]: 4601 del self.futures[future] 4602 if self.with_results: 4603 self.queue.put_nowait((future, result)) 4604 else: 4605 self.queue.put_nowait(future) 4606 async with self.condition: 4607 self.condition.notify() 4608 with self.thread_condition: 4609 self.thread_condition.notify() 4610 4611 def update(self, futures): 4612 """Add multiple futures to the collection. 4613 4614 The added futures will emit from the iterator once they finish""" 4615 from .actor import ActorFuture 4616 4617 with self.lock: 4618 for f in futures: 4619 if not isinstance(f, (Future, ActorFuture)): 4620 raise TypeError("Input must be a future, got %s" % f) 4621 self.futures[f] += 1 4622 self.loop.add_callback(self._track_future, f) 4623 4624 def add(self, future): 4625 """Add a future to the collection 4626 4627 This future will emit from the iterator once it finishes 4628 """ 4629 self.update((future,)) 4630 4631 def is_empty(self): 4632 """Returns True if there no completed or computing futures""" 4633 return not self.count() 4634 4635 def has_ready(self): 4636 """Returns True if there are completed futures available.""" 4637 return not self.queue.empty() 4638 4639 def count(self): 4640 """Return the number of futures yet to be returned 4641 4642 This includes both the number of futures still computing, as well as 4643 those that are finished, but have not yet been returned from this 4644 iterator. 4645 """ 4646 with self.lock: 4647 return len(self.futures) + len(self.queue.queue) 4648 4649 def __repr__(self): 4650 return "<as_completed: waiting={} done={}>".format( 4651 len(self.futures), len(self.queue.queue) 4652 ) 4653 4654 def __iter__(self): 4655 return self 4656 4657 def __aiter__(self): 4658 return self 4659 4660 def _get_and_raise(self): 4661 res = self.queue.get() 4662 if self.with_results: 4663 future, result = res 4664 if self.raise_errors and future.status == "error": 4665 typ, exc, tb = result 4666 raise exc.with_traceback(tb) 4667 return res 4668 4669 def __next__(self): 4670 while self.queue.empty(): 4671 if self.is_empty(): 4672 raise StopIteration() 4673 with self.thread_condition: 4674 self.thread_condition.wait(timeout=0.100) 4675 return self._get_and_raise() 4676 4677 async def __anext__(self): 4678 if not self.futures and self.queue.empty(): 4679 raise StopAsyncIteration 4680 while self.queue.empty(): 4681 if not self.futures: 4682 raise StopAsyncIteration 4683 async with self.condition: 4684 await self.condition.wait() 4685 4686 return self._get_and_raise() 4687 4688 next = __next__ 4689 4690 def next_batch(self, block=True): 4691 """Get the next batch of completed futures. 4692 4693 Parameters 4694 ---------- 4695 block : bool, optional 4696 If True then wait until we have some result, otherwise return 4697 immediately, even with an empty list. Defaults to True. 4698 4699 Examples 4700 -------- 4701 >>> ac = as_completed(futures) # doctest: +SKIP 4702 >>> client.gather(ac.next_batch()) # doctest: +SKIP 4703 [4, 1, 3] 4704 4705 >>> client.gather(ac.next_batch(block=False)) # doctest: +SKIP 4706 [] 4707 4708 Returns 4709 ------- 4710 List of futures or (future, result) tuples 4711 """ 4712 if block: 4713 batch = [next(self)] 4714 else: 4715 batch = [] 4716 while not self.queue.empty(): 4717 batch.append(self.queue.get()) 4718 return batch 4719 4720 def batches(self): 4721 """ 4722 Yield all finished futures at once rather than one-by-one 4723 4724 This returns an iterator of lists of futures or lists of 4725 (future, result) tuples rather than individual futures or individual 4726 (future, result) tuples. It will yield these as soon as possible 4727 without waiting. 4728 4729 Examples 4730 -------- 4731 >>> for batch in as_completed(futures).batches(): # doctest: +SKIP 4732 ... results = client.gather(batch) 4733 ... print(results) 4734 [4, 2] 4735 [1, 3, 7] 4736 [5] 4737 [6] 4738 """ 4739 while True: 4740 try: 4741 yield self.next_batch(block=True) 4742 except StopIteration: 4743 return 4744 4745 def clear(self): 4746 """Clear out all submitted futures""" 4747 with self.lock: 4748 self.futures.clear() 4749 while not self.queue.empty(): 4750 self.queue.get() 4751 4752 4753def AsCompleted(*args, **kwargs): 4754 raise Exception("This has moved to as_completed") 4755 4756 4757def default_client(c=None): 4758 """Return a client if one has started""" 4759 c = c or _get_global_client() 4760 if c: 4761 return c 4762 else: 4763 raise ValueError( 4764 "No clients found\n" 4765 "Start a client and point it to the scheduler address\n" 4766 " from distributed import Client\n" 4767 " client = Client('ip-addr-of-scheduler:8786')\n" 4768 ) 4769 4770 4771def ensure_default_get(client): 4772 dask.config.set(scheduler="dask.distributed") 4773 _set_global_client(client) 4774 4775 4776def redict_collection(c, dsk): 4777 from dask.delayed import Delayed 4778 4779 if isinstance(c, Delayed): 4780 return Delayed(c.key, dsk) 4781 else: 4782 cc = copy.copy(c) 4783 cc.dask = dsk 4784 return cc 4785 4786 4787def futures_of(o, client=None): 4788 """Future objects in a collection 4789 4790 Parameters 4791 ---------- 4792 o : collection 4793 A possibly nested collection of Dask objects 4794 4795 Examples 4796 -------- 4797 >>> futures_of(my_dask_dataframe) 4798 [<Future: finished key: ...>, 4799 <Future: pending key: ...>] 4800 4801 Returns 4802 ------- 4803 futures : List[Future] 4804 A list of futures held by those collections 4805 """ 4806 stack = [o] 4807 seen = set() 4808 futures = list() 4809 while stack: 4810 x = stack.pop() 4811 if type(x) in (tuple, set, list): 4812 stack.extend(x) 4813 elif type(x) is dict: 4814 stack.extend(x.values()) 4815 elif type(x) is SubgraphCallable: 4816 stack.extend(x.dsk.values()) 4817 elif isinstance(x, Future): 4818 if x not in seen: 4819 seen.add(x) 4820 futures.append(x) 4821 elif dask.is_dask_collection(x): 4822 stack.extend(x.__dask_graph__().values()) 4823 4824 if client is not None: 4825 bad = {f for f in futures if f.cancelled()} 4826 if bad: 4827 raise CancelledError(bad) 4828 4829 return futures[::-1] 4830 4831 4832def fire_and_forget(obj): 4833 """Run tasks at least once, even if we release the futures 4834 4835 Under normal operation Dask will not run any tasks for which there is not 4836 an active future (this avoids unnecessary work in many situations). 4837 However sometimes you want to just fire off a task, not track its future, 4838 and expect it to finish eventually. You can use this function on a future 4839 or collection of futures to ask Dask to complete the task even if no active 4840 client is tracking it. 4841 4842 The results will not be kept in memory after the task completes (unless 4843 there is an active future) so this is only useful for tasks that depend on 4844 side effects. 4845 4846 Parameters 4847 ---------- 4848 obj : Future, list, dict, dask collection 4849 The futures that you want to run at least once 4850 4851 Examples 4852 -------- 4853 >>> fire_and_forget(client.submit(func, *args)) # doctest: +SKIP 4854 """ 4855 futures = futures_of(obj) 4856 for future in futures: 4857 future.client._send_to_scheduler( 4858 { 4859 "op": "client-desires-keys", 4860 "keys": [stringify(future.key)], 4861 "client": "fire-and-forget", 4862 } 4863 ) 4864 4865 4866class get_task_stream: 4867 """ 4868 Collect task stream within a context block 4869 4870 This provides diagnostic information about every task that was run during 4871 the time when this block was active. 4872 4873 This must be used as a context manager. 4874 4875 Parameters 4876 ---------- 4877 plot: boolean, str 4878 If true then also return a Bokeh figure 4879 If plot == 'save' then save the figure to a file 4880 filename: str (optional) 4881 The filename to save to if you set ``plot='save'`` 4882 4883 Examples 4884 -------- 4885 >>> with get_task_stream() as ts: 4886 ... x.compute() 4887 >>> ts.data 4888 [...] 4889 4890 Get back a Bokeh figure and optionally save to a file 4891 4892 >>> with get_task_stream(plot='save', filename='task-stream.html') as ts: 4893 ... x.compute() 4894 >>> ts.figure 4895 <Bokeh Figure> 4896 4897 To share this file with others you may wish to upload and serve it online. 4898 A common way to do this is to upload the file as a gist, and then serve it 4899 on https://raw.githack.com :: 4900 4901 $ python -m pip install gist 4902 $ gist task-stream.html 4903 https://gist.github.com/8a5b3c74b10b413f612bb5e250856ceb 4904 4905 You can then navigate to that site, click the "Raw" button to the right of 4906 the ``task-stream.html`` file, and then provide that URL to 4907 https://raw.githack.com . This process should provide a sharable link that 4908 others can use to see your task stream plot. 4909 4910 See Also 4911 -------- 4912 Client.get_task_stream: Function version of this context manager 4913 """ 4914 4915 def __init__(self, client=None, plot=False, filename="task-stream.html"): 4916 self.data = [] 4917 self._plot = plot 4918 self._filename = filename 4919 self.figure = None 4920 self.client = client or default_client() 4921 self.client.get_task_stream(start=0, stop=0) # ensure plugin 4922 4923 def __enter__(self): 4924 self.start = time() 4925 return self 4926 4927 def __exit__(self, typ, value, traceback): 4928 L = self.client.get_task_stream( 4929 start=self.start, plot=self._plot, filename=self._filename 4930 ) 4931 if self._plot: 4932 L, self.figure = L 4933 self.data.extend(L) 4934 4935 async def __aenter__(self): 4936 return self 4937 4938 async def __aexit__(self, typ, value, traceback): 4939 L = await self.client.get_task_stream( 4940 start=self.start, plot=self._plot, filename=self._filename 4941 ) 4942 if self._plot: 4943 L, self.figure = L 4944 self.data.extend(L) 4945 4946 4947class performance_report: 4948 """Gather performance report 4949 4950 This creates a static HTML file that includes many of the same plots of the 4951 dashboard for later viewing. 4952 4953 The resulting file uses JavaScript, and so must be viewed with a web 4954 browser. Locally we recommend using ``python -m http.server`` or hosting 4955 the file live online. 4956 4957 Parameters 4958 ---------- 4959 filename: str, optional 4960 The filename to save the performance report locally 4961 4962 stacklevel: int, optional 4963 The code execution frame utilized for populating the Calling Code section 4964 of the report. Defaults to `1` which is the frame calling ``performance_report`` 4965 4966 mode: str, optional 4967 Mode parameter to pass to :func:`bokeh.io.output.output_file`. Defaults to ``None``. 4968 4969 Examples 4970 -------- 4971 >>> with performance_report(filename="myfile.html", stacklevel=1): 4972 ... x.compute() 4973 4974 $ python -m http.server 4975 $ open myfile.html 4976 """ 4977 4978 def __init__(self, filename="dask-report.html", stacklevel=1, mode=None): 4979 self.filename = filename 4980 # stacklevel 0 or less - shows dask internals which likely isn't helpful 4981 self._stacklevel = stacklevel if stacklevel > 0 else 1 4982 self.mode = mode 4983 4984 async def __aenter__(self): 4985 self.start = time() 4986 self.last_count = await get_client().run_on_scheduler( 4987 lambda dask_scheduler: dask_scheduler.monitor.count 4988 ) 4989 await get_client().get_task_stream(start=0, stop=0) # ensure plugin 4990 4991 async def __aexit__(self, typ, value, traceback, code=None): 4992 client = get_client() 4993 if code is None: 4994 code = client._get_computation_code(self._stacklevel + 1) 4995 data = await client.scheduler.performance_report( 4996 start=self.start, last_count=self.last_count, code=code, mode=self.mode 4997 ) 4998 with open(self.filename, "w") as f: 4999 f.write(data) 5000 5001 def __enter__(self): 5002 get_client().sync(self.__aenter__) 5003 5004 def __exit__(self, typ, value, traceback): 5005 client = get_client() 5006 code = client._get_computation_code(self._stacklevel + 1) 5007 client.sync(self.__aexit__, type, value, traceback, code=code) 5008 5009 5010class get_task_metadata: 5011 """Collect task metadata within a context block 5012 5013 This gathers ``TaskState`` metadata and final state from the scheduler 5014 for tasks which are submitted and finished within the scope of this 5015 context manager. 5016 5017 Examples 5018 -------- 5019 >>> with get_task_metadata() as tasks: 5020 ... x.compute() 5021 >>> tasks.metadata 5022 {...} 5023 >>> tasks.state 5024 {...} 5025 """ 5026 5027 def __init__(self): 5028 self.name = f"task-metadata-{uuid.uuid4().hex}" 5029 self.keys = set() 5030 self.metadata = None 5031 self.state = None 5032 5033 async def __aenter__(self): 5034 await get_client().scheduler.start_task_metadata(name=self.name) 5035 return self 5036 5037 async def __aexit__(self, typ, value, traceback): 5038 response = await get_client().scheduler.stop_task_metadata(name=self.name) 5039 self.metadata = response["metadata"] 5040 self.state = response["state"] 5041 5042 def __enter__(self): 5043 return get_client().sync(self.__aenter__) 5044 5045 def __exit__(self, typ, value, traceback): 5046 return get_client().sync(self.__aexit__, type, value, traceback) 5047 5048 5049@contextmanager 5050def temp_default_client(c): 5051 """Set the default client for the duration of the context 5052 5053 .. note:: 5054 This function should be used exclusively for unit testing the default client 5055 functionality. In all other cases, please use ``Client.as_current`` instead. 5056 5057 .. note:: 5058 Unlike ``Client.as_current``, this context manager is neither thread-local nor 5059 task-local. 5060 5061 Parameters 5062 ---------- 5063 c : Client 5064 This is what default_client() will return within the with-block. 5065 """ 5066 old_exec = default_client() 5067 _set_global_client(c) 5068 try: 5069 yield 5070 finally: 5071 _set_global_client(old_exec) 5072 5073 5074def _close_global_client(): 5075 """ 5076 Force close of global client. This cleans up when a client 5077 wasn't close explicitly, e.g. interactive sessions. 5078 """ 5079 c = _get_global_client() 5080 if c is not None: 5081 c._should_close_loop = False 5082 with suppress(TimeoutError, RuntimeError): 5083 if c.asynchronous: 5084 c.loop.add_callback(c.close, timeout=3) 5085 else: 5086 c.close(timeout=3) 5087 5088 5089atexit.register(_close_global_client) 5090