1# This file is part of Scapy
2# See http://www.secdev.org/projects/scapy for more information
3# Copyright (C) Philippe Biondi <phil@secdev.org>
4# This program is published under a GPLv2 license
5
6"""
7Logging subsystem and basic exception class.
8"""
9
10#############################
11#     Logging subsystem     #
12#############################
13
14
15import logging
16import traceback
17import time
18import warnings
19
20from scapy.consts import WINDOWS
21import scapy.modules.six as six
22
23# Typing imports
24from logging import LogRecord
25from scapy.compat import (
26    Any,
27    Dict,
28    Tuple,
29)
30
31
32class Scapy_Exception(Exception):
33    pass
34
35
36class ScapyInvalidPlatformException(Scapy_Exception):
37    pass
38
39
40class ScapyNoDstMacException(Scapy_Exception):
41    pass
42
43
44class ScapyFreqFilter(logging.Filter):
45    def __init__(self):
46        # type: () -> None
47        logging.Filter.__init__(self)
48        self.warning_table = {}  # type: Dict[int, Tuple[float, int]]  # noqa: E501
49
50    def filter(self, record):
51        # type: (LogRecord) -> bool
52        from scapy.config import conf
53        # Levels below INFO are not covered
54        if record.levelno <= logging.INFO:
55            return True
56        wt = conf.warning_threshold
57        if wt > 0:
58            stk = traceback.extract_stack()
59            caller = 0  # type: int
60            for _, l, n, _ in stk:
61                if n == 'warning':
62                    break
63                caller = l
64            tm, nb = self.warning_table.get(caller, (0, 0))
65            ltm = time.time()
66            if ltm - tm > wt:
67                tm = ltm
68                nb = 0
69            else:
70                if nb < 2:
71                    nb += 1
72                    if nb == 2:
73                        record.msg = "more " + record.msg
74                else:
75                    return False
76            self.warning_table[caller] = (tm, nb)
77        return True
78
79
80class ScapyColoredFormatter(logging.Formatter):
81    """A subclass of logging.Formatter that handles colors."""
82    levels_colored = {
83        'DEBUG': 'reset',
84        'INFO': 'reset',
85        'WARNING': 'bold+yellow',
86        'ERROR': 'bold+red',
87        'CRITICAL': 'bold+white+bg_red'
88    }
89
90    def format(self, record):
91        # type: (LogRecord) -> str
92        message = super(ScapyColoredFormatter, self).format(record)
93        from scapy.config import conf
94        message = conf.color_theme.format(
95            message,
96            self.levels_colored[record.levelname]
97        )
98        return message
99
100
101if WINDOWS:
102    # colorama is bundled within IPython, but
103    # logging.StreamHandler will be overwritten when called,
104    # so we can't wait for IPython to call it
105    try:
106        import colorama
107        colorama.init()
108    except ImportError:
109        pass
110
111# get Scapy's master logger
112log_scapy = logging.getLogger("scapy")
113# override the level if not already set
114if log_scapy.level == logging.NOTSET:
115    log_scapy.setLevel(logging.WARNING)
116# add a custom handler controlled by Scapy's config
117_handler = logging.StreamHandler()
118_handler.setFormatter(
119    ScapyColoredFormatter(
120        "%(levelname)s: %(message)s",
121    )
122)
123log_scapy.addHandler(_handler)
124# logs at runtime
125log_runtime = logging.getLogger("scapy.runtime")
126log_runtime.addFilter(ScapyFreqFilter())
127# logs in interactive functions
128log_interactive = logging.getLogger("scapy.interactive")
129log_interactive.setLevel(logging.DEBUG)
130# logs when loading Scapy
131log_loading = logging.getLogger("scapy.loading")
132
133# Apply warnings filters for python 2
134if six.PY2:
135    try:
136        with warnings.catch_warnings():
137            warnings.simplefilter("ignore")
138            from cryptography import CryptographyDeprecationWarning
139        warnings.filterwarnings("ignore",
140                                category=CryptographyDeprecationWarning)
141    except ImportError:
142        pass
143
144
145def warning(x, *args, **kargs):
146    # type: (str, *Any, **Any) -> None
147    """
148    Prints a warning during runtime.
149    """
150    log_runtime.warning(x, *args, **kargs)
151