1"""Python part of the warnings subsystem."""
2
3import sys
4
5
6__all__ = ["warn", "warn_explicit", "showwarning",
7           "formatwarning", "filterwarnings", "simplefilter",
8           "resetwarnings", "catch_warnings"]
9
10def showwarning(message, category, filename, lineno, file=None, line=None):
11    """Hook to write a warning to a file; replace if you like."""
12    msg = WarningMessage(message, category, filename, lineno, file, line)
13    _showwarnmsg_impl(msg)
14
15def formatwarning(message, category, filename, lineno, line=None):
16    """Function to format a warning the standard way."""
17    msg = WarningMessage(message, category, filename, lineno, None, line)
18    return _formatwarnmsg_impl(msg)
19
20def _showwarnmsg_impl(msg):
21    file = msg.file
22    if file is None:
23        file = sys.stderr
24        if file is None:
25            # sys.stderr is None when run with pythonw.exe:
26            # warnings get lost
27            return
28    text = _formatwarnmsg(msg)
29    try:
30        file.write(text)
31    except OSError:
32        # the file (probably stderr) is invalid - this warning gets lost.
33        pass
34
35def _formatwarnmsg_impl(msg):
36    category = msg.category.__name__
37    s =  f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
38
39    if msg.line is None:
40        try:
41            import linecache
42            line = linecache.getline(msg.filename, msg.lineno)
43        except Exception:
44            # When a warning is logged during Python shutdown, linecache
45            # and the import machinery don't work anymore
46            line = None
47            linecache = None
48    else:
49        line = msg.line
50    if line:
51        line = line.strip()
52        s += "  %s\n" % line
53
54    if msg.source is not None:
55        try:
56            import tracemalloc
57        # Logging a warning should not raise a new exception:
58        # catch Exception, not only ImportError and RecursionError.
59        except Exception:
60            # don't suggest to enable tracemalloc if it's not available
61            tracing = True
62            tb = None
63        else:
64            tracing = tracemalloc.is_tracing()
65            try:
66                tb = tracemalloc.get_object_traceback(msg.source)
67            except Exception:
68                # When a warning is logged during Python shutdown, tracemalloc
69                # and the import machinery don't work anymore
70                tb = None
71
72        if tb is not None:
73            s += 'Object allocated at (most recent call last):\n'
74            for frame in tb:
75                s += ('  File "%s", lineno %s\n'
76                      % (frame.filename, frame.lineno))
77
78                try:
79                    if linecache is not None:
80                        line = linecache.getline(frame.filename, frame.lineno)
81                    else:
82                        line = None
83                except Exception:
84                    line = None
85                if line:
86                    line = line.strip()
87                    s += '    %s\n' % line
88        elif not tracing:
89            s += (f'{category}: Enable tracemalloc to get the object '
90                  f'allocation traceback\n')
91    return s
92
93# Keep a reference to check if the function was replaced
94_showwarning_orig = showwarning
95
96def _showwarnmsg(msg):
97    """Hook to write a warning to a file; replace if you like."""
98    try:
99        sw = showwarning
100    except NameError:
101        pass
102    else:
103        if sw is not _showwarning_orig:
104            # warnings.showwarning() was replaced
105            if not callable(sw):
106                raise TypeError("warnings.showwarning() must be set to a "
107                                "function or method")
108
109            sw(msg.message, msg.category, msg.filename, msg.lineno,
110               msg.file, msg.line)
111            return
112    _showwarnmsg_impl(msg)
113
114# Keep a reference to check if the function was replaced
115_formatwarning_orig = formatwarning
116
117def _formatwarnmsg(msg):
118    """Function to format a warning the standard way."""
119    try:
120        fw = formatwarning
121    except NameError:
122        pass
123    else:
124        if fw is not _formatwarning_orig:
125            # warnings.formatwarning() was replaced
126            return fw(msg.message, msg.category,
127                      msg.filename, msg.lineno, msg.line)
128    return _formatwarnmsg_impl(msg)
129
130def filterwarnings(action, message="", category=Warning, module="", lineno=0,
131                   append=False):
132    """Insert an entry into the list of warnings filters (at the front).
133
134    'action' -- one of "error", "ignore", "always", "default", "module",
135                or "once"
136    'message' -- a regex that the warning message must match
137    'category' -- a class that the warning must be a subclass of
138    'module' -- a regex that the module name must match
139    'lineno' -- an integer line number, 0 matches all warnings
140    'append' -- if true, append to the list of filters
141    """
142    assert action in ("error", "ignore", "always", "default", "module",
143                      "once"), "invalid action: %r" % (action,)
144    assert isinstance(message, str), "message must be a string"
145    assert isinstance(category, type), "category must be a class"
146    assert issubclass(category, Warning), "category must be a Warning subclass"
147    assert isinstance(module, str), "module must be a string"
148    assert isinstance(lineno, int) and lineno >= 0, \
149           "lineno must be an int >= 0"
150
151    if message or module:
152        import re
153
154    if message:
155        message = re.compile(message, re.I)
156    else:
157        message = None
158    if module:
159        module = re.compile(module)
160    else:
161        module = None
162
163    _add_filter(action, message, category, module, lineno, append=append)
164
165def simplefilter(action, category=Warning, lineno=0, append=False):
166    """Insert a simple entry into the list of warnings filters (at the front).
167
168    A simple filter matches all modules and messages.
169    'action' -- one of "error", "ignore", "always", "default", "module",
170                or "once"
171    'category' -- a class that the warning must be a subclass of
172    'lineno' -- an integer line number, 0 matches all warnings
173    'append' -- if true, append to the list of filters
174    """
175    assert action in ("error", "ignore", "always", "default", "module",
176                      "once"), "invalid action: %r" % (action,)
177    assert isinstance(lineno, int) and lineno >= 0, \
178           "lineno must be an int >= 0"
179    _add_filter(action, None, category, None, lineno, append=append)
180
181def _add_filter(*item, append):
182    # Remove possible duplicate filters, so new one will be placed
183    # in correct place. If append=True and duplicate exists, do nothing.
184    if not append:
185        try:
186            filters.remove(item)
187        except ValueError:
188            pass
189        filters.insert(0, item)
190    else:
191        if item not in filters:
192            filters.append(item)
193    _filters_mutated()
194
195def resetwarnings():
196    """Clear the list of warning filters, so that no filters are active."""
197    filters[:] = []
198    _filters_mutated()
199
200class _OptionError(Exception):
201    """Exception used by option processing helpers."""
202    pass
203
204# Helper to process -W options passed via sys.warnoptions
205def _processoptions(args):
206    for arg in args:
207        try:
208            _setoption(arg)
209        except _OptionError as msg:
210            print("Invalid -W option ignored:", msg, file=sys.stderr)
211
212# Helper for _processoptions()
213def _setoption(arg):
214    parts = arg.split(':')
215    if len(parts) > 5:
216        raise _OptionError("too many fields (max 5): %r" % (arg,))
217    while len(parts) < 5:
218        parts.append('')
219    action, message, category, module, lineno = [s.strip()
220                                                 for s in parts]
221    action = _getaction(action)
222    category = _getcategory(category)
223    if message or module:
224        import re
225    if message:
226        message = re.escape(message)
227    if module:
228        module = re.escape(module) + r'\Z'
229    if lineno:
230        try:
231            lineno = int(lineno)
232            if lineno < 0:
233                raise ValueError
234        except (ValueError, OverflowError):
235            raise _OptionError("invalid lineno %r" % (lineno,)) from None
236    else:
237        lineno = 0
238    filterwarnings(action, message, category, module, lineno)
239
240# Helper for _setoption()
241def _getaction(action):
242    if not action:
243        return "default"
244    if action == "all": return "always" # Alias
245    for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
246        if a.startswith(action):
247            return a
248    raise _OptionError("invalid action: %r" % (action,))
249
250# Helper for _setoption()
251def _getcategory(category):
252    if not category:
253        return Warning
254    if '.' not in category:
255        import builtins as m
256        klass = category
257    else:
258        module, _, klass = category.rpartition('.')
259        try:
260            m = __import__(module, None, None, [klass])
261        except ImportError:
262            raise _OptionError("invalid module name: %r" % (module,)) from None
263    try:
264        cat = getattr(m, klass)
265    except AttributeError:
266        raise _OptionError("unknown warning category: %r" % (category,)) from None
267    if not issubclass(cat, Warning):
268        raise _OptionError("invalid warning category: %r" % (category,))
269    return cat
270
271
272def _is_internal_frame(frame):
273    """Signal whether the frame is an internal CPython implementation detail."""
274    filename = frame.f_code.co_filename
275    return 'importlib' in filename and '_bootstrap' in filename
276
277
278def _next_external_frame(frame):
279    """Find the next frame that doesn't involve CPython internals."""
280    frame = frame.f_back
281    while frame is not None and _is_internal_frame(frame):
282        frame = frame.f_back
283    return frame
284
285
286# Code typically replaced by _warnings
287def warn(message, category=None, stacklevel=1, source=None):
288    """Issue a warning, or maybe ignore it or raise an exception."""
289    # Check if message is already a Warning object
290    if isinstance(message, Warning):
291        category = message.__class__
292    # Check category argument
293    if category is None:
294        category = UserWarning
295    if not (isinstance(category, type) and issubclass(category, Warning)):
296        raise TypeError("category must be a Warning subclass, "
297                        "not '{:s}'".format(type(category).__name__))
298    # Get context information
299    try:
300        if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
301            # If frame is too small to care or if the warning originated in
302            # internal code, then do not try to hide any frames.
303            frame = sys._getframe(stacklevel)
304        else:
305            frame = sys._getframe(1)
306            # Look for one frame less since the above line starts us off.
307            for x in range(stacklevel-1):
308                frame = _next_external_frame(frame)
309                if frame is None:
310                    raise ValueError
311    except ValueError:
312        globals = sys.__dict__
313        lineno = 1
314    else:
315        globals = frame.f_globals
316        lineno = frame.f_lineno
317    if '__name__' in globals:
318        module = globals['__name__']
319    else:
320        module = "<string>"
321    filename = globals.get('__file__')
322    if filename:
323        fnl = filename.lower()
324        if fnl.endswith(".pyc"):
325            filename = filename[:-1]
326    else:
327        if module == "__main__":
328            try:
329                filename = sys.argv[0]
330            except AttributeError:
331                # embedded interpreters don't have sys.argv, see bug #839151
332                filename = '__main__'
333        if not filename:
334            filename = module
335    registry = globals.setdefault("__warningregistry__", {})
336    warn_explicit(message, category, filename, lineno, module, registry,
337                  globals, source)
338
339def warn_explicit(message, category, filename, lineno,
340                  module=None, registry=None, module_globals=None,
341                  source=None):
342    lineno = int(lineno)
343    if module is None:
344        module = filename or "<unknown>"
345        if module[-3:].lower() == ".py":
346            module = module[:-3] # XXX What about leading pathname?
347    if registry is None:
348        registry = {}
349    if registry.get('version', 0) != _filters_version:
350        registry.clear()
351        registry['version'] = _filters_version
352    if isinstance(message, Warning):
353        text = str(message)
354        category = message.__class__
355    else:
356        text = message
357        message = category(message)
358    key = (text, category, lineno)
359    # Quick test for common case
360    if registry.get(key):
361        return
362    # Search the filters
363    for item in filters:
364        action, msg, cat, mod, ln = item
365        if ((msg is None or msg.match(text)) and
366            issubclass(category, cat) and
367            (mod is None or mod.match(module)) and
368            (ln == 0 or lineno == ln)):
369            break
370    else:
371        action = defaultaction
372    # Early exit actions
373    if action == "ignore":
374        return
375
376    # Prime the linecache for formatting, in case the
377    # "file" is actually in a zipfile or something.
378    import linecache
379    linecache.getlines(filename, module_globals)
380
381    if action == "error":
382        raise message
383    # Other actions
384    if action == "once":
385        registry[key] = 1
386        oncekey = (text, category)
387        if onceregistry.get(oncekey):
388            return
389        onceregistry[oncekey] = 1
390    elif action == "always":
391        pass
392    elif action == "module":
393        registry[key] = 1
394        altkey = (text, category, 0)
395        if registry.get(altkey):
396            return
397        registry[altkey] = 1
398    elif action == "default":
399        registry[key] = 1
400    else:
401        # Unrecognized actions are errors
402        raise RuntimeError(
403              "Unrecognized action (%r) in warnings.filters:\n %s" %
404              (action, item))
405    # Print message and context
406    msg = WarningMessage(message, category, filename, lineno, source)
407    _showwarnmsg(msg)
408
409
410class WarningMessage(object):
411
412    _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
413                        "line", "source")
414
415    def __init__(self, message, category, filename, lineno, file=None,
416                 line=None, source=None):
417        self.message = message
418        self.category = category
419        self.filename = filename
420        self.lineno = lineno
421        self.file = file
422        self.line = line
423        self.source = source
424        self._category_name = category.__name__ if category else None
425
426    def __str__(self):
427        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
428                    "line : %r}" % (self.message, self._category_name,
429                                    self.filename, self.lineno, self.line))
430
431
432class catch_warnings(object):
433
434    """A context manager that copies and restores the warnings filter upon
435    exiting the context.
436
437    The 'record' argument specifies whether warnings should be captured by a
438    custom implementation of warnings.showwarning() and be appended to a list
439    returned by the context manager. Otherwise None is returned by the context
440    manager. The objects appended to the list are arguments whose attributes
441    mirror the arguments to showwarning().
442
443    The 'module' argument is to specify an alternative module to the module
444    named 'warnings' and imported under that name. This argument is only useful
445    when testing the warnings module itself.
446
447    """
448
449    def __init__(self, *, record=False, module=None):
450        """Specify whether to record warnings and if an alternative module
451        should be used other than sys.modules['warnings'].
452
453        For compatibility with Python 3.0, please consider all arguments to be
454        keyword-only.
455
456        """
457        self._record = record
458        self._module = sys.modules['warnings'] if module is None else module
459        self._entered = False
460
461    def __repr__(self):
462        args = []
463        if self._record:
464            args.append("record=True")
465        if self._module is not sys.modules['warnings']:
466            args.append("module=%r" % self._module)
467        name = type(self).__name__
468        return "%s(%s)" % (name, ", ".join(args))
469
470    def __enter__(self):
471        if self._entered:
472            raise RuntimeError("Cannot enter %r twice" % self)
473        self._entered = True
474        self._filters = self._module.filters
475        self._module.filters = self._filters[:]
476        self._module._filters_mutated()
477        self._showwarning = self._module.showwarning
478        self._showwarnmsg_impl = self._module._showwarnmsg_impl
479        if self._record:
480            log = []
481            self._module._showwarnmsg_impl = log.append
482            # Reset showwarning() to the default implementation to make sure
483            # that _showwarnmsg() calls _showwarnmsg_impl()
484            self._module.showwarning = self._module._showwarning_orig
485            return log
486        else:
487            return None
488
489    def __exit__(self, *exc_info):
490        if not self._entered:
491            raise RuntimeError("Cannot exit %r without entering first" % self)
492        self._module.filters = self._filters
493        self._module._filters_mutated()
494        self._module.showwarning = self._showwarning
495        self._module._showwarnmsg_impl = self._showwarnmsg_impl
496
497
498# Private utility function called by _PyErr_WarnUnawaitedCoroutine
499def _warn_unawaited_coroutine(coro):
500    msg_lines = [
501        f"coroutine '{coro.__qualname__}' was never awaited\n"
502    ]
503    if coro.cr_origin is not None:
504        import linecache, traceback
505        def extract():
506            for filename, lineno, funcname in reversed(coro.cr_origin):
507                line = linecache.getline(filename, lineno)
508                yield (filename, lineno, funcname, line)
509        msg_lines.append("Coroutine created at (most recent call last)\n")
510        msg_lines += traceback.format_list(list(extract()))
511    msg = "".join(msg_lines).rstrip("\n")
512    # Passing source= here means that if the user happens to have tracemalloc
513    # enabled and tracking where the coroutine was created, the warning will
514    # contain that traceback. This does mean that if they have *both*
515    # coroutine origin tracking *and* tracemalloc enabled, they'll get two
516    # partially-redundant tracebacks. If we wanted to be clever we could
517    # probably detect this case and avoid it, but for now we don't bother.
518    warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
519
520
521# filters contains a sequence of filter 5-tuples
522# The components of the 5-tuple are:
523# - an action: error, ignore, always, default, module, or once
524# - a compiled regex that must match the warning message
525# - a class representing the warning category
526# - a compiled regex that must match the module that is being warned
527# - a line number for the line being warning, or 0 to mean any line
528# If either if the compiled regexs are None, match anything.
529try:
530    from _warnings import (filters, _defaultaction, _onceregistry,
531                           warn, warn_explicit, _filters_mutated)
532    defaultaction = _defaultaction
533    onceregistry = _onceregistry
534    _warnings_defaults = True
535except ImportError:
536    filters = []
537    defaultaction = "default"
538    onceregistry = {}
539
540    _filters_version = 1
541
542    def _filters_mutated():
543        global _filters_version
544        _filters_version += 1
545
546    _warnings_defaults = False
547
548
549# Module initialization
550_processoptions(sys.warnoptions)
551if not _warnings_defaults:
552    # Several warning categories are ignored by default in regular builds
553    if not hasattr(sys, 'gettotalrefcount'):
554        filterwarnings("default", category=DeprecationWarning,
555                       module="__main__", append=1)
556        simplefilter("ignore", category=DeprecationWarning, append=1)
557        simplefilter("ignore", category=PendingDeprecationWarning, append=1)
558        simplefilter("ignore", category=ImportWarning, append=1)
559        simplefilter("ignore", category=ResourceWarning, append=1)
560
561del _warnings_defaults
562