1import logging
2
3from .conftest import make_logging_logger
4
5from loguru import logger
6
7
8class InterceptHandler(logging.Handler):
9    def emit(self, record):
10        # Get corresponding Loguru level if it exists
11        try:
12            level = logger.level(record.levelname).name
13        except ValueError:
14            level = record.levelno
15
16        # Find caller from where originated the logged message
17        frame, depth = logging.currentframe(), 2
18        while frame.f_code.co_filename == logging.__file__:
19            frame = frame.f_back
20            depth += 1
21
22        logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
23
24
25def test_formatting(writer):
26    fmt = "{name} - {file.name} - {function} - {level.name} - {level.no} - {line} - {module} - {message}"
27    expected = "tests.test_interception - test_interception.py - test_formatting - DEBUG - 10 - 31 - test_interception - This is the message\n"
28
29    with make_logging_logger("tests", InterceptHandler()) as logging_logger:
30        logger.add(writer, format=fmt)
31        logging_logger.debug("This is the %s", "message")
32
33    result = writer.read()
34    assert result == expected
35
36
37def test_intercept(writer):
38    with make_logging_logger(None, InterceptHandler()) as logging_logger:
39        logging_logger.info("Nope")
40        logger.add(writer, format="{message}")
41        logging_logger.info("Test")
42
43    result = writer.read()
44    assert result == "Test\n"
45
46
47def test_add_before_intercept(writer):
48    logger.add(writer, format="{message}")
49
50    with make_logging_logger(None, InterceptHandler()) as logging_logger:
51        logging_logger.info("Test")
52
53    result = writer.read()
54    assert result == "Test\n"
55
56
57def test_remove_interception(writer):
58    h = InterceptHandler()
59
60    with make_logging_logger("foobar", h) as logging_logger:
61        logger.add(writer, format="{message}")
62        logging_logger.debug("1")
63        logging_logger.removeHandler(h)
64        logging_logger.debug("2")
65
66    result = writer.read()
67    assert result == "1\n"
68
69
70def test_intercept_too_low(writer):
71    with make_logging_logger("tests.test_interception", InterceptHandler()):
72        logger.add(writer, format="{message}")
73        logging.getLogger("tests").error("Nope 1")
74        logging.getLogger("foobar").error("Nope 2")
75
76    result = writer.read()
77    assert result == ""
78
79
80def test_multiple_intercept(writer):
81    with make_logging_logger("test_1", InterceptHandler()) as logging_logger_1:
82        with make_logging_logger("test_2", InterceptHandler()) as logging_logger_2:
83            logger.add(writer, format="{message}")
84            logging_logger_1.info("1")
85            logging_logger_2.info("2")
86
87    result = writer.read()
88    assert result == "1\n2\n"
89
90
91def test_exception(writer):
92    with make_logging_logger("tests.test_interception", InterceptHandler()) as logging_logger:
93        logger.add(writer, format="{message}")
94
95        try:
96            1 / 0
97        except:
98            logging_logger.exception("Oops...")
99
100    lines = writer.read().strip().splitlines()
101    assert lines[0] == "Oops..."
102    assert lines[-1] == "ZeroDivisionError: division by zero"
103    assert sum(line.startswith("> ") for line in lines) == 1
104
105
106def test_level_is_no(writer):
107    with make_logging_logger("tests", InterceptHandler()) as logging_logger:
108        logger.add(writer, format="<lvl>{level.no} - {level.name} - {message}</lvl>", colorize=True)
109        logging_logger.log(12, "Hop")
110
111    result = writer.read()
112    assert result == "12 - Level 12 - Hop\x1b[0m\n"
113
114
115def test_level_does_not_exist(writer):
116    logging.addLevelName(152, "FANCY_LEVEL")
117
118    with make_logging_logger("tests", InterceptHandler()) as logging_logger:
119        logger.add(writer, format="<lvl>{level.no} - {level.name} - {message}</lvl>", colorize=True)
120        logging_logger.log(152, "Nop")
121
122    result = writer.read()
123    assert result == "152 - Level 152 - Nop\x1b[0m\n"
124
125
126def test_level_exist_builtin(writer):
127    with make_logging_logger("tests", InterceptHandler()) as logging_logger:
128        logger.add(writer, format="<lvl>{level.no} - {level.name} - {message}</lvl>", colorize=True)
129        logging_logger.error("Error...")
130
131    result = writer.read()
132    assert result == "\x1b[31m\x1b[1m40 - ERROR - Error...\x1b[0m\n"
133
134
135def test_level_exists_custom(writer):
136    logging.addLevelName(99, "ANOTHER_FANCY_LEVEL")
137    logger.level("ANOTHER_FANCY_LEVEL", no=99, color="<green>", icon="")
138
139    with make_logging_logger("tests", InterceptHandler()) as logging_logger:
140        logger.add(writer, format="<lvl>{level.no} - {level.name} - {message}</lvl>", colorize=True)
141        logging_logger.log(99, "Yep!")
142
143    result = writer.read()
144    assert result == "\x1b[32m99 - ANOTHER_FANCY_LEVEL - Yep!\x1b[0m\n"
145
146
147def test_using_logging_function(writer):
148    with make_logging_logger(None, InterceptHandler()):
149        logger.add(writer, format="{function} {line} {module} {file.name} {message}")
150        logging.warning("ABC")
151
152    result = writer.read()
153    assert result == "test_using_logging_function 150 test_interception test_interception.py ABC\n"
154