1import math
2from types import TracebackType
3from typing import Optional, Type
4from warnings import warn
5
6from ..abc._tasks import TaskGroup, TaskStatus
7from ._compat import DeprecatedAsyncContextManager, DeprecatedAwaitable, DeprecatedAwaitableFloat
8from ._eventloop import get_asynclib
9
10
11class _IgnoredTaskStatus(TaskStatus):
12    def started(self, value: object = None) -> None:
13        pass
14
15
16TASK_STATUS_IGNORED = _IgnoredTaskStatus()
17
18
19class CancelScope(DeprecatedAsyncContextManager['CancelScope']):
20    """
21    Wraps a unit of work that can be made separately cancellable.
22
23    :param deadline: The time (clock value) when this scope is cancelled automatically
24    :param shield: ``True`` to shield the cancel scope from external cancellation
25    """
26
27    def __new__(cls, *, deadline: float = math.inf, shield: bool = False) -> 'CancelScope':
28        return get_asynclib().CancelScope(shield=shield, deadline=deadline)
29
30    def cancel(self) -> DeprecatedAwaitable:
31        """Cancel this scope immediately."""
32        raise NotImplementedError
33
34    @property
35    def deadline(self) -> float:
36        """
37        The time (clock value) when this scope is cancelled automatically.
38
39        Will be ``float('inf')`` if no timeout has been set.
40
41        """
42        raise NotImplementedError
43
44    @deadline.setter
45    def deadline(self, value: float) -> None:
46        raise NotImplementedError
47
48    @property
49    def cancel_called(self) -> bool:
50        """``True`` if :meth:`cancel` has been called."""
51        raise NotImplementedError
52
53    @property
54    def shield(self) -> bool:
55        """
56        ``True`` if this scope is shielded from external cancellation.
57
58        While a scope is shielded, it will not receive cancellations from outside.
59
60        """
61        raise NotImplementedError
62
63    @shield.setter
64    def shield(self, value: bool) -> None:
65        raise NotImplementedError
66
67    def __enter__(self) -> 'CancelScope':
68        raise NotImplementedError
69
70    def __exit__(self, exc_type: Optional[Type[BaseException]],
71                 exc_val: Optional[BaseException],
72                 exc_tb: Optional[TracebackType]) -> Optional[bool]:
73        raise NotImplementedError
74
75
76def open_cancel_scope(*, shield: bool = False) -> CancelScope:
77    """
78    Open a cancel scope.
79
80    :param shield: ``True`` to shield the cancel scope from external cancellation
81    :return: a cancel scope
82
83    .. deprecated:: 3.0
84       Use :class:`~CancelScope` directly.
85
86    """
87    warn('open_cancel_scope() is deprecated -- use CancelScope() directly', DeprecationWarning)
88    return get_asynclib().CancelScope(shield=shield)
89
90
91class FailAfterContextManager(DeprecatedAsyncContextManager):
92    def __init__(self, cancel_scope: CancelScope):
93        self._cancel_scope = cancel_scope
94
95    def __enter__(self) -> CancelScope:
96        return self._cancel_scope.__enter__()
97
98    def __exit__(self, exc_type: Optional[Type[BaseException]],
99                 exc_val: Optional[BaseException],
100                 exc_tb: Optional[TracebackType]) -> Optional[bool]:
101        retval = self._cancel_scope.__exit__(exc_type, exc_val, exc_tb)
102        if self._cancel_scope.cancel_called:
103            raise TimeoutError
104
105        return retval
106
107
108def fail_after(delay: Optional[float], shield: bool = False) -> FailAfterContextManager:
109    """
110    Create a context manager which raises a :class:`TimeoutError` if does not finish in time.
111
112    :param delay: maximum allowed time (in seconds) before raising the exception, or ``None`` to
113        disable the timeout
114    :param shield: ``True`` to shield the cancel scope from external cancellation
115    :return: a context manager that yields a cancel scope
116    :rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.abc.CancelScope`\\]
117
118    """
119    deadline = (get_asynclib().current_time() + delay) if delay is not None else math.inf
120    cancel_scope = get_asynclib().CancelScope(deadline=deadline, shield=shield)
121    return FailAfterContextManager(cancel_scope)
122
123
124def move_on_after(delay: Optional[float], shield: bool = False) -> CancelScope:
125    """
126    Create a cancel scope with a deadline that expires after the given delay.
127
128    :param delay: maximum allowed time (in seconds) before exiting the context block, or ``None``
129        to disable the timeout
130    :param shield: ``True`` to shield the cancel scope from external cancellation
131    :return: a cancel scope
132
133    """
134    deadline = (get_asynclib().current_time() + delay) if delay is not None else math.inf
135    return get_asynclib().CancelScope(deadline=deadline, shield=shield)
136
137
138def current_effective_deadline() -> DeprecatedAwaitableFloat:
139    """
140    Return the nearest deadline among all the cancel scopes effective for the current task.
141
142    :return: a clock value from the event loop's internal clock (``float('inf')`` if there is no
143        deadline in effect)
144    :rtype: float
145
146    """
147    return DeprecatedAwaitableFloat(get_asynclib().current_effective_deadline(),
148                                    current_effective_deadline)
149
150
151def create_task_group() -> 'TaskGroup':
152    """
153    Create a task group.
154
155    :return: a task group
156
157    """
158    return get_asynclib().TaskGroup()
159