1import linecache
2import traceback
3
4from . import base_futures
5from . import coroutines
6
7
8def _task_repr_info(task):
9    info = base_futures._future_repr_info(task)
10
11    if task._must_cancel:
12        # replace status
13        info[0] = 'cancelling'
14
15    info.insert(1, 'name=%r' % task.get_name())
16
17    coro = coroutines._format_coroutine(task._coro)
18    info.insert(2, f'coro=<{coro}>')
19
20    if task._fut_waiter is not None:
21        info.insert(3, f'wait_for={task._fut_waiter!r}')
22    return info
23
24
25def _task_get_stack(task, limit):
26    frames = []
27    if hasattr(task._coro, 'cr_frame'):
28        # case 1: 'async def' coroutines
29        f = task._coro.cr_frame
30    elif hasattr(task._coro, 'gi_frame'):
31        # case 2: legacy coroutines
32        f = task._coro.gi_frame
33    elif hasattr(task._coro, 'ag_frame'):
34        # case 3: async generators
35        f = task._coro.ag_frame
36    else:
37        # case 4: unknown objects
38        f = None
39    if f is not None:
40        while f is not None:
41            if limit is not None:
42                if limit <= 0:
43                    break
44                limit -= 1
45            frames.append(f)
46            f = f.f_back
47        frames.reverse()
48    elif task._exception is not None:
49        tb = task._exception.__traceback__
50        while tb is not None:
51            if limit is not None:
52                if limit <= 0:
53                    break
54                limit -= 1
55            frames.append(tb.tb_frame)
56            tb = tb.tb_next
57    return frames
58
59
60def _task_print_stack(task, limit, file):
61    extracted_list = []
62    checked = set()
63    for f in task.get_stack(limit=limit):
64        lineno = f.f_lineno
65        co = f.f_code
66        filename = co.co_filename
67        name = co.co_name
68        if filename not in checked:
69            checked.add(filename)
70            linecache.checkcache(filename)
71        line = linecache.getline(filename, lineno, f.f_globals)
72        extracted_list.append((filename, lineno, name, line))
73
74    exc = task._exception
75    if not extracted_list:
76        print(f'No stack for {task!r}', file=file)
77    elif exc is not None:
78        print(f'Traceback for {task!r} (most recent call last):', file=file)
79    else:
80        print(f'Stack for {task!r} (most recent call last):', file=file)
81
82    traceback.print_list(extracted_list, file=file)
83    if exc is not None:
84        for line in traceback.format_exception_only(exc.__class__, exc):
85            print(line, file=file, end='')
86