1import asyncio 2import asyncio.events 3import functools 4import inspect 5import os 6import re 7import sys 8import threading 9from contextlib import contextmanager 10from glob import has_magic 11 12from .callbacks import _DEFAULT_CALLBACK 13from .exceptions import FSTimeoutError 14from .spec import AbstractFileSystem 15from .utils import PY36, is_exception, other_paths 16 17private = re.compile("_[^_]") 18 19 20async def _runner(event, coro, result, timeout=None): 21 timeout = timeout if timeout else None # convert 0 or 0.0 to None 22 if timeout is not None: 23 coro = asyncio.wait_for(coro, timeout=timeout) 24 try: 25 result[0] = await coro 26 except Exception as ex: 27 result[0] = ex 28 finally: 29 event.set() 30 31 32if PY36: 33 grl = asyncio.events._get_running_loop 34else: 35 grl = asyncio.events.get_running_loop 36 37 38def sync(loop, func, *args, timeout=None, **kwargs): 39 """ 40 Make loop run coroutine until it returns. Runs in other thread 41 """ 42 timeout = timeout if timeout else None # convert 0 or 0.0 to None 43 # NB: if the loop is not running *yet*, it is OK to submit work 44 # and we will wait for it 45 if loop is None or loop.is_closed(): 46 raise RuntimeError("Loop is not running") 47 try: 48 loop0 = grl() 49 if loop0 is loop: 50 raise NotImplementedError("Calling sync() from within a running loop") 51 except RuntimeError: 52 pass 53 coro = func(*args, **kwargs) 54 result = [None] 55 event = threading.Event() 56 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 57 while True: 58 # this loops allows thread to get interrupted 59 if event.wait(1): 60 break 61 if timeout is not None: 62 timeout -= 1 63 if timeout < 0: 64 raise FSTimeoutError 65 66 return_result = result[0] 67 if isinstance(return_result, asyncio.TimeoutError): 68 # suppress asyncio.TimeoutError, raise FSTimeoutError 69 raise FSTimeoutError from return_result 70 elif isinstance(return_result, BaseException): 71 raise return_result 72 else: 73 return return_result 74 75 76iothread = [None] # dedicated fsspec IO thread 77loop = [None] # global event loop for any non-async instance 78lock = threading.Lock() # for setting exactly one thread 79 80 81def sync_wrapper(func, obj=None): 82 """Given a function, make so can be called in async or blocking contexts 83 84 Leave obj=None if defining within a class. Pass the instance if attaching 85 as an attribute of the instance. 86 """ 87 88 @functools.wraps(func) 89 def wrapper(*args, **kwargs): 90 self = obj or args[0] 91 return sync(self.loop, func, *args, **kwargs) 92 93 return wrapper 94 95 96@contextmanager 97def _selector_policy(): 98 original_policy = asyncio.get_event_loop_policy() 99 try: 100 if ( 101 sys.version_info >= (3, 8) 102 and os.name == "nt" 103 and hasattr(asyncio, "WindowsSelectorEventLoopPolicy") 104 ): 105 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 106 107 yield 108 finally: 109 asyncio.set_event_loop_policy(original_policy) 110 111 112def get_running_loop(): 113 if hasattr(asyncio, "get_running_loop"): 114 return asyncio.get_running_loop() 115 else: 116 loop = asyncio._get_running_loop() 117 if loop is None: 118 raise RuntimeError("no running event loop") 119 else: 120 return loop 121 122 123def get_loop(): 124 """Create or return the default fsspec IO loop 125 126 The loop will be running on a separate thread. 127 """ 128 if loop[0] is None: 129 with lock: 130 # repeat the check just in case the loop got filled between the 131 # previous two calls from another thread 132 if loop[0] is None: 133 with _selector_policy(): 134 loop[0] = asyncio.new_event_loop() 135 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 136 th.daemon = True 137 th.start() 138 iothread[0] = th 139 return loop[0] 140 141 142@contextmanager 143def fsspec_loop(): 144 """Temporarily switch the current event loop to the fsspec's 145 own loop, and then revert it back after the context gets 146 terinated. 147 """ 148 try: 149 original_loop = get_running_loop() 150 except RuntimeError: 151 original_loop = None 152 153 fsspec_loop = get_loop() 154 try: 155 asyncio._set_running_loop(fsspec_loop) 156 yield fsspec_loop 157 finally: 158 asyncio._set_running_loop(original_loop) 159 160 161try: 162 import resource 163except ImportError: 164 resource = None 165 ResourceError = OSError 166else: 167 ResourceEror = resource.error 168 169_DEFAULT_BATCH_SIZE = 128 170_NOFILES_DEFAULT_BATCH_SIZE = 1280 171 172 173def _get_batch_size(nofiles=False): 174 from fsspec.config import conf 175 176 if nofiles: 177 if "nofiles_gather_batch_size" in conf: 178 return conf["nofiles_gather_batch_size"] 179 else: 180 if "gather_batch_size" in conf: 181 return conf["gather_batch_size"] 182 if nofiles: 183 return _NOFILES_DEFAULT_BATCH_SIZE 184 if resource is None: 185 return _DEFAULT_BATCH_SIZE 186 187 try: 188 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 189 except (ImportError, ValueError, ResourceError): 190 return _DEFAULT_BATCH_SIZE 191 192 if soft_limit == resource.RLIM_INFINITY: 193 return -1 194 else: 195 return soft_limit // 8 196 197 198async def _run_coros_in_chunks( 199 coros, 200 batch_size=None, 201 callback=_DEFAULT_CALLBACK, 202 timeout=None, 203 return_exceptions=False, 204 nofiles=False, 205): 206 """Run the given coroutines in chunks. 207 208 Parameters 209 ---------- 210 coros: list of coroutines to run 211 batch_size: int or None 212 Number of coroutines to submit/wait on simultaneously. 213 If -1, then it will not be any throttling. If 214 None, it will be inferred from _get_batch_size() 215 callback: fsspec.callbacks.Callback instance 216 Gets a relative_update when each coroutine completes 217 timeout: number or None 218 If given, each coroutine times out after this time. Note that, since 219 there are multiple batches, the total run time of this function will in 220 general be longer 221 return_exceptions: bool 222 Same meaning as in asyncio.gather 223 nofiles: bool 224 If inferring the batch_size, does this operation involve local files? 225 If yes, you normally expect smaller batches. 226 """ 227 228 if batch_size is None: 229 batch_size = _get_batch_size(nofiles=nofiles) 230 231 if batch_size == -1: 232 batch_size = len(coros) 233 234 assert batch_size > 0 235 results = [] 236 for start in range(0, len(coros), batch_size): 237 chunk = [ 238 asyncio.Task(asyncio.wait_for(c, timeout=timeout)) 239 for c in coros[start : start + batch_size] 240 ] 241 if callback is not _DEFAULT_CALLBACK: 242 [ 243 t.add_done_callback( 244 lambda *_, **__: callback.call("relative_update", 1) 245 ) 246 for t in chunk 247 ] 248 results.extend( 249 await asyncio.gather(*chunk, return_exceptions=return_exceptions), 250 ) 251 return results 252 253 254# these methods should be implemented as async by any async-able backend 255async_methods = [ 256 "_ls", 257 "_cat_file", 258 "_get_file", 259 "_put_file", 260 "_rm_file", 261 "_cp_file", 262 "_pipe_file", 263 "_expand_path", 264 "_info", 265 "_isfile", 266 "_isdir", 267 "_exists", 268 "_walk", 269 "_glob", 270 "_find", 271 "_du", 272 "_size", 273 "_mkdir", 274 "_makedirs", 275] 276 277 278class AsyncFileSystem(AbstractFileSystem): 279 """Async file operations, default implementations 280 281 Passes bulk operations to asyncio.gather for concurrent operation. 282 283 Implementations that have concurrent batch operations and/or async methods 284 should inherit from this class instead of AbstractFileSystem. Docstrings are 285 copied from the un-underscored method in AbstractFileSystem, if not given. 286 """ 287 288 # note that methods do not have docstring here; they will be copied 289 # for _* methods and inferred for overridden methods. 290 291 async_impl = True 292 disable_throttling = False 293 294 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 295 self.asynchronous = asynchronous 296 self._pid = os.getpid() 297 if not asynchronous: 298 self._loop = loop or get_loop() 299 else: 300 self._loop = None 301 self.batch_size = batch_size 302 super().__init__(*args, **kwargs) 303 304 @property 305 def loop(self): 306 if self._pid != os.getpid(): 307 raise RuntimeError("This class is not fork-safe") 308 return self._loop 309 310 async def _rm_file(self, path, **kwargs): 311 raise NotImplementedError 312 313 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 314 # TODO: implement on_error 315 batch_size = batch_size or self.batch_size 316 path = await self._expand_path(path, recursive=recursive) 317 return await _run_coros_in_chunks( 318 [self._rm_file(p, **kwargs) for p in path], 319 batch_size=batch_size, 320 nofiles=True, 321 ) 322 323 async def _copy( 324 self, 325 path1, 326 path2, 327 recursive=False, 328 on_error=None, 329 maxdepth=None, 330 batch_size=None, 331 **kwargs, 332 ): 333 if on_error is None and recursive: 334 on_error = "ignore" 335 elif on_error is None: 336 on_error = "raise" 337 338 paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive) 339 path2 = other_paths(paths, path2) 340 batch_size = batch_size or self.batch_size 341 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)] 342 result = await _run_coros_in_chunks( 343 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 344 ) 345 346 for ex in filter(is_exception, result): 347 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 348 continue 349 raise ex 350 351 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 352 if isinstance(path, str): 353 path = {path: value} 354 batch_size = batch_size or self.batch_size 355 return await _run_coros_in_chunks( 356 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 357 batch_size=batch_size, 358 nofiles=True, 359 ) 360 361 async def _process_limits(self, url, start, end): 362 """Helper for "Range"-based _cat_file""" 363 size = None 364 suff = False 365 if start is not None and start < 0: 366 # if start is negative and end None, end is the "suffix length" 367 if end is None: 368 end = -start 369 start = "" 370 suff = True 371 else: 372 size = size or (await self._info(url))["size"] 373 start = size + start 374 elif start is None: 375 start = 0 376 if not suff: 377 if end is not None and end < 0: 378 if start is not None: 379 size = size or (await self._info(url))["size"] 380 end = size + end 381 elif end is None: 382 end = "" 383 if isinstance(end, int): 384 end -= 1 # bytes range is inclusive 385 return "bytes=%s-%s" % (start, end) 386 387 async def _cat_file(self, path, start=None, end=None, **kwargs): 388 raise NotImplementedError 389 390 async def _cat( 391 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 392 ): 393 paths = await self._expand_path(path, recursive=recursive) 394 coros = [self._cat_file(path, **kwargs) for path in paths] 395 batch_size = batch_size or self.batch_size 396 out = await _run_coros_in_chunks( 397 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 398 ) 399 if on_error == "raise": 400 ex = next(filter(is_exception, out), False) 401 if ex: 402 raise ex 403 if ( 404 len(paths) > 1 405 or isinstance(path, list) 406 or paths[0] != self._strip_protocol(path) 407 ): 408 return { 409 k: v 410 for k, v in zip(paths, out) 411 if on_error != "omit" or not is_exception(v) 412 } 413 else: 414 return out[0] 415 416 async def _cat_ranges( 417 self, paths, starts, ends, max_gap=None, batch_size=None, **kwargs 418 ): 419 # TODO: on_error 420 if max_gap is not None: 421 # to be implemented in utils 422 raise NotImplementedError 423 if not isinstance(paths, list): 424 raise TypeError 425 if not isinstance(starts, list): 426 starts = [starts] * len(paths) 427 if not isinstance(ends, list): 428 ends = [starts] * len(paths) 429 if len(starts) != len(paths) or len(ends) != len(paths): 430 raise ValueError 431 coros = [ 432 self._cat_file(p, start=s, end=e, **kwargs) 433 for p, s, e in zip(paths, starts, ends) 434 ] 435 batch_size = batch_size or self.batch_size 436 return await _run_coros_in_chunks(coros, batch_size=batch_size, nofiles=True) 437 438 async def _put( 439 self, 440 lpath, 441 rpath, 442 recursive=False, 443 callback=_DEFAULT_CALLBACK, 444 batch_size=None, 445 **kwargs, 446 ): 447 """Copy file(s) from local. 448 449 Copies a specific file or tree of files (if recursive=True). If rpath 450 ends with a "/", it will be assumed to be a directory, and target files 451 will go within. 452 453 The put_file method will be called concurrently on a batch of files. The 454 batch_size option can configure the amount of futures that can be executed 455 at the same time. If it is -1, then all the files will be uploaded concurrently. 456 The default can be set for this instance by passing "batch_size" in the 457 constructor, or for all instances by setting the "gather_batch_size" key 458 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 459 """ 460 from .implementations.local import LocalFileSystem, make_path_posix 461 462 rpath = self._strip_protocol(rpath) 463 if isinstance(lpath, str): 464 lpath = make_path_posix(lpath) 465 fs = LocalFileSystem() 466 lpaths = fs.expand_path(lpath, recursive=recursive) 467 rpaths = other_paths( 468 lpaths, rpath, exists=isinstance(rpath, str) and await self._isdir(rpath) 469 ) 470 471 is_dir = {l: os.path.isdir(l) for l in lpaths} 472 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 473 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 474 475 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 476 batch_size = batch_size or self.batch_size 477 478 coros = [] 479 callback.call("set_size", len(file_pairs)) 480 for lfile, rfile in file_pairs: 481 callback.branch(lfile, rfile, kwargs) 482 coros.append(self._put_file(lfile, rfile, **kwargs)) 483 484 return await _run_coros_in_chunks( 485 coros, batch_size=batch_size, callback=callback 486 ) 487 488 async def _get_file(self, rpath, lpath, **kwargs): 489 raise NotImplementedError 490 491 async def _get( 492 self, rpath, lpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs 493 ): 494 """Copy file(s) to local. 495 496 Copies a specific file or tree of files (if recursive=True). If lpath 497 ends with a "/", it will be assumed to be a directory, and target files 498 will go within. Can submit a list of paths, which may be glob-patterns 499 and will be expanded. 500 501 The get_file method will be called concurrently on a batch of files. The 502 batch_size option can configure the amount of futures that can be executed 503 at the same time. If it is -1, then all the files will be uploaded concurrently. 504 The default can be set for this instance by passing "batch_size" in the 505 constructor, or for all instances by setting the "gather_batch_size" key 506 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 507 """ 508 from fsspec.implementations.local import make_path_posix 509 510 rpath = self._strip_protocol(rpath) 511 lpath = make_path_posix(lpath) 512 rpaths = await self._expand_path(rpath, recursive=recursive) 513 lpaths = other_paths(rpaths, lpath) 514 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 515 batch_size = kwargs.pop("batch_size", self.batch_size) 516 517 coros = [] 518 callback.lazy_call("set_size", len, lpaths) 519 for lpath, rpath in zip(lpaths, rpaths): 520 callback.branch(rpath, lpath, kwargs) 521 coros.append(self._get_file(rpath, lpath, **kwargs)) 522 return await _run_coros_in_chunks( 523 coros, batch_size=batch_size, callback=callback 524 ) 525 526 async def _isfile(self, path): 527 try: 528 return (await self._info(path))["type"] == "file" 529 except: # noqa: E722 530 return False 531 532 async def _isdir(self, path): 533 try: 534 return (await self._info(path))["type"] == "directory" 535 except IOError: 536 return False 537 538 async def _size(self, path): 539 return (await self._info(path)).get("size", None) 540 541 async def _sizes(self, paths, batch_size=None): 542 batch_size = batch_size or self.batch_size 543 return await _run_coros_in_chunks( 544 [self._size(p) for p in paths], batch_size=batch_size 545 ) 546 547 async def _exists(self, path): 548 try: 549 await self._info(path) 550 return True 551 except FileNotFoundError: 552 return False 553 554 async def _info(self, path, **kwargs): 555 raise NotImplementedError 556 557 async def _ls(self, path, **kwargs): 558 raise NotImplementedError 559 560 async def _walk(self, path, maxdepth=None, **kwargs): 561 path = self._strip_protocol(path) 562 full_dirs = {} 563 dirs = {} 564 files = {} 565 566 detail = kwargs.pop("detail", False) 567 try: 568 listing = await self._ls(path, detail=True, **kwargs) 569 except (FileNotFoundError, IOError): 570 if detail: 571 yield path, {}, {} 572 else: 573 yield path, [], [] 574 return 575 576 for info in listing: 577 # each info name must be at least [path]/part , but here 578 # we check also for names like [path]/part/ 579 pathname = info["name"].rstrip("/") 580 name = pathname.rsplit("/", 1)[-1] 581 if info["type"] == "directory" and pathname != path: 582 # do not include "self" path 583 full_dirs[pathname] = info 584 dirs[name] = info 585 elif pathname == path: 586 # file-like with same name as give path 587 files[""] = info 588 else: 589 files[name] = info 590 591 if detail: 592 yield path, dirs, files 593 else: 594 yield path, list(dirs), list(files) 595 596 if maxdepth is not None: 597 maxdepth -= 1 598 if maxdepth < 1: 599 return 600 601 for d in full_dirs: 602 async for _ in self._walk(d, maxdepth=maxdepth, detail=detail, **kwargs): 603 yield _ 604 605 async def _glob(self, path, **kwargs): 606 import re 607 608 ends = path.endswith("/") 609 path = self._strip_protocol(path) 610 indstar = path.find("*") if path.find("*") >= 0 else len(path) 611 indques = path.find("?") if path.find("?") >= 0 else len(path) 612 indbrace = path.find("[") if path.find("[") >= 0 else len(path) 613 614 ind = min(indstar, indques, indbrace) 615 616 detail = kwargs.pop("detail", False) 617 618 if not has_magic(path): 619 root = path 620 depth = 1 621 if ends: 622 path += "/*" 623 elif await self._exists(path): 624 if not detail: 625 return [path] 626 else: 627 return {path: await self._info(path)} 628 else: 629 if not detail: 630 return [] # glob of non-existent returns empty 631 else: 632 return {} 633 elif "/" in path[:ind]: 634 ind2 = path[:ind].rindex("/") 635 root = path[: ind2 + 1] 636 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 637 else: 638 root = "" 639 depth = None if "**" in path else path[ind + 1 :].count("/") + 1 640 641 allpaths = await self._find( 642 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 643 ) 644 # Escape characters special to python regex, leaving our supported 645 # special characters in place. 646 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html 647 # for shell globbing details. 648 pattern = ( 649 "^" 650 + ( 651 path.replace("\\", r"\\") 652 .replace(".", r"\.") 653 .replace("+", r"\+") 654 .replace("//", "/") 655 .replace("(", r"\(") 656 .replace(")", r"\)") 657 .replace("|", r"\|") 658 .replace("^", r"\^") 659 .replace("$", r"\$") 660 .replace("{", r"\{") 661 .replace("}", r"\}") 662 .rstrip("/") 663 .replace("?", ".") 664 ) 665 + "$" 666 ) 667 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) 668 pattern = re.sub("[*]", "[^/]*", pattern) 669 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) 670 out = { 671 p: allpaths[p] 672 for p in sorted(allpaths) 673 if pattern.match(p.replace("//", "/").rstrip("/")) 674 } 675 if detail: 676 return out 677 else: 678 return list(out) 679 680 async def _du(self, path, total=True, maxdepth=None, **kwargs): 681 sizes = {} 682 # async for? 683 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 684 info = await self._info(f) 685 sizes[info["name"]] = info["size"] 686 if total: 687 return sum(sizes.values()) 688 else: 689 return sizes 690 691 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 692 path = self._strip_protocol(path) 693 out = dict() 694 detail = kwargs.pop("detail", False) 695 # async for? 696 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 697 if withdirs: 698 files.update(dirs) 699 out.update({info["name"]: info for name, info in files.items()}) 700 if not out and (await self._isfile(path)): 701 # walk works on directories, but find should also return [path] 702 # when path happens to be a file 703 out[path] = {} 704 names = sorted(out) 705 if not detail: 706 return names 707 else: 708 return {name: out[name] for name in names} 709 710 async def _expand_path(self, path, recursive=False, maxdepth=None): 711 if isinstance(path, str): 712 out = await self._expand_path([path], recursive, maxdepth) 713 else: 714 # reduce depth on each recursion level unless None or 0 715 maxdepth = maxdepth if not maxdepth else maxdepth - 1 716 out = set() 717 path = [self._strip_protocol(p) for p in path] 718 for p in path: # can gather here 719 if has_magic(p): 720 bit = set(await self._glob(p)) 721 out |= bit 722 if recursive: 723 out |= set( 724 await self._expand_path( 725 list(bit), recursive=recursive, maxdepth=maxdepth 726 ) 727 ) 728 continue 729 elif recursive: 730 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 731 out |= rec 732 if p not in out and (recursive is False or (await self._exists(p))): 733 # should only check once, for the root 734 out.add(p) 735 if not out: 736 raise FileNotFoundError(path) 737 return list(sorted(out)) 738 739 async def _mkdir(self, path, create_parents=True, **kwargs): 740 pass # not necessary to implement, may not have directories 741 742 async def _makedirs(self, path, exist_ok=False): 743 pass # not necessary to implement, may not have directories 744 745 746def mirror_sync_methods(obj): 747 """Populate sync and async methods for obj 748 749 For each method will create a sync version if the name refers to an async method 750 (coroutine) and there is no override in the child class; will create an async 751 method for the corresponding sync method if there is no implementation. 752 753 Uses the methods specified in 754 - async_methods: the set that an implementation is expected to provide 755 - default_async_methods: that can be derived from their sync version in 756 AbstractFileSystem 757 - AsyncFileSystem: async-specific default coroutines 758 """ 759 from fsspec import AbstractFileSystem 760 761 for method in async_methods + dir(AsyncFileSystem): 762 if not method.startswith("_"): 763 continue 764 smethod = method[1:] 765 if private.match(method): 766 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 767 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 768 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 769 if isco and is_default: 770 mth = sync_wrapper(getattr(obj, method), obj=obj) 771 setattr(obj, smethod, mth) 772 if not mth.__doc__: 773 mth.__doc__ = getattr( 774 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 775 ) 776 777 778class FSSpecCoroutineCancel(Exception): 779 pass 780 781 782def _dump_running_tasks( 783 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 784): 785 import traceback 786 787 if PY36: 788 raise NotImplementedError("Do not call this on Py 3.6") 789 790 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 791 if printout: 792 [task.print_stack() for task in tasks] 793 out = [ 794 { 795 "locals": task._coro.cr_frame.f_locals, 796 "file": task._coro.cr_frame.f_code.co_filename, 797 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 798 "linelo": task._coro.cr_frame.f_lineno, 799 "stack": traceback.format_stack(task._coro.cr_frame), 800 "task": task if with_task else None, 801 } 802 for task in tasks 803 ] 804 if cancel: 805 for t in tasks: 806 cbs = t._callbacks 807 t.cancel() 808 asyncio.futures.Future.set_exception(t, exc) 809 asyncio.futures.Future.cancel(t) 810 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 811 try: 812 t._coro.throw(exc) # exits coro, unless explicitly handled 813 except exc: 814 pass 815 return out 816