1import asyncio
2import asyncio.events
3import functools
4import inspect
5import os
6import re
7import sys
8import threading
9from contextlib import contextmanager
10from glob import has_magic
11
12from .callbacks import _DEFAULT_CALLBACK
13from .exceptions import FSTimeoutError
14from .spec import AbstractFileSystem
15from .utils import PY36, is_exception, other_paths
16
17private = re.compile("_[^_]")
18
19
20async def _runner(event, coro, result, timeout=None):
21    timeout = timeout if timeout else None  # convert 0 or 0.0 to None
22    if timeout is not None:
23        coro = asyncio.wait_for(coro, timeout=timeout)
24    try:
25        result[0] = await coro
26    except Exception as ex:
27        result[0] = ex
28    finally:
29        event.set()
30
31
32if PY36:
33    grl = asyncio.events._get_running_loop
34else:
35    grl = asyncio.events.get_running_loop
36
37
38def sync(loop, func, *args, timeout=None, **kwargs):
39    """
40    Make loop run coroutine until it returns. Runs in other thread
41    """
42    timeout = timeout if timeout else None  # convert 0 or 0.0 to None
43    # NB: if the loop is not running *yet*, it is OK to submit work
44    # and we will wait for it
45    if loop is None or loop.is_closed():
46        raise RuntimeError("Loop is not running")
47    try:
48        loop0 = grl()
49        if loop0 is loop:
50            raise NotImplementedError("Calling sync() from within a running loop")
51    except RuntimeError:
52        pass
53    coro = func(*args, **kwargs)
54    result = [None]
55    event = threading.Event()
56    asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
57    while True:
58        # this loops allows thread to get interrupted
59        if event.wait(1):
60            break
61        if timeout is not None:
62            timeout -= 1
63            if timeout < 0:
64                raise FSTimeoutError
65
66    return_result = result[0]
67    if isinstance(return_result, asyncio.TimeoutError):
68        # suppress asyncio.TimeoutError, raise FSTimeoutError
69        raise FSTimeoutError from return_result
70    elif isinstance(return_result, BaseException):
71        raise return_result
72    else:
73        return return_result
74
75
76iothread = [None]  # dedicated fsspec IO thread
77loop = [None]  # global event loop for any non-async instance
78lock = threading.Lock()  # for setting exactly one thread
79
80
81def sync_wrapper(func, obj=None):
82    """Given a function, make so can be called in async or blocking contexts
83
84    Leave obj=None if defining within a class. Pass the instance if attaching
85    as an attribute of the instance.
86    """
87
88    @functools.wraps(func)
89    def wrapper(*args, **kwargs):
90        self = obj or args[0]
91        return sync(self.loop, func, *args, **kwargs)
92
93    return wrapper
94
95
96@contextmanager
97def _selector_policy():
98    original_policy = asyncio.get_event_loop_policy()
99    try:
100        if (
101            sys.version_info >= (3, 8)
102            and os.name == "nt"
103            and hasattr(asyncio, "WindowsSelectorEventLoopPolicy")
104        ):
105            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
106
107        yield
108    finally:
109        asyncio.set_event_loop_policy(original_policy)
110
111
112def get_running_loop():
113    if hasattr(asyncio, "get_running_loop"):
114        return asyncio.get_running_loop()
115    else:
116        loop = asyncio._get_running_loop()
117        if loop is None:
118            raise RuntimeError("no running event loop")
119        else:
120            return loop
121
122
123def get_loop():
124    """Create or return the default fsspec IO loop
125
126    The loop will be running on a separate thread.
127    """
128    if loop[0] is None:
129        with lock:
130            # repeat the check just in case the loop got filled between the
131            # previous two calls from another thread
132            if loop[0] is None:
133                with _selector_policy():
134                    loop[0] = asyncio.new_event_loop()
135                th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
136                th.daemon = True
137                th.start()
138                iothread[0] = th
139    return loop[0]
140
141
142@contextmanager
143def fsspec_loop():
144    """Temporarily switch the current event loop to the fsspec's
145    own loop, and then revert it back after the context gets
146    terinated.
147    """
148    try:
149        original_loop = get_running_loop()
150    except RuntimeError:
151        original_loop = None
152
153    fsspec_loop = get_loop()
154    try:
155        asyncio._set_running_loop(fsspec_loop)
156        yield fsspec_loop
157    finally:
158        asyncio._set_running_loop(original_loop)
159
160
161try:
162    import resource
163except ImportError:
164    resource = None
165    ResourceError = OSError
166else:
167    ResourceEror = resource.error
168
169_DEFAULT_BATCH_SIZE = 128
170_NOFILES_DEFAULT_BATCH_SIZE = 1280
171
172
173def _get_batch_size(nofiles=False):
174    from fsspec.config import conf
175
176    if nofiles:
177        if "nofiles_gather_batch_size" in conf:
178            return conf["nofiles_gather_batch_size"]
179    else:
180        if "gather_batch_size" in conf:
181            return conf["gather_batch_size"]
182    if nofiles:
183        return _NOFILES_DEFAULT_BATCH_SIZE
184    if resource is None:
185        return _DEFAULT_BATCH_SIZE
186
187    try:
188        soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
189    except (ImportError, ValueError, ResourceError):
190        return _DEFAULT_BATCH_SIZE
191
192    if soft_limit == resource.RLIM_INFINITY:
193        return -1
194    else:
195        return soft_limit // 8
196
197
198async def _run_coros_in_chunks(
199    coros,
200    batch_size=None,
201    callback=_DEFAULT_CALLBACK,
202    timeout=None,
203    return_exceptions=False,
204    nofiles=False,
205):
206    """Run the given coroutines in  chunks.
207
208    Parameters
209    ----------
210    coros: list of coroutines to run
211    batch_size: int or None
212        Number of coroutines to submit/wait on simultaneously.
213        If -1, then it will not be any throttling. If
214        None, it will be inferred from _get_batch_size()
215    callback: fsspec.callbacks.Callback instance
216        Gets a relative_update when each coroutine completes
217    timeout: number or None
218        If given, each coroutine times out after this time. Note that, since
219        there are multiple batches, the total run time of this function will in
220        general be longer
221    return_exceptions: bool
222        Same meaning as in asyncio.gather
223    nofiles: bool
224        If inferring the batch_size, does this operation involve local files?
225        If yes, you normally expect smaller batches.
226    """
227
228    if batch_size is None:
229        batch_size = _get_batch_size(nofiles=nofiles)
230
231    if batch_size == -1:
232        batch_size = len(coros)
233
234    assert batch_size > 0
235    results = []
236    for start in range(0, len(coros), batch_size):
237        chunk = [
238            asyncio.Task(asyncio.wait_for(c, timeout=timeout))
239            for c in coros[start : start + batch_size]
240        ]
241        if callback is not _DEFAULT_CALLBACK:
242            [
243                t.add_done_callback(
244                    lambda *_, **__: callback.call("relative_update", 1)
245                )
246                for t in chunk
247            ]
248        results.extend(
249            await asyncio.gather(*chunk, return_exceptions=return_exceptions),
250        )
251    return results
252
253
254# these methods should be implemented as async by any async-able backend
255async_methods = [
256    "_ls",
257    "_cat_file",
258    "_get_file",
259    "_put_file",
260    "_rm_file",
261    "_cp_file",
262    "_pipe_file",
263    "_expand_path",
264    "_info",
265    "_isfile",
266    "_isdir",
267    "_exists",
268    "_walk",
269    "_glob",
270    "_find",
271    "_du",
272    "_size",
273    "_mkdir",
274    "_makedirs",
275]
276
277
278class AsyncFileSystem(AbstractFileSystem):
279    """Async file operations, default implementations
280
281    Passes bulk operations to asyncio.gather for concurrent operation.
282
283    Implementations that have concurrent batch operations and/or async methods
284    should inherit from this class instead of AbstractFileSystem. Docstrings are
285    copied from the un-underscored method in AbstractFileSystem, if not given.
286    """
287
288    # note that methods do not have docstring here; they will be copied
289    # for _* methods and inferred for overridden methods.
290
291    async_impl = True
292    disable_throttling = False
293
294    def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
295        self.asynchronous = asynchronous
296        self._pid = os.getpid()
297        if not asynchronous:
298            self._loop = loop or get_loop()
299        else:
300            self._loop = None
301        self.batch_size = batch_size
302        super().__init__(*args, **kwargs)
303
304    @property
305    def loop(self):
306        if self._pid != os.getpid():
307            raise RuntimeError("This class is not fork-safe")
308        return self._loop
309
310    async def _rm_file(self, path, **kwargs):
311        raise NotImplementedError
312
313    async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
314        # TODO: implement on_error
315        batch_size = batch_size or self.batch_size
316        path = await self._expand_path(path, recursive=recursive)
317        return await _run_coros_in_chunks(
318            [self._rm_file(p, **kwargs) for p in path],
319            batch_size=batch_size,
320            nofiles=True,
321        )
322
323    async def _copy(
324        self,
325        path1,
326        path2,
327        recursive=False,
328        on_error=None,
329        maxdepth=None,
330        batch_size=None,
331        **kwargs,
332    ):
333        if on_error is None and recursive:
334            on_error = "ignore"
335        elif on_error is None:
336            on_error = "raise"
337
338        paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive)
339        path2 = other_paths(paths, path2)
340        batch_size = batch_size or self.batch_size
341        coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
342        result = await _run_coros_in_chunks(
343            coros, batch_size=batch_size, return_exceptions=True, nofiles=True
344        )
345
346        for ex in filter(is_exception, result):
347            if on_error == "ignore" and isinstance(ex, FileNotFoundError):
348                continue
349            raise ex
350
351    async def _pipe(self, path, value=None, batch_size=None, **kwargs):
352        if isinstance(path, str):
353            path = {path: value}
354        batch_size = batch_size or self.batch_size
355        return await _run_coros_in_chunks(
356            [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
357            batch_size=batch_size,
358            nofiles=True,
359        )
360
361    async def _process_limits(self, url, start, end):
362        """Helper for "Range"-based _cat_file"""
363        size = None
364        suff = False
365        if start is not None and start < 0:
366            # if start is negative and end None, end is the "suffix length"
367            if end is None:
368                end = -start
369                start = ""
370                suff = True
371            else:
372                size = size or (await self._info(url))["size"]
373                start = size + start
374        elif start is None:
375            start = 0
376        if not suff:
377            if end is not None and end < 0:
378                if start is not None:
379                    size = size or (await self._info(url))["size"]
380                    end = size + end
381            elif end is None:
382                end = ""
383            if isinstance(end, int):
384                end -= 1  # bytes range is inclusive
385        return "bytes=%s-%s" % (start, end)
386
387    async def _cat_file(self, path, start=None, end=None, **kwargs):
388        raise NotImplementedError
389
390    async def _cat(
391        self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
392    ):
393        paths = await self._expand_path(path, recursive=recursive)
394        coros = [self._cat_file(path, **kwargs) for path in paths]
395        batch_size = batch_size or self.batch_size
396        out = await _run_coros_in_chunks(
397            coros, batch_size=batch_size, nofiles=True, return_exceptions=True
398        )
399        if on_error == "raise":
400            ex = next(filter(is_exception, out), False)
401            if ex:
402                raise ex
403        if (
404            len(paths) > 1
405            or isinstance(path, list)
406            or paths[0] != self._strip_protocol(path)
407        ):
408            return {
409                k: v
410                for k, v in zip(paths, out)
411                if on_error != "omit" or not is_exception(v)
412            }
413        else:
414            return out[0]
415
416    async def _cat_ranges(
417        self, paths, starts, ends, max_gap=None, batch_size=None, **kwargs
418    ):
419        # TODO: on_error
420        if max_gap is not None:
421            # to be implemented in utils
422            raise NotImplementedError
423        if not isinstance(paths, list):
424            raise TypeError
425        if not isinstance(starts, list):
426            starts = [starts] * len(paths)
427        if not isinstance(ends, list):
428            ends = [starts] * len(paths)
429        if len(starts) != len(paths) or len(ends) != len(paths):
430            raise ValueError
431        coros = [
432            self._cat_file(p, start=s, end=e, **kwargs)
433            for p, s, e in zip(paths, starts, ends)
434        ]
435        batch_size = batch_size or self.batch_size
436        return await _run_coros_in_chunks(coros, batch_size=batch_size, nofiles=True)
437
438    async def _put(
439        self,
440        lpath,
441        rpath,
442        recursive=False,
443        callback=_DEFAULT_CALLBACK,
444        batch_size=None,
445        **kwargs,
446    ):
447        """Copy file(s) from local.
448
449        Copies a specific file or tree of files (if recursive=True). If rpath
450        ends with a "/", it will be assumed to be a directory, and target files
451        will go within.
452
453        The put_file method will be called concurrently on a batch of files. The
454        batch_size option can configure the amount of futures that can be executed
455        at the same time. If it is -1, then all the files will be uploaded concurrently.
456        The default can be set for this instance by passing "batch_size" in the
457        constructor, or for all instances by setting the "gather_batch_size" key
458        in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
459        """
460        from .implementations.local import LocalFileSystem, make_path_posix
461
462        rpath = self._strip_protocol(rpath)
463        if isinstance(lpath, str):
464            lpath = make_path_posix(lpath)
465        fs = LocalFileSystem()
466        lpaths = fs.expand_path(lpath, recursive=recursive)
467        rpaths = other_paths(
468            lpaths, rpath, exists=isinstance(rpath, str) and await self._isdir(rpath)
469        )
470
471        is_dir = {l: os.path.isdir(l) for l in lpaths}
472        rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
473        file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
474
475        await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
476        batch_size = batch_size or self.batch_size
477
478        coros = []
479        callback.call("set_size", len(file_pairs))
480        for lfile, rfile in file_pairs:
481            callback.branch(lfile, rfile, kwargs)
482            coros.append(self._put_file(lfile, rfile, **kwargs))
483
484        return await _run_coros_in_chunks(
485            coros, batch_size=batch_size, callback=callback
486        )
487
488    async def _get_file(self, rpath, lpath, **kwargs):
489        raise NotImplementedError
490
491    async def _get(
492        self, rpath, lpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs
493    ):
494        """Copy file(s) to local.
495
496        Copies a specific file or tree of files (if recursive=True). If lpath
497        ends with a "/", it will be assumed to be a directory, and target files
498        will go within. Can submit a list of paths, which may be glob-patterns
499        and will be expanded.
500
501        The get_file method will be called concurrently on a batch of files. The
502        batch_size option can configure the amount of futures that can be executed
503        at the same time. If it is -1, then all the files will be uploaded concurrently.
504        The default can be set for this instance by passing "batch_size" in the
505        constructor, or for all instances by setting the "gather_batch_size" key
506        in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
507        """
508        from fsspec.implementations.local import make_path_posix
509
510        rpath = self._strip_protocol(rpath)
511        lpath = make_path_posix(lpath)
512        rpaths = await self._expand_path(rpath, recursive=recursive)
513        lpaths = other_paths(rpaths, lpath)
514        [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
515        batch_size = kwargs.pop("batch_size", self.batch_size)
516
517        coros = []
518        callback.lazy_call("set_size", len, lpaths)
519        for lpath, rpath in zip(lpaths, rpaths):
520            callback.branch(rpath, lpath, kwargs)
521            coros.append(self._get_file(rpath, lpath, **kwargs))
522        return await _run_coros_in_chunks(
523            coros, batch_size=batch_size, callback=callback
524        )
525
526    async def _isfile(self, path):
527        try:
528            return (await self._info(path))["type"] == "file"
529        except:  # noqa: E722
530            return False
531
532    async def _isdir(self, path):
533        try:
534            return (await self._info(path))["type"] == "directory"
535        except IOError:
536            return False
537
538    async def _size(self, path):
539        return (await self._info(path)).get("size", None)
540
541    async def _sizes(self, paths, batch_size=None):
542        batch_size = batch_size or self.batch_size
543        return await _run_coros_in_chunks(
544            [self._size(p) for p in paths], batch_size=batch_size
545        )
546
547    async def _exists(self, path):
548        try:
549            await self._info(path)
550            return True
551        except FileNotFoundError:
552            return False
553
554    async def _info(self, path, **kwargs):
555        raise NotImplementedError
556
557    async def _ls(self, path, **kwargs):
558        raise NotImplementedError
559
560    async def _walk(self, path, maxdepth=None, **kwargs):
561        path = self._strip_protocol(path)
562        full_dirs = {}
563        dirs = {}
564        files = {}
565
566        detail = kwargs.pop("detail", False)
567        try:
568            listing = await self._ls(path, detail=True, **kwargs)
569        except (FileNotFoundError, IOError):
570            if detail:
571                yield path, {}, {}
572            else:
573                yield path, [], []
574            return
575
576        for info in listing:
577            # each info name must be at least [path]/part , but here
578            # we check also for names like [path]/part/
579            pathname = info["name"].rstrip("/")
580            name = pathname.rsplit("/", 1)[-1]
581            if info["type"] == "directory" and pathname != path:
582                # do not include "self" path
583                full_dirs[pathname] = info
584                dirs[name] = info
585            elif pathname == path:
586                # file-like with same name as give path
587                files[""] = info
588            else:
589                files[name] = info
590
591        if detail:
592            yield path, dirs, files
593        else:
594            yield path, list(dirs), list(files)
595
596        if maxdepth is not None:
597            maxdepth -= 1
598            if maxdepth < 1:
599                return
600
601        for d in full_dirs:
602            async for _ in self._walk(d, maxdepth=maxdepth, detail=detail, **kwargs):
603                yield _
604
605    async def _glob(self, path, **kwargs):
606        import re
607
608        ends = path.endswith("/")
609        path = self._strip_protocol(path)
610        indstar = path.find("*") if path.find("*") >= 0 else len(path)
611        indques = path.find("?") if path.find("?") >= 0 else len(path)
612        indbrace = path.find("[") if path.find("[") >= 0 else len(path)
613
614        ind = min(indstar, indques, indbrace)
615
616        detail = kwargs.pop("detail", False)
617
618        if not has_magic(path):
619            root = path
620            depth = 1
621            if ends:
622                path += "/*"
623            elif await self._exists(path):
624                if not detail:
625                    return [path]
626                else:
627                    return {path: await self._info(path)}
628            else:
629                if not detail:
630                    return []  # glob of non-existent returns empty
631                else:
632                    return {}
633        elif "/" in path[:ind]:
634            ind2 = path[:ind].rindex("/")
635            root = path[: ind2 + 1]
636            depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
637        else:
638            root = ""
639            depth = None if "**" in path else path[ind + 1 :].count("/") + 1
640
641        allpaths = await self._find(
642            root, maxdepth=depth, withdirs=True, detail=True, **kwargs
643        )
644        # Escape characters special to python regex, leaving our supported
645        # special characters in place.
646        # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
647        # for shell globbing details.
648        pattern = (
649            "^"
650            + (
651                path.replace("\\", r"\\")
652                .replace(".", r"\.")
653                .replace("+", r"\+")
654                .replace("//", "/")
655                .replace("(", r"\(")
656                .replace(")", r"\)")
657                .replace("|", r"\|")
658                .replace("^", r"\^")
659                .replace("$", r"\$")
660                .replace("{", r"\{")
661                .replace("}", r"\}")
662                .rstrip("/")
663                .replace("?", ".")
664            )
665            + "$"
666        )
667        pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
668        pattern = re.sub("[*]", "[^/]*", pattern)
669        pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
670        out = {
671            p: allpaths[p]
672            for p in sorted(allpaths)
673            if pattern.match(p.replace("//", "/").rstrip("/"))
674        }
675        if detail:
676            return out
677        else:
678            return list(out)
679
680    async def _du(self, path, total=True, maxdepth=None, **kwargs):
681        sizes = {}
682        # async for?
683        for f in await self._find(path, maxdepth=maxdepth, **kwargs):
684            info = await self._info(f)
685            sizes[info["name"]] = info["size"]
686        if total:
687            return sum(sizes.values())
688        else:
689            return sizes
690
691    async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
692        path = self._strip_protocol(path)
693        out = dict()
694        detail = kwargs.pop("detail", False)
695        # async for?
696        async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
697            if withdirs:
698                files.update(dirs)
699            out.update({info["name"]: info for name, info in files.items()})
700        if not out and (await self._isfile(path)):
701            # walk works on directories, but find should also return [path]
702            # when path happens to be a file
703            out[path] = {}
704        names = sorted(out)
705        if not detail:
706            return names
707        else:
708            return {name: out[name] for name in names}
709
710    async def _expand_path(self, path, recursive=False, maxdepth=None):
711        if isinstance(path, str):
712            out = await self._expand_path([path], recursive, maxdepth)
713        else:
714            # reduce depth on each recursion level unless None or 0
715            maxdepth = maxdepth if not maxdepth else maxdepth - 1
716            out = set()
717            path = [self._strip_protocol(p) for p in path]
718            for p in path:  # can gather here
719                if has_magic(p):
720                    bit = set(await self._glob(p))
721                    out |= bit
722                    if recursive:
723                        out |= set(
724                            await self._expand_path(
725                                list(bit), recursive=recursive, maxdepth=maxdepth
726                            )
727                        )
728                    continue
729                elif recursive:
730                    rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
731                    out |= rec
732                if p not in out and (recursive is False or (await self._exists(p))):
733                    # should only check once, for the root
734                    out.add(p)
735        if not out:
736            raise FileNotFoundError(path)
737        return list(sorted(out))
738
739    async def _mkdir(self, path, create_parents=True, **kwargs):
740        pass  # not necessary to implement, may not have directories
741
742    async def _makedirs(self, path, exist_ok=False):
743        pass  # not necessary to implement, may not have directories
744
745
746def mirror_sync_methods(obj):
747    """Populate sync and async methods for obj
748
749    For each method will create a sync version if the name refers to an async method
750    (coroutine) and there is no override in the child class; will create an async
751    method for the corresponding sync method if there is no implementation.
752
753    Uses the methods specified in
754    - async_methods: the set that an implementation is expected to provide
755    - default_async_methods: that can be derived from their sync version in
756      AbstractFileSystem
757    - AsyncFileSystem: async-specific default coroutines
758    """
759    from fsspec import AbstractFileSystem
760
761    for method in async_methods + dir(AsyncFileSystem):
762        if not method.startswith("_"):
763            continue
764        smethod = method[1:]
765        if private.match(method):
766            isco = inspect.iscoroutinefunction(getattr(obj, method, None))
767            unsync = getattr(getattr(obj, smethod, False), "__func__", None)
768            is_default = unsync is getattr(AbstractFileSystem, smethod, "")
769            if isco and is_default:
770                mth = sync_wrapper(getattr(obj, method), obj=obj)
771                setattr(obj, smethod, mth)
772                if not mth.__doc__:
773                    mth.__doc__ = getattr(
774                        getattr(AbstractFileSystem, smethod, None), "__doc__", ""
775                    )
776
777
778class FSSpecCoroutineCancel(Exception):
779    pass
780
781
782def _dump_running_tasks(
783    printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
784):
785    import traceback
786
787    if PY36:
788        raise NotImplementedError("Do not call this on Py 3.6")
789
790    tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
791    if printout:
792        [task.print_stack() for task in tasks]
793    out = [
794        {
795            "locals": task._coro.cr_frame.f_locals,
796            "file": task._coro.cr_frame.f_code.co_filename,
797            "firstline": task._coro.cr_frame.f_code.co_firstlineno,
798            "linelo": task._coro.cr_frame.f_lineno,
799            "stack": traceback.format_stack(task._coro.cr_frame),
800            "task": task if with_task else None,
801        }
802        for task in tasks
803    ]
804    if cancel:
805        for t in tasks:
806            cbs = t._callbacks
807            t.cancel()
808            asyncio.futures.Future.set_exception(t, exc)
809            asyncio.futures.Future.cancel(t)
810            [cb[0](t) for cb in cbs]  # cancels any dependent concurrent.futures
811            try:
812                t._coro.throw(exc)  # exits coro, unless explicitly handled
813            except exc:
814                pass
815    return out
816