1# Copyright 2001-2019 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2019 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import copy
29import datetime
30import pathlib
31import pickle
32import io
33import gc
34import json
35import os
36import queue
37import random
38import re
39import socket
40import struct
41import sys
42import tempfile
43from test.support.script_helper import assert_python_ok, assert_python_failure
44from test import support
45from test.support import os_helper
46from test.support import socket_helper
47from test.support import threading_helper
48from test.support import warnings_helper
49from test.support.logging_helper import TestHandler
50import textwrap
51import threading
52import time
53import unittest
54import warnings
55import weakref
56
57from http.server import HTTPServer, BaseHTTPRequestHandler
58from urllib.parse import urlparse, parse_qs
59from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
60                          ThreadingTCPServer, StreamRequestHandler)
61
62with warnings.catch_warnings():
63    warnings.simplefilter('ignore', DeprecationWarning)
64    import asyncore
65    import smtpd
66
67try:
68    import win32evtlog, win32evtlogutil, pywintypes
69except ImportError:
70    win32evtlog = win32evtlogutil = pywintypes = None
71
72try:
73    import zlib
74except ImportError:
75    pass
76
77class BaseTest(unittest.TestCase):
78
79    """Base class for logging tests."""
80
81    log_format = "%(name)s -> %(levelname)s: %(message)s"
82    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
83    message_num = 0
84
85    def setUp(self):
86        """Setup the default logging stream to an internal StringIO instance,
87        so that we can examine log output as we want."""
88        self._threading_key = threading_helper.threading_setup()
89
90        logger_dict = logging.getLogger().manager.loggerDict
91        logging._acquireLock()
92        try:
93            self.saved_handlers = logging._handlers.copy()
94            self.saved_handler_list = logging._handlerList[:]
95            self.saved_loggers = saved_loggers = logger_dict.copy()
96            self.saved_name_to_level = logging._nameToLevel.copy()
97            self.saved_level_to_name = logging._levelToName.copy()
98            self.logger_states = logger_states = {}
99            for name in saved_loggers:
100                logger_states[name] = getattr(saved_loggers[name],
101                                              'disabled', None)
102        finally:
103            logging._releaseLock()
104
105        # Set two unused loggers
106        self.logger1 = logging.getLogger("\xab\xd7\xbb")
107        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
108
109        self.root_logger = logging.getLogger("")
110        self.original_logging_level = self.root_logger.getEffectiveLevel()
111
112        self.stream = io.StringIO()
113        self.root_logger.setLevel(logging.DEBUG)
114        self.root_hdlr = logging.StreamHandler(self.stream)
115        self.root_formatter = logging.Formatter(self.log_format)
116        self.root_hdlr.setFormatter(self.root_formatter)
117        if self.logger1.hasHandlers():
118            hlist = self.logger1.handlers + self.root_logger.handlers
119            raise AssertionError('Unexpected handlers: %s' % hlist)
120        if self.logger2.hasHandlers():
121            hlist = self.logger2.handlers + self.root_logger.handlers
122            raise AssertionError('Unexpected handlers: %s' % hlist)
123        self.root_logger.addHandler(self.root_hdlr)
124        self.assertTrue(self.logger1.hasHandlers())
125        self.assertTrue(self.logger2.hasHandlers())
126
127    def tearDown(self):
128        """Remove our logging stream, and restore the original logging
129        level."""
130        self.stream.close()
131        self.root_logger.removeHandler(self.root_hdlr)
132        while self.root_logger.handlers:
133            h = self.root_logger.handlers[0]
134            self.root_logger.removeHandler(h)
135            h.close()
136        self.root_logger.setLevel(self.original_logging_level)
137        logging._acquireLock()
138        try:
139            logging._levelToName.clear()
140            logging._levelToName.update(self.saved_level_to_name)
141            logging._nameToLevel.clear()
142            logging._nameToLevel.update(self.saved_name_to_level)
143            logging._handlers.clear()
144            logging._handlers.update(self.saved_handlers)
145            logging._handlerList[:] = self.saved_handler_list
146            manager = logging.getLogger().manager
147            manager.disable = 0
148            loggerDict = manager.loggerDict
149            loggerDict.clear()
150            loggerDict.update(self.saved_loggers)
151            logger_states = self.logger_states
152            for name in self.logger_states:
153                if logger_states[name] is not None:
154                    self.saved_loggers[name].disabled = logger_states[name]
155        finally:
156            logging._releaseLock()
157
158        self.doCleanups()
159        threading_helper.threading_cleanup(*self._threading_key)
160
161    def assert_log_lines(self, expected_values, stream=None, pat=None):
162        """Match the collected log lines against the regular expression
163        self.expected_log_pat, and compare the extracted group values to
164        the expected_values list of tuples."""
165        stream = stream or self.stream
166        pat = re.compile(pat or self.expected_log_pat)
167        actual_lines = stream.getvalue().splitlines()
168        self.assertEqual(len(actual_lines), len(expected_values))
169        for actual, expected in zip(actual_lines, expected_values):
170            match = pat.search(actual)
171            if not match:
172                self.fail("Log line does not match expected pattern:\n" +
173                            actual)
174            self.assertEqual(tuple(match.groups()), expected)
175        s = stream.read()
176        if s:
177            self.fail("Remaining output at end of log stream:\n" + s)
178
179    def next_message(self):
180        """Generate a message consisting solely of an auto-incrementing
181        integer."""
182        self.message_num += 1
183        return "%d" % self.message_num
184
185
186class BuiltinLevelsTest(BaseTest):
187    """Test builtin levels and their inheritance."""
188
189    def test_flat(self):
190        # Logging levels in a flat logger namespace.
191        m = self.next_message
192
193        ERR = logging.getLogger("ERR")
194        ERR.setLevel(logging.ERROR)
195        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
196        INF.setLevel(logging.INFO)
197        DEB = logging.getLogger("DEB")
198        DEB.setLevel(logging.DEBUG)
199
200        # These should log.
201        ERR.log(logging.CRITICAL, m())
202        ERR.error(m())
203
204        INF.log(logging.CRITICAL, m())
205        INF.error(m())
206        INF.warning(m())
207        INF.info(m())
208
209        DEB.log(logging.CRITICAL, m())
210        DEB.error(m())
211        DEB.warning(m())
212        DEB.info(m())
213        DEB.debug(m())
214
215        # These should not log.
216        ERR.warning(m())
217        ERR.info(m())
218        ERR.debug(m())
219
220        INF.debug(m())
221
222        self.assert_log_lines([
223            ('ERR', 'CRITICAL', '1'),
224            ('ERR', 'ERROR', '2'),
225            ('INF', 'CRITICAL', '3'),
226            ('INF', 'ERROR', '4'),
227            ('INF', 'WARNING', '5'),
228            ('INF', 'INFO', '6'),
229            ('DEB', 'CRITICAL', '7'),
230            ('DEB', 'ERROR', '8'),
231            ('DEB', 'WARNING', '9'),
232            ('DEB', 'INFO', '10'),
233            ('DEB', 'DEBUG', '11'),
234        ])
235
236    def test_nested_explicit(self):
237        # Logging levels in a nested namespace, all explicitly set.
238        m = self.next_message
239
240        INF = logging.getLogger("INF")
241        INF.setLevel(logging.INFO)
242        INF_ERR  = logging.getLogger("INF.ERR")
243        INF_ERR.setLevel(logging.ERROR)
244
245        # These should log.
246        INF_ERR.log(logging.CRITICAL, m())
247        INF_ERR.error(m())
248
249        # These should not log.
250        INF_ERR.warning(m())
251        INF_ERR.info(m())
252        INF_ERR.debug(m())
253
254        self.assert_log_lines([
255            ('INF.ERR', 'CRITICAL', '1'),
256            ('INF.ERR', 'ERROR', '2'),
257        ])
258
259    def test_nested_inherited(self):
260        # Logging levels in a nested namespace, inherited from parent loggers.
261        m = self.next_message
262
263        INF = logging.getLogger("INF")
264        INF.setLevel(logging.INFO)
265        INF_ERR  = logging.getLogger("INF.ERR")
266        INF_ERR.setLevel(logging.ERROR)
267        INF_UNDEF = logging.getLogger("INF.UNDEF")
268        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
269        UNDEF = logging.getLogger("UNDEF")
270
271        # These should log.
272        INF_UNDEF.log(logging.CRITICAL, m())
273        INF_UNDEF.error(m())
274        INF_UNDEF.warning(m())
275        INF_UNDEF.info(m())
276        INF_ERR_UNDEF.log(logging.CRITICAL, m())
277        INF_ERR_UNDEF.error(m())
278
279        # These should not log.
280        INF_UNDEF.debug(m())
281        INF_ERR_UNDEF.warning(m())
282        INF_ERR_UNDEF.info(m())
283        INF_ERR_UNDEF.debug(m())
284
285        self.assert_log_lines([
286            ('INF.UNDEF', 'CRITICAL', '1'),
287            ('INF.UNDEF', 'ERROR', '2'),
288            ('INF.UNDEF', 'WARNING', '3'),
289            ('INF.UNDEF', 'INFO', '4'),
290            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
291            ('INF.ERR.UNDEF', 'ERROR', '6'),
292        ])
293
294    def test_nested_with_virtual_parent(self):
295        # Logging levels when some parent does not exist yet.
296        m = self.next_message
297
298        INF = logging.getLogger("INF")
299        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
300        CHILD = logging.getLogger("INF.BADPARENT")
301        INF.setLevel(logging.INFO)
302
303        # These should log.
304        GRANDCHILD.log(logging.FATAL, m())
305        GRANDCHILD.info(m())
306        CHILD.log(logging.FATAL, m())
307        CHILD.info(m())
308
309        # These should not log.
310        GRANDCHILD.debug(m())
311        CHILD.debug(m())
312
313        self.assert_log_lines([
314            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
315            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
316            ('INF.BADPARENT', 'CRITICAL', '3'),
317            ('INF.BADPARENT', 'INFO', '4'),
318        ])
319
320    def test_regression_22386(self):
321        """See issue #22386 for more information."""
322        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
323        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
324
325    def test_issue27935(self):
326        fatal = logging.getLevelName('FATAL')
327        self.assertEqual(fatal, logging.FATAL)
328
329    def test_regression_29220(self):
330        """See issue #29220 for more information."""
331        logging.addLevelName(logging.INFO, '')
332        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
333        self.assertEqual(logging.getLevelName(logging.INFO), '')
334        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
335        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
336
337class BasicFilterTest(BaseTest):
338
339    """Test the bundled Filter class."""
340
341    def test_filter(self):
342        # Only messages satisfying the specified criteria pass through the
343        #  filter.
344        filter_ = logging.Filter("spam.eggs")
345        handler = self.root_logger.handlers[0]
346        try:
347            handler.addFilter(filter_)
348            spam = logging.getLogger("spam")
349            spam_eggs = logging.getLogger("spam.eggs")
350            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
351            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
352
353            spam.info(self.next_message())
354            spam_eggs.info(self.next_message())  # Good.
355            spam_eggs_fish.info(self.next_message())  # Good.
356            spam_bakedbeans.info(self.next_message())
357
358            self.assert_log_lines([
359                ('spam.eggs', 'INFO', '2'),
360                ('spam.eggs.fish', 'INFO', '3'),
361            ])
362        finally:
363            handler.removeFilter(filter_)
364
365    def test_callable_filter(self):
366        # Only messages satisfying the specified criteria pass through the
367        #  filter.
368
369        def filterfunc(record):
370            parts = record.name.split('.')
371            prefix = '.'.join(parts[:2])
372            return prefix == 'spam.eggs'
373
374        handler = self.root_logger.handlers[0]
375        try:
376            handler.addFilter(filterfunc)
377            spam = logging.getLogger("spam")
378            spam_eggs = logging.getLogger("spam.eggs")
379            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
380            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
381
382            spam.info(self.next_message())
383            spam_eggs.info(self.next_message())  # Good.
384            spam_eggs_fish.info(self.next_message())  # Good.
385            spam_bakedbeans.info(self.next_message())
386
387            self.assert_log_lines([
388                ('spam.eggs', 'INFO', '2'),
389                ('spam.eggs.fish', 'INFO', '3'),
390            ])
391        finally:
392            handler.removeFilter(filterfunc)
393
394    def test_empty_filter(self):
395        f = logging.Filter()
396        r = logging.makeLogRecord({'name': 'spam.eggs'})
397        self.assertTrue(f.filter(r))
398
399#
400#   First, we define our levels. There can be as many as you want - the only
401#     limitations are that they should be integers, the lowest should be > 0 and
402#   larger values mean less information being logged. If you need specific
403#   level values which do not fit into these limitations, you can use a
404#   mapping dictionary to convert between your application levels and the
405#   logging system.
406#
407SILENT      = 120
408TACITURN    = 119
409TERSE       = 118
410EFFUSIVE    = 117
411SOCIABLE    = 116
412VERBOSE     = 115
413TALKATIVE   = 114
414GARRULOUS   = 113
415CHATTERBOX  = 112
416BORING      = 111
417
418LEVEL_RANGE = range(BORING, SILENT + 1)
419
420#
421#   Next, we define names for our levels. You don't need to do this - in which
422#   case the system will use "Level n" to denote the text for the level.
423#
424my_logging_levels = {
425    SILENT      : 'Silent',
426    TACITURN    : 'Taciturn',
427    TERSE       : 'Terse',
428    EFFUSIVE    : 'Effusive',
429    SOCIABLE    : 'Sociable',
430    VERBOSE     : 'Verbose',
431    TALKATIVE   : 'Talkative',
432    GARRULOUS   : 'Garrulous',
433    CHATTERBOX  : 'Chatterbox',
434    BORING      : 'Boring',
435}
436
437class GarrulousFilter(logging.Filter):
438
439    """A filter which blocks garrulous messages."""
440
441    def filter(self, record):
442        return record.levelno != GARRULOUS
443
444class VerySpecificFilter(logging.Filter):
445
446    """A filter which blocks sociable and taciturn messages."""
447
448    def filter(self, record):
449        return record.levelno not in [SOCIABLE, TACITURN]
450
451
452class CustomLevelsAndFiltersTest(BaseTest):
453
454    """Test various filtering possibilities with custom logging levels."""
455
456    # Skip the logger name group.
457    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
458
459    def setUp(self):
460        BaseTest.setUp(self)
461        for k, v in my_logging_levels.items():
462            logging.addLevelName(k, v)
463
464    def log_at_all_levels(self, logger):
465        for lvl in LEVEL_RANGE:
466            logger.log(lvl, self.next_message())
467
468    def test_logger_filter(self):
469        # Filter at logger level.
470        self.root_logger.setLevel(VERBOSE)
471        # Levels >= 'Verbose' are good.
472        self.log_at_all_levels(self.root_logger)
473        self.assert_log_lines([
474            ('Verbose', '5'),
475            ('Sociable', '6'),
476            ('Effusive', '7'),
477            ('Terse', '8'),
478            ('Taciturn', '9'),
479            ('Silent', '10'),
480        ])
481
482    def test_handler_filter(self):
483        # Filter at handler level.
484        self.root_logger.handlers[0].setLevel(SOCIABLE)
485        try:
486            # Levels >= 'Sociable' are good.
487            self.log_at_all_levels(self.root_logger)
488            self.assert_log_lines([
489                ('Sociable', '6'),
490                ('Effusive', '7'),
491                ('Terse', '8'),
492                ('Taciturn', '9'),
493                ('Silent', '10'),
494            ])
495        finally:
496            self.root_logger.handlers[0].setLevel(logging.NOTSET)
497
498    def test_specific_filters(self):
499        # Set a specific filter object on the handler, and then add another
500        #  filter object on the logger itself.
501        handler = self.root_logger.handlers[0]
502        specific_filter = None
503        garr = GarrulousFilter()
504        handler.addFilter(garr)
505        try:
506            self.log_at_all_levels(self.root_logger)
507            first_lines = [
508                # Notice how 'Garrulous' is missing
509                ('Boring', '1'),
510                ('Chatterbox', '2'),
511                ('Talkative', '4'),
512                ('Verbose', '5'),
513                ('Sociable', '6'),
514                ('Effusive', '7'),
515                ('Terse', '8'),
516                ('Taciturn', '9'),
517                ('Silent', '10'),
518            ]
519            self.assert_log_lines(first_lines)
520
521            specific_filter = VerySpecificFilter()
522            self.root_logger.addFilter(specific_filter)
523            self.log_at_all_levels(self.root_logger)
524            self.assert_log_lines(first_lines + [
525                # Not only 'Garrulous' is still missing, but also 'Sociable'
526                # and 'Taciturn'
527                ('Boring', '11'),
528                ('Chatterbox', '12'),
529                ('Talkative', '14'),
530                ('Verbose', '15'),
531                ('Effusive', '17'),
532                ('Terse', '18'),
533                ('Silent', '20'),
534        ])
535        finally:
536            if specific_filter:
537                self.root_logger.removeFilter(specific_filter)
538            handler.removeFilter(garr)
539
540
541class HandlerTest(BaseTest):
542    def test_name(self):
543        h = logging.Handler()
544        h.name = 'generic'
545        self.assertEqual(h.name, 'generic')
546        h.name = 'anothergeneric'
547        self.assertEqual(h.name, 'anothergeneric')
548        self.assertRaises(NotImplementedError, h.emit, None)
549
550    def test_builtin_handlers(self):
551        # We can't actually *use* too many handlers in the tests,
552        # but we can try instantiating them with various options
553        if sys.platform in ('linux', 'darwin'):
554            for existing in (True, False):
555                fd, fn = tempfile.mkstemp()
556                os.close(fd)
557                if not existing:
558                    os.unlink(fn)
559                h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True)
560                if existing:
561                    dev, ino = h.dev, h.ino
562                    self.assertEqual(dev, -1)
563                    self.assertEqual(ino, -1)
564                    r = logging.makeLogRecord({'msg': 'Test'})
565                    h.handle(r)
566                    # Now remove the file.
567                    os.unlink(fn)
568                    self.assertFalse(os.path.exists(fn))
569                    # The next call should recreate the file.
570                    h.handle(r)
571                    self.assertTrue(os.path.exists(fn))
572                else:
573                    self.assertEqual(h.dev, -1)
574                    self.assertEqual(h.ino, -1)
575                h.close()
576                if existing:
577                    os.unlink(fn)
578            if sys.platform == 'darwin':
579                sockname = '/var/run/syslog'
580            else:
581                sockname = '/dev/log'
582            try:
583                h = logging.handlers.SysLogHandler(sockname)
584                self.assertEqual(h.facility, h.LOG_USER)
585                self.assertTrue(h.unixsocket)
586                h.close()
587            except OSError: # syslogd might not be available
588                pass
589        for method in ('GET', 'POST', 'PUT'):
590            if method == 'PUT':
591                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
592                                  'localhost', '/log', method)
593            else:
594                h = logging.handlers.HTTPHandler('localhost', '/log', method)
595                h.close()
596        h = logging.handlers.BufferingHandler(0)
597        r = logging.makeLogRecord({})
598        self.assertTrue(h.shouldFlush(r))
599        h.close()
600        h = logging.handlers.BufferingHandler(1)
601        self.assertFalse(h.shouldFlush(r))
602        h.close()
603
604    def test_path_objects(self):
605        """
606        Test that Path objects are accepted as filename arguments to handlers.
607
608        See Issue #27493.
609        """
610        fd, fn = tempfile.mkstemp()
611        os.close(fd)
612        os.unlink(fn)
613        pfn = pathlib.Path(fn)
614        cases = (
615                    (logging.FileHandler, (pfn, 'w')),
616                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
617                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
618                )
619        if sys.platform in ('linux', 'darwin'):
620            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
621        for cls, args in cases:
622            h = cls(*args, encoding="utf-8")
623            self.assertTrue(os.path.exists(fn))
624            h.close()
625            os.unlink(fn)
626
627    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
628    def test_race(self):
629        # Issue #14632 refers.
630        def remove_loop(fname, tries):
631            for _ in range(tries):
632                try:
633                    os.unlink(fname)
634                    self.deletion_time = time.time()
635                except OSError:
636                    pass
637                time.sleep(0.004 * random.randint(0, 4))
638
639        del_count = 500
640        log_count = 500
641
642        self.handle_time = None
643        self.deletion_time = None
644
645        for delay in (False, True):
646            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
647            os.close(fd)
648            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
649            remover.daemon = True
650            remover.start()
651            h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay)
652            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
653            h.setFormatter(f)
654            try:
655                for _ in range(log_count):
656                    time.sleep(0.005)
657                    r = logging.makeLogRecord({'msg': 'testing' })
658                    try:
659                        self.handle_time = time.time()
660                        h.handle(r)
661                    except Exception:
662                        print('Deleted at %s, '
663                              'opened at %s' % (self.deletion_time,
664                                                self.handle_time))
665                        raise
666            finally:
667                remover.join()
668                h.close()
669                if os.path.exists(fn):
670                    os.unlink(fn)
671
672    # The implementation relies on os.register_at_fork existing, but we test
673    # based on os.fork existing because that is what users and this test use.
674    # This helps ensure that when fork exists (the important concept) that the
675    # register_at_fork mechanism is also present and used.
676    @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
677    def test_post_fork_child_no_deadlock(self):
678        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
679        class _OurHandler(logging.Handler):
680            def __init__(self):
681                super().__init__()
682                self.sub_handler = logging.StreamHandler(
683                    stream=open('/dev/null', 'wt', encoding='utf-8'))
684
685            def emit(self, record):
686                self.sub_handler.acquire()
687                try:
688                    self.sub_handler.emit(record)
689                finally:
690                    self.sub_handler.release()
691
692        self.assertEqual(len(logging._handlers), 0)
693        refed_h = _OurHandler()
694        self.addCleanup(refed_h.sub_handler.stream.close)
695        refed_h.name = 'because we need at least one for this test'
696        self.assertGreater(len(logging._handlers), 0)
697        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
698        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
699        test_logger.addHandler(refed_h)
700        test_logger.setLevel(logging.DEBUG)
701
702        locks_held__ready_to_fork = threading.Event()
703        fork_happened__release_locks_and_end_thread = threading.Event()
704
705        def lock_holder_thread_fn():
706            logging._acquireLock()
707            try:
708                refed_h.acquire()
709                try:
710                    # Tell the main thread to do the fork.
711                    locks_held__ready_to_fork.set()
712
713                    # If the deadlock bug exists, the fork will happen
714                    # without dealing with the locks we hold, deadlocking
715                    # the child.
716
717                    # Wait for a successful fork or an unreasonable amount of
718                    # time before releasing our locks.  To avoid a timing based
719                    # test we'd need communication from os.fork() as to when it
720                    # has actually happened.  Given this is a regression test
721                    # for a fixed issue, potentially less reliably detecting
722                    # regression via timing is acceptable for simplicity.
723                    # The test will always take at least this long. :(
724                    fork_happened__release_locks_and_end_thread.wait(0.5)
725                finally:
726                    refed_h.release()
727            finally:
728                logging._releaseLock()
729
730        lock_holder_thread = threading.Thread(
731                target=lock_holder_thread_fn,
732                name='test_post_fork_child_no_deadlock lock holder')
733        lock_holder_thread.start()
734
735        locks_held__ready_to_fork.wait()
736        pid = os.fork()
737        if pid == 0:
738            # Child process
739            try:
740                test_logger.info(r'Child process did not deadlock. \o/')
741            finally:
742                os._exit(0)
743        else:
744            # Parent process
745            test_logger.info(r'Parent process returned from fork. \o/')
746            fork_happened__release_locks_and_end_thread.set()
747            lock_holder_thread.join()
748
749            support.wait_process(pid, exitcode=0)
750
751
752class BadStream(object):
753    def write(self, data):
754        raise RuntimeError('deliberate mistake')
755
756class TestStreamHandler(logging.StreamHandler):
757    def handleError(self, record):
758        self.error_record = record
759
760class StreamWithIntName(object):
761    level = logging.NOTSET
762    name = 2
763
764class StreamHandlerTest(BaseTest):
765    def test_error_handling(self):
766        h = TestStreamHandler(BadStream())
767        r = logging.makeLogRecord({})
768        old_raise = logging.raiseExceptions
769
770        try:
771            h.handle(r)
772            self.assertIs(h.error_record, r)
773
774            h = logging.StreamHandler(BadStream())
775            with support.captured_stderr() as stderr:
776                h.handle(r)
777                msg = '\nRuntimeError: deliberate mistake\n'
778                self.assertIn(msg, stderr.getvalue())
779
780            logging.raiseExceptions = False
781            with support.captured_stderr() as stderr:
782                h.handle(r)
783                self.assertEqual('', stderr.getvalue())
784        finally:
785            logging.raiseExceptions = old_raise
786
787    def test_stream_setting(self):
788        """
789        Test setting the handler's stream
790        """
791        h = logging.StreamHandler()
792        stream = io.StringIO()
793        old = h.setStream(stream)
794        self.assertIs(old, sys.stderr)
795        actual = h.setStream(old)
796        self.assertIs(actual, stream)
797        # test that setting to existing value returns None
798        actual = h.setStream(old)
799        self.assertIsNone(actual)
800
801    def test_can_represent_stream_with_int_name(self):
802        h = logging.StreamHandler(StreamWithIntName())
803        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
804
805# -- The following section could be moved into a server_helper.py module
806# -- if it proves to be of wider utility than just test_logging
807
808class TestSMTPServer(smtpd.SMTPServer):
809    """
810    This class implements a test SMTP server.
811
812    :param addr: A (host, port) tuple which the server listens on.
813                 You can specify a port value of zero: the server's
814                 *port* attribute will hold the actual port number
815                 used, which can be used in client connections.
816    :param handler: A callable which will be called to process
817                    incoming messages. The handler will be passed
818                    the client address tuple, who the message is from,
819                    a list of recipients and the message data.
820    :param poll_interval: The interval, in seconds, used in the underlying
821                          :func:`select` or :func:`poll` call by
822                          :func:`asyncore.loop`.
823    :param sockmap: A dictionary which will be used to hold
824                    :class:`asyncore.dispatcher` instances used by
825                    :func:`asyncore.loop`. This avoids changing the
826                    :mod:`asyncore` module's global state.
827    """
828
829    def __init__(self, addr, handler, poll_interval, sockmap):
830        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
831                                  decode_data=True)
832        self.port = self.socket.getsockname()[1]
833        self._handler = handler
834        self._thread = None
835        self._quit = False
836        self.poll_interval = poll_interval
837
838    def process_message(self, peer, mailfrom, rcpttos, data):
839        """
840        Delegates to the handler passed in to the server's constructor.
841
842        Typically, this will be a test case method.
843        :param peer: The client (host, port) tuple.
844        :param mailfrom: The address of the sender.
845        :param rcpttos: The addresses of the recipients.
846        :param data: The message.
847        """
848        self._handler(peer, mailfrom, rcpttos, data)
849
850    def start(self):
851        """
852        Start the server running on a separate daemon thread.
853        """
854        self._thread = t = threading.Thread(target=self.serve_forever,
855                                            args=(self.poll_interval,))
856        t.daemon = True
857        t.start()
858
859    def serve_forever(self, poll_interval):
860        """
861        Run the :mod:`asyncore` loop until normal termination
862        conditions arise.
863        :param poll_interval: The interval, in seconds, used in the underlying
864                              :func:`select` or :func:`poll` call by
865                              :func:`asyncore.loop`.
866        """
867        while not self._quit:
868            asyncore.loop(poll_interval, map=self._map, count=1)
869
870    def stop(self):
871        """
872        Stop the thread by closing the server instance.
873        Wait for the server thread to terminate.
874        """
875        self._quit = True
876        threading_helper.join_thread(self._thread)
877        self._thread = None
878        self.close()
879        asyncore.close_all(map=self._map, ignore_all=True)
880
881
882class ControlMixin(object):
883    """
884    This mixin is used to start a server on a separate thread, and
885    shut it down programmatically. Request handling is simplified - instead
886    of needing to derive a suitable RequestHandler subclass, you just
887    provide a callable which will be passed each received request to be
888    processed.
889
890    :param handler: A handler callable which will be called with a
891                    single parameter - the request - in order to
892                    process the request. This handler is called on the
893                    server thread, effectively meaning that requests are
894                    processed serially. While not quite web scale ;-),
895                    this should be fine for testing applications.
896    :param poll_interval: The polling interval in seconds.
897    """
898    def __init__(self, handler, poll_interval):
899        self._thread = None
900        self.poll_interval = poll_interval
901        self._handler = handler
902        self.ready = threading.Event()
903
904    def start(self):
905        """
906        Create a daemon thread to run the server, and start it.
907        """
908        self._thread = t = threading.Thread(target=self.serve_forever,
909                                            args=(self.poll_interval,))
910        t.daemon = True
911        t.start()
912
913    def serve_forever(self, poll_interval):
914        """
915        Run the server. Set the ready flag before entering the
916        service loop.
917        """
918        self.ready.set()
919        super(ControlMixin, self).serve_forever(poll_interval)
920
921    def stop(self):
922        """
923        Tell the server thread to stop, and wait for it to do so.
924        """
925        self.shutdown()
926        if self._thread is not None:
927            threading_helper.join_thread(self._thread)
928            self._thread = None
929        self.server_close()
930        self.ready.clear()
931
932class TestHTTPServer(ControlMixin, HTTPServer):
933    """
934    An HTTP server which is controllable using :class:`ControlMixin`.
935
936    :param addr: A tuple with the IP address and port to listen on.
937    :param handler: A handler callable which will be called with a
938                    single parameter - the request - in order to
939                    process the request.
940    :param poll_interval: The polling interval in seconds.
941    :param log: Pass ``True`` to enable log messages.
942    """
943    def __init__(self, addr, handler, poll_interval=0.5,
944                 log=False, sslctx=None):
945        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
946            def __getattr__(self, name, default=None):
947                if name.startswith('do_'):
948                    return self.process_request
949                raise AttributeError(name)
950
951            def process_request(self):
952                self.server._handler(self)
953
954            def log_message(self, format, *args):
955                if log:
956                    super(DelegatingHTTPRequestHandler,
957                          self).log_message(format, *args)
958        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
959        ControlMixin.__init__(self, handler, poll_interval)
960        self.sslctx = sslctx
961
962    def get_request(self):
963        try:
964            sock, addr = self.socket.accept()
965            if self.sslctx:
966                sock = self.sslctx.wrap_socket(sock, server_side=True)
967        except OSError as e:
968            # socket errors are silenced by the caller, print them here
969            sys.stderr.write("Got an error:\n%s\n" % e)
970            raise
971        return sock, addr
972
973class TestTCPServer(ControlMixin, ThreadingTCPServer):
974    """
975    A TCP server which is controllable using :class:`ControlMixin`.
976
977    :param addr: A tuple with the IP address and port to listen on.
978    :param handler: A handler callable which will be called with a single
979                    parameter - the request - in order to process the request.
980    :param poll_interval: The polling interval in seconds.
981    :bind_and_activate: If True (the default), binds the server and starts it
982                        listening. If False, you need to call
983                        :meth:`server_bind` and :meth:`server_activate` at
984                        some later time before calling :meth:`start`, so that
985                        the server will set up the socket and listen on it.
986    """
987
988    allow_reuse_address = True
989
990    def __init__(self, addr, handler, poll_interval=0.5,
991                 bind_and_activate=True):
992        class DelegatingTCPRequestHandler(StreamRequestHandler):
993
994            def handle(self):
995                self.server._handler(self)
996        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
997                                    bind_and_activate)
998        ControlMixin.__init__(self, handler, poll_interval)
999
1000    def server_bind(self):
1001        super(TestTCPServer, self).server_bind()
1002        self.port = self.socket.getsockname()[1]
1003
1004class TestUDPServer(ControlMixin, ThreadingUDPServer):
1005    """
1006    A UDP server which is controllable using :class:`ControlMixin`.
1007
1008    :param addr: A tuple with the IP address and port to listen on.
1009    :param handler: A handler callable which will be called with a
1010                    single parameter - the request - in order to
1011                    process the request.
1012    :param poll_interval: The polling interval for shutdown requests,
1013                          in seconds.
1014    :bind_and_activate: If True (the default), binds the server and
1015                        starts it listening. If False, you need to
1016                        call :meth:`server_bind` and
1017                        :meth:`server_activate` at some later time
1018                        before calling :meth:`start`, so that the server will
1019                        set up the socket and listen on it.
1020    """
1021    def __init__(self, addr, handler, poll_interval=0.5,
1022                 bind_and_activate=True):
1023        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1024
1025            def handle(self):
1026                self.server._handler(self)
1027
1028            def finish(self):
1029                data = self.wfile.getvalue()
1030                if data:
1031                    try:
1032                        super(DelegatingUDPRequestHandler, self).finish()
1033                    except OSError:
1034                        if not self.server._closed:
1035                            raise
1036
1037        ThreadingUDPServer.__init__(self, addr,
1038                                    DelegatingUDPRequestHandler,
1039                                    bind_and_activate)
1040        ControlMixin.__init__(self, handler, poll_interval)
1041        self._closed = False
1042
1043    def server_bind(self):
1044        super(TestUDPServer, self).server_bind()
1045        self.port = self.socket.getsockname()[1]
1046
1047    def server_close(self):
1048        super(TestUDPServer, self).server_close()
1049        self._closed = True
1050
1051if hasattr(socket, "AF_UNIX"):
1052    class TestUnixStreamServer(TestTCPServer):
1053        address_family = socket.AF_UNIX
1054
1055    class TestUnixDatagramServer(TestUDPServer):
1056        address_family = socket.AF_UNIX
1057
1058# - end of server_helper section
1059
1060class SMTPHandlerTest(BaseTest):
1061    # bpo-14314, bpo-19665, bpo-34092: don't wait forever
1062    TIMEOUT = support.LONG_TIMEOUT
1063
1064    def test_basic(self):
1065        sockmap = {}
1066        server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
1067                                sockmap)
1068        server.start()
1069        addr = (socket_helper.HOST, server.port)
1070        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1071                                         timeout=self.TIMEOUT)
1072        self.assertEqual(h.toaddrs, ['you'])
1073        self.messages = []
1074        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1075        self.handled = threading.Event()
1076        h.handle(r)
1077        self.handled.wait(self.TIMEOUT)
1078        server.stop()
1079        self.assertTrue(self.handled.is_set())
1080        self.assertEqual(len(self.messages), 1)
1081        peer, mailfrom, rcpttos, data = self.messages[0]
1082        self.assertEqual(mailfrom, 'me')
1083        self.assertEqual(rcpttos, ['you'])
1084        self.assertIn('\nSubject: Log\n', data)
1085        self.assertTrue(data.endswith('\n\nHello \u2713'))
1086        h.close()
1087
1088    def process_message(self, *args):
1089        self.messages.append(args)
1090        self.handled.set()
1091
1092class MemoryHandlerTest(BaseTest):
1093
1094    """Tests for the MemoryHandler."""
1095
1096    # Do not bother with a logger name group.
1097    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1098
1099    def setUp(self):
1100        BaseTest.setUp(self)
1101        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1102                                                       self.root_hdlr)
1103        self.mem_logger = logging.getLogger('mem')
1104        self.mem_logger.propagate = 0
1105        self.mem_logger.addHandler(self.mem_hdlr)
1106
1107    def tearDown(self):
1108        self.mem_hdlr.close()
1109        BaseTest.tearDown(self)
1110
1111    def test_flush(self):
1112        # The memory handler flushes to its target handler based on specific
1113        #  criteria (message count and message level).
1114        self.mem_logger.debug(self.next_message())
1115        self.assert_log_lines([])
1116        self.mem_logger.info(self.next_message())
1117        self.assert_log_lines([])
1118        # This will flush because the level is >= logging.WARNING
1119        self.mem_logger.warning(self.next_message())
1120        lines = [
1121            ('DEBUG', '1'),
1122            ('INFO', '2'),
1123            ('WARNING', '3'),
1124        ]
1125        self.assert_log_lines(lines)
1126        for n in (4, 14):
1127            for i in range(9):
1128                self.mem_logger.debug(self.next_message())
1129            self.assert_log_lines(lines)
1130            # This will flush because it's the 10th message since the last
1131            #  flush.
1132            self.mem_logger.debug(self.next_message())
1133            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1134            self.assert_log_lines(lines)
1135
1136        self.mem_logger.debug(self.next_message())
1137        self.assert_log_lines(lines)
1138
1139    def test_flush_on_close(self):
1140        """
1141        Test that the flush-on-close configuration works as expected.
1142        """
1143        self.mem_logger.debug(self.next_message())
1144        self.assert_log_lines([])
1145        self.mem_logger.info(self.next_message())
1146        self.assert_log_lines([])
1147        self.mem_logger.removeHandler(self.mem_hdlr)
1148        # Default behaviour is to flush on close. Check that it happens.
1149        self.mem_hdlr.close()
1150        lines = [
1151            ('DEBUG', '1'),
1152            ('INFO', '2'),
1153        ]
1154        self.assert_log_lines(lines)
1155        # Now configure for flushing not to be done on close.
1156        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1157                                                       self.root_hdlr,
1158                                                       False)
1159        self.mem_logger.addHandler(self.mem_hdlr)
1160        self.mem_logger.debug(self.next_message())
1161        self.assert_log_lines(lines)  # no change
1162        self.mem_logger.info(self.next_message())
1163        self.assert_log_lines(lines)  # no change
1164        self.mem_logger.removeHandler(self.mem_hdlr)
1165        self.mem_hdlr.close()
1166        # assert that no new lines have been added
1167        self.assert_log_lines(lines)  # no change
1168
1169    def test_race_between_set_target_and_flush(self):
1170        class MockRaceConditionHandler:
1171            def __init__(self, mem_hdlr):
1172                self.mem_hdlr = mem_hdlr
1173                self.threads = []
1174
1175            def removeTarget(self):
1176                self.mem_hdlr.setTarget(None)
1177
1178            def handle(self, msg):
1179                thread = threading.Thread(target=self.removeTarget)
1180                self.threads.append(thread)
1181                thread.start()
1182
1183        target = MockRaceConditionHandler(self.mem_hdlr)
1184        try:
1185            self.mem_hdlr.setTarget(target)
1186
1187            for _ in range(10):
1188                time.sleep(0.005)
1189                self.mem_logger.info("not flushed")
1190                self.mem_logger.warning("flushed")
1191        finally:
1192            for thread in target.threads:
1193                threading_helper.join_thread(thread)
1194
1195
1196class ExceptionFormatter(logging.Formatter):
1197    """A special exception formatter."""
1198    def formatException(self, ei):
1199        return "Got a [%s]" % ei[0].__name__
1200
1201
1202class ConfigFileTest(BaseTest):
1203
1204    """Reading logging config from a .ini-style config file."""
1205
1206    check_no_resource_warning = warnings_helper.check_no_resource_warning
1207    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1208
1209    # config0 is a standard configuration.
1210    config0 = """
1211    [loggers]
1212    keys=root
1213
1214    [handlers]
1215    keys=hand1
1216
1217    [formatters]
1218    keys=form1
1219
1220    [logger_root]
1221    level=WARNING
1222    handlers=hand1
1223
1224    [handler_hand1]
1225    class=StreamHandler
1226    level=NOTSET
1227    formatter=form1
1228    args=(sys.stdout,)
1229
1230    [formatter_form1]
1231    format=%(levelname)s ++ %(message)s
1232    datefmt=
1233    """
1234
1235    # config1 adds a little to the standard configuration.
1236    config1 = """
1237    [loggers]
1238    keys=root,parser
1239
1240    [handlers]
1241    keys=hand1
1242
1243    [formatters]
1244    keys=form1
1245
1246    [logger_root]
1247    level=WARNING
1248    handlers=
1249
1250    [logger_parser]
1251    level=DEBUG
1252    handlers=hand1
1253    propagate=1
1254    qualname=compiler.parser
1255
1256    [handler_hand1]
1257    class=StreamHandler
1258    level=NOTSET
1259    formatter=form1
1260    args=(sys.stdout,)
1261
1262    [formatter_form1]
1263    format=%(levelname)s ++ %(message)s
1264    datefmt=
1265    """
1266
1267    # config1a moves the handler to the root.
1268    config1a = """
1269    [loggers]
1270    keys=root,parser
1271
1272    [handlers]
1273    keys=hand1
1274
1275    [formatters]
1276    keys=form1
1277
1278    [logger_root]
1279    level=WARNING
1280    handlers=hand1
1281
1282    [logger_parser]
1283    level=DEBUG
1284    handlers=
1285    propagate=1
1286    qualname=compiler.parser
1287
1288    [handler_hand1]
1289    class=StreamHandler
1290    level=NOTSET
1291    formatter=form1
1292    args=(sys.stdout,)
1293
1294    [formatter_form1]
1295    format=%(levelname)s ++ %(message)s
1296    datefmt=
1297    """
1298
1299    # config2 has a subtle configuration error that should be reported
1300    config2 = config1.replace("sys.stdout", "sys.stbout")
1301
1302    # config3 has a less subtle configuration error
1303    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1304
1305    # config4 specifies a custom formatter class to be loaded
1306    config4 = """
1307    [loggers]
1308    keys=root
1309
1310    [handlers]
1311    keys=hand1
1312
1313    [formatters]
1314    keys=form1
1315
1316    [logger_root]
1317    level=NOTSET
1318    handlers=hand1
1319
1320    [handler_hand1]
1321    class=StreamHandler
1322    level=NOTSET
1323    formatter=form1
1324    args=(sys.stdout,)
1325
1326    [formatter_form1]
1327    class=""" + __name__ + """.ExceptionFormatter
1328    format=%(levelname)s:%(name)s:%(message)s
1329    datefmt=
1330    """
1331
1332    # config5 specifies a custom handler class to be loaded
1333    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1334
1335    # config6 uses ', ' delimiters in the handlers and formatters sections
1336    config6 = """
1337    [loggers]
1338    keys=root,parser
1339
1340    [handlers]
1341    keys=hand1, hand2
1342
1343    [formatters]
1344    keys=form1, form2
1345
1346    [logger_root]
1347    level=WARNING
1348    handlers=
1349
1350    [logger_parser]
1351    level=DEBUG
1352    handlers=hand1
1353    propagate=1
1354    qualname=compiler.parser
1355
1356    [handler_hand1]
1357    class=StreamHandler
1358    level=NOTSET
1359    formatter=form1
1360    args=(sys.stdout,)
1361
1362    [handler_hand2]
1363    class=StreamHandler
1364    level=NOTSET
1365    formatter=form1
1366    args=(sys.stderr,)
1367
1368    [formatter_form1]
1369    format=%(levelname)s ++ %(message)s
1370    datefmt=
1371
1372    [formatter_form2]
1373    format=%(message)s
1374    datefmt=
1375    """
1376
1377    # config7 adds a compiler logger, and uses kwargs instead of args.
1378    config7 = """
1379    [loggers]
1380    keys=root,parser,compiler
1381
1382    [handlers]
1383    keys=hand1
1384
1385    [formatters]
1386    keys=form1
1387
1388    [logger_root]
1389    level=WARNING
1390    handlers=hand1
1391
1392    [logger_compiler]
1393    level=DEBUG
1394    handlers=
1395    propagate=1
1396    qualname=compiler
1397
1398    [logger_parser]
1399    level=DEBUG
1400    handlers=
1401    propagate=1
1402    qualname=compiler.parser
1403
1404    [handler_hand1]
1405    class=StreamHandler
1406    level=NOTSET
1407    formatter=form1
1408    kwargs={'stream': sys.stdout,}
1409
1410    [formatter_form1]
1411    format=%(levelname)s ++ %(message)s
1412    datefmt=
1413    """
1414
1415    # config 8, check for resource warning
1416    config8 = r"""
1417    [loggers]
1418    keys=root
1419
1420    [handlers]
1421    keys=file
1422
1423    [formatters]
1424    keys=
1425
1426    [logger_root]
1427    level=DEBUG
1428    handlers=file
1429
1430    [handler_file]
1431    class=FileHandler
1432    level=DEBUG
1433    args=("{tempfile}",)
1434    kwargs={{"encoding": "utf-8"}}
1435    """
1436
1437    disable_test = """
1438    [loggers]
1439    keys=root
1440
1441    [handlers]
1442    keys=screen
1443
1444    [formatters]
1445    keys=
1446
1447    [logger_root]
1448    level=DEBUG
1449    handlers=screen
1450
1451    [handler_screen]
1452    level=DEBUG
1453    class=StreamHandler
1454    args=(sys.stdout,)
1455    formatter=
1456    """
1457
1458    def apply_config(self, conf, **kwargs):
1459        file = io.StringIO(textwrap.dedent(conf))
1460        logging.config.fileConfig(file, encoding="utf-8", **kwargs)
1461
1462    def test_config0_ok(self):
1463        # A simple config file which overrides the default settings.
1464        with support.captured_stdout() as output:
1465            self.apply_config(self.config0)
1466            logger = logging.getLogger()
1467            # Won't output anything
1468            logger.info(self.next_message())
1469            # Outputs a message
1470            logger.error(self.next_message())
1471            self.assert_log_lines([
1472                ('ERROR', '2'),
1473            ], stream=output)
1474            # Original logger output is empty.
1475            self.assert_log_lines([])
1476
1477    def test_config0_using_cp_ok(self):
1478        # A simple config file which overrides the default settings.
1479        with support.captured_stdout() as output:
1480            file = io.StringIO(textwrap.dedent(self.config0))
1481            cp = configparser.ConfigParser()
1482            cp.read_file(file)
1483            logging.config.fileConfig(cp)
1484            logger = logging.getLogger()
1485            # Won't output anything
1486            logger.info(self.next_message())
1487            # Outputs a message
1488            logger.error(self.next_message())
1489            self.assert_log_lines([
1490                ('ERROR', '2'),
1491            ], stream=output)
1492            # Original logger output is empty.
1493            self.assert_log_lines([])
1494
1495    def test_config1_ok(self, config=config1):
1496        # A config file defining a sub-parser as well.
1497        with support.captured_stdout() as output:
1498            self.apply_config(config)
1499            logger = logging.getLogger("compiler.parser")
1500            # Both will output a message
1501            logger.info(self.next_message())
1502            logger.error(self.next_message())
1503            self.assert_log_lines([
1504                ('INFO', '1'),
1505                ('ERROR', '2'),
1506            ], stream=output)
1507            # Original logger output is empty.
1508            self.assert_log_lines([])
1509
1510    def test_config2_failure(self):
1511        # A simple config file which overrides the default settings.
1512        self.assertRaises(Exception, self.apply_config, self.config2)
1513
1514    def test_config3_failure(self):
1515        # A simple config file which overrides the default settings.
1516        self.assertRaises(Exception, self.apply_config, self.config3)
1517
1518    def test_config4_ok(self):
1519        # A config file specifying a custom formatter class.
1520        with support.captured_stdout() as output:
1521            self.apply_config(self.config4)
1522            logger = logging.getLogger()
1523            try:
1524                raise RuntimeError()
1525            except RuntimeError:
1526                logging.exception("just testing")
1527            sys.stdout.seek(0)
1528            self.assertEqual(output.getvalue(),
1529                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1530            # Original logger output is empty
1531            self.assert_log_lines([])
1532
1533    def test_config5_ok(self):
1534        self.test_config1_ok(config=self.config5)
1535
1536    def test_config6_ok(self):
1537        self.test_config1_ok(config=self.config6)
1538
1539    def test_config7_ok(self):
1540        with support.captured_stdout() as output:
1541            self.apply_config(self.config1a)
1542            logger = logging.getLogger("compiler.parser")
1543            # See issue #11424. compiler-hyphenated sorts
1544            # between compiler and compiler.xyz and this
1545            # was preventing compiler.xyz from being included
1546            # in the child loggers of compiler because of an
1547            # overzealous loop termination condition.
1548            hyphenated = logging.getLogger('compiler-hyphenated')
1549            # All will output a message
1550            logger.info(self.next_message())
1551            logger.error(self.next_message())
1552            hyphenated.critical(self.next_message())
1553            self.assert_log_lines([
1554                ('INFO', '1'),
1555                ('ERROR', '2'),
1556                ('CRITICAL', '3'),
1557            ], stream=output)
1558            # Original logger output is empty.
1559            self.assert_log_lines([])
1560        with support.captured_stdout() as output:
1561            self.apply_config(self.config7)
1562            logger = logging.getLogger("compiler.parser")
1563            self.assertFalse(logger.disabled)
1564            # Both will output a message
1565            logger.info(self.next_message())
1566            logger.error(self.next_message())
1567            logger = logging.getLogger("compiler.lexer")
1568            # Both will output a message
1569            logger.info(self.next_message())
1570            logger.error(self.next_message())
1571            # Will not appear
1572            hyphenated.critical(self.next_message())
1573            self.assert_log_lines([
1574                ('INFO', '4'),
1575                ('ERROR', '5'),
1576                ('INFO', '6'),
1577                ('ERROR', '7'),
1578            ], stream=output)
1579            # Original logger output is empty.
1580            self.assert_log_lines([])
1581
1582    def test_config8_ok(self):
1583
1584        def cleanup(h1, fn):
1585            h1.close()
1586            os.remove(fn)
1587
1588        with self.check_no_resource_warning():
1589            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1590            os.close(fd)
1591
1592            # Replace single backslash with double backslash in windows
1593            # to avoid unicode error during string formatting
1594            if os.name == "nt":
1595                fn = fn.replace("\\", "\\\\")
1596
1597            config8 = self.config8.format(tempfile=fn)
1598
1599            self.apply_config(config8)
1600            self.apply_config(config8)
1601
1602        handler = logging.root.handlers[0]
1603        self.addCleanup(cleanup, handler, fn)
1604
1605    def test_logger_disabling(self):
1606        self.apply_config(self.disable_test)
1607        logger = logging.getLogger('some_pristine_logger')
1608        self.assertFalse(logger.disabled)
1609        self.apply_config(self.disable_test)
1610        self.assertTrue(logger.disabled)
1611        self.apply_config(self.disable_test, disable_existing_loggers=False)
1612        self.assertFalse(logger.disabled)
1613
1614    def test_config_set_handler_names(self):
1615        test_config = """
1616            [loggers]
1617            keys=root
1618
1619            [handlers]
1620            keys=hand1
1621
1622            [formatters]
1623            keys=form1
1624
1625            [logger_root]
1626            handlers=hand1
1627
1628            [handler_hand1]
1629            class=StreamHandler
1630            formatter=form1
1631
1632            [formatter_form1]
1633            format=%(levelname)s ++ %(message)s
1634            """
1635        self.apply_config(test_config)
1636        self.assertEqual(logging.getLogger().handlers[0].name, 'hand1')
1637
1638    def test_defaults_do_no_interpolation(self):
1639        """bpo-33802 defaults should not get interpolated"""
1640        ini = textwrap.dedent("""
1641            [formatters]
1642            keys=default
1643
1644            [formatter_default]
1645
1646            [handlers]
1647            keys=console
1648
1649            [handler_console]
1650            class=logging.StreamHandler
1651            args=tuple()
1652
1653            [loggers]
1654            keys=root
1655
1656            [logger_root]
1657            formatter=default
1658            handlers=console
1659            """).strip()
1660        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1661        try:
1662            os.write(fd, ini.encode('ascii'))
1663            os.close(fd)
1664            logging.config.fileConfig(
1665                fn,
1666                encoding="utf-8",
1667                defaults=dict(
1668                    version=1,
1669                    disable_existing_loggers=False,
1670                    formatters={
1671                        "generic": {
1672                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1673                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1674                            "class": "logging.Formatter"
1675                        },
1676                    },
1677                )
1678            )
1679        finally:
1680            os.unlink(fn)
1681
1682
1683class SocketHandlerTest(BaseTest):
1684
1685    """Test for SocketHandler objects."""
1686
1687    server_class = TestTCPServer
1688    address = ('localhost', 0)
1689
1690    def setUp(self):
1691        """Set up a TCP server to receive log messages, and a SocketHandler
1692        pointing to that server's address and port."""
1693        BaseTest.setUp(self)
1694        # Issue #29177: deal with errors that happen during setup
1695        self.server = self.sock_hdlr = self.server_exception = None
1696        try:
1697            self.server = server = self.server_class(self.address,
1698                                                     self.handle_socket, 0.01)
1699            server.start()
1700            # Uncomment next line to test error recovery in setUp()
1701            # raise OSError('dummy error raised')
1702        except OSError as e:
1703            self.server_exception = e
1704            return
1705        server.ready.wait()
1706        hcls = logging.handlers.SocketHandler
1707        if isinstance(server.server_address, tuple):
1708            self.sock_hdlr = hcls('localhost', server.port)
1709        else:
1710            self.sock_hdlr = hcls(server.server_address, None)
1711        self.log_output = ''
1712        self.root_logger.removeHandler(self.root_logger.handlers[0])
1713        self.root_logger.addHandler(self.sock_hdlr)
1714        self.handled = threading.Semaphore(0)
1715
1716    def tearDown(self):
1717        """Shutdown the TCP server."""
1718        try:
1719            if self.sock_hdlr:
1720                self.root_logger.removeHandler(self.sock_hdlr)
1721                self.sock_hdlr.close()
1722            if self.server:
1723                self.server.stop()
1724        finally:
1725            BaseTest.tearDown(self)
1726
1727    def handle_socket(self, request):
1728        conn = request.connection
1729        while True:
1730            chunk = conn.recv(4)
1731            if len(chunk) < 4:
1732                break
1733            slen = struct.unpack(">L", chunk)[0]
1734            chunk = conn.recv(slen)
1735            while len(chunk) < slen:
1736                chunk = chunk + conn.recv(slen - len(chunk))
1737            obj = pickle.loads(chunk)
1738            record = logging.makeLogRecord(obj)
1739            self.log_output += record.msg + '\n'
1740            self.handled.release()
1741
1742    def test_output(self):
1743        # The log message sent to the SocketHandler is properly received.
1744        if self.server_exception:
1745            self.skipTest(self.server_exception)
1746        logger = logging.getLogger("tcp")
1747        logger.error("spam")
1748        self.handled.acquire()
1749        logger.debug("eggs")
1750        self.handled.acquire()
1751        self.assertEqual(self.log_output, "spam\neggs\n")
1752
1753    def test_noserver(self):
1754        if self.server_exception:
1755            self.skipTest(self.server_exception)
1756        # Avoid timing-related failures due to SocketHandler's own hard-wired
1757        # one-second timeout on socket.create_connection() (issue #16264).
1758        self.sock_hdlr.retryStart = 2.5
1759        # Kill the server
1760        self.server.stop()
1761        # The logging call should try to connect, which should fail
1762        try:
1763            raise RuntimeError('Deliberate mistake')
1764        except RuntimeError:
1765            self.root_logger.exception('Never sent')
1766        self.root_logger.error('Never sent, either')
1767        now = time.time()
1768        self.assertGreater(self.sock_hdlr.retryTime, now)
1769        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1770        self.root_logger.error('Nor this')
1771
1772def _get_temp_domain_socket():
1773    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1774    os.close(fd)
1775    # just need a name - file can't be present, or we'll get an
1776    # 'address already in use' error.
1777    os.remove(fn)
1778    return fn
1779
1780@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1781class UnixSocketHandlerTest(SocketHandlerTest):
1782
1783    """Test for SocketHandler with unix sockets."""
1784
1785    if hasattr(socket, "AF_UNIX"):
1786        server_class = TestUnixStreamServer
1787
1788    def setUp(self):
1789        # override the definition in the base class
1790        self.address = _get_temp_domain_socket()
1791        SocketHandlerTest.setUp(self)
1792
1793    def tearDown(self):
1794        SocketHandlerTest.tearDown(self)
1795        os_helper.unlink(self.address)
1796
1797class DatagramHandlerTest(BaseTest):
1798
1799    """Test for DatagramHandler."""
1800
1801    server_class = TestUDPServer
1802    address = ('localhost', 0)
1803
1804    def setUp(self):
1805        """Set up a UDP server to receive log messages, and a DatagramHandler
1806        pointing to that server's address and port."""
1807        BaseTest.setUp(self)
1808        # Issue #29177: deal with errors that happen during setup
1809        self.server = self.sock_hdlr = self.server_exception = None
1810        try:
1811            self.server = server = self.server_class(self.address,
1812                                                     self.handle_datagram, 0.01)
1813            server.start()
1814            # Uncomment next line to test error recovery in setUp()
1815            # raise OSError('dummy error raised')
1816        except OSError as e:
1817            self.server_exception = e
1818            return
1819        server.ready.wait()
1820        hcls = logging.handlers.DatagramHandler
1821        if isinstance(server.server_address, tuple):
1822            self.sock_hdlr = hcls('localhost', server.port)
1823        else:
1824            self.sock_hdlr = hcls(server.server_address, None)
1825        self.log_output = ''
1826        self.root_logger.removeHandler(self.root_logger.handlers[0])
1827        self.root_logger.addHandler(self.sock_hdlr)
1828        self.handled = threading.Event()
1829
1830    def tearDown(self):
1831        """Shutdown the UDP server."""
1832        try:
1833            if self.server:
1834                self.server.stop()
1835            if self.sock_hdlr:
1836                self.root_logger.removeHandler(self.sock_hdlr)
1837                self.sock_hdlr.close()
1838        finally:
1839            BaseTest.tearDown(self)
1840
1841    def handle_datagram(self, request):
1842        slen = struct.pack('>L', 0) # length of prefix
1843        packet = request.packet[len(slen):]
1844        obj = pickle.loads(packet)
1845        record = logging.makeLogRecord(obj)
1846        self.log_output += record.msg + '\n'
1847        self.handled.set()
1848
1849    def test_output(self):
1850        # The log message sent to the DatagramHandler is properly received.
1851        if self.server_exception:
1852            self.skipTest(self.server_exception)
1853        logger = logging.getLogger("udp")
1854        logger.error("spam")
1855        self.handled.wait()
1856        self.handled.clear()
1857        logger.error("eggs")
1858        self.handled.wait()
1859        self.assertEqual(self.log_output, "spam\neggs\n")
1860
1861@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1862class UnixDatagramHandlerTest(DatagramHandlerTest):
1863
1864    """Test for DatagramHandler using Unix sockets."""
1865
1866    if hasattr(socket, "AF_UNIX"):
1867        server_class = TestUnixDatagramServer
1868
1869    def setUp(self):
1870        # override the definition in the base class
1871        self.address = _get_temp_domain_socket()
1872        DatagramHandlerTest.setUp(self)
1873
1874    def tearDown(self):
1875        DatagramHandlerTest.tearDown(self)
1876        os_helper.unlink(self.address)
1877
1878class SysLogHandlerTest(BaseTest):
1879
1880    """Test for SysLogHandler using UDP."""
1881
1882    server_class = TestUDPServer
1883    address = ('localhost', 0)
1884
1885    def setUp(self):
1886        """Set up a UDP server to receive log messages, and a SysLogHandler
1887        pointing to that server's address and port."""
1888        BaseTest.setUp(self)
1889        # Issue #29177: deal with errors that happen during setup
1890        self.server = self.sl_hdlr = self.server_exception = None
1891        try:
1892            self.server = server = self.server_class(self.address,
1893                                                     self.handle_datagram, 0.01)
1894            server.start()
1895            # Uncomment next line to test error recovery in setUp()
1896            # raise OSError('dummy error raised')
1897        except OSError as e:
1898            self.server_exception = e
1899            return
1900        server.ready.wait()
1901        hcls = logging.handlers.SysLogHandler
1902        if isinstance(server.server_address, tuple):
1903            self.sl_hdlr = hcls((server.server_address[0], server.port))
1904        else:
1905            self.sl_hdlr = hcls(server.server_address)
1906        self.log_output = ''
1907        self.root_logger.removeHandler(self.root_logger.handlers[0])
1908        self.root_logger.addHandler(self.sl_hdlr)
1909        self.handled = threading.Event()
1910
1911    def tearDown(self):
1912        """Shutdown the server."""
1913        try:
1914            if self.server:
1915                self.server.stop()
1916            if self.sl_hdlr:
1917                self.root_logger.removeHandler(self.sl_hdlr)
1918                self.sl_hdlr.close()
1919        finally:
1920            BaseTest.tearDown(self)
1921
1922    def handle_datagram(self, request):
1923        self.log_output = request.packet
1924        self.handled.set()
1925
1926    def test_output(self):
1927        if self.server_exception:
1928            self.skipTest(self.server_exception)
1929        # The log message sent to the SysLogHandler is properly received.
1930        logger = logging.getLogger("slh")
1931        logger.error("sp\xe4m")
1932        self.handled.wait()
1933        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1934        self.handled.clear()
1935        self.sl_hdlr.append_nul = False
1936        logger.error("sp\xe4m")
1937        self.handled.wait()
1938        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1939        self.handled.clear()
1940        self.sl_hdlr.ident = "h\xe4m-"
1941        logger.error("sp\xe4m")
1942        self.handled.wait()
1943        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1944
1945    def test_udp_reconnection(self):
1946        logger = logging.getLogger("slh")
1947        self.sl_hdlr.close()
1948        self.handled.clear()
1949        logger.error("sp\xe4m")
1950        self.handled.wait(0.1)
1951        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1952
1953@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1954class UnixSysLogHandlerTest(SysLogHandlerTest):
1955
1956    """Test for SysLogHandler with Unix sockets."""
1957
1958    if hasattr(socket, "AF_UNIX"):
1959        server_class = TestUnixDatagramServer
1960
1961    def setUp(self):
1962        # override the definition in the base class
1963        self.address = _get_temp_domain_socket()
1964        SysLogHandlerTest.setUp(self)
1965
1966    def tearDown(self):
1967        SysLogHandlerTest.tearDown(self)
1968        os_helper.unlink(self.address)
1969
1970@unittest.skipUnless(socket_helper.IPV6_ENABLED,
1971                     'IPv6 support required for this test.')
1972class IPv6SysLogHandlerTest(SysLogHandlerTest):
1973
1974    """Test for SysLogHandler with IPv6 host."""
1975
1976    server_class = TestUDPServer
1977    address = ('::1', 0)
1978
1979    def setUp(self):
1980        self.server_class.address_family = socket.AF_INET6
1981        super(IPv6SysLogHandlerTest, self).setUp()
1982
1983    def tearDown(self):
1984        self.server_class.address_family = socket.AF_INET
1985        super(IPv6SysLogHandlerTest, self).tearDown()
1986
1987class HTTPHandlerTest(BaseTest):
1988    """Test for HTTPHandler."""
1989
1990    def setUp(self):
1991        """Set up an HTTP server to receive log messages, and a HTTPHandler
1992        pointing to that server's address and port."""
1993        BaseTest.setUp(self)
1994        self.handled = threading.Event()
1995
1996    def handle_request(self, request):
1997        self.command = request.command
1998        self.log_data = urlparse(request.path)
1999        if self.command == 'POST':
2000            try:
2001                rlen = int(request.headers['Content-Length'])
2002                self.post_data = request.rfile.read(rlen)
2003            except:
2004                self.post_data = None
2005        request.send_response(200)
2006        request.end_headers()
2007        self.handled.set()
2008
2009    def test_output(self):
2010        # The log message sent to the HTTPHandler is properly received.
2011        logger = logging.getLogger("http")
2012        root_logger = self.root_logger
2013        root_logger.removeHandler(self.root_logger.handlers[0])
2014        for secure in (False, True):
2015            addr = ('localhost', 0)
2016            if secure:
2017                try:
2018                    import ssl
2019                except ImportError:
2020                    sslctx = None
2021                else:
2022                    here = os.path.dirname(__file__)
2023                    localhost_cert = os.path.join(here, "keycert.pem")
2024                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2025                    sslctx.load_cert_chain(localhost_cert)
2026
2027                    context = ssl.create_default_context(cafile=localhost_cert)
2028            else:
2029                sslctx = None
2030                context = None
2031            self.server = server = TestHTTPServer(addr, self.handle_request,
2032                                                    0.01, sslctx=sslctx)
2033            server.start()
2034            server.ready.wait()
2035            host = 'localhost:%d' % server.server_port
2036            secure_client = secure and sslctx
2037            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
2038                                                       secure=secure_client,
2039                                                       context=context,
2040                                                       credentials=('foo', 'bar'))
2041            self.log_data = None
2042            root_logger.addHandler(self.h_hdlr)
2043
2044            for method in ('GET', 'POST'):
2045                self.h_hdlr.method = method
2046                self.handled.clear()
2047                msg = "sp\xe4m"
2048                logger.error(msg)
2049                self.handled.wait()
2050                self.assertEqual(self.log_data.path, '/frob')
2051                self.assertEqual(self.command, method)
2052                if method == 'GET':
2053                    d = parse_qs(self.log_data.query)
2054                else:
2055                    d = parse_qs(self.post_data.decode('utf-8'))
2056                self.assertEqual(d['name'], ['http'])
2057                self.assertEqual(d['funcName'], ['test_output'])
2058                self.assertEqual(d['msg'], [msg])
2059
2060            self.server.stop()
2061            self.root_logger.removeHandler(self.h_hdlr)
2062            self.h_hdlr.close()
2063
2064class MemoryTest(BaseTest):
2065
2066    """Test memory persistence of logger objects."""
2067
2068    def setUp(self):
2069        """Create a dict to remember potentially destroyed objects."""
2070        BaseTest.setUp(self)
2071        self._survivors = {}
2072
2073    def _watch_for_survival(self, *args):
2074        """Watch the given objects for survival, by creating weakrefs to
2075        them."""
2076        for obj in args:
2077            key = id(obj), repr(obj)
2078            self._survivors[key] = weakref.ref(obj)
2079
2080    def _assertTruesurvival(self):
2081        """Assert that all objects watched for survival have survived."""
2082        # Trigger cycle breaking.
2083        gc.collect()
2084        dead = []
2085        for (id_, repr_), ref in self._survivors.items():
2086            if ref() is None:
2087                dead.append(repr_)
2088        if dead:
2089            self.fail("%d objects should have survived "
2090                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2091
2092    def test_persistent_loggers(self):
2093        # Logger objects are persistent and retain their configuration, even
2094        #  if visible references are destroyed.
2095        self.root_logger.setLevel(logging.INFO)
2096        foo = logging.getLogger("foo")
2097        self._watch_for_survival(foo)
2098        foo.setLevel(logging.DEBUG)
2099        self.root_logger.debug(self.next_message())
2100        foo.debug(self.next_message())
2101        self.assert_log_lines([
2102            ('foo', 'DEBUG', '2'),
2103        ])
2104        del foo
2105        # foo has survived.
2106        self._assertTruesurvival()
2107        # foo has retained its settings.
2108        bar = logging.getLogger("foo")
2109        bar.debug(self.next_message())
2110        self.assert_log_lines([
2111            ('foo', 'DEBUG', '2'),
2112            ('foo', 'DEBUG', '3'),
2113        ])
2114
2115
2116class EncodingTest(BaseTest):
2117    def test_encoding_plain_file(self):
2118        # In Python 2.x, a plain file object is treated as having no encoding.
2119        log = logging.getLogger("test")
2120        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2121        os.close(fd)
2122        # the non-ascii data we write to the log.
2123        data = "foo\x80"
2124        try:
2125            handler = logging.FileHandler(fn, encoding="utf-8")
2126            log.addHandler(handler)
2127            try:
2128                # write non-ascii data to the log.
2129                log.warning(data)
2130            finally:
2131                log.removeHandler(handler)
2132                handler.close()
2133            # check we wrote exactly those bytes, ignoring trailing \n etc
2134            f = open(fn, encoding="utf-8")
2135            try:
2136                self.assertEqual(f.read().rstrip(), data)
2137            finally:
2138                f.close()
2139        finally:
2140            if os.path.isfile(fn):
2141                os.remove(fn)
2142
2143    def test_encoding_cyrillic_unicode(self):
2144        log = logging.getLogger("test")
2145        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2146        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2147        # Ensure it's written in a Cyrillic encoding
2148        writer_class = codecs.getwriter('cp1251')
2149        writer_class.encoding = 'cp1251'
2150        stream = io.BytesIO()
2151        writer = writer_class(stream, 'strict')
2152        handler = logging.StreamHandler(writer)
2153        log.addHandler(handler)
2154        try:
2155            log.warning(message)
2156        finally:
2157            log.removeHandler(handler)
2158            handler.close()
2159        # check we wrote exactly those bytes, ignoring trailing \n etc
2160        s = stream.getvalue()
2161        # Compare against what the data should be when encoded in CP-1251
2162        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2163
2164
2165class WarningsTest(BaseTest):
2166
2167    def test_warnings(self):
2168        with warnings.catch_warnings():
2169            logging.captureWarnings(True)
2170            self.addCleanup(logging.captureWarnings, False)
2171            warnings.filterwarnings("always", category=UserWarning)
2172            stream = io.StringIO()
2173            h = logging.StreamHandler(stream)
2174            logger = logging.getLogger("py.warnings")
2175            logger.addHandler(h)
2176            warnings.warn("I'm warning you...")
2177            logger.removeHandler(h)
2178            s = stream.getvalue()
2179            h.close()
2180            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2181
2182            # See if an explicit file uses the original implementation
2183            a_file = io.StringIO()
2184            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2185                                 a_file, "Dummy line")
2186            s = a_file.getvalue()
2187            a_file.close()
2188            self.assertEqual(s,
2189                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2190
2191    def test_warnings_no_handlers(self):
2192        with warnings.catch_warnings():
2193            logging.captureWarnings(True)
2194            self.addCleanup(logging.captureWarnings, False)
2195
2196            # confirm our assumption: no loggers are set
2197            logger = logging.getLogger("py.warnings")
2198            self.assertEqual(logger.handlers, [])
2199
2200            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2201            self.assertEqual(len(logger.handlers), 1)
2202            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2203
2204
2205def formatFunc(format, datefmt=None):
2206    return logging.Formatter(format, datefmt)
2207
2208class myCustomFormatter:
2209    def __init__(self, fmt, datefmt=None):
2210        pass
2211
2212def handlerFunc():
2213    return logging.StreamHandler()
2214
2215class CustomHandler(logging.StreamHandler):
2216    pass
2217
2218class ConfigDictTest(BaseTest):
2219
2220    """Reading logging config from a dictionary."""
2221
2222    check_no_resource_warning = warnings_helper.check_no_resource_warning
2223    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2224
2225    # config0 is a standard configuration.
2226    config0 = {
2227        'version': 1,
2228        'formatters': {
2229            'form1' : {
2230                'format' : '%(levelname)s ++ %(message)s',
2231            },
2232        },
2233        'handlers' : {
2234            'hand1' : {
2235                'class' : 'logging.StreamHandler',
2236                'formatter' : 'form1',
2237                'level' : 'NOTSET',
2238                'stream'  : 'ext://sys.stdout',
2239            },
2240        },
2241        'root' : {
2242            'level' : 'WARNING',
2243            'handlers' : ['hand1'],
2244        },
2245    }
2246
2247    # config1 adds a little to the standard configuration.
2248    config1 = {
2249        'version': 1,
2250        'formatters': {
2251            'form1' : {
2252                'format' : '%(levelname)s ++ %(message)s',
2253            },
2254        },
2255        'handlers' : {
2256            'hand1' : {
2257                'class' : 'logging.StreamHandler',
2258                'formatter' : 'form1',
2259                'level' : 'NOTSET',
2260                'stream'  : 'ext://sys.stdout',
2261            },
2262        },
2263        'loggers' : {
2264            'compiler.parser' : {
2265                'level' : 'DEBUG',
2266                'handlers' : ['hand1'],
2267            },
2268        },
2269        'root' : {
2270            'level' : 'WARNING',
2271        },
2272    }
2273
2274    # config1a moves the handler to the root. Used with config8a
2275    config1a = {
2276        'version': 1,
2277        'formatters': {
2278            'form1' : {
2279                'format' : '%(levelname)s ++ %(message)s',
2280            },
2281        },
2282        'handlers' : {
2283            'hand1' : {
2284                'class' : 'logging.StreamHandler',
2285                'formatter' : 'form1',
2286                'level' : 'NOTSET',
2287                'stream'  : 'ext://sys.stdout',
2288            },
2289        },
2290        'loggers' : {
2291            'compiler.parser' : {
2292                'level' : 'DEBUG',
2293            },
2294        },
2295        'root' : {
2296            'level' : 'WARNING',
2297            'handlers' : ['hand1'],
2298        },
2299    }
2300
2301    # config2 has a subtle configuration error that should be reported
2302    config2 = {
2303        'version': 1,
2304        'formatters': {
2305            'form1' : {
2306                'format' : '%(levelname)s ++ %(message)s',
2307            },
2308        },
2309        'handlers' : {
2310            'hand1' : {
2311                'class' : 'logging.StreamHandler',
2312                'formatter' : 'form1',
2313                'level' : 'NOTSET',
2314                'stream'  : 'ext://sys.stdbout',
2315            },
2316        },
2317        'loggers' : {
2318            'compiler.parser' : {
2319                'level' : 'DEBUG',
2320                'handlers' : ['hand1'],
2321            },
2322        },
2323        'root' : {
2324            'level' : 'WARNING',
2325        },
2326    }
2327
2328    # As config1 but with a misspelt level on a handler
2329    config2a = {
2330        'version': 1,
2331        'formatters': {
2332            'form1' : {
2333                'format' : '%(levelname)s ++ %(message)s',
2334            },
2335        },
2336        'handlers' : {
2337            'hand1' : {
2338                'class' : 'logging.StreamHandler',
2339                'formatter' : 'form1',
2340                'level' : 'NTOSET',
2341                'stream'  : 'ext://sys.stdout',
2342            },
2343        },
2344        'loggers' : {
2345            'compiler.parser' : {
2346                'level' : 'DEBUG',
2347                'handlers' : ['hand1'],
2348            },
2349        },
2350        'root' : {
2351            'level' : 'WARNING',
2352        },
2353    }
2354
2355
2356    # As config1 but with a misspelt level on a logger
2357    config2b = {
2358        'version': 1,
2359        'formatters': {
2360            'form1' : {
2361                'format' : '%(levelname)s ++ %(message)s',
2362            },
2363        },
2364        'handlers' : {
2365            'hand1' : {
2366                'class' : 'logging.StreamHandler',
2367                'formatter' : 'form1',
2368                'level' : 'NOTSET',
2369                'stream'  : 'ext://sys.stdout',
2370            },
2371        },
2372        'loggers' : {
2373            'compiler.parser' : {
2374                'level' : 'DEBUG',
2375                'handlers' : ['hand1'],
2376            },
2377        },
2378        'root' : {
2379            'level' : 'WRANING',
2380        },
2381    }
2382
2383    # config3 has a less subtle configuration error
2384    config3 = {
2385        'version': 1,
2386        'formatters': {
2387            'form1' : {
2388                'format' : '%(levelname)s ++ %(message)s',
2389            },
2390        },
2391        'handlers' : {
2392            'hand1' : {
2393                'class' : 'logging.StreamHandler',
2394                'formatter' : 'misspelled_name',
2395                'level' : 'NOTSET',
2396                'stream'  : 'ext://sys.stdout',
2397            },
2398        },
2399        'loggers' : {
2400            'compiler.parser' : {
2401                'level' : 'DEBUG',
2402                'handlers' : ['hand1'],
2403            },
2404        },
2405        'root' : {
2406            'level' : 'WARNING',
2407        },
2408    }
2409
2410    # config4 specifies a custom formatter class to be loaded
2411    config4 = {
2412        'version': 1,
2413        'formatters': {
2414            'form1' : {
2415                '()' : __name__ + '.ExceptionFormatter',
2416                'format' : '%(levelname)s:%(name)s:%(message)s',
2417            },
2418        },
2419        'handlers' : {
2420            'hand1' : {
2421                'class' : 'logging.StreamHandler',
2422                'formatter' : 'form1',
2423                'level' : 'NOTSET',
2424                'stream'  : 'ext://sys.stdout',
2425            },
2426        },
2427        'root' : {
2428            'level' : 'NOTSET',
2429                'handlers' : ['hand1'],
2430        },
2431    }
2432
2433    # As config4 but using an actual callable rather than a string
2434    config4a = {
2435        'version': 1,
2436        'formatters': {
2437            'form1' : {
2438                '()' : ExceptionFormatter,
2439                'format' : '%(levelname)s:%(name)s:%(message)s',
2440            },
2441            'form2' : {
2442                '()' : __name__ + '.formatFunc',
2443                'format' : '%(levelname)s:%(name)s:%(message)s',
2444            },
2445            'form3' : {
2446                '()' : formatFunc,
2447                'format' : '%(levelname)s:%(name)s:%(message)s',
2448            },
2449        },
2450        'handlers' : {
2451            'hand1' : {
2452                'class' : 'logging.StreamHandler',
2453                'formatter' : 'form1',
2454                'level' : 'NOTSET',
2455                'stream'  : 'ext://sys.stdout',
2456            },
2457            'hand2' : {
2458                '()' : handlerFunc,
2459            },
2460        },
2461        'root' : {
2462            'level' : 'NOTSET',
2463                'handlers' : ['hand1'],
2464        },
2465    }
2466
2467    # config5 specifies a custom handler class to be loaded
2468    config5 = {
2469        'version': 1,
2470        'formatters': {
2471            'form1' : {
2472                'format' : '%(levelname)s ++ %(message)s',
2473            },
2474        },
2475        'handlers' : {
2476            'hand1' : {
2477                'class' : __name__ + '.CustomHandler',
2478                'formatter' : 'form1',
2479                'level' : 'NOTSET',
2480                'stream'  : 'ext://sys.stdout',
2481            },
2482        },
2483        'loggers' : {
2484            'compiler.parser' : {
2485                'level' : 'DEBUG',
2486                'handlers' : ['hand1'],
2487            },
2488        },
2489        'root' : {
2490            'level' : 'WARNING',
2491        },
2492    }
2493
2494    # config6 specifies a custom handler class to be loaded
2495    # but has bad arguments
2496    config6 = {
2497        'version': 1,
2498        'formatters': {
2499            'form1' : {
2500                'format' : '%(levelname)s ++ %(message)s',
2501            },
2502        },
2503        'handlers' : {
2504            'hand1' : {
2505                'class' : __name__ + '.CustomHandler',
2506                'formatter' : 'form1',
2507                'level' : 'NOTSET',
2508                'stream'  : 'ext://sys.stdout',
2509                '9' : 'invalid parameter name',
2510            },
2511        },
2512        'loggers' : {
2513            'compiler.parser' : {
2514                'level' : 'DEBUG',
2515                'handlers' : ['hand1'],
2516            },
2517        },
2518        'root' : {
2519            'level' : 'WARNING',
2520        },
2521    }
2522
2523    # config 7 does not define compiler.parser but defines compiler.lexer
2524    # so compiler.parser should be disabled after applying it
2525    config7 = {
2526        'version': 1,
2527        'formatters': {
2528            'form1' : {
2529                'format' : '%(levelname)s ++ %(message)s',
2530            },
2531        },
2532        'handlers' : {
2533            'hand1' : {
2534                'class' : 'logging.StreamHandler',
2535                'formatter' : 'form1',
2536                'level' : 'NOTSET',
2537                'stream'  : 'ext://sys.stdout',
2538            },
2539        },
2540        'loggers' : {
2541            'compiler.lexer' : {
2542                'level' : 'DEBUG',
2543                'handlers' : ['hand1'],
2544            },
2545        },
2546        'root' : {
2547            'level' : 'WARNING',
2548        },
2549    }
2550
2551    # config8 defines both compiler and compiler.lexer
2552    # so compiler.parser should not be disabled (since
2553    # compiler is defined)
2554    config8 = {
2555        'version': 1,
2556        'disable_existing_loggers' : False,
2557        'formatters': {
2558            'form1' : {
2559                'format' : '%(levelname)s ++ %(message)s',
2560            },
2561        },
2562        'handlers' : {
2563            'hand1' : {
2564                'class' : 'logging.StreamHandler',
2565                'formatter' : 'form1',
2566                'level' : 'NOTSET',
2567                'stream'  : 'ext://sys.stdout',
2568            },
2569        },
2570        'loggers' : {
2571            'compiler' : {
2572                'level' : 'DEBUG',
2573                'handlers' : ['hand1'],
2574            },
2575            'compiler.lexer' : {
2576            },
2577        },
2578        'root' : {
2579            'level' : 'WARNING',
2580        },
2581    }
2582
2583    # config8a disables existing loggers
2584    config8a = {
2585        'version': 1,
2586        'disable_existing_loggers' : True,
2587        'formatters': {
2588            'form1' : {
2589                'format' : '%(levelname)s ++ %(message)s',
2590            },
2591        },
2592        'handlers' : {
2593            'hand1' : {
2594                'class' : 'logging.StreamHandler',
2595                'formatter' : 'form1',
2596                'level' : 'NOTSET',
2597                'stream'  : 'ext://sys.stdout',
2598            },
2599        },
2600        'loggers' : {
2601            'compiler' : {
2602                'level' : 'DEBUG',
2603                'handlers' : ['hand1'],
2604            },
2605            'compiler.lexer' : {
2606            },
2607        },
2608        'root' : {
2609            'level' : 'WARNING',
2610        },
2611    }
2612
2613    config9 = {
2614        'version': 1,
2615        'formatters': {
2616            'form1' : {
2617                'format' : '%(levelname)s ++ %(message)s',
2618            },
2619        },
2620        'handlers' : {
2621            'hand1' : {
2622                'class' : 'logging.StreamHandler',
2623                'formatter' : 'form1',
2624                'level' : 'WARNING',
2625                'stream'  : 'ext://sys.stdout',
2626            },
2627        },
2628        'loggers' : {
2629            'compiler.parser' : {
2630                'level' : 'WARNING',
2631                'handlers' : ['hand1'],
2632            },
2633        },
2634        'root' : {
2635            'level' : 'NOTSET',
2636        },
2637    }
2638
2639    config9a = {
2640        'version': 1,
2641        'incremental' : True,
2642        'handlers' : {
2643            'hand1' : {
2644                'level' : 'WARNING',
2645            },
2646        },
2647        'loggers' : {
2648            'compiler.parser' : {
2649                'level' : 'INFO',
2650            },
2651        },
2652    }
2653
2654    config9b = {
2655        'version': 1,
2656        'incremental' : True,
2657        'handlers' : {
2658            'hand1' : {
2659                'level' : 'INFO',
2660            },
2661        },
2662        'loggers' : {
2663            'compiler.parser' : {
2664                'level' : 'INFO',
2665            },
2666        },
2667    }
2668
2669    # As config1 but with a filter added
2670    config10 = {
2671        'version': 1,
2672        'formatters': {
2673            'form1' : {
2674                'format' : '%(levelname)s ++ %(message)s',
2675            },
2676        },
2677        'filters' : {
2678            'filt1' : {
2679                'name' : 'compiler.parser',
2680            },
2681        },
2682        'handlers' : {
2683            'hand1' : {
2684                'class' : 'logging.StreamHandler',
2685                'formatter' : 'form1',
2686                'level' : 'NOTSET',
2687                'stream'  : 'ext://sys.stdout',
2688                'filters' : ['filt1'],
2689            },
2690        },
2691        'loggers' : {
2692            'compiler.parser' : {
2693                'level' : 'DEBUG',
2694                'filters' : ['filt1'],
2695            },
2696        },
2697        'root' : {
2698            'level' : 'WARNING',
2699            'handlers' : ['hand1'],
2700        },
2701    }
2702
2703    # As config1 but using cfg:// references
2704    config11 = {
2705        'version': 1,
2706        'true_formatters': {
2707            'form1' : {
2708                'format' : '%(levelname)s ++ %(message)s',
2709            },
2710        },
2711        'handler_configs': {
2712            'hand1' : {
2713                'class' : 'logging.StreamHandler',
2714                'formatter' : 'form1',
2715                'level' : 'NOTSET',
2716                'stream'  : 'ext://sys.stdout',
2717            },
2718        },
2719        'formatters' : 'cfg://true_formatters',
2720        'handlers' : {
2721            'hand1' : 'cfg://handler_configs[hand1]',
2722        },
2723        'loggers' : {
2724            'compiler.parser' : {
2725                'level' : 'DEBUG',
2726                'handlers' : ['hand1'],
2727            },
2728        },
2729        'root' : {
2730            'level' : 'WARNING',
2731        },
2732    }
2733
2734    # As config11 but missing the version key
2735    config12 = {
2736        'true_formatters': {
2737            'form1' : {
2738                'format' : '%(levelname)s ++ %(message)s',
2739            },
2740        },
2741        'handler_configs': {
2742            'hand1' : {
2743                'class' : 'logging.StreamHandler',
2744                'formatter' : 'form1',
2745                'level' : 'NOTSET',
2746                'stream'  : 'ext://sys.stdout',
2747            },
2748        },
2749        'formatters' : 'cfg://true_formatters',
2750        'handlers' : {
2751            'hand1' : 'cfg://handler_configs[hand1]',
2752        },
2753        'loggers' : {
2754            'compiler.parser' : {
2755                'level' : 'DEBUG',
2756                'handlers' : ['hand1'],
2757            },
2758        },
2759        'root' : {
2760            'level' : 'WARNING',
2761        },
2762    }
2763
2764    # As config11 but using an unsupported version
2765    config13 = {
2766        'version': 2,
2767        'true_formatters': {
2768            'form1' : {
2769                'format' : '%(levelname)s ++ %(message)s',
2770            },
2771        },
2772        'handler_configs': {
2773            'hand1' : {
2774                'class' : 'logging.StreamHandler',
2775                'formatter' : 'form1',
2776                'level' : 'NOTSET',
2777                'stream'  : 'ext://sys.stdout',
2778            },
2779        },
2780        'formatters' : 'cfg://true_formatters',
2781        'handlers' : {
2782            'hand1' : 'cfg://handler_configs[hand1]',
2783        },
2784        'loggers' : {
2785            'compiler.parser' : {
2786                'level' : 'DEBUG',
2787                'handlers' : ['hand1'],
2788            },
2789        },
2790        'root' : {
2791            'level' : 'WARNING',
2792        },
2793    }
2794
2795    # As config0, but with properties
2796    config14 = {
2797        'version': 1,
2798        'formatters': {
2799            'form1' : {
2800                'format' : '%(levelname)s ++ %(message)s',
2801            },
2802        },
2803        'handlers' : {
2804            'hand1' : {
2805                'class' : 'logging.StreamHandler',
2806                'formatter' : 'form1',
2807                'level' : 'NOTSET',
2808                'stream'  : 'ext://sys.stdout',
2809                '.': {
2810                    'foo': 'bar',
2811                    'terminator': '!\n',
2812                }
2813            },
2814        },
2815        'root' : {
2816            'level' : 'WARNING',
2817            'handlers' : ['hand1'],
2818        },
2819    }
2820
2821    out_of_order = {
2822        "version": 1,
2823        "formatters": {
2824            "mySimpleFormatter": {
2825                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2826                "style": "$"
2827            }
2828        },
2829        "handlers": {
2830            "fileGlobal": {
2831                "class": "logging.StreamHandler",
2832                "level": "DEBUG",
2833                "formatter": "mySimpleFormatter"
2834            },
2835            "bufferGlobal": {
2836                "class": "logging.handlers.MemoryHandler",
2837                "capacity": 5,
2838                "formatter": "mySimpleFormatter",
2839                "target": "fileGlobal",
2840                "level": "DEBUG"
2841                }
2842        },
2843        "loggers": {
2844            "mymodule": {
2845                "level": "DEBUG",
2846                "handlers": ["bufferGlobal"],
2847                "propagate": "true"
2848            }
2849        }
2850    }
2851
2852    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
2853    custom_formatter_class_validate = {
2854        'version': 1,
2855        'formatters': {
2856            'form1': {
2857                '()': __name__ + '.ExceptionFormatter',
2858                'format': '%(levelname)s:%(name)s:%(message)s',
2859                'validate': False,
2860            },
2861        },
2862        'handlers' : {
2863            'hand1' : {
2864                'class': 'logging.StreamHandler',
2865                'formatter': 'form1',
2866                'level': 'NOTSET',
2867                'stream': 'ext://sys.stdout',
2868            },
2869        },
2870        "loggers": {
2871            "my_test_logger_custom_formatter": {
2872                "level": "DEBUG",
2873                "handlers": ["hand1"],
2874                "propagate": "true"
2875            }
2876        }
2877    }
2878
2879    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
2880    custom_formatter_class_validate2 = {
2881        'version': 1,
2882        'formatters': {
2883            'form1': {
2884                'class': __name__ + '.ExceptionFormatter',
2885                'format': '%(levelname)s:%(name)s:%(message)s',
2886                'validate': False,
2887            },
2888        },
2889        'handlers' : {
2890            'hand1' : {
2891                'class': 'logging.StreamHandler',
2892                'formatter': 'form1',
2893                'level': 'NOTSET',
2894                'stream': 'ext://sys.stdout',
2895            },
2896        },
2897        "loggers": {
2898            "my_test_logger_custom_formatter": {
2899                "level": "DEBUG",
2900                "handlers": ["hand1"],
2901                "propagate": "true"
2902            }
2903        }
2904    }
2905
2906    # Configuration with custom class that is not inherited from logging.Formatter
2907    custom_formatter_class_validate3 = {
2908        'version': 1,
2909        'formatters': {
2910            'form1': {
2911                'class': __name__ + '.myCustomFormatter',
2912                'format': '%(levelname)s:%(name)s:%(message)s',
2913                'validate': False,
2914            },
2915        },
2916        'handlers' : {
2917            'hand1' : {
2918                'class': 'logging.StreamHandler',
2919                'formatter': 'form1',
2920                'level': 'NOTSET',
2921                'stream': 'ext://sys.stdout',
2922            },
2923        },
2924        "loggers": {
2925            "my_test_logger_custom_formatter": {
2926                "level": "DEBUG",
2927                "handlers": ["hand1"],
2928                "propagate": "true"
2929            }
2930        }
2931    }
2932
2933    # Configuration with custom function and 'validate' set to False
2934    custom_formatter_with_function = {
2935        'version': 1,
2936        'formatters': {
2937            'form1': {
2938                '()': formatFunc,
2939                'format': '%(levelname)s:%(name)s:%(message)s',
2940                'validate': False,
2941            },
2942        },
2943        'handlers' : {
2944            'hand1' : {
2945                'class': 'logging.StreamHandler',
2946                'formatter': 'form1',
2947                'level': 'NOTSET',
2948                'stream': 'ext://sys.stdout',
2949            },
2950        },
2951        "loggers": {
2952            "my_test_logger_custom_formatter": {
2953                "level": "DEBUG",
2954                "handlers": ["hand1"],
2955                "propagate": "true"
2956            }
2957        }
2958    }
2959
2960    def apply_config(self, conf):
2961        logging.config.dictConfig(conf)
2962
2963    def test_config0_ok(self):
2964        # A simple config which overrides the default settings.
2965        with support.captured_stdout() as output:
2966            self.apply_config(self.config0)
2967            logger = logging.getLogger()
2968            # Won't output anything
2969            logger.info(self.next_message())
2970            # Outputs a message
2971            logger.error(self.next_message())
2972            self.assert_log_lines([
2973                ('ERROR', '2'),
2974            ], stream=output)
2975            # Original logger output is empty.
2976            self.assert_log_lines([])
2977
2978    def test_config1_ok(self, config=config1):
2979        # A config defining a sub-parser as well.
2980        with support.captured_stdout() as output:
2981            self.apply_config(config)
2982            logger = logging.getLogger("compiler.parser")
2983            # Both will output a message
2984            logger.info(self.next_message())
2985            logger.error(self.next_message())
2986            self.assert_log_lines([
2987                ('INFO', '1'),
2988                ('ERROR', '2'),
2989            ], stream=output)
2990            # Original logger output is empty.
2991            self.assert_log_lines([])
2992
2993    def test_config2_failure(self):
2994        # A simple config which overrides the default settings.
2995        self.assertRaises(Exception, self.apply_config, self.config2)
2996
2997    def test_config2a_failure(self):
2998        # A simple config which overrides the default settings.
2999        self.assertRaises(Exception, self.apply_config, self.config2a)
3000
3001    def test_config2b_failure(self):
3002        # A simple config which overrides the default settings.
3003        self.assertRaises(Exception, self.apply_config, self.config2b)
3004
3005    def test_config3_failure(self):
3006        # A simple config which overrides the default settings.
3007        self.assertRaises(Exception, self.apply_config, self.config3)
3008
3009    def test_config4_ok(self):
3010        # A config specifying a custom formatter class.
3011        with support.captured_stdout() as output:
3012            self.apply_config(self.config4)
3013            #logger = logging.getLogger()
3014            try:
3015                raise RuntimeError()
3016            except RuntimeError:
3017                logging.exception("just testing")
3018            sys.stdout.seek(0)
3019            self.assertEqual(output.getvalue(),
3020                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3021            # Original logger output is empty
3022            self.assert_log_lines([])
3023
3024    def test_config4a_ok(self):
3025        # A config specifying a custom formatter class.
3026        with support.captured_stdout() as output:
3027            self.apply_config(self.config4a)
3028            #logger = logging.getLogger()
3029            try:
3030                raise RuntimeError()
3031            except RuntimeError:
3032                logging.exception("just testing")
3033            sys.stdout.seek(0)
3034            self.assertEqual(output.getvalue(),
3035                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3036            # Original logger output is empty
3037            self.assert_log_lines([])
3038
3039    def test_config5_ok(self):
3040        self.test_config1_ok(config=self.config5)
3041
3042    def test_config6_failure(self):
3043        self.assertRaises(Exception, self.apply_config, self.config6)
3044
3045    def test_config7_ok(self):
3046        with support.captured_stdout() as output:
3047            self.apply_config(self.config1)
3048            logger = logging.getLogger("compiler.parser")
3049            # Both will output a message
3050            logger.info(self.next_message())
3051            logger.error(self.next_message())
3052            self.assert_log_lines([
3053                ('INFO', '1'),
3054                ('ERROR', '2'),
3055            ], stream=output)
3056            # Original logger output is empty.
3057            self.assert_log_lines([])
3058        with support.captured_stdout() as output:
3059            self.apply_config(self.config7)
3060            logger = logging.getLogger("compiler.parser")
3061            self.assertTrue(logger.disabled)
3062            logger = logging.getLogger("compiler.lexer")
3063            # Both will output a message
3064            logger.info(self.next_message())
3065            logger.error(self.next_message())
3066            self.assert_log_lines([
3067                ('INFO', '3'),
3068                ('ERROR', '4'),
3069            ], stream=output)
3070            # Original logger output is empty.
3071            self.assert_log_lines([])
3072
3073    # Same as test_config_7_ok but don't disable old loggers.
3074    def test_config_8_ok(self):
3075        with support.captured_stdout() as output:
3076            self.apply_config(self.config1)
3077            logger = logging.getLogger("compiler.parser")
3078            # All will output a message
3079            logger.info(self.next_message())
3080            logger.error(self.next_message())
3081            self.assert_log_lines([
3082                ('INFO', '1'),
3083                ('ERROR', '2'),
3084            ], stream=output)
3085            # Original logger output is empty.
3086            self.assert_log_lines([])
3087        with support.captured_stdout() as output:
3088            self.apply_config(self.config8)
3089            logger = logging.getLogger("compiler.parser")
3090            self.assertFalse(logger.disabled)
3091            # Both will output a message
3092            logger.info(self.next_message())
3093            logger.error(self.next_message())
3094            logger = logging.getLogger("compiler.lexer")
3095            # Both will output a message
3096            logger.info(self.next_message())
3097            logger.error(self.next_message())
3098            self.assert_log_lines([
3099                ('INFO', '3'),
3100                ('ERROR', '4'),
3101                ('INFO', '5'),
3102                ('ERROR', '6'),
3103            ], stream=output)
3104            # Original logger output is empty.
3105            self.assert_log_lines([])
3106
3107    def test_config_8a_ok(self):
3108        with support.captured_stdout() as output:
3109            self.apply_config(self.config1a)
3110            logger = logging.getLogger("compiler.parser")
3111            # See issue #11424. compiler-hyphenated sorts
3112            # between compiler and compiler.xyz and this
3113            # was preventing compiler.xyz from being included
3114            # in the child loggers of compiler because of an
3115            # overzealous loop termination condition.
3116            hyphenated = logging.getLogger('compiler-hyphenated')
3117            # All will output a message
3118            logger.info(self.next_message())
3119            logger.error(self.next_message())
3120            hyphenated.critical(self.next_message())
3121            self.assert_log_lines([
3122                ('INFO', '1'),
3123                ('ERROR', '2'),
3124                ('CRITICAL', '3'),
3125            ], stream=output)
3126            # Original logger output is empty.
3127            self.assert_log_lines([])
3128        with support.captured_stdout() as output:
3129            self.apply_config(self.config8a)
3130            logger = logging.getLogger("compiler.parser")
3131            self.assertFalse(logger.disabled)
3132            # Both will output a message
3133            logger.info(self.next_message())
3134            logger.error(self.next_message())
3135            logger = logging.getLogger("compiler.lexer")
3136            # Both will output a message
3137            logger.info(self.next_message())
3138            logger.error(self.next_message())
3139            # Will not appear
3140            hyphenated.critical(self.next_message())
3141            self.assert_log_lines([
3142                ('INFO', '4'),
3143                ('ERROR', '5'),
3144                ('INFO', '6'),
3145                ('ERROR', '7'),
3146            ], stream=output)
3147            # Original logger output is empty.
3148            self.assert_log_lines([])
3149
3150    def test_config_9_ok(self):
3151        with support.captured_stdout() as output:
3152            self.apply_config(self.config9)
3153            logger = logging.getLogger("compiler.parser")
3154            # Nothing will be output since both handler and logger are set to WARNING
3155            logger.info(self.next_message())
3156            self.assert_log_lines([], stream=output)
3157            self.apply_config(self.config9a)
3158            # Nothing will be output since handler is still set to WARNING
3159            logger.info(self.next_message())
3160            self.assert_log_lines([], stream=output)
3161            self.apply_config(self.config9b)
3162            # Message should now be output
3163            logger.info(self.next_message())
3164            self.assert_log_lines([
3165                ('INFO', '3'),
3166            ], stream=output)
3167
3168    def test_config_10_ok(self):
3169        with support.captured_stdout() as output:
3170            self.apply_config(self.config10)
3171            logger = logging.getLogger("compiler.parser")
3172            logger.warning(self.next_message())
3173            logger = logging.getLogger('compiler')
3174            # Not output, because filtered
3175            logger.warning(self.next_message())
3176            logger = logging.getLogger('compiler.lexer')
3177            # Not output, because filtered
3178            logger.warning(self.next_message())
3179            logger = logging.getLogger("compiler.parser.codegen")
3180            # Output, as not filtered
3181            logger.error(self.next_message())
3182            self.assert_log_lines([
3183                ('WARNING', '1'),
3184                ('ERROR', '4'),
3185            ], stream=output)
3186
3187    def test_config11_ok(self):
3188        self.test_config1_ok(self.config11)
3189
3190    def test_config12_failure(self):
3191        self.assertRaises(Exception, self.apply_config, self.config12)
3192
3193    def test_config13_failure(self):
3194        self.assertRaises(Exception, self.apply_config, self.config13)
3195
3196    def test_config14_ok(self):
3197        with support.captured_stdout() as output:
3198            self.apply_config(self.config14)
3199            h = logging._handlers['hand1']
3200            self.assertEqual(h.foo, 'bar')
3201            self.assertEqual(h.terminator, '!\n')
3202            logging.warning('Exclamation')
3203            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3204
3205    def test_config15_ok(self):
3206
3207        def cleanup(h1, fn):
3208            h1.close()
3209            os.remove(fn)
3210
3211        with self.check_no_resource_warning():
3212            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3213            os.close(fd)
3214
3215            config = {
3216                "version": 1,
3217                "handlers": {
3218                    "file": {
3219                        "class": "logging.FileHandler",
3220                        "filename": fn,
3221                        "encoding": "utf-8",
3222                    }
3223                },
3224                "root": {
3225                    "handlers": ["file"]
3226                }
3227            }
3228
3229            self.apply_config(config)
3230            self.apply_config(config)
3231
3232        handler = logging.root.handlers[0]
3233        self.addCleanup(cleanup, handler, fn)
3234
3235    def setup_via_listener(self, text, verify=None):
3236        text = text.encode("utf-8")
3237        # Ask for a randomly assigned port (by using port 0)
3238        t = logging.config.listen(0, verify)
3239        t.start()
3240        t.ready.wait()
3241        # Now get the port allocated
3242        port = t.port
3243        t.ready.clear()
3244        try:
3245            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3246            sock.settimeout(2.0)
3247            sock.connect(('localhost', port))
3248
3249            slen = struct.pack('>L', len(text))
3250            s = slen + text
3251            sentsofar = 0
3252            left = len(s)
3253            while left > 0:
3254                sent = sock.send(s[sentsofar:])
3255                sentsofar += sent
3256                left -= sent
3257            sock.close()
3258        finally:
3259            t.ready.wait(2.0)
3260            logging.config.stopListening()
3261            threading_helper.join_thread(t)
3262
3263    def test_listen_config_10_ok(self):
3264        with support.captured_stdout() as output:
3265            self.setup_via_listener(json.dumps(self.config10))
3266            logger = logging.getLogger("compiler.parser")
3267            logger.warning(self.next_message())
3268            logger = logging.getLogger('compiler')
3269            # Not output, because filtered
3270            logger.warning(self.next_message())
3271            logger = logging.getLogger('compiler.lexer')
3272            # Not output, because filtered
3273            logger.warning(self.next_message())
3274            logger = logging.getLogger("compiler.parser.codegen")
3275            # Output, as not filtered
3276            logger.error(self.next_message())
3277            self.assert_log_lines([
3278                ('WARNING', '1'),
3279                ('ERROR', '4'),
3280            ], stream=output)
3281
3282    def test_listen_config_1_ok(self):
3283        with support.captured_stdout() as output:
3284            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3285            logger = logging.getLogger("compiler.parser")
3286            # Both will output a message
3287            logger.info(self.next_message())
3288            logger.error(self.next_message())
3289            self.assert_log_lines([
3290                ('INFO', '1'),
3291                ('ERROR', '2'),
3292            ], stream=output)
3293            # Original logger output is empty.
3294            self.assert_log_lines([])
3295
3296    def test_listen_verify(self):
3297
3298        def verify_fail(stuff):
3299            return None
3300
3301        def verify_reverse(stuff):
3302            return stuff[::-1]
3303
3304        logger = logging.getLogger("compiler.parser")
3305        to_send = textwrap.dedent(ConfigFileTest.config1)
3306        # First, specify a verification function that will fail.
3307        # We expect to see no output, since our configuration
3308        # never took effect.
3309        with support.captured_stdout() as output:
3310            self.setup_via_listener(to_send, verify_fail)
3311            # Both will output a message
3312            logger.info(self.next_message())
3313            logger.error(self.next_message())
3314        self.assert_log_lines([], stream=output)
3315        # Original logger output has the stuff we logged.
3316        self.assert_log_lines([
3317            ('INFO', '1'),
3318            ('ERROR', '2'),
3319        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3320
3321        # Now, perform no verification. Our configuration
3322        # should take effect.
3323
3324        with support.captured_stdout() as output:
3325            self.setup_via_listener(to_send)    # no verify callable specified
3326            logger = logging.getLogger("compiler.parser")
3327            # Both will output a message
3328            logger.info(self.next_message())
3329            logger.error(self.next_message())
3330        self.assert_log_lines([
3331            ('INFO', '3'),
3332            ('ERROR', '4'),
3333        ], stream=output)
3334        # Original logger output still has the stuff we logged before.
3335        self.assert_log_lines([
3336            ('INFO', '1'),
3337            ('ERROR', '2'),
3338        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3339
3340        # Now, perform verification which transforms the bytes.
3341
3342        with support.captured_stdout() as output:
3343            self.setup_via_listener(to_send[::-1], verify_reverse)
3344            logger = logging.getLogger("compiler.parser")
3345            # Both will output a message
3346            logger.info(self.next_message())
3347            logger.error(self.next_message())
3348        self.assert_log_lines([
3349            ('INFO', '5'),
3350            ('ERROR', '6'),
3351        ], stream=output)
3352        # Original logger output still has the stuff we logged before.
3353        self.assert_log_lines([
3354            ('INFO', '1'),
3355            ('ERROR', '2'),
3356        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3357
3358    def test_out_of_order(self):
3359        self.assertRaises(ValueError, self.apply_config, self.out_of_order)
3360
3361    def test_out_of_order_with_dollar_style(self):
3362        config = copy.deepcopy(self.out_of_order)
3363        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3364
3365        self.apply_config(config)
3366        handler = logging.getLogger('mymodule').handlers[0]
3367        self.assertIsInstance(handler.target, logging.Handler)
3368        self.assertIsInstance(handler.formatter._style,
3369                              logging.StringTemplateStyle)
3370
3371    def test_custom_formatter_class_with_validate(self):
3372        self.apply_config(self.custom_formatter_class_validate)
3373        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3374        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3375
3376    def test_custom_formatter_class_with_validate2(self):
3377        self.apply_config(self.custom_formatter_class_validate2)
3378        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3379        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3380
3381    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3382        config = self.custom_formatter_class_validate.copy()
3383        config['formatters']['form1']['style'] = "$"
3384
3385        # Exception should not be raise as we have configured 'validate' to False
3386        self.apply_config(config)
3387        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3388        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3389
3390    def test_custom_formatter_class_with_validate3(self):
3391        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3392
3393    def test_custom_formatter_function_with_validate(self):
3394        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3395
3396    def test_baseconfig(self):
3397        d = {
3398            'atuple': (1, 2, 3),
3399            'alist': ['a', 'b', 'c'],
3400            'adict': {'d': 'e', 'f': 3 },
3401            'nest1': ('g', ('h', 'i'), 'j'),
3402            'nest2': ['k', ['l', 'm'], 'n'],
3403            'nest3': ['o', 'cfg://alist', 'p'],
3404        }
3405        bc = logging.config.BaseConfigurator(d)
3406        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3407        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3408        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3409        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3410        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3411        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3412        v = bc.convert('cfg://nest3')
3413        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3414        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3415        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3416        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3417
3418    def test_namedtuple(self):
3419        # see bpo-39142
3420        from collections import namedtuple
3421
3422        class MyHandler(logging.StreamHandler):
3423            def __init__(self, resource, *args, **kwargs):
3424                super().__init__(*args, **kwargs)
3425                self.resource: namedtuple = resource
3426
3427            def emit(self, record):
3428                record.msg += f' {self.resource.type}'
3429                return super().emit(record)
3430
3431        Resource = namedtuple('Resource', ['type', 'labels'])
3432        resource = Resource(type='my_type', labels=['a'])
3433
3434        config = {
3435            'version': 1,
3436            'handlers': {
3437                'myhandler': {
3438                    '()': MyHandler,
3439                    'resource': resource
3440                }
3441            },
3442            'root':  {'level': 'INFO', 'handlers': ['myhandler']},
3443        }
3444        with support.captured_stderr() as stderr:
3445            self.apply_config(config)
3446            logging.info('some log')
3447        self.assertEqual(stderr.getvalue(), 'some log my_type\n')
3448
3449class ManagerTest(BaseTest):
3450    def test_manager_loggerclass(self):
3451        logged = []
3452
3453        class MyLogger(logging.Logger):
3454            def _log(self, level, msg, args, exc_info=None, extra=None):
3455                logged.append(msg)
3456
3457        man = logging.Manager(None)
3458        self.assertRaises(TypeError, man.setLoggerClass, int)
3459        man.setLoggerClass(MyLogger)
3460        logger = man.getLogger('test')
3461        logger.warning('should appear in logged')
3462        logging.warning('should not appear in logged')
3463
3464        self.assertEqual(logged, ['should appear in logged'])
3465
3466    def test_set_log_record_factory(self):
3467        man = logging.Manager(None)
3468        expected = object()
3469        man.setLogRecordFactory(expected)
3470        self.assertEqual(man.logRecordFactory, expected)
3471
3472class ChildLoggerTest(BaseTest):
3473    def test_child_loggers(self):
3474        r = logging.getLogger()
3475        l1 = logging.getLogger('abc')
3476        l2 = logging.getLogger('def.ghi')
3477        c1 = r.getChild('xyz')
3478        c2 = r.getChild('uvw.xyz')
3479        self.assertIs(c1, logging.getLogger('xyz'))
3480        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3481        c1 = l1.getChild('def')
3482        c2 = c1.getChild('ghi')
3483        c3 = l1.getChild('def.ghi')
3484        self.assertIs(c1, logging.getLogger('abc.def'))
3485        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3486        self.assertIs(c2, c3)
3487
3488
3489class DerivedLogRecord(logging.LogRecord):
3490    pass
3491
3492class LogRecordFactoryTest(BaseTest):
3493
3494    def setUp(self):
3495        class CheckingFilter(logging.Filter):
3496            def __init__(self, cls):
3497                self.cls = cls
3498
3499            def filter(self, record):
3500                t = type(record)
3501                if t is not self.cls:
3502                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3503                            self.cls)
3504                    raise TypeError(msg)
3505                return True
3506
3507        BaseTest.setUp(self)
3508        self.filter = CheckingFilter(DerivedLogRecord)
3509        self.root_logger.addFilter(self.filter)
3510        self.orig_factory = logging.getLogRecordFactory()
3511
3512    def tearDown(self):
3513        self.root_logger.removeFilter(self.filter)
3514        BaseTest.tearDown(self)
3515        logging.setLogRecordFactory(self.orig_factory)
3516
3517    def test_logrecord_class(self):
3518        self.assertRaises(TypeError, self.root_logger.warning,
3519                          self.next_message())
3520        logging.setLogRecordFactory(DerivedLogRecord)
3521        self.root_logger.error(self.next_message())
3522        self.assert_log_lines([
3523           ('root', 'ERROR', '2'),
3524        ])
3525
3526
3527class QueueHandlerTest(BaseTest):
3528    # Do not bother with a logger name group.
3529    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3530
3531    def setUp(self):
3532        BaseTest.setUp(self)
3533        self.queue = queue.Queue(-1)
3534        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3535        self.name = 'que'
3536        self.que_logger = logging.getLogger('que')
3537        self.que_logger.propagate = False
3538        self.que_logger.setLevel(logging.WARNING)
3539        self.que_logger.addHandler(self.que_hdlr)
3540
3541    def tearDown(self):
3542        self.que_hdlr.close()
3543        BaseTest.tearDown(self)
3544
3545    def test_queue_handler(self):
3546        self.que_logger.debug(self.next_message())
3547        self.assertRaises(queue.Empty, self.queue.get_nowait)
3548        self.que_logger.info(self.next_message())
3549        self.assertRaises(queue.Empty, self.queue.get_nowait)
3550        msg = self.next_message()
3551        self.que_logger.warning(msg)
3552        data = self.queue.get_nowait()
3553        self.assertTrue(isinstance(data, logging.LogRecord))
3554        self.assertEqual(data.name, self.que_logger.name)
3555        self.assertEqual((data.msg, data.args), (msg, None))
3556
3557    def test_formatting(self):
3558        msg = self.next_message()
3559        levelname = logging.getLevelName(logging.WARNING)
3560        log_format_str = '{name} -> {levelname}: {message}'
3561        formatted_msg = log_format_str.format(name=self.name,
3562                                              levelname=levelname, message=msg)
3563        formatter = logging.Formatter(self.log_format)
3564        self.que_hdlr.setFormatter(formatter)
3565        self.que_logger.warning(msg)
3566        log_record = self.queue.get_nowait()
3567        self.assertEqual(formatted_msg, log_record.msg)
3568        self.assertEqual(formatted_msg, log_record.message)
3569
3570    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3571                         'logging.handlers.QueueListener required for this test')
3572    def test_queue_listener(self):
3573        handler = TestHandler(support.Matcher())
3574        listener = logging.handlers.QueueListener(self.queue, handler)
3575        listener.start()
3576        try:
3577            self.que_logger.warning(self.next_message())
3578            self.que_logger.error(self.next_message())
3579            self.que_logger.critical(self.next_message())
3580        finally:
3581            listener.stop()
3582        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3583        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3584        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3585        handler.close()
3586
3587        # Now test with respect_handler_level set
3588
3589        handler = TestHandler(support.Matcher())
3590        handler.setLevel(logging.CRITICAL)
3591        listener = logging.handlers.QueueListener(self.queue, handler,
3592                                                  respect_handler_level=True)
3593        listener.start()
3594        try:
3595            self.que_logger.warning(self.next_message())
3596            self.que_logger.error(self.next_message())
3597            self.que_logger.critical(self.next_message())
3598        finally:
3599            listener.stop()
3600        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3601        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3602        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3603        handler.close()
3604
3605    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3606                         'logging.handlers.QueueListener required for this test')
3607    def test_queue_listener_with_StreamHandler(self):
3608        # Test that traceback only appends once (bpo-34334).
3609        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3610        listener.start()
3611        try:
3612            1 / 0
3613        except ZeroDivisionError as e:
3614            exc = e
3615            self.que_logger.exception(self.next_message(), exc_info=exc)
3616        listener.stop()
3617        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3618
3619    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3620                         'logging.handlers.QueueListener required for this test')
3621    def test_queue_listener_with_multiple_handlers(self):
3622        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
3623        self.que_hdlr.setFormatter(self.root_formatter)
3624        self.que_logger.addHandler(self.root_hdlr)
3625
3626        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
3627        listener.start()
3628        self.que_logger.error("error")
3629        listener.stop()
3630        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
3631
3632if hasattr(logging.handlers, 'QueueListener'):
3633    import multiprocessing
3634    from unittest.mock import patch
3635
3636    class QueueListenerTest(BaseTest):
3637        """
3638        Tests based on patch submitted for issue #27930. Ensure that
3639        QueueListener handles all log messages.
3640        """
3641
3642        repeat = 20
3643
3644        @staticmethod
3645        def setup_and_log(log_queue, ident):
3646            """
3647            Creates a logger with a QueueHandler that logs to a queue read by a
3648            QueueListener. Starts the listener, logs five messages, and stops
3649            the listener.
3650            """
3651            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3652            logger.setLevel(logging.DEBUG)
3653            handler = logging.handlers.QueueHandler(log_queue)
3654            logger.addHandler(handler)
3655            listener = logging.handlers.QueueListener(log_queue)
3656            listener.start()
3657
3658            logger.info('one')
3659            logger.info('two')
3660            logger.info('three')
3661            logger.info('four')
3662            logger.info('five')
3663
3664            listener.stop()
3665            logger.removeHandler(handler)
3666            handler.close()
3667
3668        @patch.object(logging.handlers.QueueListener, 'handle')
3669        def test_handle_called_with_queue_queue(self, mock_handle):
3670            for i in range(self.repeat):
3671                log_queue = queue.Queue()
3672                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3673            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3674                             'correct number of handled log messages')
3675
3676        @patch.object(logging.handlers.QueueListener, 'handle')
3677        def test_handle_called_with_mp_queue(self, mock_handle):
3678            # bpo-28668: The multiprocessing (mp) module is not functional
3679            # when the mp.synchronize module cannot be imported.
3680            support.skip_if_broken_multiprocessing_synchronize()
3681            for i in range(self.repeat):
3682                log_queue = multiprocessing.Queue()
3683                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3684                log_queue.close()
3685                log_queue.join_thread()
3686            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3687                             'correct number of handled log messages')
3688
3689        @staticmethod
3690        def get_all_from_queue(log_queue):
3691            try:
3692                while True:
3693                    yield log_queue.get_nowait()
3694            except queue.Empty:
3695                return []
3696
3697        def test_no_messages_in_queue_after_stop(self):
3698            """
3699            Five messages are logged then the QueueListener is stopped. This
3700            test then gets everything off the queue. Failure of this test
3701            indicates that messages were not registered on the queue until
3702            _after_ the QueueListener stopped.
3703            """
3704            # bpo-28668: The multiprocessing (mp) module is not functional
3705            # when the mp.synchronize module cannot be imported.
3706            support.skip_if_broken_multiprocessing_synchronize()
3707            for i in range(self.repeat):
3708                queue = multiprocessing.Queue()
3709                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3710                # time.sleep(1)
3711                items = list(self.get_all_from_queue(queue))
3712                queue.close()
3713                queue.join_thread()
3714
3715                expected = [[], [logging.handlers.QueueListener._sentinel]]
3716                self.assertIn(items, expected,
3717                              'Found unexpected messages in queue: %s' % (
3718                                    [m.msg if isinstance(m, logging.LogRecord)
3719                                     else m for m in items]))
3720
3721        def test_calls_task_done_after_stop(self):
3722            # Issue 36813: Make sure queue.join does not deadlock.
3723            log_queue = queue.Queue()
3724            listener = logging.handlers.QueueListener(log_queue)
3725            listener.start()
3726            listener.stop()
3727            with self.assertRaises(ValueError):
3728                # Make sure all tasks are done and .join won't block.
3729                log_queue.task_done()
3730
3731
3732ZERO = datetime.timedelta(0)
3733
3734class UTC(datetime.tzinfo):
3735    def utcoffset(self, dt):
3736        return ZERO
3737
3738    dst = utcoffset
3739
3740    def tzname(self, dt):
3741        return 'UTC'
3742
3743utc = UTC()
3744
3745class AssertErrorMessage:
3746
3747    def assert_error_message(self, exception, message, *args, **kwargs):
3748        try:
3749            self.assertRaises((), *args, **kwargs)
3750        except exception as e:
3751            self.assertEqual(message, str(e))
3752
3753class FormatterTest(unittest.TestCase, AssertErrorMessage):
3754    def setUp(self):
3755        self.common = {
3756            'name': 'formatter.test',
3757            'level': logging.DEBUG,
3758            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3759            'lineno': 42,
3760            'exc_info': None,
3761            'func': None,
3762            'msg': 'Message with %d %s',
3763            'args': (2, 'placeholders'),
3764        }
3765        self.variants = {
3766            'custom': {
3767                'custom': 1234
3768            }
3769        }
3770
3771    def get_record(self, name=None):
3772        result = dict(self.common)
3773        if name is not None:
3774            result.update(self.variants[name])
3775        return logging.makeLogRecord(result)
3776
3777    def test_percent(self):
3778        # Test %-formatting
3779        r = self.get_record()
3780        f = logging.Formatter('${%(message)s}')
3781        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3782        f = logging.Formatter('%(random)s')
3783        self.assertRaises(ValueError, f.format, r)
3784        self.assertFalse(f.usesTime())
3785        f = logging.Formatter('%(asctime)s')
3786        self.assertTrue(f.usesTime())
3787        f = logging.Formatter('%(asctime)-15s')
3788        self.assertTrue(f.usesTime())
3789        f = logging.Formatter('%(asctime)#15s')
3790        self.assertTrue(f.usesTime())
3791
3792    def test_braces(self):
3793        # Test {}-formatting
3794        r = self.get_record()
3795        f = logging.Formatter('$%{message}%$', style='{')
3796        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3797        f = logging.Formatter('{random}', style='{')
3798        self.assertRaises(ValueError, f.format, r)
3799        f = logging.Formatter("{message}", style='{')
3800        self.assertFalse(f.usesTime())
3801        f = logging.Formatter('{asctime}', style='{')
3802        self.assertTrue(f.usesTime())
3803        f = logging.Formatter('{asctime!s:15}', style='{')
3804        self.assertTrue(f.usesTime())
3805        f = logging.Formatter('{asctime:15}', style='{')
3806        self.assertTrue(f.usesTime())
3807
3808    def test_dollars(self):
3809        # Test $-formatting
3810        r = self.get_record()
3811        f = logging.Formatter('${message}', style='$')
3812        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3813        f = logging.Formatter('$message', style='$')
3814        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3815        f = logging.Formatter('$$%${message}%$$', style='$')
3816        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3817        f = logging.Formatter('${random}', style='$')
3818        self.assertRaises(ValueError, f.format, r)
3819        self.assertFalse(f.usesTime())
3820        f = logging.Formatter('${asctime}', style='$')
3821        self.assertTrue(f.usesTime())
3822        f = logging.Formatter('$asctime', style='$')
3823        self.assertTrue(f.usesTime())
3824        f = logging.Formatter('${message}', style='$')
3825        self.assertFalse(f.usesTime())
3826        f = logging.Formatter('${asctime}--', style='$')
3827        self.assertTrue(f.usesTime())
3828
3829    def test_format_validate(self):
3830        # Check correct formatting
3831        # Percentage style
3832        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3833        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3834        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3835        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3836        f = logging.Formatter("%(process)#+027.23X")
3837        self.assertEqual(f._fmt, "%(process)#+027.23X")
3838        f = logging.Formatter("%(foo)#.*g")
3839        self.assertEqual(f._fmt, "%(foo)#.*g")
3840
3841        # StrFormat Style
3842        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
3843        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
3844        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
3845        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
3846        f = logging.Formatter("{customfield!s:#<30}", style="{")
3847        self.assertEqual(f._fmt, "{customfield!s:#<30}")
3848        f = logging.Formatter("{message!r}", style="{")
3849        self.assertEqual(f._fmt, "{message!r}")
3850        f = logging.Formatter("{message!s}", style="{")
3851        self.assertEqual(f._fmt, "{message!s}")
3852        f = logging.Formatter("{message!a}", style="{")
3853        self.assertEqual(f._fmt, "{message!a}")
3854        f = logging.Formatter("{process!r:4.2}", style="{")
3855        self.assertEqual(f._fmt, "{process!r:4.2}")
3856        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
3857        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
3858        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
3859        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
3860        f = logging.Formatter("{foo:12.{p}}", style="{")
3861        self.assertEqual(f._fmt, "{foo:12.{p}}")
3862        f = logging.Formatter("{foo:{w}.6}", style="{")
3863        self.assertEqual(f._fmt, "{foo:{w}.6}")
3864        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
3865        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
3866        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
3867        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
3868        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
3869        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
3870
3871        # Dollar style
3872        f = logging.Formatter("${asctime} - $message", style="$")
3873        self.assertEqual(f._fmt, "${asctime} - $message")
3874        f = logging.Formatter("$bar $$", style="$")
3875        self.assertEqual(f._fmt, "$bar $$")
3876        f = logging.Formatter("$bar $$$$", style="$")
3877        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
3878
3879        # Testing when ValueError being raised from incorrect format
3880        # Percentage Style
3881        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
3882        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
3883        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
3884        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
3885        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
3886        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
3887        self.assertRaises(ValueError, logging.Formatter, '${message}')
3888        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
3889        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
3890
3891        # StrFormat Style
3892        # Testing failure for '-' in field name
3893        self.assert_error_message(
3894            ValueError,
3895            "invalid format: invalid field name/expression: 'name-thing'",
3896            logging.Formatter, "{name-thing}", style="{"
3897        )
3898        # Testing failure for style mismatch
3899        self.assert_error_message(
3900            ValueError,
3901            "invalid format: no fields",
3902            logging.Formatter, '%(asctime)s', style='{'
3903        )
3904        # Testing failure for invalid conversion
3905        self.assert_error_message(
3906            ValueError,
3907            "invalid conversion: 'Z'"
3908        )
3909        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
3910        self.assert_error_message(
3911            ValueError,
3912            "invalid format: expected ':' after conversion specifier",
3913            logging.Formatter, '{asctime!aa:15}', style='{'
3914        )
3915        # Testing failure for invalid spec
3916        self.assert_error_message(
3917            ValueError,
3918            "invalid format: bad specifier: '.2ff'",
3919            logging.Formatter, '{process:.2ff}', style='{'
3920        )
3921        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
3922        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
3923        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
3924        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
3925        # Testing failure for mismatch braces
3926        self.assert_error_message(
3927            ValueError,
3928            "invalid format: expected '}' before end of string",
3929            logging.Formatter, '{process', style='{'
3930        )
3931        self.assert_error_message(
3932            ValueError,
3933            "invalid format: Single '}' encountered in format string",
3934            logging.Formatter, 'process}', style='{'
3935        )
3936        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
3937        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
3938        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
3939        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
3940        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
3941        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
3942        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
3943        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
3944        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
3945
3946        # Dollar style
3947        # Testing failure for mismatch bare $
3948        self.assert_error_message(
3949            ValueError,
3950            "invalid format: bare \'$\' not allowed",
3951            logging.Formatter, '$bar $$$', style='$'
3952        )
3953        self.assert_error_message(
3954            ValueError,
3955            "invalid format: bare \'$\' not allowed",
3956            logging.Formatter, 'bar $', style='$'
3957        )
3958        self.assert_error_message(
3959            ValueError,
3960            "invalid format: bare \'$\' not allowed",
3961            logging.Formatter, 'foo $.', style='$'
3962        )
3963        # Testing failure for mismatch style
3964        self.assert_error_message(
3965            ValueError,
3966            "invalid format: no fields",
3967            logging.Formatter, '{asctime}', style='$'
3968        )
3969        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
3970
3971        # Testing failure for incorrect fields
3972        self.assert_error_message(
3973            ValueError,
3974            "invalid format: no fields",
3975            logging.Formatter, 'foo', style='$'
3976        )
3977        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
3978
3979    def test_defaults_parameter(self):
3980        fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message']
3981        styles = ['%', '{', '$']
3982        for fmt, style in zip(fmts, styles):
3983            f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'})
3984            r = self.get_record()
3985            self.assertEqual(f.format(r), 'Default Message with 2 placeholders')
3986            r = self.get_record("custom")
3987            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
3988
3989            # Without default
3990            f = logging.Formatter(fmt, style=style)
3991            r = self.get_record()
3992            self.assertRaises(ValueError, f.format, r)
3993
3994            # Non-existing default is ignored
3995            f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'})
3996            r = self.get_record("custom")
3997            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
3998
3999    def test_invalid_style(self):
4000        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
4001
4002    def test_time(self):
4003        r = self.get_record()
4004        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
4005        # We use None to indicate we want the local timezone
4006        # We're essentially converting a UTC time to local time
4007        r.created = time.mktime(dt.astimezone(None).timetuple())
4008        r.msecs = 123
4009        f = logging.Formatter('%(asctime)s %(message)s')
4010        f.converter = time.gmtime
4011        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
4012        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
4013        f.format(r)
4014        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
4015
4016    def test_default_msec_format_none(self):
4017        class NoMsecFormatter(logging.Formatter):
4018            default_msec_format = None
4019            default_time_format = '%d/%m/%Y %H:%M:%S'
4020
4021        r = self.get_record()
4022        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc)
4023        r.created = time.mktime(dt.astimezone(None).timetuple())
4024        f = NoMsecFormatter()
4025        f.converter = time.gmtime
4026        self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00')
4027
4028
4029class TestBufferingFormatter(logging.BufferingFormatter):
4030    def formatHeader(self, records):
4031        return '[(%d)' % len(records)
4032
4033    def formatFooter(self, records):
4034        return '(%d)]' % len(records)
4035
4036class BufferingFormatterTest(unittest.TestCase):
4037    def setUp(self):
4038        self.records = [
4039            logging.makeLogRecord({'msg': 'one'}),
4040            logging.makeLogRecord({'msg': 'two'}),
4041        ]
4042
4043    def test_default(self):
4044        f = logging.BufferingFormatter()
4045        self.assertEqual('', f.format([]))
4046        self.assertEqual('onetwo', f.format(self.records))
4047
4048    def test_custom(self):
4049        f = TestBufferingFormatter()
4050        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
4051        lf = logging.Formatter('<%(message)s>')
4052        f = TestBufferingFormatter(lf)
4053        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
4054
4055class ExceptionTest(BaseTest):
4056    def test_formatting(self):
4057        r = self.root_logger
4058        h = RecordingHandler()
4059        r.addHandler(h)
4060        try:
4061            raise RuntimeError('deliberate mistake')
4062        except:
4063            logging.exception('failed', stack_info=True)
4064        r.removeHandler(h)
4065        h.close()
4066        r = h.records[0]
4067        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
4068                                              'call last):\n'))
4069        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
4070                                            'deliberate mistake'))
4071        self.assertTrue(r.stack_info.startswith('Stack (most recent '
4072                                              'call last):\n'))
4073        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
4074                                            'stack_info=True)'))
4075
4076
4077class LastResortTest(BaseTest):
4078    def test_last_resort(self):
4079        # Test the last resort handler
4080        root = self.root_logger
4081        root.removeHandler(self.root_hdlr)
4082        old_lastresort = logging.lastResort
4083        old_raise_exceptions = logging.raiseExceptions
4084
4085        try:
4086            with support.captured_stderr() as stderr:
4087                root.debug('This should not appear')
4088                self.assertEqual(stderr.getvalue(), '')
4089                root.warning('Final chance!')
4090                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
4091
4092            # No handlers and no last resort, so 'No handlers' message
4093            logging.lastResort = None
4094            with support.captured_stderr() as stderr:
4095                root.warning('Final chance!')
4096                msg = 'No handlers could be found for logger "root"\n'
4097                self.assertEqual(stderr.getvalue(), msg)
4098
4099            # 'No handlers' message only printed once
4100            with support.captured_stderr() as stderr:
4101                root.warning('Final chance!')
4102                self.assertEqual(stderr.getvalue(), '')
4103
4104            # If raiseExceptions is False, no message is printed
4105            root.manager.emittedNoHandlerWarning = False
4106            logging.raiseExceptions = False
4107            with support.captured_stderr() as stderr:
4108                root.warning('Final chance!')
4109                self.assertEqual(stderr.getvalue(), '')
4110        finally:
4111            root.addHandler(self.root_hdlr)
4112            logging.lastResort = old_lastresort
4113            logging.raiseExceptions = old_raise_exceptions
4114
4115
4116class FakeHandler:
4117
4118    def __init__(self, identifier, called):
4119        for method in ('acquire', 'flush', 'close', 'release'):
4120            setattr(self, method, self.record_call(identifier, method, called))
4121
4122    def record_call(self, identifier, method_name, called):
4123        def inner():
4124            called.append('{} - {}'.format(identifier, method_name))
4125        return inner
4126
4127
4128class RecordingHandler(logging.NullHandler):
4129
4130    def __init__(self, *args, **kwargs):
4131        super(RecordingHandler, self).__init__(*args, **kwargs)
4132        self.records = []
4133
4134    def handle(self, record):
4135        """Keep track of all the emitted records."""
4136        self.records.append(record)
4137
4138
4139class ShutdownTest(BaseTest):
4140
4141    """Test suite for the shutdown method."""
4142
4143    def setUp(self):
4144        super(ShutdownTest, self).setUp()
4145        self.called = []
4146
4147        raise_exceptions = logging.raiseExceptions
4148        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4149
4150    def raise_error(self, error):
4151        def inner():
4152            raise error()
4153        return inner
4154
4155    def test_no_failure(self):
4156        # create some fake handlers
4157        handler0 = FakeHandler(0, self.called)
4158        handler1 = FakeHandler(1, self.called)
4159        handler2 = FakeHandler(2, self.called)
4160
4161        # create live weakref to those handlers
4162        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4163
4164        logging.shutdown(handlerList=list(handlers))
4165
4166        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4167                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4168                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4169        self.assertEqual(expected, self.called)
4170
4171    def _test_with_failure_in_method(self, method, error):
4172        handler = FakeHandler(0, self.called)
4173        setattr(handler, method, self.raise_error(error))
4174        handlers = [logging.weakref.ref(handler)]
4175
4176        logging.shutdown(handlerList=list(handlers))
4177
4178        self.assertEqual('0 - release', self.called[-1])
4179
4180    def test_with_ioerror_in_acquire(self):
4181        self._test_with_failure_in_method('acquire', OSError)
4182
4183    def test_with_ioerror_in_flush(self):
4184        self._test_with_failure_in_method('flush', OSError)
4185
4186    def test_with_ioerror_in_close(self):
4187        self._test_with_failure_in_method('close', OSError)
4188
4189    def test_with_valueerror_in_acquire(self):
4190        self._test_with_failure_in_method('acquire', ValueError)
4191
4192    def test_with_valueerror_in_flush(self):
4193        self._test_with_failure_in_method('flush', ValueError)
4194
4195    def test_with_valueerror_in_close(self):
4196        self._test_with_failure_in_method('close', ValueError)
4197
4198    def test_with_other_error_in_acquire_without_raise(self):
4199        logging.raiseExceptions = False
4200        self._test_with_failure_in_method('acquire', IndexError)
4201
4202    def test_with_other_error_in_flush_without_raise(self):
4203        logging.raiseExceptions = False
4204        self._test_with_failure_in_method('flush', IndexError)
4205
4206    def test_with_other_error_in_close_without_raise(self):
4207        logging.raiseExceptions = False
4208        self._test_with_failure_in_method('close', IndexError)
4209
4210    def test_with_other_error_in_acquire_with_raise(self):
4211        logging.raiseExceptions = True
4212        self.assertRaises(IndexError, self._test_with_failure_in_method,
4213                          'acquire', IndexError)
4214
4215    def test_with_other_error_in_flush_with_raise(self):
4216        logging.raiseExceptions = True
4217        self.assertRaises(IndexError, self._test_with_failure_in_method,
4218                          'flush', IndexError)
4219
4220    def test_with_other_error_in_close_with_raise(self):
4221        logging.raiseExceptions = True
4222        self.assertRaises(IndexError, self._test_with_failure_in_method,
4223                          'close', IndexError)
4224
4225
4226class ModuleLevelMiscTest(BaseTest):
4227
4228    """Test suite for some module level methods."""
4229
4230    def test_disable(self):
4231        old_disable = logging.root.manager.disable
4232        # confirm our assumptions are correct
4233        self.assertEqual(old_disable, 0)
4234        self.addCleanup(logging.disable, old_disable)
4235
4236        logging.disable(83)
4237        self.assertEqual(logging.root.manager.disable, 83)
4238
4239        self.assertRaises(ValueError, logging.disable, "doesnotexists")
4240
4241        class _NotAnIntOrString:
4242            pass
4243
4244        self.assertRaises(TypeError, logging.disable, _NotAnIntOrString())
4245
4246        logging.disable("WARN")
4247
4248        # test the default value introduced in 3.7
4249        # (Issue #28524)
4250        logging.disable()
4251        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
4252
4253    def _test_log(self, method, level=None):
4254        called = []
4255        support.patch(self, logging, 'basicConfig',
4256                      lambda *a, **kw: called.append((a, kw)))
4257
4258        recording = RecordingHandler()
4259        logging.root.addHandler(recording)
4260
4261        log_method = getattr(logging, method)
4262        if level is not None:
4263            log_method(level, "test me: %r", recording)
4264        else:
4265            log_method("test me: %r", recording)
4266
4267        self.assertEqual(len(recording.records), 1)
4268        record = recording.records[0]
4269        self.assertEqual(record.getMessage(), "test me: %r" % recording)
4270
4271        expected_level = level if level is not None else getattr(logging, method.upper())
4272        self.assertEqual(record.levelno, expected_level)
4273
4274        # basicConfig was not called!
4275        self.assertEqual(called, [])
4276
4277    def test_log(self):
4278        self._test_log('log', logging.ERROR)
4279
4280    def test_debug(self):
4281        self._test_log('debug')
4282
4283    def test_info(self):
4284        self._test_log('info')
4285
4286    def test_warning(self):
4287        self._test_log('warning')
4288
4289    def test_error(self):
4290        self._test_log('error')
4291
4292    def test_critical(self):
4293        self._test_log('critical')
4294
4295    def test_set_logger_class(self):
4296        self.assertRaises(TypeError, logging.setLoggerClass, object)
4297
4298        class MyLogger(logging.Logger):
4299            pass
4300
4301        logging.setLoggerClass(MyLogger)
4302        self.assertEqual(logging.getLoggerClass(), MyLogger)
4303
4304        logging.setLoggerClass(logging.Logger)
4305        self.assertEqual(logging.getLoggerClass(), logging.Logger)
4306
4307    def test_subclass_logger_cache(self):
4308        # bpo-37258
4309        message = []
4310
4311        class MyLogger(logging.getLoggerClass()):
4312            def __init__(self, name='MyLogger', level=logging.NOTSET):
4313                super().__init__(name, level)
4314                message.append('initialized')
4315
4316        logging.setLoggerClass(MyLogger)
4317        logger = logging.getLogger('just_some_logger')
4318        self.assertEqual(message, ['initialized'])
4319        stream = io.StringIO()
4320        h = logging.StreamHandler(stream)
4321        logger.addHandler(h)
4322        try:
4323            logger.setLevel(logging.DEBUG)
4324            logger.debug("hello")
4325            self.assertEqual(stream.getvalue().strip(), "hello")
4326
4327            stream.truncate(0)
4328            stream.seek(0)
4329
4330            logger.setLevel(logging.INFO)
4331            logger.debug("hello")
4332            self.assertEqual(stream.getvalue(), "")
4333        finally:
4334            logger.removeHandler(h)
4335            h.close()
4336            logging.setLoggerClass(logging.Logger)
4337
4338    def test_logging_at_shutdown(self):
4339        # bpo-20037: Doing text I/O late at interpreter shutdown must not crash
4340        code = textwrap.dedent("""
4341            import logging
4342
4343            class A:
4344                def __del__(self):
4345                    try:
4346                        raise ValueError("some error")
4347                    except Exception:
4348                        logging.exception("exception in __del__")
4349
4350            a = A()
4351        """)
4352        rc, out, err = assert_python_ok("-c", code)
4353        err = err.decode()
4354        self.assertIn("exception in __del__", err)
4355        self.assertIn("ValueError: some error", err)
4356
4357    def test_logging_at_shutdown_open(self):
4358        # bpo-26789: FileHandler keeps a reference to the builtin open()
4359        # function to be able to open or reopen the file during Python
4360        # finalization.
4361        filename = os_helper.TESTFN
4362        self.addCleanup(os_helper.unlink, filename)
4363
4364        code = textwrap.dedent(f"""
4365            import builtins
4366            import logging
4367
4368            class A:
4369                def __del__(self):
4370                    logging.error("log in __del__")
4371
4372            # basicConfig() opens the file, but logging.shutdown() closes
4373            # it at Python exit. When A.__del__() is called,
4374            # FileHandler._open() must be called again to re-open the file.
4375            logging.basicConfig(filename={filename!r}, encoding="utf-8")
4376
4377            a = A()
4378
4379            # Simulate the Python finalization which removes the builtin
4380            # open() function.
4381            del builtins.open
4382        """)
4383        assert_python_ok("-c", code)
4384
4385        with open(filename, encoding="utf-8") as fp:
4386            self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__")
4387
4388    def test_recursion_error(self):
4389        # Issue 36272
4390        code = textwrap.dedent("""
4391            import logging
4392
4393            def rec():
4394                logging.error("foo")
4395                rec()
4396
4397            rec()
4398        """)
4399        rc, out, err = assert_python_failure("-c", code)
4400        err = err.decode()
4401        self.assertNotIn("Cannot recover from stack overflow.", err)
4402        self.assertEqual(rc, 1)
4403
4404    def test_get_level_names_mapping(self):
4405        mapping = logging.getLevelNamesMapping()
4406        self.assertEqual(logging._nameToLevel, mapping)  # value is equivalent
4407        self.assertIsNot(logging._nameToLevel, mapping)  # but not the internal data
4408        new_mapping = logging.getLevelNamesMapping()     # another call -> another copy
4409        self.assertIsNot(mapping, new_mapping)           # verify not the same object as before
4410        self.assertEqual(mapping, new_mapping)           # but equivalent in value
4411
4412
4413class LogRecordTest(BaseTest):
4414    def test_str_rep(self):
4415        r = logging.makeLogRecord({})
4416        s = str(r)
4417        self.assertTrue(s.startswith('<LogRecord: '))
4418        self.assertTrue(s.endswith('>'))
4419
4420    def test_dict_arg(self):
4421        h = RecordingHandler()
4422        r = logging.getLogger()
4423        r.addHandler(h)
4424        d = {'less' : 'more' }
4425        logging.warning('less is %(less)s', d)
4426        self.assertIs(h.records[0].args, d)
4427        self.assertEqual(h.records[0].message, 'less is more')
4428        r.removeHandler(h)
4429        h.close()
4430
4431    @staticmethod # pickled as target of child process in the following test
4432    def _extract_logrecord_process_name(key, logMultiprocessing, conn=None):
4433        prev_logMultiprocessing = logging.logMultiprocessing
4434        logging.logMultiprocessing = logMultiprocessing
4435        try:
4436            import multiprocessing as mp
4437            name = mp.current_process().name
4438
4439            r1 = logging.makeLogRecord({'msg': f'msg1_{key}'})
4440
4441            # https://bugs.python.org/issue45128
4442            with support.swap_item(sys.modules, 'multiprocessing', None):
4443                r2 = logging.makeLogRecord({'msg': f'msg2_{key}'})
4444
4445            results = {'processName'  : name,
4446                       'r1.processName': r1.processName,
4447                       'r2.processName': r2.processName,
4448                      }
4449        finally:
4450            logging.logMultiprocessing = prev_logMultiprocessing
4451        if conn:
4452            conn.send(results)
4453        else:
4454            return results
4455
4456    def test_multiprocessing(self):
4457        multiprocessing_imported = 'multiprocessing' in sys.modules
4458        try:
4459            # logMultiprocessing is True by default
4460            self.assertEqual(logging.logMultiprocessing, True)
4461
4462            LOG_MULTI_PROCESSING = True
4463            # When logMultiprocessing == True:
4464            # In the main process processName = 'MainProcess'
4465            r = logging.makeLogRecord({})
4466            self.assertEqual(r.processName, 'MainProcess')
4467
4468            results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING)
4469            self.assertEqual('MainProcess', results['processName'])
4470            self.assertEqual('MainProcess', results['r1.processName'])
4471            self.assertEqual('MainProcess', results['r2.processName'])
4472
4473            # In other processes, processName is correct when multiprocessing in imported,
4474            # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762).
4475            import multiprocessing
4476            parent_conn, child_conn = multiprocessing.Pipe()
4477            p = multiprocessing.Process(
4478                target=self._extract_logrecord_process_name,
4479                args=(2, LOG_MULTI_PROCESSING, child_conn,)
4480            )
4481            p.start()
4482            results = parent_conn.recv()
4483            self.assertNotEqual('MainProcess', results['processName'])
4484            self.assertEqual(results['processName'], results['r1.processName'])
4485            self.assertEqual('MainProcess', results['r2.processName'])
4486            p.join()
4487
4488        finally:
4489            if multiprocessing_imported:
4490                import multiprocessing
4491
4492    def test_optional(self):
4493        r = logging.makeLogRecord({})
4494        NOT_NONE = self.assertIsNotNone
4495        NOT_NONE(r.thread)
4496        NOT_NONE(r.threadName)
4497        NOT_NONE(r.process)
4498        NOT_NONE(r.processName)
4499        log_threads = logging.logThreads
4500        log_processes = logging.logProcesses
4501        log_multiprocessing = logging.logMultiprocessing
4502        try:
4503            logging.logThreads = False
4504            logging.logProcesses = False
4505            logging.logMultiprocessing = False
4506            r = logging.makeLogRecord({})
4507            NONE = self.assertIsNone
4508            NONE(r.thread)
4509            NONE(r.threadName)
4510            NONE(r.process)
4511            NONE(r.processName)
4512        finally:
4513            logging.logThreads = log_threads
4514            logging.logProcesses = log_processes
4515            logging.logMultiprocessing = log_multiprocessing
4516
4517class BasicConfigTest(unittest.TestCase):
4518
4519    """Test suite for logging.basicConfig."""
4520
4521    def setUp(self):
4522        super(BasicConfigTest, self).setUp()
4523        self.handlers = logging.root.handlers
4524        self.saved_handlers = logging._handlers.copy()
4525        self.saved_handler_list = logging._handlerList[:]
4526        self.original_logging_level = logging.root.level
4527        self.addCleanup(self.cleanup)
4528        logging.root.handlers = []
4529
4530    def tearDown(self):
4531        for h in logging.root.handlers[:]:
4532            logging.root.removeHandler(h)
4533            h.close()
4534        super(BasicConfigTest, self).tearDown()
4535
4536    def cleanup(self):
4537        setattr(logging.root, 'handlers', self.handlers)
4538        logging._handlers.clear()
4539        logging._handlers.update(self.saved_handlers)
4540        logging._handlerList[:] = self.saved_handler_list
4541        logging.root.setLevel(self.original_logging_level)
4542
4543    def test_no_kwargs(self):
4544        logging.basicConfig()
4545
4546        # handler defaults to a StreamHandler to sys.stderr
4547        self.assertEqual(len(logging.root.handlers), 1)
4548        handler = logging.root.handlers[0]
4549        self.assertIsInstance(handler, logging.StreamHandler)
4550        self.assertEqual(handler.stream, sys.stderr)
4551
4552        formatter = handler.formatter
4553        # format defaults to logging.BASIC_FORMAT
4554        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
4555        # datefmt defaults to None
4556        self.assertIsNone(formatter.datefmt)
4557        # style defaults to %
4558        self.assertIsInstance(formatter._style, logging.PercentStyle)
4559
4560        # level is not explicitly set
4561        self.assertEqual(logging.root.level, self.original_logging_level)
4562
4563    def test_strformatstyle(self):
4564        with support.captured_stdout() as output:
4565            logging.basicConfig(stream=sys.stdout, style="{")
4566            logging.error("Log an error")
4567            sys.stdout.seek(0)
4568            self.assertEqual(output.getvalue().strip(),
4569                "ERROR:root:Log an error")
4570
4571    def test_stringtemplatestyle(self):
4572        with support.captured_stdout() as output:
4573            logging.basicConfig(stream=sys.stdout, style="$")
4574            logging.error("Log an error")
4575            sys.stdout.seek(0)
4576            self.assertEqual(output.getvalue().strip(),
4577                "ERROR:root:Log an error")
4578
4579    def test_filename(self):
4580
4581        def cleanup(h1, h2, fn):
4582            h1.close()
4583            h2.close()
4584            os.remove(fn)
4585
4586        logging.basicConfig(filename='test.log', encoding='utf-8')
4587
4588        self.assertEqual(len(logging.root.handlers), 1)
4589        handler = logging.root.handlers[0]
4590        self.assertIsInstance(handler, logging.FileHandler)
4591
4592        expected = logging.FileHandler('test.log', 'a', encoding='utf-8')
4593        self.assertEqual(handler.stream.mode, expected.stream.mode)
4594        self.assertEqual(handler.stream.name, expected.stream.name)
4595        self.addCleanup(cleanup, handler, expected, 'test.log')
4596
4597    def test_filemode(self):
4598
4599        def cleanup(h1, h2, fn):
4600            h1.close()
4601            h2.close()
4602            os.remove(fn)
4603
4604        logging.basicConfig(filename='test.log', filemode='wb')
4605
4606        handler = logging.root.handlers[0]
4607        expected = logging.FileHandler('test.log', 'wb')
4608        self.assertEqual(handler.stream.mode, expected.stream.mode)
4609        self.addCleanup(cleanup, handler, expected, 'test.log')
4610
4611    def test_stream(self):
4612        stream = io.StringIO()
4613        self.addCleanup(stream.close)
4614        logging.basicConfig(stream=stream)
4615
4616        self.assertEqual(len(logging.root.handlers), 1)
4617        handler = logging.root.handlers[0]
4618        self.assertIsInstance(handler, logging.StreamHandler)
4619        self.assertEqual(handler.stream, stream)
4620
4621    def test_format(self):
4622        logging.basicConfig(format='%(asctime)s - %(message)s')
4623
4624        formatter = logging.root.handlers[0].formatter
4625        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
4626
4627    def test_datefmt(self):
4628        logging.basicConfig(datefmt='bar')
4629
4630        formatter = logging.root.handlers[0].formatter
4631        self.assertEqual(formatter.datefmt, 'bar')
4632
4633    def test_style(self):
4634        logging.basicConfig(style='$')
4635
4636        formatter = logging.root.handlers[0].formatter
4637        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4638
4639    def test_level(self):
4640        old_level = logging.root.level
4641        self.addCleanup(logging.root.setLevel, old_level)
4642
4643        logging.basicConfig(level=57)
4644        self.assertEqual(logging.root.level, 57)
4645        # Test that second call has no effect
4646        logging.basicConfig(level=58)
4647        self.assertEqual(logging.root.level, 57)
4648
4649    def test_incompatible(self):
4650        assertRaises = self.assertRaises
4651        handlers = [logging.StreamHandler()]
4652        stream = sys.stderr
4653        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4654                                                      stream=stream)
4655        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4656                                                      handlers=handlers)
4657        assertRaises(ValueError, logging.basicConfig, stream=stream,
4658                                                      handlers=handlers)
4659        # Issue 23207: test for invalid kwargs
4660        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4661        # Should pop both filename and filemode even if filename is None
4662        logging.basicConfig(filename=None, filemode='a')
4663
4664    def test_handlers(self):
4665        handlers = [
4666            logging.StreamHandler(),
4667            logging.StreamHandler(sys.stdout),
4668            logging.StreamHandler(),
4669        ]
4670        f = logging.Formatter()
4671        handlers[2].setFormatter(f)
4672        logging.basicConfig(handlers=handlers)
4673        self.assertIs(handlers[0], logging.root.handlers[0])
4674        self.assertIs(handlers[1], logging.root.handlers[1])
4675        self.assertIs(handlers[2], logging.root.handlers[2])
4676        self.assertIsNotNone(handlers[0].formatter)
4677        self.assertIsNotNone(handlers[1].formatter)
4678        self.assertIs(handlers[2].formatter, f)
4679        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4680
4681    def test_force(self):
4682        old_string_io = io.StringIO()
4683        new_string_io = io.StringIO()
4684        old_handlers = [logging.StreamHandler(old_string_io)]
4685        new_handlers = [logging.StreamHandler(new_string_io)]
4686        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
4687        logging.warning('warn')
4688        logging.info('info')
4689        logging.debug('debug')
4690        self.assertEqual(len(logging.root.handlers), 1)
4691        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
4692                            force=True)
4693        logging.warning('warn')
4694        logging.info('info')
4695        logging.debug('debug')
4696        self.assertEqual(len(logging.root.handlers), 1)
4697        self.assertEqual(old_string_io.getvalue().strip(),
4698                         'WARNING:root:warn')
4699        self.assertEqual(new_string_io.getvalue().strip(),
4700                         'WARNING:root:warn\nINFO:root:info')
4701
4702    def test_encoding(self):
4703        try:
4704            encoding = 'utf-8'
4705            logging.basicConfig(filename='test.log', encoding=encoding,
4706                                errors='strict',
4707                                format='%(message)s', level=logging.DEBUG)
4708
4709            self.assertEqual(len(logging.root.handlers), 1)
4710            handler = logging.root.handlers[0]
4711            self.assertIsInstance(handler, logging.FileHandler)
4712            self.assertEqual(handler.encoding, encoding)
4713            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4714        finally:
4715            handler.close()
4716            with open('test.log', encoding='utf-8') as f:
4717                data = f.read().strip()
4718            os.remove('test.log')
4719            self.assertEqual(data,
4720                             'The Øresund Bridge joins Copenhagen to Malmö')
4721
4722    def test_encoding_errors(self):
4723        try:
4724            encoding = 'ascii'
4725            logging.basicConfig(filename='test.log', encoding=encoding,
4726                                errors='ignore',
4727                                format='%(message)s', level=logging.DEBUG)
4728
4729            self.assertEqual(len(logging.root.handlers), 1)
4730            handler = logging.root.handlers[0]
4731            self.assertIsInstance(handler, logging.FileHandler)
4732            self.assertEqual(handler.encoding, encoding)
4733            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4734        finally:
4735            handler.close()
4736            with open('test.log', encoding='utf-8') as f:
4737                data = f.read().strip()
4738            os.remove('test.log')
4739            self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm')
4740
4741    def test_encoding_errors_default(self):
4742        try:
4743            encoding = 'ascii'
4744            logging.basicConfig(filename='test.log', encoding=encoding,
4745                                format='%(message)s', level=logging.DEBUG)
4746
4747            self.assertEqual(len(logging.root.handlers), 1)
4748            handler = logging.root.handlers[0]
4749            self.assertIsInstance(handler, logging.FileHandler)
4750            self.assertEqual(handler.encoding, encoding)
4751            self.assertEqual(handler.errors, 'backslashreplace')
4752            logging.debug('��: ☃️: The Øresund Bridge joins Copenhagen to Malmö')
4753        finally:
4754            handler.close()
4755            with open('test.log', encoding='utf-8') as f:
4756                data = f.read().strip()
4757            os.remove('test.log')
4758            self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund '
4759                                   r'Bridge joins Copenhagen to Malm\xf6')
4760
4761    def test_encoding_errors_none(self):
4762        # Specifying None should behave as 'strict'
4763        try:
4764            encoding = 'ascii'
4765            logging.basicConfig(filename='test.log', encoding=encoding,
4766                                errors=None,
4767                                format='%(message)s', level=logging.DEBUG)
4768
4769            self.assertEqual(len(logging.root.handlers), 1)
4770            handler = logging.root.handlers[0]
4771            self.assertIsInstance(handler, logging.FileHandler)
4772            self.assertEqual(handler.encoding, encoding)
4773            self.assertIsNone(handler.errors)
4774
4775            message = []
4776
4777            def dummy_handle_error(record):
4778                _, v, _ = sys.exc_info()
4779                message.append(str(v))
4780
4781            handler.handleError = dummy_handle_error
4782            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4783            self.assertTrue(message)
4784            self.assertIn("'ascii' codec can't encode "
4785                          "character '\\xd8' in position 4:", message[0])
4786        finally:
4787            handler.close()
4788            with open('test.log', encoding='utf-8') as f:
4789                data = f.read().strip()
4790            os.remove('test.log')
4791            # didn't write anything due to the encoding error
4792            self.assertEqual(data, r'')
4793
4794
4795    def _test_log(self, method, level=None):
4796        # logging.root has no handlers so basicConfig should be called
4797        called = []
4798
4799        old_basic_config = logging.basicConfig
4800        def my_basic_config(*a, **kw):
4801            old_basic_config()
4802            old_level = logging.root.level
4803            logging.root.setLevel(100)  # avoid having messages in stderr
4804            self.addCleanup(logging.root.setLevel, old_level)
4805            called.append((a, kw))
4806
4807        support.patch(self, logging, 'basicConfig', my_basic_config)
4808
4809        log_method = getattr(logging, method)
4810        if level is not None:
4811            log_method(level, "test me")
4812        else:
4813            log_method("test me")
4814
4815        # basicConfig was called with no arguments
4816        self.assertEqual(called, [((), {})])
4817
4818    def test_log(self):
4819        self._test_log('log', logging.WARNING)
4820
4821    def test_debug(self):
4822        self._test_log('debug')
4823
4824    def test_info(self):
4825        self._test_log('info')
4826
4827    def test_warning(self):
4828        self._test_log('warning')
4829
4830    def test_error(self):
4831        self._test_log('error')
4832
4833    def test_critical(self):
4834        self._test_log('critical')
4835
4836
4837class LoggerAdapterTest(unittest.TestCase):
4838    def setUp(self):
4839        super(LoggerAdapterTest, self).setUp()
4840        old_handler_list = logging._handlerList[:]
4841
4842        self.recording = RecordingHandler()
4843        self.logger = logging.root
4844        self.logger.addHandler(self.recording)
4845        self.addCleanup(self.logger.removeHandler, self.recording)
4846        self.addCleanup(self.recording.close)
4847
4848        def cleanup():
4849            logging._handlerList[:] = old_handler_list
4850
4851        self.addCleanup(cleanup)
4852        self.addCleanup(logging.shutdown)
4853        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4854
4855    def test_exception(self):
4856        msg = 'testing exception: %r'
4857        exc = None
4858        try:
4859            1 / 0
4860        except ZeroDivisionError as e:
4861            exc = e
4862            self.adapter.exception(msg, self.recording)
4863
4864        self.assertEqual(len(self.recording.records), 1)
4865        record = self.recording.records[0]
4866        self.assertEqual(record.levelno, logging.ERROR)
4867        self.assertEqual(record.msg, msg)
4868        self.assertEqual(record.args, (self.recording,))
4869        self.assertEqual(record.exc_info,
4870                         (exc.__class__, exc, exc.__traceback__))
4871
4872    def test_exception_excinfo(self):
4873        try:
4874            1 / 0
4875        except ZeroDivisionError as e:
4876            exc = e
4877
4878        self.adapter.exception('exc_info test', exc_info=exc)
4879
4880        self.assertEqual(len(self.recording.records), 1)
4881        record = self.recording.records[0]
4882        self.assertEqual(record.exc_info,
4883                         (exc.__class__, exc, exc.__traceback__))
4884
4885    def test_critical(self):
4886        msg = 'critical test! %r'
4887        self.adapter.critical(msg, self.recording)
4888
4889        self.assertEqual(len(self.recording.records), 1)
4890        record = self.recording.records[0]
4891        self.assertEqual(record.levelno, logging.CRITICAL)
4892        self.assertEqual(record.msg, msg)
4893        self.assertEqual(record.args, (self.recording,))
4894
4895    def test_is_enabled_for(self):
4896        old_disable = self.adapter.logger.manager.disable
4897        self.adapter.logger.manager.disable = 33
4898        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
4899                        old_disable)
4900        self.assertFalse(self.adapter.isEnabledFor(32))
4901
4902    def test_has_handlers(self):
4903        self.assertTrue(self.adapter.hasHandlers())
4904
4905        for handler in self.logger.handlers:
4906            self.logger.removeHandler(handler)
4907
4908        self.assertFalse(self.logger.hasHandlers())
4909        self.assertFalse(self.adapter.hasHandlers())
4910
4911    def test_nested(self):
4912        class Adapter(logging.LoggerAdapter):
4913            prefix = 'Adapter'
4914
4915            def process(self, msg, kwargs):
4916                return f"{self.prefix} {msg}", kwargs
4917
4918        msg = 'Adapters can be nested, yo.'
4919        adapter = Adapter(logger=self.logger, extra=None)
4920        adapter_adapter = Adapter(logger=adapter, extra=None)
4921        adapter_adapter.prefix = 'AdapterAdapter'
4922        self.assertEqual(repr(adapter), repr(adapter_adapter))
4923        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
4924        self.assertEqual(len(self.recording.records), 1)
4925        record = self.recording.records[0]
4926        self.assertEqual(record.levelno, logging.CRITICAL)
4927        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
4928        self.assertEqual(record.args, (self.recording,))
4929        orig_manager = adapter_adapter.manager
4930        self.assertIs(adapter.manager, orig_manager)
4931        self.assertIs(self.logger.manager, orig_manager)
4932        temp_manager = object()
4933        try:
4934            adapter_adapter.manager = temp_manager
4935            self.assertIs(adapter_adapter.manager, temp_manager)
4936            self.assertIs(adapter.manager, temp_manager)
4937            self.assertIs(self.logger.manager, temp_manager)
4938        finally:
4939            adapter_adapter.manager = orig_manager
4940        self.assertIs(adapter_adapter.manager, orig_manager)
4941        self.assertIs(adapter.manager, orig_manager)
4942        self.assertIs(self.logger.manager, orig_manager)
4943
4944
4945class LoggerTest(BaseTest, AssertErrorMessage):
4946
4947    def setUp(self):
4948        super(LoggerTest, self).setUp()
4949        self.recording = RecordingHandler()
4950        self.logger = logging.Logger(name='blah')
4951        self.logger.addHandler(self.recording)
4952        self.addCleanup(self.logger.removeHandler, self.recording)
4953        self.addCleanup(self.recording.close)
4954        self.addCleanup(logging.shutdown)
4955
4956    def test_set_invalid_level(self):
4957        self.assert_error_message(
4958            TypeError, 'Level not an integer or a valid string: None',
4959            self.logger.setLevel, None)
4960        self.assert_error_message(
4961            TypeError, 'Level not an integer or a valid string: (0, 0)',
4962            self.logger.setLevel, (0, 0))
4963
4964    def test_exception(self):
4965        msg = 'testing exception: %r'
4966        exc = None
4967        try:
4968            1 / 0
4969        except ZeroDivisionError as e:
4970            exc = e
4971            self.logger.exception(msg, self.recording)
4972
4973        self.assertEqual(len(self.recording.records), 1)
4974        record = self.recording.records[0]
4975        self.assertEqual(record.levelno, logging.ERROR)
4976        self.assertEqual(record.msg, msg)
4977        self.assertEqual(record.args, (self.recording,))
4978        self.assertEqual(record.exc_info,
4979                         (exc.__class__, exc, exc.__traceback__))
4980
4981    def test_log_invalid_level_with_raise(self):
4982        with support.swap_attr(logging, 'raiseExceptions', True):
4983            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
4984
4985    def test_log_invalid_level_no_raise(self):
4986        with support.swap_attr(logging, 'raiseExceptions', False):
4987            self.logger.log('10', 'test message')  # no exception happens
4988
4989    def test_find_caller_with_stack_info(self):
4990        called = []
4991        support.patch(self, logging.traceback, 'print_stack',
4992                      lambda f, file: called.append(file.getvalue()))
4993
4994        self.logger.findCaller(stack_info=True)
4995
4996        self.assertEqual(len(called), 1)
4997        self.assertEqual('Stack (most recent call last):\n', called[0])
4998
4999    def test_find_caller_with_stacklevel(self):
5000        the_level = 1
5001
5002        def innermost():
5003            self.logger.warning('test', stacklevel=the_level)
5004
5005        def inner():
5006            innermost()
5007
5008        def outer():
5009            inner()
5010
5011        records = self.recording.records
5012        outer()
5013        self.assertEqual(records[-1].funcName, 'innermost')
5014        lineno = records[-1].lineno
5015        the_level += 1
5016        outer()
5017        self.assertEqual(records[-1].funcName, 'inner')
5018        self.assertGreater(records[-1].lineno, lineno)
5019        lineno = records[-1].lineno
5020        the_level += 1
5021        outer()
5022        self.assertEqual(records[-1].funcName, 'outer')
5023        self.assertGreater(records[-1].lineno, lineno)
5024        lineno = records[-1].lineno
5025        the_level += 1
5026        outer()
5027        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
5028        self.assertGreater(records[-1].lineno, lineno)
5029
5030    def test_make_record_with_extra_overwrite(self):
5031        name = 'my record'
5032        level = 13
5033        fn = lno = msg = args = exc_info = func = sinfo = None
5034        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
5035                                       exc_info, func, sinfo)
5036
5037        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
5038            extra = {key: 'some value'}
5039            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
5040                              fn, lno, msg, args, exc_info,
5041                              extra=extra, sinfo=sinfo)
5042
5043    def test_make_record_with_extra_no_overwrite(self):
5044        name = 'my record'
5045        level = 13
5046        fn = lno = msg = args = exc_info = func = sinfo = None
5047        extra = {'valid_key': 'some value'}
5048        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
5049                                        exc_info, extra=extra, sinfo=sinfo)
5050        self.assertIn('valid_key', result.__dict__)
5051
5052    def test_has_handlers(self):
5053        self.assertTrue(self.logger.hasHandlers())
5054
5055        for handler in self.logger.handlers:
5056            self.logger.removeHandler(handler)
5057        self.assertFalse(self.logger.hasHandlers())
5058
5059    def test_has_handlers_no_propagate(self):
5060        child_logger = logging.getLogger('blah.child')
5061        child_logger.propagate = False
5062        self.assertFalse(child_logger.hasHandlers())
5063
5064    def test_is_enabled_for(self):
5065        old_disable = self.logger.manager.disable
5066        self.logger.manager.disable = 23
5067        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5068        self.assertFalse(self.logger.isEnabledFor(22))
5069
5070    def test_is_enabled_for_disabled_logger(self):
5071        old_disabled = self.logger.disabled
5072        old_disable = self.logger.manager.disable
5073
5074        self.logger.disabled = True
5075        self.logger.manager.disable = 21
5076
5077        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
5078        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5079
5080        self.assertFalse(self.logger.isEnabledFor(22))
5081
5082    def test_root_logger_aliases(self):
5083        root = logging.getLogger()
5084        self.assertIs(root, logging.root)
5085        self.assertIs(root, logging.getLogger(None))
5086        self.assertIs(root, logging.getLogger(''))
5087        self.assertIs(root, logging.getLogger('root'))
5088        self.assertIs(root, logging.getLogger('foo').root)
5089        self.assertIs(root, logging.getLogger('foo.bar').root)
5090        self.assertIs(root, logging.getLogger('foo').parent)
5091
5092        self.assertIsNot(root, logging.getLogger('\0'))
5093        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
5094
5095    def test_invalid_names(self):
5096        self.assertRaises(TypeError, logging.getLogger, any)
5097        self.assertRaises(TypeError, logging.getLogger, b'foo')
5098
5099    def test_pickling(self):
5100        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
5101            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
5102                logger = logging.getLogger(name)
5103                s = pickle.dumps(logger, proto)
5104                unpickled = pickle.loads(s)
5105                self.assertIs(unpickled, logger)
5106
5107    def test_caching(self):
5108        root = self.root_logger
5109        logger1 = logging.getLogger("abc")
5110        logger2 = logging.getLogger("abc.def")
5111
5112        # Set root logger level and ensure cache is empty
5113        root.setLevel(logging.ERROR)
5114        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
5115        self.assertEqual(logger2._cache, {})
5116
5117        # Ensure cache is populated and calls are consistent
5118        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5119        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
5120        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
5121        self.assertEqual(root._cache, {})
5122        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5123
5124        # Ensure root cache gets populated
5125        self.assertEqual(root._cache, {})
5126        self.assertTrue(root.isEnabledFor(logging.ERROR))
5127        self.assertEqual(root._cache, {logging.ERROR: True})
5128
5129        # Set parent logger level and ensure caches are emptied
5130        logger1.setLevel(logging.CRITICAL)
5131        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5132        self.assertEqual(logger2._cache, {})
5133
5134        # Ensure logger2 uses parent logger's effective level
5135        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5136
5137        # Set level to NOTSET and ensure caches are empty
5138        logger2.setLevel(logging.NOTSET)
5139        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5140        self.assertEqual(logger2._cache, {})
5141        self.assertEqual(logger1._cache, {})
5142        self.assertEqual(root._cache, {})
5143
5144        # Verify logger2 follows parent and not root
5145        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5146        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
5147        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
5148        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
5149        self.assertTrue(root.isEnabledFor(logging.ERROR))
5150
5151        # Disable logging in manager and ensure caches are clear
5152        logging.disable()
5153        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5154        self.assertEqual(logger2._cache, {})
5155        self.assertEqual(logger1._cache, {})
5156        self.assertEqual(root._cache, {})
5157
5158        # Ensure no loggers are enabled
5159        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
5160        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
5161        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
5162
5163
5164class BaseFileTest(BaseTest):
5165    "Base class for handler tests that write log files"
5166
5167    def setUp(self):
5168        BaseTest.setUp(self)
5169        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
5170        os.close(fd)
5171        self.rmfiles = []
5172
5173    def tearDown(self):
5174        for fn in self.rmfiles:
5175            os.unlink(fn)
5176        if os.path.exists(self.fn):
5177            os.unlink(self.fn)
5178        BaseTest.tearDown(self)
5179
5180    def assertLogFile(self, filename):
5181        "Assert a log file is there and register it for deletion"
5182        self.assertTrue(os.path.exists(filename),
5183                        msg="Log file %r does not exist" % filename)
5184        self.rmfiles.append(filename)
5185
5186    def next_rec(self):
5187        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
5188                                 self.next_message(), None, None, None)
5189
5190class FileHandlerTest(BaseFileTest):
5191    def test_delay(self):
5192        os.unlink(self.fn)
5193        fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True)
5194        self.assertIsNone(fh.stream)
5195        self.assertFalse(os.path.exists(self.fn))
5196        fh.handle(logging.makeLogRecord({}))
5197        self.assertIsNotNone(fh.stream)
5198        self.assertTrue(os.path.exists(self.fn))
5199        fh.close()
5200
5201    def test_emit_after_closing_in_write_mode(self):
5202        # Issue #42378
5203        os.unlink(self.fn)
5204        fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w')
5205        fh.setFormatter(logging.Formatter('%(message)s'))
5206        fh.emit(self.next_rec())    # '1'
5207        fh.close()
5208        fh.emit(self.next_rec())    # '2'
5209        with open(self.fn) as fp:
5210            self.assertEqual(fp.read().strip(), '1')
5211
5212class RotatingFileHandlerTest(BaseFileTest):
5213    def test_should_not_rollover(self):
5214        # If maxbytes is zero rollover never occurs
5215        rh = logging.handlers.RotatingFileHandler(
5216                self.fn, encoding="utf-8", maxBytes=0)
5217        self.assertFalse(rh.shouldRollover(None))
5218        rh.close()
5219        # bpo-45401 - test with special file
5220        # We set maxBytes to 1 so that rollover would normally happen, except
5221        # for the check for regular files
5222        rh = logging.handlers.RotatingFileHandler(
5223                os.devnull, encoding="utf-8", maxBytes=1)
5224        self.assertFalse(rh.shouldRollover(self.next_rec()))
5225        rh.close()
5226
5227    def test_should_rollover(self):
5228        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1)
5229        self.assertTrue(rh.shouldRollover(self.next_rec()))
5230        rh.close()
5231
5232    def test_file_created(self):
5233        # checks that the file is created and assumes it was created
5234        # by us
5235        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8")
5236        rh.emit(self.next_rec())
5237        self.assertLogFile(self.fn)
5238        rh.close()
5239
5240    def test_rollover_filenames(self):
5241        def namer(name):
5242            return name + ".test"
5243        rh = logging.handlers.RotatingFileHandler(
5244            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5245        rh.namer = namer
5246        rh.emit(self.next_rec())
5247        self.assertLogFile(self.fn)
5248        rh.emit(self.next_rec())
5249        self.assertLogFile(namer(self.fn + ".1"))
5250        rh.emit(self.next_rec())
5251        self.assertLogFile(namer(self.fn + ".2"))
5252        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5253        rh.close()
5254
5255    def test_namer_rotator_inheritance(self):
5256        class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler):
5257            def namer(self, name):
5258                return name + ".test"
5259
5260            def rotator(self, source, dest):
5261                if os.path.exists(source):
5262                    os.replace(source, dest + ".rotated")
5263
5264        rh = HandlerWithNamerAndRotator(
5265            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5266        self.assertEqual(rh.namer(self.fn), self.fn + ".test")
5267        rh.emit(self.next_rec())
5268        self.assertLogFile(self.fn)
5269        rh.emit(self.next_rec())
5270        self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated")
5271        self.assertFalse(os.path.exists(rh.namer(self.fn + ".1")))
5272        rh.close()
5273
5274    @support.requires_zlib()
5275    def test_rotator(self):
5276        def namer(name):
5277            return name + ".gz"
5278
5279        def rotator(source, dest):
5280            with open(source, "rb") as sf:
5281                data = sf.read()
5282                compressed = zlib.compress(data, 9)
5283                with open(dest, "wb") as df:
5284                    df.write(compressed)
5285            os.remove(source)
5286
5287        rh = logging.handlers.RotatingFileHandler(
5288            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5289        rh.rotator = rotator
5290        rh.namer = namer
5291        m1 = self.next_rec()
5292        rh.emit(m1)
5293        self.assertLogFile(self.fn)
5294        m2 = self.next_rec()
5295        rh.emit(m2)
5296        fn = namer(self.fn + ".1")
5297        self.assertLogFile(fn)
5298        newline = os.linesep
5299        with open(fn, "rb") as f:
5300            compressed = f.read()
5301            data = zlib.decompress(compressed)
5302            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5303        rh.emit(self.next_rec())
5304        fn = namer(self.fn + ".2")
5305        self.assertLogFile(fn)
5306        with open(fn, "rb") as f:
5307            compressed = f.read()
5308            data = zlib.decompress(compressed)
5309            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5310        rh.emit(self.next_rec())
5311        fn = namer(self.fn + ".2")
5312        with open(fn, "rb") as f:
5313            compressed = f.read()
5314            data = zlib.decompress(compressed)
5315            self.assertEqual(data.decode("ascii"), m2.msg + newline)
5316        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5317        rh.close()
5318
5319class TimedRotatingFileHandlerTest(BaseFileTest):
5320    def test_should_not_rollover(self):
5321        # See bpo-45401. Should only ever rollover regular files
5322        fh = logging.handlers.TimedRotatingFileHandler(
5323                os.devnull, 'S', encoding="utf-8", backupCount=1)
5324        time.sleep(1.1)    # a little over a second ...
5325        r = logging.makeLogRecord({'msg': 'testing - device file'})
5326        self.assertFalse(fh.shouldRollover(r))
5327        fh.close()
5328
5329    # other test methods added below
5330    def test_rollover(self):
5331        fh = logging.handlers.TimedRotatingFileHandler(
5332                self.fn, 'S', encoding="utf-8", backupCount=1)
5333        fmt = logging.Formatter('%(asctime)s %(message)s')
5334        fh.setFormatter(fmt)
5335        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
5336        fh.emit(r1)
5337        self.assertLogFile(self.fn)
5338        time.sleep(1.1)    # a little over a second ...
5339        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
5340        fh.emit(r2)
5341        fh.close()
5342        # At this point, we should have a recent rotated file which we
5343        # can test for the existence of. However, in practice, on some
5344        # machines which run really slowly, we don't know how far back
5345        # in time to go to look for the log file. So, we go back a fair
5346        # bit, and stop as soon as we see a rotated file. In theory this
5347        # could of course still fail, but the chances are lower.
5348        found = False
5349        now = datetime.datetime.now()
5350        GO_BACK = 5 * 60 # seconds
5351        for secs in range(GO_BACK):
5352            prev = now - datetime.timedelta(seconds=secs)
5353            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
5354            found = os.path.exists(fn)
5355            if found:
5356                self.rmfiles.append(fn)
5357                break
5358        msg = 'No rotated files found, went back %d seconds' % GO_BACK
5359        if not found:
5360            # print additional diagnostics
5361            dn, fn = os.path.split(self.fn)
5362            files = [f for f in os.listdir(dn) if f.startswith(fn)]
5363            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
5364            print('The only matching files are: %s' % files, file=sys.stderr)
5365            for f in files:
5366                print('Contents of %s:' % f)
5367                path = os.path.join(dn, f)
5368                with open(path, 'r') as tf:
5369                    print(tf.read())
5370        self.assertTrue(found, msg=msg)
5371
5372    def test_invalid(self):
5373        assertRaises = self.assertRaises
5374        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5375                     self.fn, 'X', encoding="utf-8", delay=True)
5376        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5377                     self.fn, 'W', encoding="utf-8", delay=True)
5378        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5379                     self.fn, 'W7', encoding="utf-8", delay=True)
5380
5381    def test_compute_rollover_daily_attime(self):
5382        currentTime = 0
5383        atTime = datetime.time(12, 0, 0)
5384        rh = logging.handlers.TimedRotatingFileHandler(
5385            self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0,
5386            utc=True, atTime=atTime)
5387        try:
5388            actual = rh.computeRollover(currentTime)
5389            self.assertEqual(actual, currentTime + 12 * 60 * 60)
5390
5391            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
5392            self.assertEqual(actual, currentTime + 36 * 60 * 60)
5393        finally:
5394            rh.close()
5395
5396    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
5397    def test_compute_rollover_weekly_attime(self):
5398        currentTime = int(time.time())
5399        today = currentTime - currentTime % 86400
5400
5401        atTime = datetime.time(12, 0, 0)
5402
5403        wday = time.gmtime(today).tm_wday
5404        for day in range(7):
5405            rh = logging.handlers.TimedRotatingFileHandler(
5406                self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0,
5407                utc=True, atTime=atTime)
5408            try:
5409                if wday > day:
5410                    # The rollover day has already passed this week, so we
5411                    # go over into next week
5412                    expected = (7 - wday + day)
5413                else:
5414                    expected = (day - wday)
5415                # At this point expected is in days from now, convert to seconds
5416                expected *= 24 * 60 * 60
5417                # Add in the rollover time
5418                expected += 12 * 60 * 60
5419                # Add in adjustment for today
5420                expected += today
5421                actual = rh.computeRollover(today)
5422                if actual != expected:
5423                    print('failed in timezone: %d' % time.timezone)
5424                    print('local vars: %s' % locals())
5425                self.assertEqual(actual, expected)
5426                if day == wday:
5427                    # goes into following week
5428                    expected += 7 * 24 * 60 * 60
5429                actual = rh.computeRollover(today + 13 * 60 * 60)
5430                if actual != expected:
5431                    print('failed in timezone: %d' % time.timezone)
5432                    print('local vars: %s' % locals())
5433                self.assertEqual(actual, expected)
5434            finally:
5435                rh.close()
5436
5437
5438def secs(**kw):
5439    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
5440
5441for when, exp in (('S', 1),
5442                  ('M', 60),
5443                  ('H', 60 * 60),
5444                  ('D', 60 * 60 * 24),
5445                  ('MIDNIGHT', 60 * 60 * 24),
5446                  # current time (epoch start) is a Thursday, W0 means Monday
5447                  ('W0', secs(days=4, hours=24)),
5448                 ):
5449    def test_compute_rollover(self, when=when, exp=exp):
5450        rh = logging.handlers.TimedRotatingFileHandler(
5451            self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True)
5452        currentTime = 0.0
5453        actual = rh.computeRollover(currentTime)
5454        if exp != actual:
5455            # Failures occur on some systems for MIDNIGHT and W0.
5456            # Print detailed calculation for MIDNIGHT so we can try to see
5457            # what's going on
5458            if when == 'MIDNIGHT':
5459                try:
5460                    if rh.utc:
5461                        t = time.gmtime(currentTime)
5462                    else:
5463                        t = time.localtime(currentTime)
5464                    currentHour = t[3]
5465                    currentMinute = t[4]
5466                    currentSecond = t[5]
5467                    # r is the number of seconds left between now and midnight
5468                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
5469                                                       currentMinute) * 60 +
5470                            currentSecond)
5471                    result = currentTime + r
5472                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
5473                    print('currentHour: %s' % currentHour, file=sys.stderr)
5474                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
5475                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
5476                    print('r: %s' % r, file=sys.stderr)
5477                    print('result: %s' % result, file=sys.stderr)
5478                except Exception:
5479                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
5480        self.assertEqual(exp, actual)
5481        rh.close()
5482    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
5483
5484
5485@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
5486class NTEventLogHandlerTest(BaseTest):
5487    def test_basic(self):
5488        logtype = 'Application'
5489        elh = win32evtlog.OpenEventLog(None, logtype)
5490        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
5491
5492        try:
5493            h = logging.handlers.NTEventLogHandler('test_logging')
5494        except pywintypes.error as e:
5495            if e.winerror == 5:  # access denied
5496                raise unittest.SkipTest('Insufficient privileges to run test')
5497            raise
5498
5499        r = logging.makeLogRecord({'msg': 'Test Log Message'})
5500        h.handle(r)
5501        h.close()
5502        # Now see if the event is recorded
5503        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
5504        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
5505                win32evtlog.EVENTLOG_SEQUENTIAL_READ
5506        found = False
5507        GO_BACK = 100
5508        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
5509        for e in events:
5510            if e.SourceName != 'test_logging':
5511                continue
5512            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
5513            if msg != 'Test Log Message\r\n':
5514                continue
5515            found = True
5516            break
5517        msg = 'Record not found in event log, went back %d records' % GO_BACK
5518        self.assertTrue(found, msg=msg)
5519
5520
5521class MiscTestCase(unittest.TestCase):
5522    def test__all__(self):
5523        not_exported = {
5524            'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe',
5525            'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
5526            'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root',
5527            'threading'}
5528        support.check__all__(self, logging, not_exported=not_exported)
5529
5530
5531# Set the locale to the platform-dependent default.  I have no idea
5532# why the test does this, but in any case we save the current locale
5533# first and restore it at the end.
5534def setUpModule():
5535    cm = support.run_with_locale('LC_ALL', '')
5536    cm.__enter__()
5537    unittest.addModuleCleanup(cm.__exit__, None, None, None)
5538
5539
5540if __name__ == "__main__":
5541    unittest.main()
5542