1# Copyright 2001-2019 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2019 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import copy 29import datetime 30import pathlib 31import pickle 32import io 33import gc 34import json 35import os 36import queue 37import random 38import re 39import socket 40import struct 41import sys 42import tempfile 43from test.support.script_helper import assert_python_ok, assert_python_failure 44from test import support 45from test.support import os_helper 46from test.support import socket_helper 47from test.support import threading_helper 48from test.support import warnings_helper 49from test.support.logging_helper import TestHandler 50import textwrap 51import threading 52import time 53import unittest 54import warnings 55import weakref 56 57from http.server import HTTPServer, BaseHTTPRequestHandler 58from urllib.parse import urlparse, parse_qs 59from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 60 ThreadingTCPServer, StreamRequestHandler) 61 62with warnings.catch_warnings(): 63 warnings.simplefilter('ignore', DeprecationWarning) 64 import asyncore 65 import smtpd 66 67try: 68 import win32evtlog, win32evtlogutil, pywintypes 69except ImportError: 70 win32evtlog = win32evtlogutil = pywintypes = None 71 72try: 73 import zlib 74except ImportError: 75 pass 76 77class BaseTest(unittest.TestCase): 78 79 """Base class for logging tests.""" 80 81 log_format = "%(name)s -> %(levelname)s: %(message)s" 82 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 83 message_num = 0 84 85 def setUp(self): 86 """Setup the default logging stream to an internal StringIO instance, 87 so that we can examine log output as we want.""" 88 self._threading_key = threading_helper.threading_setup() 89 90 logger_dict = logging.getLogger().manager.loggerDict 91 logging._acquireLock() 92 try: 93 self.saved_handlers = logging._handlers.copy() 94 self.saved_handler_list = logging._handlerList[:] 95 self.saved_loggers = saved_loggers = logger_dict.copy() 96 self.saved_name_to_level = logging._nameToLevel.copy() 97 self.saved_level_to_name = logging._levelToName.copy() 98 self.logger_states = logger_states = {} 99 for name in saved_loggers: 100 logger_states[name] = getattr(saved_loggers[name], 101 'disabled', None) 102 finally: 103 logging._releaseLock() 104 105 # Set two unused loggers 106 self.logger1 = logging.getLogger("\xab\xd7\xbb") 107 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 108 109 self.root_logger = logging.getLogger("") 110 self.original_logging_level = self.root_logger.getEffectiveLevel() 111 112 self.stream = io.StringIO() 113 self.root_logger.setLevel(logging.DEBUG) 114 self.root_hdlr = logging.StreamHandler(self.stream) 115 self.root_formatter = logging.Formatter(self.log_format) 116 self.root_hdlr.setFormatter(self.root_formatter) 117 if self.logger1.hasHandlers(): 118 hlist = self.logger1.handlers + self.root_logger.handlers 119 raise AssertionError('Unexpected handlers: %s' % hlist) 120 if self.logger2.hasHandlers(): 121 hlist = self.logger2.handlers + self.root_logger.handlers 122 raise AssertionError('Unexpected handlers: %s' % hlist) 123 self.root_logger.addHandler(self.root_hdlr) 124 self.assertTrue(self.logger1.hasHandlers()) 125 self.assertTrue(self.logger2.hasHandlers()) 126 127 def tearDown(self): 128 """Remove our logging stream, and restore the original logging 129 level.""" 130 self.stream.close() 131 self.root_logger.removeHandler(self.root_hdlr) 132 while self.root_logger.handlers: 133 h = self.root_logger.handlers[0] 134 self.root_logger.removeHandler(h) 135 h.close() 136 self.root_logger.setLevel(self.original_logging_level) 137 logging._acquireLock() 138 try: 139 logging._levelToName.clear() 140 logging._levelToName.update(self.saved_level_to_name) 141 logging._nameToLevel.clear() 142 logging._nameToLevel.update(self.saved_name_to_level) 143 logging._handlers.clear() 144 logging._handlers.update(self.saved_handlers) 145 logging._handlerList[:] = self.saved_handler_list 146 manager = logging.getLogger().manager 147 manager.disable = 0 148 loggerDict = manager.loggerDict 149 loggerDict.clear() 150 loggerDict.update(self.saved_loggers) 151 logger_states = self.logger_states 152 for name in self.logger_states: 153 if logger_states[name] is not None: 154 self.saved_loggers[name].disabled = logger_states[name] 155 finally: 156 logging._releaseLock() 157 158 self.doCleanups() 159 threading_helper.threading_cleanup(*self._threading_key) 160 161 def assert_log_lines(self, expected_values, stream=None, pat=None): 162 """Match the collected log lines against the regular expression 163 self.expected_log_pat, and compare the extracted group values to 164 the expected_values list of tuples.""" 165 stream = stream or self.stream 166 pat = re.compile(pat or self.expected_log_pat) 167 actual_lines = stream.getvalue().splitlines() 168 self.assertEqual(len(actual_lines), len(expected_values)) 169 for actual, expected in zip(actual_lines, expected_values): 170 match = pat.search(actual) 171 if not match: 172 self.fail("Log line does not match expected pattern:\n" + 173 actual) 174 self.assertEqual(tuple(match.groups()), expected) 175 s = stream.read() 176 if s: 177 self.fail("Remaining output at end of log stream:\n" + s) 178 179 def next_message(self): 180 """Generate a message consisting solely of an auto-incrementing 181 integer.""" 182 self.message_num += 1 183 return "%d" % self.message_num 184 185 186class BuiltinLevelsTest(BaseTest): 187 """Test builtin levels and their inheritance.""" 188 189 def test_flat(self): 190 # Logging levels in a flat logger namespace. 191 m = self.next_message 192 193 ERR = logging.getLogger("ERR") 194 ERR.setLevel(logging.ERROR) 195 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 196 INF.setLevel(logging.INFO) 197 DEB = logging.getLogger("DEB") 198 DEB.setLevel(logging.DEBUG) 199 200 # These should log. 201 ERR.log(logging.CRITICAL, m()) 202 ERR.error(m()) 203 204 INF.log(logging.CRITICAL, m()) 205 INF.error(m()) 206 INF.warning(m()) 207 INF.info(m()) 208 209 DEB.log(logging.CRITICAL, m()) 210 DEB.error(m()) 211 DEB.warning(m()) 212 DEB.info(m()) 213 DEB.debug(m()) 214 215 # These should not log. 216 ERR.warning(m()) 217 ERR.info(m()) 218 ERR.debug(m()) 219 220 INF.debug(m()) 221 222 self.assert_log_lines([ 223 ('ERR', 'CRITICAL', '1'), 224 ('ERR', 'ERROR', '2'), 225 ('INF', 'CRITICAL', '3'), 226 ('INF', 'ERROR', '4'), 227 ('INF', 'WARNING', '5'), 228 ('INF', 'INFO', '6'), 229 ('DEB', 'CRITICAL', '7'), 230 ('DEB', 'ERROR', '8'), 231 ('DEB', 'WARNING', '9'), 232 ('DEB', 'INFO', '10'), 233 ('DEB', 'DEBUG', '11'), 234 ]) 235 236 def test_nested_explicit(self): 237 # Logging levels in a nested namespace, all explicitly set. 238 m = self.next_message 239 240 INF = logging.getLogger("INF") 241 INF.setLevel(logging.INFO) 242 INF_ERR = logging.getLogger("INF.ERR") 243 INF_ERR.setLevel(logging.ERROR) 244 245 # These should log. 246 INF_ERR.log(logging.CRITICAL, m()) 247 INF_ERR.error(m()) 248 249 # These should not log. 250 INF_ERR.warning(m()) 251 INF_ERR.info(m()) 252 INF_ERR.debug(m()) 253 254 self.assert_log_lines([ 255 ('INF.ERR', 'CRITICAL', '1'), 256 ('INF.ERR', 'ERROR', '2'), 257 ]) 258 259 def test_nested_inherited(self): 260 # Logging levels in a nested namespace, inherited from parent loggers. 261 m = self.next_message 262 263 INF = logging.getLogger("INF") 264 INF.setLevel(logging.INFO) 265 INF_ERR = logging.getLogger("INF.ERR") 266 INF_ERR.setLevel(logging.ERROR) 267 INF_UNDEF = logging.getLogger("INF.UNDEF") 268 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 269 UNDEF = logging.getLogger("UNDEF") 270 271 # These should log. 272 INF_UNDEF.log(logging.CRITICAL, m()) 273 INF_UNDEF.error(m()) 274 INF_UNDEF.warning(m()) 275 INF_UNDEF.info(m()) 276 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 277 INF_ERR_UNDEF.error(m()) 278 279 # These should not log. 280 INF_UNDEF.debug(m()) 281 INF_ERR_UNDEF.warning(m()) 282 INF_ERR_UNDEF.info(m()) 283 INF_ERR_UNDEF.debug(m()) 284 285 self.assert_log_lines([ 286 ('INF.UNDEF', 'CRITICAL', '1'), 287 ('INF.UNDEF', 'ERROR', '2'), 288 ('INF.UNDEF', 'WARNING', '3'), 289 ('INF.UNDEF', 'INFO', '4'), 290 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 291 ('INF.ERR.UNDEF', 'ERROR', '6'), 292 ]) 293 294 def test_nested_with_virtual_parent(self): 295 # Logging levels when some parent does not exist yet. 296 m = self.next_message 297 298 INF = logging.getLogger("INF") 299 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 300 CHILD = logging.getLogger("INF.BADPARENT") 301 INF.setLevel(logging.INFO) 302 303 # These should log. 304 GRANDCHILD.log(logging.FATAL, m()) 305 GRANDCHILD.info(m()) 306 CHILD.log(logging.FATAL, m()) 307 CHILD.info(m()) 308 309 # These should not log. 310 GRANDCHILD.debug(m()) 311 CHILD.debug(m()) 312 313 self.assert_log_lines([ 314 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 315 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 316 ('INF.BADPARENT', 'CRITICAL', '3'), 317 ('INF.BADPARENT', 'INFO', '4'), 318 ]) 319 320 def test_regression_22386(self): 321 """See issue #22386 for more information.""" 322 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 323 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 324 325 def test_issue27935(self): 326 fatal = logging.getLevelName('FATAL') 327 self.assertEqual(fatal, logging.FATAL) 328 329 def test_regression_29220(self): 330 """See issue #29220 for more information.""" 331 logging.addLevelName(logging.INFO, '') 332 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 333 self.assertEqual(logging.getLevelName(logging.INFO), '') 334 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 335 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 336 337class BasicFilterTest(BaseTest): 338 339 """Test the bundled Filter class.""" 340 341 def test_filter(self): 342 # Only messages satisfying the specified criteria pass through the 343 # filter. 344 filter_ = logging.Filter("spam.eggs") 345 handler = self.root_logger.handlers[0] 346 try: 347 handler.addFilter(filter_) 348 spam = logging.getLogger("spam") 349 spam_eggs = logging.getLogger("spam.eggs") 350 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 351 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 352 353 spam.info(self.next_message()) 354 spam_eggs.info(self.next_message()) # Good. 355 spam_eggs_fish.info(self.next_message()) # Good. 356 spam_bakedbeans.info(self.next_message()) 357 358 self.assert_log_lines([ 359 ('spam.eggs', 'INFO', '2'), 360 ('spam.eggs.fish', 'INFO', '3'), 361 ]) 362 finally: 363 handler.removeFilter(filter_) 364 365 def test_callable_filter(self): 366 # Only messages satisfying the specified criteria pass through the 367 # filter. 368 369 def filterfunc(record): 370 parts = record.name.split('.') 371 prefix = '.'.join(parts[:2]) 372 return prefix == 'spam.eggs' 373 374 handler = self.root_logger.handlers[0] 375 try: 376 handler.addFilter(filterfunc) 377 spam = logging.getLogger("spam") 378 spam_eggs = logging.getLogger("spam.eggs") 379 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 380 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 381 382 spam.info(self.next_message()) 383 spam_eggs.info(self.next_message()) # Good. 384 spam_eggs_fish.info(self.next_message()) # Good. 385 spam_bakedbeans.info(self.next_message()) 386 387 self.assert_log_lines([ 388 ('spam.eggs', 'INFO', '2'), 389 ('spam.eggs.fish', 'INFO', '3'), 390 ]) 391 finally: 392 handler.removeFilter(filterfunc) 393 394 def test_empty_filter(self): 395 f = logging.Filter() 396 r = logging.makeLogRecord({'name': 'spam.eggs'}) 397 self.assertTrue(f.filter(r)) 398 399# 400# First, we define our levels. There can be as many as you want - the only 401# limitations are that they should be integers, the lowest should be > 0 and 402# larger values mean less information being logged. If you need specific 403# level values which do not fit into these limitations, you can use a 404# mapping dictionary to convert between your application levels and the 405# logging system. 406# 407SILENT = 120 408TACITURN = 119 409TERSE = 118 410EFFUSIVE = 117 411SOCIABLE = 116 412VERBOSE = 115 413TALKATIVE = 114 414GARRULOUS = 113 415CHATTERBOX = 112 416BORING = 111 417 418LEVEL_RANGE = range(BORING, SILENT + 1) 419 420# 421# Next, we define names for our levels. You don't need to do this - in which 422# case the system will use "Level n" to denote the text for the level. 423# 424my_logging_levels = { 425 SILENT : 'Silent', 426 TACITURN : 'Taciturn', 427 TERSE : 'Terse', 428 EFFUSIVE : 'Effusive', 429 SOCIABLE : 'Sociable', 430 VERBOSE : 'Verbose', 431 TALKATIVE : 'Talkative', 432 GARRULOUS : 'Garrulous', 433 CHATTERBOX : 'Chatterbox', 434 BORING : 'Boring', 435} 436 437class GarrulousFilter(logging.Filter): 438 439 """A filter which blocks garrulous messages.""" 440 441 def filter(self, record): 442 return record.levelno != GARRULOUS 443 444class VerySpecificFilter(logging.Filter): 445 446 """A filter which blocks sociable and taciturn messages.""" 447 448 def filter(self, record): 449 return record.levelno not in [SOCIABLE, TACITURN] 450 451 452class CustomLevelsAndFiltersTest(BaseTest): 453 454 """Test various filtering possibilities with custom logging levels.""" 455 456 # Skip the logger name group. 457 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 458 459 def setUp(self): 460 BaseTest.setUp(self) 461 for k, v in my_logging_levels.items(): 462 logging.addLevelName(k, v) 463 464 def log_at_all_levels(self, logger): 465 for lvl in LEVEL_RANGE: 466 logger.log(lvl, self.next_message()) 467 468 def test_logger_filter(self): 469 # Filter at logger level. 470 self.root_logger.setLevel(VERBOSE) 471 # Levels >= 'Verbose' are good. 472 self.log_at_all_levels(self.root_logger) 473 self.assert_log_lines([ 474 ('Verbose', '5'), 475 ('Sociable', '6'), 476 ('Effusive', '7'), 477 ('Terse', '8'), 478 ('Taciturn', '9'), 479 ('Silent', '10'), 480 ]) 481 482 def test_handler_filter(self): 483 # Filter at handler level. 484 self.root_logger.handlers[0].setLevel(SOCIABLE) 485 try: 486 # Levels >= 'Sociable' are good. 487 self.log_at_all_levels(self.root_logger) 488 self.assert_log_lines([ 489 ('Sociable', '6'), 490 ('Effusive', '7'), 491 ('Terse', '8'), 492 ('Taciturn', '9'), 493 ('Silent', '10'), 494 ]) 495 finally: 496 self.root_logger.handlers[0].setLevel(logging.NOTSET) 497 498 def test_specific_filters(self): 499 # Set a specific filter object on the handler, and then add another 500 # filter object on the logger itself. 501 handler = self.root_logger.handlers[0] 502 specific_filter = None 503 garr = GarrulousFilter() 504 handler.addFilter(garr) 505 try: 506 self.log_at_all_levels(self.root_logger) 507 first_lines = [ 508 # Notice how 'Garrulous' is missing 509 ('Boring', '1'), 510 ('Chatterbox', '2'), 511 ('Talkative', '4'), 512 ('Verbose', '5'), 513 ('Sociable', '6'), 514 ('Effusive', '7'), 515 ('Terse', '8'), 516 ('Taciturn', '9'), 517 ('Silent', '10'), 518 ] 519 self.assert_log_lines(first_lines) 520 521 specific_filter = VerySpecificFilter() 522 self.root_logger.addFilter(specific_filter) 523 self.log_at_all_levels(self.root_logger) 524 self.assert_log_lines(first_lines + [ 525 # Not only 'Garrulous' is still missing, but also 'Sociable' 526 # and 'Taciturn' 527 ('Boring', '11'), 528 ('Chatterbox', '12'), 529 ('Talkative', '14'), 530 ('Verbose', '15'), 531 ('Effusive', '17'), 532 ('Terse', '18'), 533 ('Silent', '20'), 534 ]) 535 finally: 536 if specific_filter: 537 self.root_logger.removeFilter(specific_filter) 538 handler.removeFilter(garr) 539 540 541class HandlerTest(BaseTest): 542 def test_name(self): 543 h = logging.Handler() 544 h.name = 'generic' 545 self.assertEqual(h.name, 'generic') 546 h.name = 'anothergeneric' 547 self.assertEqual(h.name, 'anothergeneric') 548 self.assertRaises(NotImplementedError, h.emit, None) 549 550 def test_builtin_handlers(self): 551 # We can't actually *use* too many handlers in the tests, 552 # but we can try instantiating them with various options 553 if sys.platform in ('linux', 'darwin'): 554 for existing in (True, False): 555 fd, fn = tempfile.mkstemp() 556 os.close(fd) 557 if not existing: 558 os.unlink(fn) 559 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True) 560 if existing: 561 dev, ino = h.dev, h.ino 562 self.assertEqual(dev, -1) 563 self.assertEqual(ino, -1) 564 r = logging.makeLogRecord({'msg': 'Test'}) 565 h.handle(r) 566 # Now remove the file. 567 os.unlink(fn) 568 self.assertFalse(os.path.exists(fn)) 569 # The next call should recreate the file. 570 h.handle(r) 571 self.assertTrue(os.path.exists(fn)) 572 else: 573 self.assertEqual(h.dev, -1) 574 self.assertEqual(h.ino, -1) 575 h.close() 576 if existing: 577 os.unlink(fn) 578 if sys.platform == 'darwin': 579 sockname = '/var/run/syslog' 580 else: 581 sockname = '/dev/log' 582 try: 583 h = logging.handlers.SysLogHandler(sockname) 584 self.assertEqual(h.facility, h.LOG_USER) 585 self.assertTrue(h.unixsocket) 586 h.close() 587 except OSError: # syslogd might not be available 588 pass 589 for method in ('GET', 'POST', 'PUT'): 590 if method == 'PUT': 591 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 592 'localhost', '/log', method) 593 else: 594 h = logging.handlers.HTTPHandler('localhost', '/log', method) 595 h.close() 596 h = logging.handlers.BufferingHandler(0) 597 r = logging.makeLogRecord({}) 598 self.assertTrue(h.shouldFlush(r)) 599 h.close() 600 h = logging.handlers.BufferingHandler(1) 601 self.assertFalse(h.shouldFlush(r)) 602 h.close() 603 604 def test_path_objects(self): 605 """ 606 Test that Path objects are accepted as filename arguments to handlers. 607 608 See Issue #27493. 609 """ 610 fd, fn = tempfile.mkstemp() 611 os.close(fd) 612 os.unlink(fn) 613 pfn = pathlib.Path(fn) 614 cases = ( 615 (logging.FileHandler, (pfn, 'w')), 616 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 617 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 618 ) 619 if sys.platform in ('linux', 'darwin'): 620 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 621 for cls, args in cases: 622 h = cls(*args, encoding="utf-8") 623 self.assertTrue(os.path.exists(fn)) 624 h.close() 625 os.unlink(fn) 626 627 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 628 def test_race(self): 629 # Issue #14632 refers. 630 def remove_loop(fname, tries): 631 for _ in range(tries): 632 try: 633 os.unlink(fname) 634 self.deletion_time = time.time() 635 except OSError: 636 pass 637 time.sleep(0.004 * random.randint(0, 4)) 638 639 del_count = 500 640 log_count = 500 641 642 self.handle_time = None 643 self.deletion_time = None 644 645 for delay in (False, True): 646 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 647 os.close(fd) 648 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 649 remover.daemon = True 650 remover.start() 651 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay) 652 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 653 h.setFormatter(f) 654 try: 655 for _ in range(log_count): 656 time.sleep(0.005) 657 r = logging.makeLogRecord({'msg': 'testing' }) 658 try: 659 self.handle_time = time.time() 660 h.handle(r) 661 except Exception: 662 print('Deleted at %s, ' 663 'opened at %s' % (self.deletion_time, 664 self.handle_time)) 665 raise 666 finally: 667 remover.join() 668 h.close() 669 if os.path.exists(fn): 670 os.unlink(fn) 671 672 # The implementation relies on os.register_at_fork existing, but we test 673 # based on os.fork existing because that is what users and this test use. 674 # This helps ensure that when fork exists (the important concept) that the 675 # register_at_fork mechanism is also present and used. 676 @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().') 677 def test_post_fork_child_no_deadlock(self): 678 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 679 class _OurHandler(logging.Handler): 680 def __init__(self): 681 super().__init__() 682 self.sub_handler = logging.StreamHandler( 683 stream=open('/dev/null', 'wt', encoding='utf-8')) 684 685 def emit(self, record): 686 self.sub_handler.acquire() 687 try: 688 self.sub_handler.emit(record) 689 finally: 690 self.sub_handler.release() 691 692 self.assertEqual(len(logging._handlers), 0) 693 refed_h = _OurHandler() 694 self.addCleanup(refed_h.sub_handler.stream.close) 695 refed_h.name = 'because we need at least one for this test' 696 self.assertGreater(len(logging._handlers), 0) 697 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 698 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 699 test_logger.addHandler(refed_h) 700 test_logger.setLevel(logging.DEBUG) 701 702 locks_held__ready_to_fork = threading.Event() 703 fork_happened__release_locks_and_end_thread = threading.Event() 704 705 def lock_holder_thread_fn(): 706 logging._acquireLock() 707 try: 708 refed_h.acquire() 709 try: 710 # Tell the main thread to do the fork. 711 locks_held__ready_to_fork.set() 712 713 # If the deadlock bug exists, the fork will happen 714 # without dealing with the locks we hold, deadlocking 715 # the child. 716 717 # Wait for a successful fork or an unreasonable amount of 718 # time before releasing our locks. To avoid a timing based 719 # test we'd need communication from os.fork() as to when it 720 # has actually happened. Given this is a regression test 721 # for a fixed issue, potentially less reliably detecting 722 # regression via timing is acceptable for simplicity. 723 # The test will always take at least this long. :( 724 fork_happened__release_locks_and_end_thread.wait(0.5) 725 finally: 726 refed_h.release() 727 finally: 728 logging._releaseLock() 729 730 lock_holder_thread = threading.Thread( 731 target=lock_holder_thread_fn, 732 name='test_post_fork_child_no_deadlock lock holder') 733 lock_holder_thread.start() 734 735 locks_held__ready_to_fork.wait() 736 pid = os.fork() 737 if pid == 0: 738 # Child process 739 try: 740 test_logger.info(r'Child process did not deadlock. \o/') 741 finally: 742 os._exit(0) 743 else: 744 # Parent process 745 test_logger.info(r'Parent process returned from fork. \o/') 746 fork_happened__release_locks_and_end_thread.set() 747 lock_holder_thread.join() 748 749 support.wait_process(pid, exitcode=0) 750 751 752class BadStream(object): 753 def write(self, data): 754 raise RuntimeError('deliberate mistake') 755 756class TestStreamHandler(logging.StreamHandler): 757 def handleError(self, record): 758 self.error_record = record 759 760class StreamWithIntName(object): 761 level = logging.NOTSET 762 name = 2 763 764class StreamHandlerTest(BaseTest): 765 def test_error_handling(self): 766 h = TestStreamHandler(BadStream()) 767 r = logging.makeLogRecord({}) 768 old_raise = logging.raiseExceptions 769 770 try: 771 h.handle(r) 772 self.assertIs(h.error_record, r) 773 774 h = logging.StreamHandler(BadStream()) 775 with support.captured_stderr() as stderr: 776 h.handle(r) 777 msg = '\nRuntimeError: deliberate mistake\n' 778 self.assertIn(msg, stderr.getvalue()) 779 780 logging.raiseExceptions = False 781 with support.captured_stderr() as stderr: 782 h.handle(r) 783 self.assertEqual('', stderr.getvalue()) 784 finally: 785 logging.raiseExceptions = old_raise 786 787 def test_stream_setting(self): 788 """ 789 Test setting the handler's stream 790 """ 791 h = logging.StreamHandler() 792 stream = io.StringIO() 793 old = h.setStream(stream) 794 self.assertIs(old, sys.stderr) 795 actual = h.setStream(old) 796 self.assertIs(actual, stream) 797 # test that setting to existing value returns None 798 actual = h.setStream(old) 799 self.assertIsNone(actual) 800 801 def test_can_represent_stream_with_int_name(self): 802 h = logging.StreamHandler(StreamWithIntName()) 803 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 804 805# -- The following section could be moved into a server_helper.py module 806# -- if it proves to be of wider utility than just test_logging 807 808class TestSMTPServer(smtpd.SMTPServer): 809 """ 810 This class implements a test SMTP server. 811 812 :param addr: A (host, port) tuple which the server listens on. 813 You can specify a port value of zero: the server's 814 *port* attribute will hold the actual port number 815 used, which can be used in client connections. 816 :param handler: A callable which will be called to process 817 incoming messages. The handler will be passed 818 the client address tuple, who the message is from, 819 a list of recipients and the message data. 820 :param poll_interval: The interval, in seconds, used in the underlying 821 :func:`select` or :func:`poll` call by 822 :func:`asyncore.loop`. 823 :param sockmap: A dictionary which will be used to hold 824 :class:`asyncore.dispatcher` instances used by 825 :func:`asyncore.loop`. This avoids changing the 826 :mod:`asyncore` module's global state. 827 """ 828 829 def __init__(self, addr, handler, poll_interval, sockmap): 830 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 831 decode_data=True) 832 self.port = self.socket.getsockname()[1] 833 self._handler = handler 834 self._thread = None 835 self._quit = False 836 self.poll_interval = poll_interval 837 838 def process_message(self, peer, mailfrom, rcpttos, data): 839 """ 840 Delegates to the handler passed in to the server's constructor. 841 842 Typically, this will be a test case method. 843 :param peer: The client (host, port) tuple. 844 :param mailfrom: The address of the sender. 845 :param rcpttos: The addresses of the recipients. 846 :param data: The message. 847 """ 848 self._handler(peer, mailfrom, rcpttos, data) 849 850 def start(self): 851 """ 852 Start the server running on a separate daemon thread. 853 """ 854 self._thread = t = threading.Thread(target=self.serve_forever, 855 args=(self.poll_interval,)) 856 t.daemon = True 857 t.start() 858 859 def serve_forever(self, poll_interval): 860 """ 861 Run the :mod:`asyncore` loop until normal termination 862 conditions arise. 863 :param poll_interval: The interval, in seconds, used in the underlying 864 :func:`select` or :func:`poll` call by 865 :func:`asyncore.loop`. 866 """ 867 while not self._quit: 868 asyncore.loop(poll_interval, map=self._map, count=1) 869 870 def stop(self): 871 """ 872 Stop the thread by closing the server instance. 873 Wait for the server thread to terminate. 874 """ 875 self._quit = True 876 threading_helper.join_thread(self._thread) 877 self._thread = None 878 self.close() 879 asyncore.close_all(map=self._map, ignore_all=True) 880 881 882class ControlMixin(object): 883 """ 884 This mixin is used to start a server on a separate thread, and 885 shut it down programmatically. Request handling is simplified - instead 886 of needing to derive a suitable RequestHandler subclass, you just 887 provide a callable which will be passed each received request to be 888 processed. 889 890 :param handler: A handler callable which will be called with a 891 single parameter - the request - in order to 892 process the request. This handler is called on the 893 server thread, effectively meaning that requests are 894 processed serially. While not quite web scale ;-), 895 this should be fine for testing applications. 896 :param poll_interval: The polling interval in seconds. 897 """ 898 def __init__(self, handler, poll_interval): 899 self._thread = None 900 self.poll_interval = poll_interval 901 self._handler = handler 902 self.ready = threading.Event() 903 904 def start(self): 905 """ 906 Create a daemon thread to run the server, and start it. 907 """ 908 self._thread = t = threading.Thread(target=self.serve_forever, 909 args=(self.poll_interval,)) 910 t.daemon = True 911 t.start() 912 913 def serve_forever(self, poll_interval): 914 """ 915 Run the server. Set the ready flag before entering the 916 service loop. 917 """ 918 self.ready.set() 919 super(ControlMixin, self).serve_forever(poll_interval) 920 921 def stop(self): 922 """ 923 Tell the server thread to stop, and wait for it to do so. 924 """ 925 self.shutdown() 926 if self._thread is not None: 927 threading_helper.join_thread(self._thread) 928 self._thread = None 929 self.server_close() 930 self.ready.clear() 931 932class TestHTTPServer(ControlMixin, HTTPServer): 933 """ 934 An HTTP server which is controllable using :class:`ControlMixin`. 935 936 :param addr: A tuple with the IP address and port to listen on. 937 :param handler: A handler callable which will be called with a 938 single parameter - the request - in order to 939 process the request. 940 :param poll_interval: The polling interval in seconds. 941 :param log: Pass ``True`` to enable log messages. 942 """ 943 def __init__(self, addr, handler, poll_interval=0.5, 944 log=False, sslctx=None): 945 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 946 def __getattr__(self, name, default=None): 947 if name.startswith('do_'): 948 return self.process_request 949 raise AttributeError(name) 950 951 def process_request(self): 952 self.server._handler(self) 953 954 def log_message(self, format, *args): 955 if log: 956 super(DelegatingHTTPRequestHandler, 957 self).log_message(format, *args) 958 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 959 ControlMixin.__init__(self, handler, poll_interval) 960 self.sslctx = sslctx 961 962 def get_request(self): 963 try: 964 sock, addr = self.socket.accept() 965 if self.sslctx: 966 sock = self.sslctx.wrap_socket(sock, server_side=True) 967 except OSError as e: 968 # socket errors are silenced by the caller, print them here 969 sys.stderr.write("Got an error:\n%s\n" % e) 970 raise 971 return sock, addr 972 973class TestTCPServer(ControlMixin, ThreadingTCPServer): 974 """ 975 A TCP server which is controllable using :class:`ControlMixin`. 976 977 :param addr: A tuple with the IP address and port to listen on. 978 :param handler: A handler callable which will be called with a single 979 parameter - the request - in order to process the request. 980 :param poll_interval: The polling interval in seconds. 981 :bind_and_activate: If True (the default), binds the server and starts it 982 listening. If False, you need to call 983 :meth:`server_bind` and :meth:`server_activate` at 984 some later time before calling :meth:`start`, so that 985 the server will set up the socket and listen on it. 986 """ 987 988 allow_reuse_address = True 989 990 def __init__(self, addr, handler, poll_interval=0.5, 991 bind_and_activate=True): 992 class DelegatingTCPRequestHandler(StreamRequestHandler): 993 994 def handle(self): 995 self.server._handler(self) 996 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 997 bind_and_activate) 998 ControlMixin.__init__(self, handler, poll_interval) 999 1000 def server_bind(self): 1001 super(TestTCPServer, self).server_bind() 1002 self.port = self.socket.getsockname()[1] 1003 1004class TestUDPServer(ControlMixin, ThreadingUDPServer): 1005 """ 1006 A UDP server which is controllable using :class:`ControlMixin`. 1007 1008 :param addr: A tuple with the IP address and port to listen on. 1009 :param handler: A handler callable which will be called with a 1010 single parameter - the request - in order to 1011 process the request. 1012 :param poll_interval: The polling interval for shutdown requests, 1013 in seconds. 1014 :bind_and_activate: If True (the default), binds the server and 1015 starts it listening. If False, you need to 1016 call :meth:`server_bind` and 1017 :meth:`server_activate` at some later time 1018 before calling :meth:`start`, so that the server will 1019 set up the socket and listen on it. 1020 """ 1021 def __init__(self, addr, handler, poll_interval=0.5, 1022 bind_and_activate=True): 1023 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1024 1025 def handle(self): 1026 self.server._handler(self) 1027 1028 def finish(self): 1029 data = self.wfile.getvalue() 1030 if data: 1031 try: 1032 super(DelegatingUDPRequestHandler, self).finish() 1033 except OSError: 1034 if not self.server._closed: 1035 raise 1036 1037 ThreadingUDPServer.__init__(self, addr, 1038 DelegatingUDPRequestHandler, 1039 bind_and_activate) 1040 ControlMixin.__init__(self, handler, poll_interval) 1041 self._closed = False 1042 1043 def server_bind(self): 1044 super(TestUDPServer, self).server_bind() 1045 self.port = self.socket.getsockname()[1] 1046 1047 def server_close(self): 1048 super(TestUDPServer, self).server_close() 1049 self._closed = True 1050 1051if hasattr(socket, "AF_UNIX"): 1052 class TestUnixStreamServer(TestTCPServer): 1053 address_family = socket.AF_UNIX 1054 1055 class TestUnixDatagramServer(TestUDPServer): 1056 address_family = socket.AF_UNIX 1057 1058# - end of server_helper section 1059 1060class SMTPHandlerTest(BaseTest): 1061 # bpo-14314, bpo-19665, bpo-34092: don't wait forever 1062 TIMEOUT = support.LONG_TIMEOUT 1063 1064 def test_basic(self): 1065 sockmap = {} 1066 server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001, 1067 sockmap) 1068 server.start() 1069 addr = (socket_helper.HOST, server.port) 1070 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1071 timeout=self.TIMEOUT) 1072 self.assertEqual(h.toaddrs, ['you']) 1073 self.messages = [] 1074 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1075 self.handled = threading.Event() 1076 h.handle(r) 1077 self.handled.wait(self.TIMEOUT) 1078 server.stop() 1079 self.assertTrue(self.handled.is_set()) 1080 self.assertEqual(len(self.messages), 1) 1081 peer, mailfrom, rcpttos, data = self.messages[0] 1082 self.assertEqual(mailfrom, 'me') 1083 self.assertEqual(rcpttos, ['you']) 1084 self.assertIn('\nSubject: Log\n', data) 1085 self.assertTrue(data.endswith('\n\nHello \u2713')) 1086 h.close() 1087 1088 def process_message(self, *args): 1089 self.messages.append(args) 1090 self.handled.set() 1091 1092class MemoryHandlerTest(BaseTest): 1093 1094 """Tests for the MemoryHandler.""" 1095 1096 # Do not bother with a logger name group. 1097 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1098 1099 def setUp(self): 1100 BaseTest.setUp(self) 1101 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1102 self.root_hdlr) 1103 self.mem_logger = logging.getLogger('mem') 1104 self.mem_logger.propagate = 0 1105 self.mem_logger.addHandler(self.mem_hdlr) 1106 1107 def tearDown(self): 1108 self.mem_hdlr.close() 1109 BaseTest.tearDown(self) 1110 1111 def test_flush(self): 1112 # The memory handler flushes to its target handler based on specific 1113 # criteria (message count and message level). 1114 self.mem_logger.debug(self.next_message()) 1115 self.assert_log_lines([]) 1116 self.mem_logger.info(self.next_message()) 1117 self.assert_log_lines([]) 1118 # This will flush because the level is >= logging.WARNING 1119 self.mem_logger.warning(self.next_message()) 1120 lines = [ 1121 ('DEBUG', '1'), 1122 ('INFO', '2'), 1123 ('WARNING', '3'), 1124 ] 1125 self.assert_log_lines(lines) 1126 for n in (4, 14): 1127 for i in range(9): 1128 self.mem_logger.debug(self.next_message()) 1129 self.assert_log_lines(lines) 1130 # This will flush because it's the 10th message since the last 1131 # flush. 1132 self.mem_logger.debug(self.next_message()) 1133 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1134 self.assert_log_lines(lines) 1135 1136 self.mem_logger.debug(self.next_message()) 1137 self.assert_log_lines(lines) 1138 1139 def test_flush_on_close(self): 1140 """ 1141 Test that the flush-on-close configuration works as expected. 1142 """ 1143 self.mem_logger.debug(self.next_message()) 1144 self.assert_log_lines([]) 1145 self.mem_logger.info(self.next_message()) 1146 self.assert_log_lines([]) 1147 self.mem_logger.removeHandler(self.mem_hdlr) 1148 # Default behaviour is to flush on close. Check that it happens. 1149 self.mem_hdlr.close() 1150 lines = [ 1151 ('DEBUG', '1'), 1152 ('INFO', '2'), 1153 ] 1154 self.assert_log_lines(lines) 1155 # Now configure for flushing not to be done on close. 1156 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1157 self.root_hdlr, 1158 False) 1159 self.mem_logger.addHandler(self.mem_hdlr) 1160 self.mem_logger.debug(self.next_message()) 1161 self.assert_log_lines(lines) # no change 1162 self.mem_logger.info(self.next_message()) 1163 self.assert_log_lines(lines) # no change 1164 self.mem_logger.removeHandler(self.mem_hdlr) 1165 self.mem_hdlr.close() 1166 # assert that no new lines have been added 1167 self.assert_log_lines(lines) # no change 1168 1169 def test_race_between_set_target_and_flush(self): 1170 class MockRaceConditionHandler: 1171 def __init__(self, mem_hdlr): 1172 self.mem_hdlr = mem_hdlr 1173 self.threads = [] 1174 1175 def removeTarget(self): 1176 self.mem_hdlr.setTarget(None) 1177 1178 def handle(self, msg): 1179 thread = threading.Thread(target=self.removeTarget) 1180 self.threads.append(thread) 1181 thread.start() 1182 1183 target = MockRaceConditionHandler(self.mem_hdlr) 1184 try: 1185 self.mem_hdlr.setTarget(target) 1186 1187 for _ in range(10): 1188 time.sleep(0.005) 1189 self.mem_logger.info("not flushed") 1190 self.mem_logger.warning("flushed") 1191 finally: 1192 for thread in target.threads: 1193 threading_helper.join_thread(thread) 1194 1195 1196class ExceptionFormatter(logging.Formatter): 1197 """A special exception formatter.""" 1198 def formatException(self, ei): 1199 return "Got a [%s]" % ei[0].__name__ 1200 1201 1202class ConfigFileTest(BaseTest): 1203 1204 """Reading logging config from a .ini-style config file.""" 1205 1206 check_no_resource_warning = warnings_helper.check_no_resource_warning 1207 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1208 1209 # config0 is a standard configuration. 1210 config0 = """ 1211 [loggers] 1212 keys=root 1213 1214 [handlers] 1215 keys=hand1 1216 1217 [formatters] 1218 keys=form1 1219 1220 [logger_root] 1221 level=WARNING 1222 handlers=hand1 1223 1224 [handler_hand1] 1225 class=StreamHandler 1226 level=NOTSET 1227 formatter=form1 1228 args=(sys.stdout,) 1229 1230 [formatter_form1] 1231 format=%(levelname)s ++ %(message)s 1232 datefmt= 1233 """ 1234 1235 # config1 adds a little to the standard configuration. 1236 config1 = """ 1237 [loggers] 1238 keys=root,parser 1239 1240 [handlers] 1241 keys=hand1 1242 1243 [formatters] 1244 keys=form1 1245 1246 [logger_root] 1247 level=WARNING 1248 handlers= 1249 1250 [logger_parser] 1251 level=DEBUG 1252 handlers=hand1 1253 propagate=1 1254 qualname=compiler.parser 1255 1256 [handler_hand1] 1257 class=StreamHandler 1258 level=NOTSET 1259 formatter=form1 1260 args=(sys.stdout,) 1261 1262 [formatter_form1] 1263 format=%(levelname)s ++ %(message)s 1264 datefmt= 1265 """ 1266 1267 # config1a moves the handler to the root. 1268 config1a = """ 1269 [loggers] 1270 keys=root,parser 1271 1272 [handlers] 1273 keys=hand1 1274 1275 [formatters] 1276 keys=form1 1277 1278 [logger_root] 1279 level=WARNING 1280 handlers=hand1 1281 1282 [logger_parser] 1283 level=DEBUG 1284 handlers= 1285 propagate=1 1286 qualname=compiler.parser 1287 1288 [handler_hand1] 1289 class=StreamHandler 1290 level=NOTSET 1291 formatter=form1 1292 args=(sys.stdout,) 1293 1294 [formatter_form1] 1295 format=%(levelname)s ++ %(message)s 1296 datefmt= 1297 """ 1298 1299 # config2 has a subtle configuration error that should be reported 1300 config2 = config1.replace("sys.stdout", "sys.stbout") 1301 1302 # config3 has a less subtle configuration error 1303 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1304 1305 # config4 specifies a custom formatter class to be loaded 1306 config4 = """ 1307 [loggers] 1308 keys=root 1309 1310 [handlers] 1311 keys=hand1 1312 1313 [formatters] 1314 keys=form1 1315 1316 [logger_root] 1317 level=NOTSET 1318 handlers=hand1 1319 1320 [handler_hand1] 1321 class=StreamHandler 1322 level=NOTSET 1323 formatter=form1 1324 args=(sys.stdout,) 1325 1326 [formatter_form1] 1327 class=""" + __name__ + """.ExceptionFormatter 1328 format=%(levelname)s:%(name)s:%(message)s 1329 datefmt= 1330 """ 1331 1332 # config5 specifies a custom handler class to be loaded 1333 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1334 1335 # config6 uses ', ' delimiters in the handlers and formatters sections 1336 config6 = """ 1337 [loggers] 1338 keys=root,parser 1339 1340 [handlers] 1341 keys=hand1, hand2 1342 1343 [formatters] 1344 keys=form1, form2 1345 1346 [logger_root] 1347 level=WARNING 1348 handlers= 1349 1350 [logger_parser] 1351 level=DEBUG 1352 handlers=hand1 1353 propagate=1 1354 qualname=compiler.parser 1355 1356 [handler_hand1] 1357 class=StreamHandler 1358 level=NOTSET 1359 formatter=form1 1360 args=(sys.stdout,) 1361 1362 [handler_hand2] 1363 class=StreamHandler 1364 level=NOTSET 1365 formatter=form1 1366 args=(sys.stderr,) 1367 1368 [formatter_form1] 1369 format=%(levelname)s ++ %(message)s 1370 datefmt= 1371 1372 [formatter_form2] 1373 format=%(message)s 1374 datefmt= 1375 """ 1376 1377 # config7 adds a compiler logger, and uses kwargs instead of args. 1378 config7 = """ 1379 [loggers] 1380 keys=root,parser,compiler 1381 1382 [handlers] 1383 keys=hand1 1384 1385 [formatters] 1386 keys=form1 1387 1388 [logger_root] 1389 level=WARNING 1390 handlers=hand1 1391 1392 [logger_compiler] 1393 level=DEBUG 1394 handlers= 1395 propagate=1 1396 qualname=compiler 1397 1398 [logger_parser] 1399 level=DEBUG 1400 handlers= 1401 propagate=1 1402 qualname=compiler.parser 1403 1404 [handler_hand1] 1405 class=StreamHandler 1406 level=NOTSET 1407 formatter=form1 1408 kwargs={'stream': sys.stdout,} 1409 1410 [formatter_form1] 1411 format=%(levelname)s ++ %(message)s 1412 datefmt= 1413 """ 1414 1415 # config 8, check for resource warning 1416 config8 = r""" 1417 [loggers] 1418 keys=root 1419 1420 [handlers] 1421 keys=file 1422 1423 [formatters] 1424 keys= 1425 1426 [logger_root] 1427 level=DEBUG 1428 handlers=file 1429 1430 [handler_file] 1431 class=FileHandler 1432 level=DEBUG 1433 args=("{tempfile}",) 1434 kwargs={{"encoding": "utf-8"}} 1435 """ 1436 1437 disable_test = """ 1438 [loggers] 1439 keys=root 1440 1441 [handlers] 1442 keys=screen 1443 1444 [formatters] 1445 keys= 1446 1447 [logger_root] 1448 level=DEBUG 1449 handlers=screen 1450 1451 [handler_screen] 1452 level=DEBUG 1453 class=StreamHandler 1454 args=(sys.stdout,) 1455 formatter= 1456 """ 1457 1458 def apply_config(self, conf, **kwargs): 1459 file = io.StringIO(textwrap.dedent(conf)) 1460 logging.config.fileConfig(file, encoding="utf-8", **kwargs) 1461 1462 def test_config0_ok(self): 1463 # A simple config file which overrides the default settings. 1464 with support.captured_stdout() as output: 1465 self.apply_config(self.config0) 1466 logger = logging.getLogger() 1467 # Won't output anything 1468 logger.info(self.next_message()) 1469 # Outputs a message 1470 logger.error(self.next_message()) 1471 self.assert_log_lines([ 1472 ('ERROR', '2'), 1473 ], stream=output) 1474 # Original logger output is empty. 1475 self.assert_log_lines([]) 1476 1477 def test_config0_using_cp_ok(self): 1478 # A simple config file which overrides the default settings. 1479 with support.captured_stdout() as output: 1480 file = io.StringIO(textwrap.dedent(self.config0)) 1481 cp = configparser.ConfigParser() 1482 cp.read_file(file) 1483 logging.config.fileConfig(cp) 1484 logger = logging.getLogger() 1485 # Won't output anything 1486 logger.info(self.next_message()) 1487 # Outputs a message 1488 logger.error(self.next_message()) 1489 self.assert_log_lines([ 1490 ('ERROR', '2'), 1491 ], stream=output) 1492 # Original logger output is empty. 1493 self.assert_log_lines([]) 1494 1495 def test_config1_ok(self, config=config1): 1496 # A config file defining a sub-parser as well. 1497 with support.captured_stdout() as output: 1498 self.apply_config(config) 1499 logger = logging.getLogger("compiler.parser") 1500 # Both will output a message 1501 logger.info(self.next_message()) 1502 logger.error(self.next_message()) 1503 self.assert_log_lines([ 1504 ('INFO', '1'), 1505 ('ERROR', '2'), 1506 ], stream=output) 1507 # Original logger output is empty. 1508 self.assert_log_lines([]) 1509 1510 def test_config2_failure(self): 1511 # A simple config file which overrides the default settings. 1512 self.assertRaises(Exception, self.apply_config, self.config2) 1513 1514 def test_config3_failure(self): 1515 # A simple config file which overrides the default settings. 1516 self.assertRaises(Exception, self.apply_config, self.config3) 1517 1518 def test_config4_ok(self): 1519 # A config file specifying a custom formatter class. 1520 with support.captured_stdout() as output: 1521 self.apply_config(self.config4) 1522 logger = logging.getLogger() 1523 try: 1524 raise RuntimeError() 1525 except RuntimeError: 1526 logging.exception("just testing") 1527 sys.stdout.seek(0) 1528 self.assertEqual(output.getvalue(), 1529 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1530 # Original logger output is empty 1531 self.assert_log_lines([]) 1532 1533 def test_config5_ok(self): 1534 self.test_config1_ok(config=self.config5) 1535 1536 def test_config6_ok(self): 1537 self.test_config1_ok(config=self.config6) 1538 1539 def test_config7_ok(self): 1540 with support.captured_stdout() as output: 1541 self.apply_config(self.config1a) 1542 logger = logging.getLogger("compiler.parser") 1543 # See issue #11424. compiler-hyphenated sorts 1544 # between compiler and compiler.xyz and this 1545 # was preventing compiler.xyz from being included 1546 # in the child loggers of compiler because of an 1547 # overzealous loop termination condition. 1548 hyphenated = logging.getLogger('compiler-hyphenated') 1549 # All will output a message 1550 logger.info(self.next_message()) 1551 logger.error(self.next_message()) 1552 hyphenated.critical(self.next_message()) 1553 self.assert_log_lines([ 1554 ('INFO', '1'), 1555 ('ERROR', '2'), 1556 ('CRITICAL', '3'), 1557 ], stream=output) 1558 # Original logger output is empty. 1559 self.assert_log_lines([]) 1560 with support.captured_stdout() as output: 1561 self.apply_config(self.config7) 1562 logger = logging.getLogger("compiler.parser") 1563 self.assertFalse(logger.disabled) 1564 # Both will output a message 1565 logger.info(self.next_message()) 1566 logger.error(self.next_message()) 1567 logger = logging.getLogger("compiler.lexer") 1568 # Both will output a message 1569 logger.info(self.next_message()) 1570 logger.error(self.next_message()) 1571 # Will not appear 1572 hyphenated.critical(self.next_message()) 1573 self.assert_log_lines([ 1574 ('INFO', '4'), 1575 ('ERROR', '5'), 1576 ('INFO', '6'), 1577 ('ERROR', '7'), 1578 ], stream=output) 1579 # Original logger output is empty. 1580 self.assert_log_lines([]) 1581 1582 def test_config8_ok(self): 1583 1584 def cleanup(h1, fn): 1585 h1.close() 1586 os.remove(fn) 1587 1588 with self.check_no_resource_warning(): 1589 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1590 os.close(fd) 1591 1592 # Replace single backslash with double backslash in windows 1593 # to avoid unicode error during string formatting 1594 if os.name == "nt": 1595 fn = fn.replace("\\", "\\\\") 1596 1597 config8 = self.config8.format(tempfile=fn) 1598 1599 self.apply_config(config8) 1600 self.apply_config(config8) 1601 1602 handler = logging.root.handlers[0] 1603 self.addCleanup(cleanup, handler, fn) 1604 1605 def test_logger_disabling(self): 1606 self.apply_config(self.disable_test) 1607 logger = logging.getLogger('some_pristine_logger') 1608 self.assertFalse(logger.disabled) 1609 self.apply_config(self.disable_test) 1610 self.assertTrue(logger.disabled) 1611 self.apply_config(self.disable_test, disable_existing_loggers=False) 1612 self.assertFalse(logger.disabled) 1613 1614 def test_config_set_handler_names(self): 1615 test_config = """ 1616 [loggers] 1617 keys=root 1618 1619 [handlers] 1620 keys=hand1 1621 1622 [formatters] 1623 keys=form1 1624 1625 [logger_root] 1626 handlers=hand1 1627 1628 [handler_hand1] 1629 class=StreamHandler 1630 formatter=form1 1631 1632 [formatter_form1] 1633 format=%(levelname)s ++ %(message)s 1634 """ 1635 self.apply_config(test_config) 1636 self.assertEqual(logging.getLogger().handlers[0].name, 'hand1') 1637 1638 def test_defaults_do_no_interpolation(self): 1639 """bpo-33802 defaults should not get interpolated""" 1640 ini = textwrap.dedent(""" 1641 [formatters] 1642 keys=default 1643 1644 [formatter_default] 1645 1646 [handlers] 1647 keys=console 1648 1649 [handler_console] 1650 class=logging.StreamHandler 1651 args=tuple() 1652 1653 [loggers] 1654 keys=root 1655 1656 [logger_root] 1657 formatter=default 1658 handlers=console 1659 """).strip() 1660 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1661 try: 1662 os.write(fd, ini.encode('ascii')) 1663 os.close(fd) 1664 logging.config.fileConfig( 1665 fn, 1666 encoding="utf-8", 1667 defaults=dict( 1668 version=1, 1669 disable_existing_loggers=False, 1670 formatters={ 1671 "generic": { 1672 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1673 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1674 "class": "logging.Formatter" 1675 }, 1676 }, 1677 ) 1678 ) 1679 finally: 1680 os.unlink(fn) 1681 1682 1683class SocketHandlerTest(BaseTest): 1684 1685 """Test for SocketHandler objects.""" 1686 1687 server_class = TestTCPServer 1688 address = ('localhost', 0) 1689 1690 def setUp(self): 1691 """Set up a TCP server to receive log messages, and a SocketHandler 1692 pointing to that server's address and port.""" 1693 BaseTest.setUp(self) 1694 # Issue #29177: deal with errors that happen during setup 1695 self.server = self.sock_hdlr = self.server_exception = None 1696 try: 1697 self.server = server = self.server_class(self.address, 1698 self.handle_socket, 0.01) 1699 server.start() 1700 # Uncomment next line to test error recovery in setUp() 1701 # raise OSError('dummy error raised') 1702 except OSError as e: 1703 self.server_exception = e 1704 return 1705 server.ready.wait() 1706 hcls = logging.handlers.SocketHandler 1707 if isinstance(server.server_address, tuple): 1708 self.sock_hdlr = hcls('localhost', server.port) 1709 else: 1710 self.sock_hdlr = hcls(server.server_address, None) 1711 self.log_output = '' 1712 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1713 self.root_logger.addHandler(self.sock_hdlr) 1714 self.handled = threading.Semaphore(0) 1715 1716 def tearDown(self): 1717 """Shutdown the TCP server.""" 1718 try: 1719 if self.sock_hdlr: 1720 self.root_logger.removeHandler(self.sock_hdlr) 1721 self.sock_hdlr.close() 1722 if self.server: 1723 self.server.stop() 1724 finally: 1725 BaseTest.tearDown(self) 1726 1727 def handle_socket(self, request): 1728 conn = request.connection 1729 while True: 1730 chunk = conn.recv(4) 1731 if len(chunk) < 4: 1732 break 1733 slen = struct.unpack(">L", chunk)[0] 1734 chunk = conn.recv(slen) 1735 while len(chunk) < slen: 1736 chunk = chunk + conn.recv(slen - len(chunk)) 1737 obj = pickle.loads(chunk) 1738 record = logging.makeLogRecord(obj) 1739 self.log_output += record.msg + '\n' 1740 self.handled.release() 1741 1742 def test_output(self): 1743 # The log message sent to the SocketHandler is properly received. 1744 if self.server_exception: 1745 self.skipTest(self.server_exception) 1746 logger = logging.getLogger("tcp") 1747 logger.error("spam") 1748 self.handled.acquire() 1749 logger.debug("eggs") 1750 self.handled.acquire() 1751 self.assertEqual(self.log_output, "spam\neggs\n") 1752 1753 def test_noserver(self): 1754 if self.server_exception: 1755 self.skipTest(self.server_exception) 1756 # Avoid timing-related failures due to SocketHandler's own hard-wired 1757 # one-second timeout on socket.create_connection() (issue #16264). 1758 self.sock_hdlr.retryStart = 2.5 1759 # Kill the server 1760 self.server.stop() 1761 # The logging call should try to connect, which should fail 1762 try: 1763 raise RuntimeError('Deliberate mistake') 1764 except RuntimeError: 1765 self.root_logger.exception('Never sent') 1766 self.root_logger.error('Never sent, either') 1767 now = time.time() 1768 self.assertGreater(self.sock_hdlr.retryTime, now) 1769 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1770 self.root_logger.error('Nor this') 1771 1772def _get_temp_domain_socket(): 1773 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1774 os.close(fd) 1775 # just need a name - file can't be present, or we'll get an 1776 # 'address already in use' error. 1777 os.remove(fn) 1778 return fn 1779 1780@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1781class UnixSocketHandlerTest(SocketHandlerTest): 1782 1783 """Test for SocketHandler with unix sockets.""" 1784 1785 if hasattr(socket, "AF_UNIX"): 1786 server_class = TestUnixStreamServer 1787 1788 def setUp(self): 1789 # override the definition in the base class 1790 self.address = _get_temp_domain_socket() 1791 SocketHandlerTest.setUp(self) 1792 1793 def tearDown(self): 1794 SocketHandlerTest.tearDown(self) 1795 os_helper.unlink(self.address) 1796 1797class DatagramHandlerTest(BaseTest): 1798 1799 """Test for DatagramHandler.""" 1800 1801 server_class = TestUDPServer 1802 address = ('localhost', 0) 1803 1804 def setUp(self): 1805 """Set up a UDP server to receive log messages, and a DatagramHandler 1806 pointing to that server's address and port.""" 1807 BaseTest.setUp(self) 1808 # Issue #29177: deal with errors that happen during setup 1809 self.server = self.sock_hdlr = self.server_exception = None 1810 try: 1811 self.server = server = self.server_class(self.address, 1812 self.handle_datagram, 0.01) 1813 server.start() 1814 # Uncomment next line to test error recovery in setUp() 1815 # raise OSError('dummy error raised') 1816 except OSError as e: 1817 self.server_exception = e 1818 return 1819 server.ready.wait() 1820 hcls = logging.handlers.DatagramHandler 1821 if isinstance(server.server_address, tuple): 1822 self.sock_hdlr = hcls('localhost', server.port) 1823 else: 1824 self.sock_hdlr = hcls(server.server_address, None) 1825 self.log_output = '' 1826 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1827 self.root_logger.addHandler(self.sock_hdlr) 1828 self.handled = threading.Event() 1829 1830 def tearDown(self): 1831 """Shutdown the UDP server.""" 1832 try: 1833 if self.server: 1834 self.server.stop() 1835 if self.sock_hdlr: 1836 self.root_logger.removeHandler(self.sock_hdlr) 1837 self.sock_hdlr.close() 1838 finally: 1839 BaseTest.tearDown(self) 1840 1841 def handle_datagram(self, request): 1842 slen = struct.pack('>L', 0) # length of prefix 1843 packet = request.packet[len(slen):] 1844 obj = pickle.loads(packet) 1845 record = logging.makeLogRecord(obj) 1846 self.log_output += record.msg + '\n' 1847 self.handled.set() 1848 1849 def test_output(self): 1850 # The log message sent to the DatagramHandler is properly received. 1851 if self.server_exception: 1852 self.skipTest(self.server_exception) 1853 logger = logging.getLogger("udp") 1854 logger.error("spam") 1855 self.handled.wait() 1856 self.handled.clear() 1857 logger.error("eggs") 1858 self.handled.wait() 1859 self.assertEqual(self.log_output, "spam\neggs\n") 1860 1861@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1862class UnixDatagramHandlerTest(DatagramHandlerTest): 1863 1864 """Test for DatagramHandler using Unix sockets.""" 1865 1866 if hasattr(socket, "AF_UNIX"): 1867 server_class = TestUnixDatagramServer 1868 1869 def setUp(self): 1870 # override the definition in the base class 1871 self.address = _get_temp_domain_socket() 1872 DatagramHandlerTest.setUp(self) 1873 1874 def tearDown(self): 1875 DatagramHandlerTest.tearDown(self) 1876 os_helper.unlink(self.address) 1877 1878class SysLogHandlerTest(BaseTest): 1879 1880 """Test for SysLogHandler using UDP.""" 1881 1882 server_class = TestUDPServer 1883 address = ('localhost', 0) 1884 1885 def setUp(self): 1886 """Set up a UDP server to receive log messages, and a SysLogHandler 1887 pointing to that server's address and port.""" 1888 BaseTest.setUp(self) 1889 # Issue #29177: deal with errors that happen during setup 1890 self.server = self.sl_hdlr = self.server_exception = None 1891 try: 1892 self.server = server = self.server_class(self.address, 1893 self.handle_datagram, 0.01) 1894 server.start() 1895 # Uncomment next line to test error recovery in setUp() 1896 # raise OSError('dummy error raised') 1897 except OSError as e: 1898 self.server_exception = e 1899 return 1900 server.ready.wait() 1901 hcls = logging.handlers.SysLogHandler 1902 if isinstance(server.server_address, tuple): 1903 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1904 else: 1905 self.sl_hdlr = hcls(server.server_address) 1906 self.log_output = '' 1907 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1908 self.root_logger.addHandler(self.sl_hdlr) 1909 self.handled = threading.Event() 1910 1911 def tearDown(self): 1912 """Shutdown the server.""" 1913 try: 1914 if self.server: 1915 self.server.stop() 1916 if self.sl_hdlr: 1917 self.root_logger.removeHandler(self.sl_hdlr) 1918 self.sl_hdlr.close() 1919 finally: 1920 BaseTest.tearDown(self) 1921 1922 def handle_datagram(self, request): 1923 self.log_output = request.packet 1924 self.handled.set() 1925 1926 def test_output(self): 1927 if self.server_exception: 1928 self.skipTest(self.server_exception) 1929 # The log message sent to the SysLogHandler is properly received. 1930 logger = logging.getLogger("slh") 1931 logger.error("sp\xe4m") 1932 self.handled.wait() 1933 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1934 self.handled.clear() 1935 self.sl_hdlr.append_nul = False 1936 logger.error("sp\xe4m") 1937 self.handled.wait() 1938 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1939 self.handled.clear() 1940 self.sl_hdlr.ident = "h\xe4m-" 1941 logger.error("sp\xe4m") 1942 self.handled.wait() 1943 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1944 1945 def test_udp_reconnection(self): 1946 logger = logging.getLogger("slh") 1947 self.sl_hdlr.close() 1948 self.handled.clear() 1949 logger.error("sp\xe4m") 1950 self.handled.wait(0.1) 1951 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1952 1953@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1954class UnixSysLogHandlerTest(SysLogHandlerTest): 1955 1956 """Test for SysLogHandler with Unix sockets.""" 1957 1958 if hasattr(socket, "AF_UNIX"): 1959 server_class = TestUnixDatagramServer 1960 1961 def setUp(self): 1962 # override the definition in the base class 1963 self.address = _get_temp_domain_socket() 1964 SysLogHandlerTest.setUp(self) 1965 1966 def tearDown(self): 1967 SysLogHandlerTest.tearDown(self) 1968 os_helper.unlink(self.address) 1969 1970@unittest.skipUnless(socket_helper.IPV6_ENABLED, 1971 'IPv6 support required for this test.') 1972class IPv6SysLogHandlerTest(SysLogHandlerTest): 1973 1974 """Test for SysLogHandler with IPv6 host.""" 1975 1976 server_class = TestUDPServer 1977 address = ('::1', 0) 1978 1979 def setUp(self): 1980 self.server_class.address_family = socket.AF_INET6 1981 super(IPv6SysLogHandlerTest, self).setUp() 1982 1983 def tearDown(self): 1984 self.server_class.address_family = socket.AF_INET 1985 super(IPv6SysLogHandlerTest, self).tearDown() 1986 1987class HTTPHandlerTest(BaseTest): 1988 """Test for HTTPHandler.""" 1989 1990 def setUp(self): 1991 """Set up an HTTP server to receive log messages, and a HTTPHandler 1992 pointing to that server's address and port.""" 1993 BaseTest.setUp(self) 1994 self.handled = threading.Event() 1995 1996 def handle_request(self, request): 1997 self.command = request.command 1998 self.log_data = urlparse(request.path) 1999 if self.command == 'POST': 2000 try: 2001 rlen = int(request.headers['Content-Length']) 2002 self.post_data = request.rfile.read(rlen) 2003 except: 2004 self.post_data = None 2005 request.send_response(200) 2006 request.end_headers() 2007 self.handled.set() 2008 2009 def test_output(self): 2010 # The log message sent to the HTTPHandler is properly received. 2011 logger = logging.getLogger("http") 2012 root_logger = self.root_logger 2013 root_logger.removeHandler(self.root_logger.handlers[0]) 2014 for secure in (False, True): 2015 addr = ('localhost', 0) 2016 if secure: 2017 try: 2018 import ssl 2019 except ImportError: 2020 sslctx = None 2021 else: 2022 here = os.path.dirname(__file__) 2023 localhost_cert = os.path.join(here, "keycert.pem") 2024 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 2025 sslctx.load_cert_chain(localhost_cert) 2026 2027 context = ssl.create_default_context(cafile=localhost_cert) 2028 else: 2029 sslctx = None 2030 context = None 2031 self.server = server = TestHTTPServer(addr, self.handle_request, 2032 0.01, sslctx=sslctx) 2033 server.start() 2034 server.ready.wait() 2035 host = 'localhost:%d' % server.server_port 2036 secure_client = secure and sslctx 2037 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 2038 secure=secure_client, 2039 context=context, 2040 credentials=('foo', 'bar')) 2041 self.log_data = None 2042 root_logger.addHandler(self.h_hdlr) 2043 2044 for method in ('GET', 'POST'): 2045 self.h_hdlr.method = method 2046 self.handled.clear() 2047 msg = "sp\xe4m" 2048 logger.error(msg) 2049 self.handled.wait() 2050 self.assertEqual(self.log_data.path, '/frob') 2051 self.assertEqual(self.command, method) 2052 if method == 'GET': 2053 d = parse_qs(self.log_data.query) 2054 else: 2055 d = parse_qs(self.post_data.decode('utf-8')) 2056 self.assertEqual(d['name'], ['http']) 2057 self.assertEqual(d['funcName'], ['test_output']) 2058 self.assertEqual(d['msg'], [msg]) 2059 2060 self.server.stop() 2061 self.root_logger.removeHandler(self.h_hdlr) 2062 self.h_hdlr.close() 2063 2064class MemoryTest(BaseTest): 2065 2066 """Test memory persistence of logger objects.""" 2067 2068 def setUp(self): 2069 """Create a dict to remember potentially destroyed objects.""" 2070 BaseTest.setUp(self) 2071 self._survivors = {} 2072 2073 def _watch_for_survival(self, *args): 2074 """Watch the given objects for survival, by creating weakrefs to 2075 them.""" 2076 for obj in args: 2077 key = id(obj), repr(obj) 2078 self._survivors[key] = weakref.ref(obj) 2079 2080 def _assertTruesurvival(self): 2081 """Assert that all objects watched for survival have survived.""" 2082 # Trigger cycle breaking. 2083 gc.collect() 2084 dead = [] 2085 for (id_, repr_), ref in self._survivors.items(): 2086 if ref() is None: 2087 dead.append(repr_) 2088 if dead: 2089 self.fail("%d objects should have survived " 2090 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2091 2092 def test_persistent_loggers(self): 2093 # Logger objects are persistent and retain their configuration, even 2094 # if visible references are destroyed. 2095 self.root_logger.setLevel(logging.INFO) 2096 foo = logging.getLogger("foo") 2097 self._watch_for_survival(foo) 2098 foo.setLevel(logging.DEBUG) 2099 self.root_logger.debug(self.next_message()) 2100 foo.debug(self.next_message()) 2101 self.assert_log_lines([ 2102 ('foo', 'DEBUG', '2'), 2103 ]) 2104 del foo 2105 # foo has survived. 2106 self._assertTruesurvival() 2107 # foo has retained its settings. 2108 bar = logging.getLogger("foo") 2109 bar.debug(self.next_message()) 2110 self.assert_log_lines([ 2111 ('foo', 'DEBUG', '2'), 2112 ('foo', 'DEBUG', '3'), 2113 ]) 2114 2115 2116class EncodingTest(BaseTest): 2117 def test_encoding_plain_file(self): 2118 # In Python 2.x, a plain file object is treated as having no encoding. 2119 log = logging.getLogger("test") 2120 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2121 os.close(fd) 2122 # the non-ascii data we write to the log. 2123 data = "foo\x80" 2124 try: 2125 handler = logging.FileHandler(fn, encoding="utf-8") 2126 log.addHandler(handler) 2127 try: 2128 # write non-ascii data to the log. 2129 log.warning(data) 2130 finally: 2131 log.removeHandler(handler) 2132 handler.close() 2133 # check we wrote exactly those bytes, ignoring trailing \n etc 2134 f = open(fn, encoding="utf-8") 2135 try: 2136 self.assertEqual(f.read().rstrip(), data) 2137 finally: 2138 f.close() 2139 finally: 2140 if os.path.isfile(fn): 2141 os.remove(fn) 2142 2143 def test_encoding_cyrillic_unicode(self): 2144 log = logging.getLogger("test") 2145 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2146 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2147 # Ensure it's written in a Cyrillic encoding 2148 writer_class = codecs.getwriter('cp1251') 2149 writer_class.encoding = 'cp1251' 2150 stream = io.BytesIO() 2151 writer = writer_class(stream, 'strict') 2152 handler = logging.StreamHandler(writer) 2153 log.addHandler(handler) 2154 try: 2155 log.warning(message) 2156 finally: 2157 log.removeHandler(handler) 2158 handler.close() 2159 # check we wrote exactly those bytes, ignoring trailing \n etc 2160 s = stream.getvalue() 2161 # Compare against what the data should be when encoded in CP-1251 2162 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2163 2164 2165class WarningsTest(BaseTest): 2166 2167 def test_warnings(self): 2168 with warnings.catch_warnings(): 2169 logging.captureWarnings(True) 2170 self.addCleanup(logging.captureWarnings, False) 2171 warnings.filterwarnings("always", category=UserWarning) 2172 stream = io.StringIO() 2173 h = logging.StreamHandler(stream) 2174 logger = logging.getLogger("py.warnings") 2175 logger.addHandler(h) 2176 warnings.warn("I'm warning you...") 2177 logger.removeHandler(h) 2178 s = stream.getvalue() 2179 h.close() 2180 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2181 2182 # See if an explicit file uses the original implementation 2183 a_file = io.StringIO() 2184 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2185 a_file, "Dummy line") 2186 s = a_file.getvalue() 2187 a_file.close() 2188 self.assertEqual(s, 2189 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2190 2191 def test_warnings_no_handlers(self): 2192 with warnings.catch_warnings(): 2193 logging.captureWarnings(True) 2194 self.addCleanup(logging.captureWarnings, False) 2195 2196 # confirm our assumption: no loggers are set 2197 logger = logging.getLogger("py.warnings") 2198 self.assertEqual(logger.handlers, []) 2199 2200 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2201 self.assertEqual(len(logger.handlers), 1) 2202 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2203 2204 2205def formatFunc(format, datefmt=None): 2206 return logging.Formatter(format, datefmt) 2207 2208class myCustomFormatter: 2209 def __init__(self, fmt, datefmt=None): 2210 pass 2211 2212def handlerFunc(): 2213 return logging.StreamHandler() 2214 2215class CustomHandler(logging.StreamHandler): 2216 pass 2217 2218class ConfigDictTest(BaseTest): 2219 2220 """Reading logging config from a dictionary.""" 2221 2222 check_no_resource_warning = warnings_helper.check_no_resource_warning 2223 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2224 2225 # config0 is a standard configuration. 2226 config0 = { 2227 'version': 1, 2228 'formatters': { 2229 'form1' : { 2230 'format' : '%(levelname)s ++ %(message)s', 2231 }, 2232 }, 2233 'handlers' : { 2234 'hand1' : { 2235 'class' : 'logging.StreamHandler', 2236 'formatter' : 'form1', 2237 'level' : 'NOTSET', 2238 'stream' : 'ext://sys.stdout', 2239 }, 2240 }, 2241 'root' : { 2242 'level' : 'WARNING', 2243 'handlers' : ['hand1'], 2244 }, 2245 } 2246 2247 # config1 adds a little to the standard configuration. 2248 config1 = { 2249 'version': 1, 2250 'formatters': { 2251 'form1' : { 2252 'format' : '%(levelname)s ++ %(message)s', 2253 }, 2254 }, 2255 'handlers' : { 2256 'hand1' : { 2257 'class' : 'logging.StreamHandler', 2258 'formatter' : 'form1', 2259 'level' : 'NOTSET', 2260 'stream' : 'ext://sys.stdout', 2261 }, 2262 }, 2263 'loggers' : { 2264 'compiler.parser' : { 2265 'level' : 'DEBUG', 2266 'handlers' : ['hand1'], 2267 }, 2268 }, 2269 'root' : { 2270 'level' : 'WARNING', 2271 }, 2272 } 2273 2274 # config1a moves the handler to the root. Used with config8a 2275 config1a = { 2276 'version': 1, 2277 'formatters': { 2278 'form1' : { 2279 'format' : '%(levelname)s ++ %(message)s', 2280 }, 2281 }, 2282 'handlers' : { 2283 'hand1' : { 2284 'class' : 'logging.StreamHandler', 2285 'formatter' : 'form1', 2286 'level' : 'NOTSET', 2287 'stream' : 'ext://sys.stdout', 2288 }, 2289 }, 2290 'loggers' : { 2291 'compiler.parser' : { 2292 'level' : 'DEBUG', 2293 }, 2294 }, 2295 'root' : { 2296 'level' : 'WARNING', 2297 'handlers' : ['hand1'], 2298 }, 2299 } 2300 2301 # config2 has a subtle configuration error that should be reported 2302 config2 = { 2303 'version': 1, 2304 'formatters': { 2305 'form1' : { 2306 'format' : '%(levelname)s ++ %(message)s', 2307 }, 2308 }, 2309 'handlers' : { 2310 'hand1' : { 2311 'class' : 'logging.StreamHandler', 2312 'formatter' : 'form1', 2313 'level' : 'NOTSET', 2314 'stream' : 'ext://sys.stdbout', 2315 }, 2316 }, 2317 'loggers' : { 2318 'compiler.parser' : { 2319 'level' : 'DEBUG', 2320 'handlers' : ['hand1'], 2321 }, 2322 }, 2323 'root' : { 2324 'level' : 'WARNING', 2325 }, 2326 } 2327 2328 # As config1 but with a misspelt level on a handler 2329 config2a = { 2330 'version': 1, 2331 'formatters': { 2332 'form1' : { 2333 'format' : '%(levelname)s ++ %(message)s', 2334 }, 2335 }, 2336 'handlers' : { 2337 'hand1' : { 2338 'class' : 'logging.StreamHandler', 2339 'formatter' : 'form1', 2340 'level' : 'NTOSET', 2341 'stream' : 'ext://sys.stdout', 2342 }, 2343 }, 2344 'loggers' : { 2345 'compiler.parser' : { 2346 'level' : 'DEBUG', 2347 'handlers' : ['hand1'], 2348 }, 2349 }, 2350 'root' : { 2351 'level' : 'WARNING', 2352 }, 2353 } 2354 2355 2356 # As config1 but with a misspelt level on a logger 2357 config2b = { 2358 'version': 1, 2359 'formatters': { 2360 'form1' : { 2361 'format' : '%(levelname)s ++ %(message)s', 2362 }, 2363 }, 2364 'handlers' : { 2365 'hand1' : { 2366 'class' : 'logging.StreamHandler', 2367 'formatter' : 'form1', 2368 'level' : 'NOTSET', 2369 'stream' : 'ext://sys.stdout', 2370 }, 2371 }, 2372 'loggers' : { 2373 'compiler.parser' : { 2374 'level' : 'DEBUG', 2375 'handlers' : ['hand1'], 2376 }, 2377 }, 2378 'root' : { 2379 'level' : 'WRANING', 2380 }, 2381 } 2382 2383 # config3 has a less subtle configuration error 2384 config3 = { 2385 'version': 1, 2386 'formatters': { 2387 'form1' : { 2388 'format' : '%(levelname)s ++ %(message)s', 2389 }, 2390 }, 2391 'handlers' : { 2392 'hand1' : { 2393 'class' : 'logging.StreamHandler', 2394 'formatter' : 'misspelled_name', 2395 'level' : 'NOTSET', 2396 'stream' : 'ext://sys.stdout', 2397 }, 2398 }, 2399 'loggers' : { 2400 'compiler.parser' : { 2401 'level' : 'DEBUG', 2402 'handlers' : ['hand1'], 2403 }, 2404 }, 2405 'root' : { 2406 'level' : 'WARNING', 2407 }, 2408 } 2409 2410 # config4 specifies a custom formatter class to be loaded 2411 config4 = { 2412 'version': 1, 2413 'formatters': { 2414 'form1' : { 2415 '()' : __name__ + '.ExceptionFormatter', 2416 'format' : '%(levelname)s:%(name)s:%(message)s', 2417 }, 2418 }, 2419 'handlers' : { 2420 'hand1' : { 2421 'class' : 'logging.StreamHandler', 2422 'formatter' : 'form1', 2423 'level' : 'NOTSET', 2424 'stream' : 'ext://sys.stdout', 2425 }, 2426 }, 2427 'root' : { 2428 'level' : 'NOTSET', 2429 'handlers' : ['hand1'], 2430 }, 2431 } 2432 2433 # As config4 but using an actual callable rather than a string 2434 config4a = { 2435 'version': 1, 2436 'formatters': { 2437 'form1' : { 2438 '()' : ExceptionFormatter, 2439 'format' : '%(levelname)s:%(name)s:%(message)s', 2440 }, 2441 'form2' : { 2442 '()' : __name__ + '.formatFunc', 2443 'format' : '%(levelname)s:%(name)s:%(message)s', 2444 }, 2445 'form3' : { 2446 '()' : formatFunc, 2447 'format' : '%(levelname)s:%(name)s:%(message)s', 2448 }, 2449 }, 2450 'handlers' : { 2451 'hand1' : { 2452 'class' : 'logging.StreamHandler', 2453 'formatter' : 'form1', 2454 'level' : 'NOTSET', 2455 'stream' : 'ext://sys.stdout', 2456 }, 2457 'hand2' : { 2458 '()' : handlerFunc, 2459 }, 2460 }, 2461 'root' : { 2462 'level' : 'NOTSET', 2463 'handlers' : ['hand1'], 2464 }, 2465 } 2466 2467 # config5 specifies a custom handler class to be loaded 2468 config5 = { 2469 'version': 1, 2470 'formatters': { 2471 'form1' : { 2472 'format' : '%(levelname)s ++ %(message)s', 2473 }, 2474 }, 2475 'handlers' : { 2476 'hand1' : { 2477 'class' : __name__ + '.CustomHandler', 2478 'formatter' : 'form1', 2479 'level' : 'NOTSET', 2480 'stream' : 'ext://sys.stdout', 2481 }, 2482 }, 2483 'loggers' : { 2484 'compiler.parser' : { 2485 'level' : 'DEBUG', 2486 'handlers' : ['hand1'], 2487 }, 2488 }, 2489 'root' : { 2490 'level' : 'WARNING', 2491 }, 2492 } 2493 2494 # config6 specifies a custom handler class to be loaded 2495 # but has bad arguments 2496 config6 = { 2497 'version': 1, 2498 'formatters': { 2499 'form1' : { 2500 'format' : '%(levelname)s ++ %(message)s', 2501 }, 2502 }, 2503 'handlers' : { 2504 'hand1' : { 2505 'class' : __name__ + '.CustomHandler', 2506 'formatter' : 'form1', 2507 'level' : 'NOTSET', 2508 'stream' : 'ext://sys.stdout', 2509 '9' : 'invalid parameter name', 2510 }, 2511 }, 2512 'loggers' : { 2513 'compiler.parser' : { 2514 'level' : 'DEBUG', 2515 'handlers' : ['hand1'], 2516 }, 2517 }, 2518 'root' : { 2519 'level' : 'WARNING', 2520 }, 2521 } 2522 2523 # config 7 does not define compiler.parser but defines compiler.lexer 2524 # so compiler.parser should be disabled after applying it 2525 config7 = { 2526 'version': 1, 2527 'formatters': { 2528 'form1' : { 2529 'format' : '%(levelname)s ++ %(message)s', 2530 }, 2531 }, 2532 'handlers' : { 2533 'hand1' : { 2534 'class' : 'logging.StreamHandler', 2535 'formatter' : 'form1', 2536 'level' : 'NOTSET', 2537 'stream' : 'ext://sys.stdout', 2538 }, 2539 }, 2540 'loggers' : { 2541 'compiler.lexer' : { 2542 'level' : 'DEBUG', 2543 'handlers' : ['hand1'], 2544 }, 2545 }, 2546 'root' : { 2547 'level' : 'WARNING', 2548 }, 2549 } 2550 2551 # config8 defines both compiler and compiler.lexer 2552 # so compiler.parser should not be disabled (since 2553 # compiler is defined) 2554 config8 = { 2555 'version': 1, 2556 'disable_existing_loggers' : False, 2557 'formatters': { 2558 'form1' : { 2559 'format' : '%(levelname)s ++ %(message)s', 2560 }, 2561 }, 2562 'handlers' : { 2563 'hand1' : { 2564 'class' : 'logging.StreamHandler', 2565 'formatter' : 'form1', 2566 'level' : 'NOTSET', 2567 'stream' : 'ext://sys.stdout', 2568 }, 2569 }, 2570 'loggers' : { 2571 'compiler' : { 2572 'level' : 'DEBUG', 2573 'handlers' : ['hand1'], 2574 }, 2575 'compiler.lexer' : { 2576 }, 2577 }, 2578 'root' : { 2579 'level' : 'WARNING', 2580 }, 2581 } 2582 2583 # config8a disables existing loggers 2584 config8a = { 2585 'version': 1, 2586 'disable_existing_loggers' : True, 2587 'formatters': { 2588 'form1' : { 2589 'format' : '%(levelname)s ++ %(message)s', 2590 }, 2591 }, 2592 'handlers' : { 2593 'hand1' : { 2594 'class' : 'logging.StreamHandler', 2595 'formatter' : 'form1', 2596 'level' : 'NOTSET', 2597 'stream' : 'ext://sys.stdout', 2598 }, 2599 }, 2600 'loggers' : { 2601 'compiler' : { 2602 'level' : 'DEBUG', 2603 'handlers' : ['hand1'], 2604 }, 2605 'compiler.lexer' : { 2606 }, 2607 }, 2608 'root' : { 2609 'level' : 'WARNING', 2610 }, 2611 } 2612 2613 config9 = { 2614 'version': 1, 2615 'formatters': { 2616 'form1' : { 2617 'format' : '%(levelname)s ++ %(message)s', 2618 }, 2619 }, 2620 'handlers' : { 2621 'hand1' : { 2622 'class' : 'logging.StreamHandler', 2623 'formatter' : 'form1', 2624 'level' : 'WARNING', 2625 'stream' : 'ext://sys.stdout', 2626 }, 2627 }, 2628 'loggers' : { 2629 'compiler.parser' : { 2630 'level' : 'WARNING', 2631 'handlers' : ['hand1'], 2632 }, 2633 }, 2634 'root' : { 2635 'level' : 'NOTSET', 2636 }, 2637 } 2638 2639 config9a = { 2640 'version': 1, 2641 'incremental' : True, 2642 'handlers' : { 2643 'hand1' : { 2644 'level' : 'WARNING', 2645 }, 2646 }, 2647 'loggers' : { 2648 'compiler.parser' : { 2649 'level' : 'INFO', 2650 }, 2651 }, 2652 } 2653 2654 config9b = { 2655 'version': 1, 2656 'incremental' : True, 2657 'handlers' : { 2658 'hand1' : { 2659 'level' : 'INFO', 2660 }, 2661 }, 2662 'loggers' : { 2663 'compiler.parser' : { 2664 'level' : 'INFO', 2665 }, 2666 }, 2667 } 2668 2669 # As config1 but with a filter added 2670 config10 = { 2671 'version': 1, 2672 'formatters': { 2673 'form1' : { 2674 'format' : '%(levelname)s ++ %(message)s', 2675 }, 2676 }, 2677 'filters' : { 2678 'filt1' : { 2679 'name' : 'compiler.parser', 2680 }, 2681 }, 2682 'handlers' : { 2683 'hand1' : { 2684 'class' : 'logging.StreamHandler', 2685 'formatter' : 'form1', 2686 'level' : 'NOTSET', 2687 'stream' : 'ext://sys.stdout', 2688 'filters' : ['filt1'], 2689 }, 2690 }, 2691 'loggers' : { 2692 'compiler.parser' : { 2693 'level' : 'DEBUG', 2694 'filters' : ['filt1'], 2695 }, 2696 }, 2697 'root' : { 2698 'level' : 'WARNING', 2699 'handlers' : ['hand1'], 2700 }, 2701 } 2702 2703 # As config1 but using cfg:// references 2704 config11 = { 2705 'version': 1, 2706 'true_formatters': { 2707 'form1' : { 2708 'format' : '%(levelname)s ++ %(message)s', 2709 }, 2710 }, 2711 'handler_configs': { 2712 'hand1' : { 2713 'class' : 'logging.StreamHandler', 2714 'formatter' : 'form1', 2715 'level' : 'NOTSET', 2716 'stream' : 'ext://sys.stdout', 2717 }, 2718 }, 2719 'formatters' : 'cfg://true_formatters', 2720 'handlers' : { 2721 'hand1' : 'cfg://handler_configs[hand1]', 2722 }, 2723 'loggers' : { 2724 'compiler.parser' : { 2725 'level' : 'DEBUG', 2726 'handlers' : ['hand1'], 2727 }, 2728 }, 2729 'root' : { 2730 'level' : 'WARNING', 2731 }, 2732 } 2733 2734 # As config11 but missing the version key 2735 config12 = { 2736 'true_formatters': { 2737 'form1' : { 2738 'format' : '%(levelname)s ++ %(message)s', 2739 }, 2740 }, 2741 'handler_configs': { 2742 'hand1' : { 2743 'class' : 'logging.StreamHandler', 2744 'formatter' : 'form1', 2745 'level' : 'NOTSET', 2746 'stream' : 'ext://sys.stdout', 2747 }, 2748 }, 2749 'formatters' : 'cfg://true_formatters', 2750 'handlers' : { 2751 'hand1' : 'cfg://handler_configs[hand1]', 2752 }, 2753 'loggers' : { 2754 'compiler.parser' : { 2755 'level' : 'DEBUG', 2756 'handlers' : ['hand1'], 2757 }, 2758 }, 2759 'root' : { 2760 'level' : 'WARNING', 2761 }, 2762 } 2763 2764 # As config11 but using an unsupported version 2765 config13 = { 2766 'version': 2, 2767 'true_formatters': { 2768 'form1' : { 2769 'format' : '%(levelname)s ++ %(message)s', 2770 }, 2771 }, 2772 'handler_configs': { 2773 'hand1' : { 2774 'class' : 'logging.StreamHandler', 2775 'formatter' : 'form1', 2776 'level' : 'NOTSET', 2777 'stream' : 'ext://sys.stdout', 2778 }, 2779 }, 2780 'formatters' : 'cfg://true_formatters', 2781 'handlers' : { 2782 'hand1' : 'cfg://handler_configs[hand1]', 2783 }, 2784 'loggers' : { 2785 'compiler.parser' : { 2786 'level' : 'DEBUG', 2787 'handlers' : ['hand1'], 2788 }, 2789 }, 2790 'root' : { 2791 'level' : 'WARNING', 2792 }, 2793 } 2794 2795 # As config0, but with properties 2796 config14 = { 2797 'version': 1, 2798 'formatters': { 2799 'form1' : { 2800 'format' : '%(levelname)s ++ %(message)s', 2801 }, 2802 }, 2803 'handlers' : { 2804 'hand1' : { 2805 'class' : 'logging.StreamHandler', 2806 'formatter' : 'form1', 2807 'level' : 'NOTSET', 2808 'stream' : 'ext://sys.stdout', 2809 '.': { 2810 'foo': 'bar', 2811 'terminator': '!\n', 2812 } 2813 }, 2814 }, 2815 'root' : { 2816 'level' : 'WARNING', 2817 'handlers' : ['hand1'], 2818 }, 2819 } 2820 2821 out_of_order = { 2822 "version": 1, 2823 "formatters": { 2824 "mySimpleFormatter": { 2825 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2826 "style": "$" 2827 } 2828 }, 2829 "handlers": { 2830 "fileGlobal": { 2831 "class": "logging.StreamHandler", 2832 "level": "DEBUG", 2833 "formatter": "mySimpleFormatter" 2834 }, 2835 "bufferGlobal": { 2836 "class": "logging.handlers.MemoryHandler", 2837 "capacity": 5, 2838 "formatter": "mySimpleFormatter", 2839 "target": "fileGlobal", 2840 "level": "DEBUG" 2841 } 2842 }, 2843 "loggers": { 2844 "mymodule": { 2845 "level": "DEBUG", 2846 "handlers": ["bufferGlobal"], 2847 "propagate": "true" 2848 } 2849 } 2850 } 2851 2852 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 2853 custom_formatter_class_validate = { 2854 'version': 1, 2855 'formatters': { 2856 'form1': { 2857 '()': __name__ + '.ExceptionFormatter', 2858 'format': '%(levelname)s:%(name)s:%(message)s', 2859 'validate': False, 2860 }, 2861 }, 2862 'handlers' : { 2863 'hand1' : { 2864 'class': 'logging.StreamHandler', 2865 'formatter': 'form1', 2866 'level': 'NOTSET', 2867 'stream': 'ext://sys.stdout', 2868 }, 2869 }, 2870 "loggers": { 2871 "my_test_logger_custom_formatter": { 2872 "level": "DEBUG", 2873 "handlers": ["hand1"], 2874 "propagate": "true" 2875 } 2876 } 2877 } 2878 2879 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 2880 custom_formatter_class_validate2 = { 2881 'version': 1, 2882 'formatters': { 2883 'form1': { 2884 'class': __name__ + '.ExceptionFormatter', 2885 'format': '%(levelname)s:%(name)s:%(message)s', 2886 'validate': False, 2887 }, 2888 }, 2889 'handlers' : { 2890 'hand1' : { 2891 'class': 'logging.StreamHandler', 2892 'formatter': 'form1', 2893 'level': 'NOTSET', 2894 'stream': 'ext://sys.stdout', 2895 }, 2896 }, 2897 "loggers": { 2898 "my_test_logger_custom_formatter": { 2899 "level": "DEBUG", 2900 "handlers": ["hand1"], 2901 "propagate": "true" 2902 } 2903 } 2904 } 2905 2906 # Configuration with custom class that is not inherited from logging.Formatter 2907 custom_formatter_class_validate3 = { 2908 'version': 1, 2909 'formatters': { 2910 'form1': { 2911 'class': __name__ + '.myCustomFormatter', 2912 'format': '%(levelname)s:%(name)s:%(message)s', 2913 'validate': False, 2914 }, 2915 }, 2916 'handlers' : { 2917 'hand1' : { 2918 'class': 'logging.StreamHandler', 2919 'formatter': 'form1', 2920 'level': 'NOTSET', 2921 'stream': 'ext://sys.stdout', 2922 }, 2923 }, 2924 "loggers": { 2925 "my_test_logger_custom_formatter": { 2926 "level": "DEBUG", 2927 "handlers": ["hand1"], 2928 "propagate": "true" 2929 } 2930 } 2931 } 2932 2933 # Configuration with custom function and 'validate' set to False 2934 custom_formatter_with_function = { 2935 'version': 1, 2936 'formatters': { 2937 'form1': { 2938 '()': formatFunc, 2939 'format': '%(levelname)s:%(name)s:%(message)s', 2940 'validate': False, 2941 }, 2942 }, 2943 'handlers' : { 2944 'hand1' : { 2945 'class': 'logging.StreamHandler', 2946 'formatter': 'form1', 2947 'level': 'NOTSET', 2948 'stream': 'ext://sys.stdout', 2949 }, 2950 }, 2951 "loggers": { 2952 "my_test_logger_custom_formatter": { 2953 "level": "DEBUG", 2954 "handlers": ["hand1"], 2955 "propagate": "true" 2956 } 2957 } 2958 } 2959 2960 def apply_config(self, conf): 2961 logging.config.dictConfig(conf) 2962 2963 def test_config0_ok(self): 2964 # A simple config which overrides the default settings. 2965 with support.captured_stdout() as output: 2966 self.apply_config(self.config0) 2967 logger = logging.getLogger() 2968 # Won't output anything 2969 logger.info(self.next_message()) 2970 # Outputs a message 2971 logger.error(self.next_message()) 2972 self.assert_log_lines([ 2973 ('ERROR', '2'), 2974 ], stream=output) 2975 # Original logger output is empty. 2976 self.assert_log_lines([]) 2977 2978 def test_config1_ok(self, config=config1): 2979 # A config defining a sub-parser as well. 2980 with support.captured_stdout() as output: 2981 self.apply_config(config) 2982 logger = logging.getLogger("compiler.parser") 2983 # Both will output a message 2984 logger.info(self.next_message()) 2985 logger.error(self.next_message()) 2986 self.assert_log_lines([ 2987 ('INFO', '1'), 2988 ('ERROR', '2'), 2989 ], stream=output) 2990 # Original logger output is empty. 2991 self.assert_log_lines([]) 2992 2993 def test_config2_failure(self): 2994 # A simple config which overrides the default settings. 2995 self.assertRaises(Exception, self.apply_config, self.config2) 2996 2997 def test_config2a_failure(self): 2998 # A simple config which overrides the default settings. 2999 self.assertRaises(Exception, self.apply_config, self.config2a) 3000 3001 def test_config2b_failure(self): 3002 # A simple config which overrides the default settings. 3003 self.assertRaises(Exception, self.apply_config, self.config2b) 3004 3005 def test_config3_failure(self): 3006 # A simple config which overrides the default settings. 3007 self.assertRaises(Exception, self.apply_config, self.config3) 3008 3009 def test_config4_ok(self): 3010 # A config specifying a custom formatter class. 3011 with support.captured_stdout() as output: 3012 self.apply_config(self.config4) 3013 #logger = logging.getLogger() 3014 try: 3015 raise RuntimeError() 3016 except RuntimeError: 3017 logging.exception("just testing") 3018 sys.stdout.seek(0) 3019 self.assertEqual(output.getvalue(), 3020 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3021 # Original logger output is empty 3022 self.assert_log_lines([]) 3023 3024 def test_config4a_ok(self): 3025 # A config specifying a custom formatter class. 3026 with support.captured_stdout() as output: 3027 self.apply_config(self.config4a) 3028 #logger = logging.getLogger() 3029 try: 3030 raise RuntimeError() 3031 except RuntimeError: 3032 logging.exception("just testing") 3033 sys.stdout.seek(0) 3034 self.assertEqual(output.getvalue(), 3035 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3036 # Original logger output is empty 3037 self.assert_log_lines([]) 3038 3039 def test_config5_ok(self): 3040 self.test_config1_ok(config=self.config5) 3041 3042 def test_config6_failure(self): 3043 self.assertRaises(Exception, self.apply_config, self.config6) 3044 3045 def test_config7_ok(self): 3046 with support.captured_stdout() as output: 3047 self.apply_config(self.config1) 3048 logger = logging.getLogger("compiler.parser") 3049 # Both will output a message 3050 logger.info(self.next_message()) 3051 logger.error(self.next_message()) 3052 self.assert_log_lines([ 3053 ('INFO', '1'), 3054 ('ERROR', '2'), 3055 ], stream=output) 3056 # Original logger output is empty. 3057 self.assert_log_lines([]) 3058 with support.captured_stdout() as output: 3059 self.apply_config(self.config7) 3060 logger = logging.getLogger("compiler.parser") 3061 self.assertTrue(logger.disabled) 3062 logger = logging.getLogger("compiler.lexer") 3063 # Both will output a message 3064 logger.info(self.next_message()) 3065 logger.error(self.next_message()) 3066 self.assert_log_lines([ 3067 ('INFO', '3'), 3068 ('ERROR', '4'), 3069 ], stream=output) 3070 # Original logger output is empty. 3071 self.assert_log_lines([]) 3072 3073 # Same as test_config_7_ok but don't disable old loggers. 3074 def test_config_8_ok(self): 3075 with support.captured_stdout() as output: 3076 self.apply_config(self.config1) 3077 logger = logging.getLogger("compiler.parser") 3078 # All will output a message 3079 logger.info(self.next_message()) 3080 logger.error(self.next_message()) 3081 self.assert_log_lines([ 3082 ('INFO', '1'), 3083 ('ERROR', '2'), 3084 ], stream=output) 3085 # Original logger output is empty. 3086 self.assert_log_lines([]) 3087 with support.captured_stdout() as output: 3088 self.apply_config(self.config8) 3089 logger = logging.getLogger("compiler.parser") 3090 self.assertFalse(logger.disabled) 3091 # Both will output a message 3092 logger.info(self.next_message()) 3093 logger.error(self.next_message()) 3094 logger = logging.getLogger("compiler.lexer") 3095 # Both will output a message 3096 logger.info(self.next_message()) 3097 logger.error(self.next_message()) 3098 self.assert_log_lines([ 3099 ('INFO', '3'), 3100 ('ERROR', '4'), 3101 ('INFO', '5'), 3102 ('ERROR', '6'), 3103 ], stream=output) 3104 # Original logger output is empty. 3105 self.assert_log_lines([]) 3106 3107 def test_config_8a_ok(self): 3108 with support.captured_stdout() as output: 3109 self.apply_config(self.config1a) 3110 logger = logging.getLogger("compiler.parser") 3111 # See issue #11424. compiler-hyphenated sorts 3112 # between compiler and compiler.xyz and this 3113 # was preventing compiler.xyz from being included 3114 # in the child loggers of compiler because of an 3115 # overzealous loop termination condition. 3116 hyphenated = logging.getLogger('compiler-hyphenated') 3117 # All will output a message 3118 logger.info(self.next_message()) 3119 logger.error(self.next_message()) 3120 hyphenated.critical(self.next_message()) 3121 self.assert_log_lines([ 3122 ('INFO', '1'), 3123 ('ERROR', '2'), 3124 ('CRITICAL', '3'), 3125 ], stream=output) 3126 # Original logger output is empty. 3127 self.assert_log_lines([]) 3128 with support.captured_stdout() as output: 3129 self.apply_config(self.config8a) 3130 logger = logging.getLogger("compiler.parser") 3131 self.assertFalse(logger.disabled) 3132 # Both will output a message 3133 logger.info(self.next_message()) 3134 logger.error(self.next_message()) 3135 logger = logging.getLogger("compiler.lexer") 3136 # Both will output a message 3137 logger.info(self.next_message()) 3138 logger.error(self.next_message()) 3139 # Will not appear 3140 hyphenated.critical(self.next_message()) 3141 self.assert_log_lines([ 3142 ('INFO', '4'), 3143 ('ERROR', '5'), 3144 ('INFO', '6'), 3145 ('ERROR', '7'), 3146 ], stream=output) 3147 # Original logger output is empty. 3148 self.assert_log_lines([]) 3149 3150 def test_config_9_ok(self): 3151 with support.captured_stdout() as output: 3152 self.apply_config(self.config9) 3153 logger = logging.getLogger("compiler.parser") 3154 # Nothing will be output since both handler and logger are set to WARNING 3155 logger.info(self.next_message()) 3156 self.assert_log_lines([], stream=output) 3157 self.apply_config(self.config9a) 3158 # Nothing will be output since handler is still set to WARNING 3159 logger.info(self.next_message()) 3160 self.assert_log_lines([], stream=output) 3161 self.apply_config(self.config9b) 3162 # Message should now be output 3163 logger.info(self.next_message()) 3164 self.assert_log_lines([ 3165 ('INFO', '3'), 3166 ], stream=output) 3167 3168 def test_config_10_ok(self): 3169 with support.captured_stdout() as output: 3170 self.apply_config(self.config10) 3171 logger = logging.getLogger("compiler.parser") 3172 logger.warning(self.next_message()) 3173 logger = logging.getLogger('compiler') 3174 # Not output, because filtered 3175 logger.warning(self.next_message()) 3176 logger = logging.getLogger('compiler.lexer') 3177 # Not output, because filtered 3178 logger.warning(self.next_message()) 3179 logger = logging.getLogger("compiler.parser.codegen") 3180 # Output, as not filtered 3181 logger.error(self.next_message()) 3182 self.assert_log_lines([ 3183 ('WARNING', '1'), 3184 ('ERROR', '4'), 3185 ], stream=output) 3186 3187 def test_config11_ok(self): 3188 self.test_config1_ok(self.config11) 3189 3190 def test_config12_failure(self): 3191 self.assertRaises(Exception, self.apply_config, self.config12) 3192 3193 def test_config13_failure(self): 3194 self.assertRaises(Exception, self.apply_config, self.config13) 3195 3196 def test_config14_ok(self): 3197 with support.captured_stdout() as output: 3198 self.apply_config(self.config14) 3199 h = logging._handlers['hand1'] 3200 self.assertEqual(h.foo, 'bar') 3201 self.assertEqual(h.terminator, '!\n') 3202 logging.warning('Exclamation') 3203 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3204 3205 def test_config15_ok(self): 3206 3207 def cleanup(h1, fn): 3208 h1.close() 3209 os.remove(fn) 3210 3211 with self.check_no_resource_warning(): 3212 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3213 os.close(fd) 3214 3215 config = { 3216 "version": 1, 3217 "handlers": { 3218 "file": { 3219 "class": "logging.FileHandler", 3220 "filename": fn, 3221 "encoding": "utf-8", 3222 } 3223 }, 3224 "root": { 3225 "handlers": ["file"] 3226 } 3227 } 3228 3229 self.apply_config(config) 3230 self.apply_config(config) 3231 3232 handler = logging.root.handlers[0] 3233 self.addCleanup(cleanup, handler, fn) 3234 3235 def setup_via_listener(self, text, verify=None): 3236 text = text.encode("utf-8") 3237 # Ask for a randomly assigned port (by using port 0) 3238 t = logging.config.listen(0, verify) 3239 t.start() 3240 t.ready.wait() 3241 # Now get the port allocated 3242 port = t.port 3243 t.ready.clear() 3244 try: 3245 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3246 sock.settimeout(2.0) 3247 sock.connect(('localhost', port)) 3248 3249 slen = struct.pack('>L', len(text)) 3250 s = slen + text 3251 sentsofar = 0 3252 left = len(s) 3253 while left > 0: 3254 sent = sock.send(s[sentsofar:]) 3255 sentsofar += sent 3256 left -= sent 3257 sock.close() 3258 finally: 3259 t.ready.wait(2.0) 3260 logging.config.stopListening() 3261 threading_helper.join_thread(t) 3262 3263 def test_listen_config_10_ok(self): 3264 with support.captured_stdout() as output: 3265 self.setup_via_listener(json.dumps(self.config10)) 3266 logger = logging.getLogger("compiler.parser") 3267 logger.warning(self.next_message()) 3268 logger = logging.getLogger('compiler') 3269 # Not output, because filtered 3270 logger.warning(self.next_message()) 3271 logger = logging.getLogger('compiler.lexer') 3272 # Not output, because filtered 3273 logger.warning(self.next_message()) 3274 logger = logging.getLogger("compiler.parser.codegen") 3275 # Output, as not filtered 3276 logger.error(self.next_message()) 3277 self.assert_log_lines([ 3278 ('WARNING', '1'), 3279 ('ERROR', '4'), 3280 ], stream=output) 3281 3282 def test_listen_config_1_ok(self): 3283 with support.captured_stdout() as output: 3284 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3285 logger = logging.getLogger("compiler.parser") 3286 # Both will output a message 3287 logger.info(self.next_message()) 3288 logger.error(self.next_message()) 3289 self.assert_log_lines([ 3290 ('INFO', '1'), 3291 ('ERROR', '2'), 3292 ], stream=output) 3293 # Original logger output is empty. 3294 self.assert_log_lines([]) 3295 3296 def test_listen_verify(self): 3297 3298 def verify_fail(stuff): 3299 return None 3300 3301 def verify_reverse(stuff): 3302 return stuff[::-1] 3303 3304 logger = logging.getLogger("compiler.parser") 3305 to_send = textwrap.dedent(ConfigFileTest.config1) 3306 # First, specify a verification function that will fail. 3307 # We expect to see no output, since our configuration 3308 # never took effect. 3309 with support.captured_stdout() as output: 3310 self.setup_via_listener(to_send, verify_fail) 3311 # Both will output a message 3312 logger.info(self.next_message()) 3313 logger.error(self.next_message()) 3314 self.assert_log_lines([], stream=output) 3315 # Original logger output has the stuff we logged. 3316 self.assert_log_lines([ 3317 ('INFO', '1'), 3318 ('ERROR', '2'), 3319 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3320 3321 # Now, perform no verification. Our configuration 3322 # should take effect. 3323 3324 with support.captured_stdout() as output: 3325 self.setup_via_listener(to_send) # no verify callable specified 3326 logger = logging.getLogger("compiler.parser") 3327 # Both will output a message 3328 logger.info(self.next_message()) 3329 logger.error(self.next_message()) 3330 self.assert_log_lines([ 3331 ('INFO', '3'), 3332 ('ERROR', '4'), 3333 ], stream=output) 3334 # Original logger output still has the stuff we logged before. 3335 self.assert_log_lines([ 3336 ('INFO', '1'), 3337 ('ERROR', '2'), 3338 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3339 3340 # Now, perform verification which transforms the bytes. 3341 3342 with support.captured_stdout() as output: 3343 self.setup_via_listener(to_send[::-1], verify_reverse) 3344 logger = logging.getLogger("compiler.parser") 3345 # Both will output a message 3346 logger.info(self.next_message()) 3347 logger.error(self.next_message()) 3348 self.assert_log_lines([ 3349 ('INFO', '5'), 3350 ('ERROR', '6'), 3351 ], stream=output) 3352 # Original logger output still has the stuff we logged before. 3353 self.assert_log_lines([ 3354 ('INFO', '1'), 3355 ('ERROR', '2'), 3356 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3357 3358 def test_out_of_order(self): 3359 self.assertRaises(ValueError, self.apply_config, self.out_of_order) 3360 3361 def test_out_of_order_with_dollar_style(self): 3362 config = copy.deepcopy(self.out_of_order) 3363 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3364 3365 self.apply_config(config) 3366 handler = logging.getLogger('mymodule').handlers[0] 3367 self.assertIsInstance(handler.target, logging.Handler) 3368 self.assertIsInstance(handler.formatter._style, 3369 logging.StringTemplateStyle) 3370 3371 def test_custom_formatter_class_with_validate(self): 3372 self.apply_config(self.custom_formatter_class_validate) 3373 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3374 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3375 3376 def test_custom_formatter_class_with_validate2(self): 3377 self.apply_config(self.custom_formatter_class_validate2) 3378 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3379 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3380 3381 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3382 config = self.custom_formatter_class_validate.copy() 3383 config['formatters']['form1']['style'] = "$" 3384 3385 # Exception should not be raise as we have configured 'validate' to False 3386 self.apply_config(config) 3387 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3388 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3389 3390 def test_custom_formatter_class_with_validate3(self): 3391 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3392 3393 def test_custom_formatter_function_with_validate(self): 3394 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3395 3396 def test_baseconfig(self): 3397 d = { 3398 'atuple': (1, 2, 3), 3399 'alist': ['a', 'b', 'c'], 3400 'adict': {'d': 'e', 'f': 3 }, 3401 'nest1': ('g', ('h', 'i'), 'j'), 3402 'nest2': ['k', ['l', 'm'], 'n'], 3403 'nest3': ['o', 'cfg://alist', 'p'], 3404 } 3405 bc = logging.config.BaseConfigurator(d) 3406 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3407 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3408 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3409 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3410 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3411 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3412 v = bc.convert('cfg://nest3') 3413 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3414 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3415 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3416 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3417 3418 def test_namedtuple(self): 3419 # see bpo-39142 3420 from collections import namedtuple 3421 3422 class MyHandler(logging.StreamHandler): 3423 def __init__(self, resource, *args, **kwargs): 3424 super().__init__(*args, **kwargs) 3425 self.resource: namedtuple = resource 3426 3427 def emit(self, record): 3428 record.msg += f' {self.resource.type}' 3429 return super().emit(record) 3430 3431 Resource = namedtuple('Resource', ['type', 'labels']) 3432 resource = Resource(type='my_type', labels=['a']) 3433 3434 config = { 3435 'version': 1, 3436 'handlers': { 3437 'myhandler': { 3438 '()': MyHandler, 3439 'resource': resource 3440 } 3441 }, 3442 'root': {'level': 'INFO', 'handlers': ['myhandler']}, 3443 } 3444 with support.captured_stderr() as stderr: 3445 self.apply_config(config) 3446 logging.info('some log') 3447 self.assertEqual(stderr.getvalue(), 'some log my_type\n') 3448 3449class ManagerTest(BaseTest): 3450 def test_manager_loggerclass(self): 3451 logged = [] 3452 3453 class MyLogger(logging.Logger): 3454 def _log(self, level, msg, args, exc_info=None, extra=None): 3455 logged.append(msg) 3456 3457 man = logging.Manager(None) 3458 self.assertRaises(TypeError, man.setLoggerClass, int) 3459 man.setLoggerClass(MyLogger) 3460 logger = man.getLogger('test') 3461 logger.warning('should appear in logged') 3462 logging.warning('should not appear in logged') 3463 3464 self.assertEqual(logged, ['should appear in logged']) 3465 3466 def test_set_log_record_factory(self): 3467 man = logging.Manager(None) 3468 expected = object() 3469 man.setLogRecordFactory(expected) 3470 self.assertEqual(man.logRecordFactory, expected) 3471 3472class ChildLoggerTest(BaseTest): 3473 def test_child_loggers(self): 3474 r = logging.getLogger() 3475 l1 = logging.getLogger('abc') 3476 l2 = logging.getLogger('def.ghi') 3477 c1 = r.getChild('xyz') 3478 c2 = r.getChild('uvw.xyz') 3479 self.assertIs(c1, logging.getLogger('xyz')) 3480 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3481 c1 = l1.getChild('def') 3482 c2 = c1.getChild('ghi') 3483 c3 = l1.getChild('def.ghi') 3484 self.assertIs(c1, logging.getLogger('abc.def')) 3485 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3486 self.assertIs(c2, c3) 3487 3488 3489class DerivedLogRecord(logging.LogRecord): 3490 pass 3491 3492class LogRecordFactoryTest(BaseTest): 3493 3494 def setUp(self): 3495 class CheckingFilter(logging.Filter): 3496 def __init__(self, cls): 3497 self.cls = cls 3498 3499 def filter(self, record): 3500 t = type(record) 3501 if t is not self.cls: 3502 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3503 self.cls) 3504 raise TypeError(msg) 3505 return True 3506 3507 BaseTest.setUp(self) 3508 self.filter = CheckingFilter(DerivedLogRecord) 3509 self.root_logger.addFilter(self.filter) 3510 self.orig_factory = logging.getLogRecordFactory() 3511 3512 def tearDown(self): 3513 self.root_logger.removeFilter(self.filter) 3514 BaseTest.tearDown(self) 3515 logging.setLogRecordFactory(self.orig_factory) 3516 3517 def test_logrecord_class(self): 3518 self.assertRaises(TypeError, self.root_logger.warning, 3519 self.next_message()) 3520 logging.setLogRecordFactory(DerivedLogRecord) 3521 self.root_logger.error(self.next_message()) 3522 self.assert_log_lines([ 3523 ('root', 'ERROR', '2'), 3524 ]) 3525 3526 3527class QueueHandlerTest(BaseTest): 3528 # Do not bother with a logger name group. 3529 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3530 3531 def setUp(self): 3532 BaseTest.setUp(self) 3533 self.queue = queue.Queue(-1) 3534 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3535 self.name = 'que' 3536 self.que_logger = logging.getLogger('que') 3537 self.que_logger.propagate = False 3538 self.que_logger.setLevel(logging.WARNING) 3539 self.que_logger.addHandler(self.que_hdlr) 3540 3541 def tearDown(self): 3542 self.que_hdlr.close() 3543 BaseTest.tearDown(self) 3544 3545 def test_queue_handler(self): 3546 self.que_logger.debug(self.next_message()) 3547 self.assertRaises(queue.Empty, self.queue.get_nowait) 3548 self.que_logger.info(self.next_message()) 3549 self.assertRaises(queue.Empty, self.queue.get_nowait) 3550 msg = self.next_message() 3551 self.que_logger.warning(msg) 3552 data = self.queue.get_nowait() 3553 self.assertTrue(isinstance(data, logging.LogRecord)) 3554 self.assertEqual(data.name, self.que_logger.name) 3555 self.assertEqual((data.msg, data.args), (msg, None)) 3556 3557 def test_formatting(self): 3558 msg = self.next_message() 3559 levelname = logging.getLevelName(logging.WARNING) 3560 log_format_str = '{name} -> {levelname}: {message}' 3561 formatted_msg = log_format_str.format(name=self.name, 3562 levelname=levelname, message=msg) 3563 formatter = logging.Formatter(self.log_format) 3564 self.que_hdlr.setFormatter(formatter) 3565 self.que_logger.warning(msg) 3566 log_record = self.queue.get_nowait() 3567 self.assertEqual(formatted_msg, log_record.msg) 3568 self.assertEqual(formatted_msg, log_record.message) 3569 3570 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3571 'logging.handlers.QueueListener required for this test') 3572 def test_queue_listener(self): 3573 handler = TestHandler(support.Matcher()) 3574 listener = logging.handlers.QueueListener(self.queue, handler) 3575 listener.start() 3576 try: 3577 self.que_logger.warning(self.next_message()) 3578 self.que_logger.error(self.next_message()) 3579 self.que_logger.critical(self.next_message()) 3580 finally: 3581 listener.stop() 3582 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3583 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3584 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3585 handler.close() 3586 3587 # Now test with respect_handler_level set 3588 3589 handler = TestHandler(support.Matcher()) 3590 handler.setLevel(logging.CRITICAL) 3591 listener = logging.handlers.QueueListener(self.queue, handler, 3592 respect_handler_level=True) 3593 listener.start() 3594 try: 3595 self.que_logger.warning(self.next_message()) 3596 self.que_logger.error(self.next_message()) 3597 self.que_logger.critical(self.next_message()) 3598 finally: 3599 listener.stop() 3600 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3601 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3602 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3603 handler.close() 3604 3605 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3606 'logging.handlers.QueueListener required for this test') 3607 def test_queue_listener_with_StreamHandler(self): 3608 # Test that traceback only appends once (bpo-34334). 3609 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3610 listener.start() 3611 try: 3612 1 / 0 3613 except ZeroDivisionError as e: 3614 exc = e 3615 self.que_logger.exception(self.next_message(), exc_info=exc) 3616 listener.stop() 3617 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3618 3619 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3620 'logging.handlers.QueueListener required for this test') 3621 def test_queue_listener_with_multiple_handlers(self): 3622 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 3623 self.que_hdlr.setFormatter(self.root_formatter) 3624 self.que_logger.addHandler(self.root_hdlr) 3625 3626 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 3627 listener.start() 3628 self.que_logger.error("error") 3629 listener.stop() 3630 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 3631 3632if hasattr(logging.handlers, 'QueueListener'): 3633 import multiprocessing 3634 from unittest.mock import patch 3635 3636 class QueueListenerTest(BaseTest): 3637 """ 3638 Tests based on patch submitted for issue #27930. Ensure that 3639 QueueListener handles all log messages. 3640 """ 3641 3642 repeat = 20 3643 3644 @staticmethod 3645 def setup_and_log(log_queue, ident): 3646 """ 3647 Creates a logger with a QueueHandler that logs to a queue read by a 3648 QueueListener. Starts the listener, logs five messages, and stops 3649 the listener. 3650 """ 3651 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3652 logger.setLevel(logging.DEBUG) 3653 handler = logging.handlers.QueueHandler(log_queue) 3654 logger.addHandler(handler) 3655 listener = logging.handlers.QueueListener(log_queue) 3656 listener.start() 3657 3658 logger.info('one') 3659 logger.info('two') 3660 logger.info('three') 3661 logger.info('four') 3662 logger.info('five') 3663 3664 listener.stop() 3665 logger.removeHandler(handler) 3666 handler.close() 3667 3668 @patch.object(logging.handlers.QueueListener, 'handle') 3669 def test_handle_called_with_queue_queue(self, mock_handle): 3670 for i in range(self.repeat): 3671 log_queue = queue.Queue() 3672 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3673 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3674 'correct number of handled log messages') 3675 3676 @patch.object(logging.handlers.QueueListener, 'handle') 3677 def test_handle_called_with_mp_queue(self, mock_handle): 3678 # bpo-28668: The multiprocessing (mp) module is not functional 3679 # when the mp.synchronize module cannot be imported. 3680 support.skip_if_broken_multiprocessing_synchronize() 3681 for i in range(self.repeat): 3682 log_queue = multiprocessing.Queue() 3683 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3684 log_queue.close() 3685 log_queue.join_thread() 3686 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3687 'correct number of handled log messages') 3688 3689 @staticmethod 3690 def get_all_from_queue(log_queue): 3691 try: 3692 while True: 3693 yield log_queue.get_nowait() 3694 except queue.Empty: 3695 return [] 3696 3697 def test_no_messages_in_queue_after_stop(self): 3698 """ 3699 Five messages are logged then the QueueListener is stopped. This 3700 test then gets everything off the queue. Failure of this test 3701 indicates that messages were not registered on the queue until 3702 _after_ the QueueListener stopped. 3703 """ 3704 # bpo-28668: The multiprocessing (mp) module is not functional 3705 # when the mp.synchronize module cannot be imported. 3706 support.skip_if_broken_multiprocessing_synchronize() 3707 for i in range(self.repeat): 3708 queue = multiprocessing.Queue() 3709 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3710 # time.sleep(1) 3711 items = list(self.get_all_from_queue(queue)) 3712 queue.close() 3713 queue.join_thread() 3714 3715 expected = [[], [logging.handlers.QueueListener._sentinel]] 3716 self.assertIn(items, expected, 3717 'Found unexpected messages in queue: %s' % ( 3718 [m.msg if isinstance(m, logging.LogRecord) 3719 else m for m in items])) 3720 3721 def test_calls_task_done_after_stop(self): 3722 # Issue 36813: Make sure queue.join does not deadlock. 3723 log_queue = queue.Queue() 3724 listener = logging.handlers.QueueListener(log_queue) 3725 listener.start() 3726 listener.stop() 3727 with self.assertRaises(ValueError): 3728 # Make sure all tasks are done and .join won't block. 3729 log_queue.task_done() 3730 3731 3732ZERO = datetime.timedelta(0) 3733 3734class UTC(datetime.tzinfo): 3735 def utcoffset(self, dt): 3736 return ZERO 3737 3738 dst = utcoffset 3739 3740 def tzname(self, dt): 3741 return 'UTC' 3742 3743utc = UTC() 3744 3745class AssertErrorMessage: 3746 3747 def assert_error_message(self, exception, message, *args, **kwargs): 3748 try: 3749 self.assertRaises((), *args, **kwargs) 3750 except exception as e: 3751 self.assertEqual(message, str(e)) 3752 3753class FormatterTest(unittest.TestCase, AssertErrorMessage): 3754 def setUp(self): 3755 self.common = { 3756 'name': 'formatter.test', 3757 'level': logging.DEBUG, 3758 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3759 'lineno': 42, 3760 'exc_info': None, 3761 'func': None, 3762 'msg': 'Message with %d %s', 3763 'args': (2, 'placeholders'), 3764 } 3765 self.variants = { 3766 'custom': { 3767 'custom': 1234 3768 } 3769 } 3770 3771 def get_record(self, name=None): 3772 result = dict(self.common) 3773 if name is not None: 3774 result.update(self.variants[name]) 3775 return logging.makeLogRecord(result) 3776 3777 def test_percent(self): 3778 # Test %-formatting 3779 r = self.get_record() 3780 f = logging.Formatter('${%(message)s}') 3781 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3782 f = logging.Formatter('%(random)s') 3783 self.assertRaises(ValueError, f.format, r) 3784 self.assertFalse(f.usesTime()) 3785 f = logging.Formatter('%(asctime)s') 3786 self.assertTrue(f.usesTime()) 3787 f = logging.Formatter('%(asctime)-15s') 3788 self.assertTrue(f.usesTime()) 3789 f = logging.Formatter('%(asctime)#15s') 3790 self.assertTrue(f.usesTime()) 3791 3792 def test_braces(self): 3793 # Test {}-formatting 3794 r = self.get_record() 3795 f = logging.Formatter('$%{message}%$', style='{') 3796 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3797 f = logging.Formatter('{random}', style='{') 3798 self.assertRaises(ValueError, f.format, r) 3799 f = logging.Formatter("{message}", style='{') 3800 self.assertFalse(f.usesTime()) 3801 f = logging.Formatter('{asctime}', style='{') 3802 self.assertTrue(f.usesTime()) 3803 f = logging.Formatter('{asctime!s:15}', style='{') 3804 self.assertTrue(f.usesTime()) 3805 f = logging.Formatter('{asctime:15}', style='{') 3806 self.assertTrue(f.usesTime()) 3807 3808 def test_dollars(self): 3809 # Test $-formatting 3810 r = self.get_record() 3811 f = logging.Formatter('${message}', style='$') 3812 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3813 f = logging.Formatter('$message', style='$') 3814 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3815 f = logging.Formatter('$$%${message}%$$', style='$') 3816 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3817 f = logging.Formatter('${random}', style='$') 3818 self.assertRaises(ValueError, f.format, r) 3819 self.assertFalse(f.usesTime()) 3820 f = logging.Formatter('${asctime}', style='$') 3821 self.assertTrue(f.usesTime()) 3822 f = logging.Formatter('$asctime', style='$') 3823 self.assertTrue(f.usesTime()) 3824 f = logging.Formatter('${message}', style='$') 3825 self.assertFalse(f.usesTime()) 3826 f = logging.Formatter('${asctime}--', style='$') 3827 self.assertTrue(f.usesTime()) 3828 3829 def test_format_validate(self): 3830 # Check correct formatting 3831 # Percentage style 3832 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3833 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3834 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3835 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3836 f = logging.Formatter("%(process)#+027.23X") 3837 self.assertEqual(f._fmt, "%(process)#+027.23X") 3838 f = logging.Formatter("%(foo)#.*g") 3839 self.assertEqual(f._fmt, "%(foo)#.*g") 3840 3841 # StrFormat Style 3842 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 3843 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 3844 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 3845 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 3846 f = logging.Formatter("{customfield!s:#<30}", style="{") 3847 self.assertEqual(f._fmt, "{customfield!s:#<30}") 3848 f = logging.Formatter("{message!r}", style="{") 3849 self.assertEqual(f._fmt, "{message!r}") 3850 f = logging.Formatter("{message!s}", style="{") 3851 self.assertEqual(f._fmt, "{message!s}") 3852 f = logging.Formatter("{message!a}", style="{") 3853 self.assertEqual(f._fmt, "{message!a}") 3854 f = logging.Formatter("{process!r:4.2}", style="{") 3855 self.assertEqual(f._fmt, "{process!r:4.2}") 3856 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 3857 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 3858 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 3859 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 3860 f = logging.Formatter("{foo:12.{p}}", style="{") 3861 self.assertEqual(f._fmt, "{foo:12.{p}}") 3862 f = logging.Formatter("{foo:{w}.6}", style="{") 3863 self.assertEqual(f._fmt, "{foo:{w}.6}") 3864 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 3865 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 3866 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 3867 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 3868 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 3869 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 3870 3871 # Dollar style 3872 f = logging.Formatter("${asctime} - $message", style="$") 3873 self.assertEqual(f._fmt, "${asctime} - $message") 3874 f = logging.Formatter("$bar $$", style="$") 3875 self.assertEqual(f._fmt, "$bar $$") 3876 f = logging.Formatter("$bar $$$$", style="$") 3877 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 3878 3879 # Testing when ValueError being raised from incorrect format 3880 # Percentage Style 3881 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 3882 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 3883 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 3884 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 3885 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 3886 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 3887 self.assertRaises(ValueError, logging.Formatter, '${message}') 3888 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 3889 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 3890 3891 # StrFormat Style 3892 # Testing failure for '-' in field name 3893 self.assert_error_message( 3894 ValueError, 3895 "invalid format: invalid field name/expression: 'name-thing'", 3896 logging.Formatter, "{name-thing}", style="{" 3897 ) 3898 # Testing failure for style mismatch 3899 self.assert_error_message( 3900 ValueError, 3901 "invalid format: no fields", 3902 logging.Formatter, '%(asctime)s', style='{' 3903 ) 3904 # Testing failure for invalid conversion 3905 self.assert_error_message( 3906 ValueError, 3907 "invalid conversion: 'Z'" 3908 ) 3909 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 3910 self.assert_error_message( 3911 ValueError, 3912 "invalid format: expected ':' after conversion specifier", 3913 logging.Formatter, '{asctime!aa:15}', style='{' 3914 ) 3915 # Testing failure for invalid spec 3916 self.assert_error_message( 3917 ValueError, 3918 "invalid format: bad specifier: '.2ff'", 3919 logging.Formatter, '{process:.2ff}', style='{' 3920 ) 3921 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 3922 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 3923 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 3924 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 3925 # Testing failure for mismatch braces 3926 self.assert_error_message( 3927 ValueError, 3928 "invalid format: expected '}' before end of string", 3929 logging.Formatter, '{process', style='{' 3930 ) 3931 self.assert_error_message( 3932 ValueError, 3933 "invalid format: Single '}' encountered in format string", 3934 logging.Formatter, 'process}', style='{' 3935 ) 3936 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 3937 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 3938 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 3939 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 3940 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 3941 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 3942 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 3943 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 3944 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 3945 3946 # Dollar style 3947 # Testing failure for mismatch bare $ 3948 self.assert_error_message( 3949 ValueError, 3950 "invalid format: bare \'$\' not allowed", 3951 logging.Formatter, '$bar $$$', style='$' 3952 ) 3953 self.assert_error_message( 3954 ValueError, 3955 "invalid format: bare \'$\' not allowed", 3956 logging.Formatter, 'bar $', style='$' 3957 ) 3958 self.assert_error_message( 3959 ValueError, 3960 "invalid format: bare \'$\' not allowed", 3961 logging.Formatter, 'foo $.', style='$' 3962 ) 3963 # Testing failure for mismatch style 3964 self.assert_error_message( 3965 ValueError, 3966 "invalid format: no fields", 3967 logging.Formatter, '{asctime}', style='$' 3968 ) 3969 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 3970 3971 # Testing failure for incorrect fields 3972 self.assert_error_message( 3973 ValueError, 3974 "invalid format: no fields", 3975 logging.Formatter, 'foo', style='$' 3976 ) 3977 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 3978 3979 def test_defaults_parameter(self): 3980 fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message'] 3981 styles = ['%', '{', '$'] 3982 for fmt, style in zip(fmts, styles): 3983 f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'}) 3984 r = self.get_record() 3985 self.assertEqual(f.format(r), 'Default Message with 2 placeholders') 3986 r = self.get_record("custom") 3987 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 3988 3989 # Without default 3990 f = logging.Formatter(fmt, style=style) 3991 r = self.get_record() 3992 self.assertRaises(ValueError, f.format, r) 3993 3994 # Non-existing default is ignored 3995 f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'}) 3996 r = self.get_record("custom") 3997 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 3998 3999 def test_invalid_style(self): 4000 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 4001 4002 def test_time(self): 4003 r = self.get_record() 4004 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 4005 # We use None to indicate we want the local timezone 4006 # We're essentially converting a UTC time to local time 4007 r.created = time.mktime(dt.astimezone(None).timetuple()) 4008 r.msecs = 123 4009 f = logging.Formatter('%(asctime)s %(message)s') 4010 f.converter = time.gmtime 4011 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 4012 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 4013 f.format(r) 4014 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 4015 4016 def test_default_msec_format_none(self): 4017 class NoMsecFormatter(logging.Formatter): 4018 default_msec_format = None 4019 default_time_format = '%d/%m/%Y %H:%M:%S' 4020 4021 r = self.get_record() 4022 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc) 4023 r.created = time.mktime(dt.astimezone(None).timetuple()) 4024 f = NoMsecFormatter() 4025 f.converter = time.gmtime 4026 self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00') 4027 4028 4029class TestBufferingFormatter(logging.BufferingFormatter): 4030 def formatHeader(self, records): 4031 return '[(%d)' % len(records) 4032 4033 def formatFooter(self, records): 4034 return '(%d)]' % len(records) 4035 4036class BufferingFormatterTest(unittest.TestCase): 4037 def setUp(self): 4038 self.records = [ 4039 logging.makeLogRecord({'msg': 'one'}), 4040 logging.makeLogRecord({'msg': 'two'}), 4041 ] 4042 4043 def test_default(self): 4044 f = logging.BufferingFormatter() 4045 self.assertEqual('', f.format([])) 4046 self.assertEqual('onetwo', f.format(self.records)) 4047 4048 def test_custom(self): 4049 f = TestBufferingFormatter() 4050 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 4051 lf = logging.Formatter('<%(message)s>') 4052 f = TestBufferingFormatter(lf) 4053 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 4054 4055class ExceptionTest(BaseTest): 4056 def test_formatting(self): 4057 r = self.root_logger 4058 h = RecordingHandler() 4059 r.addHandler(h) 4060 try: 4061 raise RuntimeError('deliberate mistake') 4062 except: 4063 logging.exception('failed', stack_info=True) 4064 r.removeHandler(h) 4065 h.close() 4066 r = h.records[0] 4067 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 4068 'call last):\n')) 4069 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 4070 'deliberate mistake')) 4071 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 4072 'call last):\n')) 4073 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 4074 'stack_info=True)')) 4075 4076 4077class LastResortTest(BaseTest): 4078 def test_last_resort(self): 4079 # Test the last resort handler 4080 root = self.root_logger 4081 root.removeHandler(self.root_hdlr) 4082 old_lastresort = logging.lastResort 4083 old_raise_exceptions = logging.raiseExceptions 4084 4085 try: 4086 with support.captured_stderr() as stderr: 4087 root.debug('This should not appear') 4088 self.assertEqual(stderr.getvalue(), '') 4089 root.warning('Final chance!') 4090 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 4091 4092 # No handlers and no last resort, so 'No handlers' message 4093 logging.lastResort = None 4094 with support.captured_stderr() as stderr: 4095 root.warning('Final chance!') 4096 msg = 'No handlers could be found for logger "root"\n' 4097 self.assertEqual(stderr.getvalue(), msg) 4098 4099 # 'No handlers' message only printed once 4100 with support.captured_stderr() as stderr: 4101 root.warning('Final chance!') 4102 self.assertEqual(stderr.getvalue(), '') 4103 4104 # If raiseExceptions is False, no message is printed 4105 root.manager.emittedNoHandlerWarning = False 4106 logging.raiseExceptions = False 4107 with support.captured_stderr() as stderr: 4108 root.warning('Final chance!') 4109 self.assertEqual(stderr.getvalue(), '') 4110 finally: 4111 root.addHandler(self.root_hdlr) 4112 logging.lastResort = old_lastresort 4113 logging.raiseExceptions = old_raise_exceptions 4114 4115 4116class FakeHandler: 4117 4118 def __init__(self, identifier, called): 4119 for method in ('acquire', 'flush', 'close', 'release'): 4120 setattr(self, method, self.record_call(identifier, method, called)) 4121 4122 def record_call(self, identifier, method_name, called): 4123 def inner(): 4124 called.append('{} - {}'.format(identifier, method_name)) 4125 return inner 4126 4127 4128class RecordingHandler(logging.NullHandler): 4129 4130 def __init__(self, *args, **kwargs): 4131 super(RecordingHandler, self).__init__(*args, **kwargs) 4132 self.records = [] 4133 4134 def handle(self, record): 4135 """Keep track of all the emitted records.""" 4136 self.records.append(record) 4137 4138 4139class ShutdownTest(BaseTest): 4140 4141 """Test suite for the shutdown method.""" 4142 4143 def setUp(self): 4144 super(ShutdownTest, self).setUp() 4145 self.called = [] 4146 4147 raise_exceptions = logging.raiseExceptions 4148 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4149 4150 def raise_error(self, error): 4151 def inner(): 4152 raise error() 4153 return inner 4154 4155 def test_no_failure(self): 4156 # create some fake handlers 4157 handler0 = FakeHandler(0, self.called) 4158 handler1 = FakeHandler(1, self.called) 4159 handler2 = FakeHandler(2, self.called) 4160 4161 # create live weakref to those handlers 4162 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4163 4164 logging.shutdown(handlerList=list(handlers)) 4165 4166 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4167 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4168 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4169 self.assertEqual(expected, self.called) 4170 4171 def _test_with_failure_in_method(self, method, error): 4172 handler = FakeHandler(0, self.called) 4173 setattr(handler, method, self.raise_error(error)) 4174 handlers = [logging.weakref.ref(handler)] 4175 4176 logging.shutdown(handlerList=list(handlers)) 4177 4178 self.assertEqual('0 - release', self.called[-1]) 4179 4180 def test_with_ioerror_in_acquire(self): 4181 self._test_with_failure_in_method('acquire', OSError) 4182 4183 def test_with_ioerror_in_flush(self): 4184 self._test_with_failure_in_method('flush', OSError) 4185 4186 def test_with_ioerror_in_close(self): 4187 self._test_with_failure_in_method('close', OSError) 4188 4189 def test_with_valueerror_in_acquire(self): 4190 self._test_with_failure_in_method('acquire', ValueError) 4191 4192 def test_with_valueerror_in_flush(self): 4193 self._test_with_failure_in_method('flush', ValueError) 4194 4195 def test_with_valueerror_in_close(self): 4196 self._test_with_failure_in_method('close', ValueError) 4197 4198 def test_with_other_error_in_acquire_without_raise(self): 4199 logging.raiseExceptions = False 4200 self._test_with_failure_in_method('acquire', IndexError) 4201 4202 def test_with_other_error_in_flush_without_raise(self): 4203 logging.raiseExceptions = False 4204 self._test_with_failure_in_method('flush', IndexError) 4205 4206 def test_with_other_error_in_close_without_raise(self): 4207 logging.raiseExceptions = False 4208 self._test_with_failure_in_method('close', IndexError) 4209 4210 def test_with_other_error_in_acquire_with_raise(self): 4211 logging.raiseExceptions = True 4212 self.assertRaises(IndexError, self._test_with_failure_in_method, 4213 'acquire', IndexError) 4214 4215 def test_with_other_error_in_flush_with_raise(self): 4216 logging.raiseExceptions = True 4217 self.assertRaises(IndexError, self._test_with_failure_in_method, 4218 'flush', IndexError) 4219 4220 def test_with_other_error_in_close_with_raise(self): 4221 logging.raiseExceptions = True 4222 self.assertRaises(IndexError, self._test_with_failure_in_method, 4223 'close', IndexError) 4224 4225 4226class ModuleLevelMiscTest(BaseTest): 4227 4228 """Test suite for some module level methods.""" 4229 4230 def test_disable(self): 4231 old_disable = logging.root.manager.disable 4232 # confirm our assumptions are correct 4233 self.assertEqual(old_disable, 0) 4234 self.addCleanup(logging.disable, old_disable) 4235 4236 logging.disable(83) 4237 self.assertEqual(logging.root.manager.disable, 83) 4238 4239 self.assertRaises(ValueError, logging.disable, "doesnotexists") 4240 4241 class _NotAnIntOrString: 4242 pass 4243 4244 self.assertRaises(TypeError, logging.disable, _NotAnIntOrString()) 4245 4246 logging.disable("WARN") 4247 4248 # test the default value introduced in 3.7 4249 # (Issue #28524) 4250 logging.disable() 4251 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 4252 4253 def _test_log(self, method, level=None): 4254 called = [] 4255 support.patch(self, logging, 'basicConfig', 4256 lambda *a, **kw: called.append((a, kw))) 4257 4258 recording = RecordingHandler() 4259 logging.root.addHandler(recording) 4260 4261 log_method = getattr(logging, method) 4262 if level is not None: 4263 log_method(level, "test me: %r", recording) 4264 else: 4265 log_method("test me: %r", recording) 4266 4267 self.assertEqual(len(recording.records), 1) 4268 record = recording.records[0] 4269 self.assertEqual(record.getMessage(), "test me: %r" % recording) 4270 4271 expected_level = level if level is not None else getattr(logging, method.upper()) 4272 self.assertEqual(record.levelno, expected_level) 4273 4274 # basicConfig was not called! 4275 self.assertEqual(called, []) 4276 4277 def test_log(self): 4278 self._test_log('log', logging.ERROR) 4279 4280 def test_debug(self): 4281 self._test_log('debug') 4282 4283 def test_info(self): 4284 self._test_log('info') 4285 4286 def test_warning(self): 4287 self._test_log('warning') 4288 4289 def test_error(self): 4290 self._test_log('error') 4291 4292 def test_critical(self): 4293 self._test_log('critical') 4294 4295 def test_set_logger_class(self): 4296 self.assertRaises(TypeError, logging.setLoggerClass, object) 4297 4298 class MyLogger(logging.Logger): 4299 pass 4300 4301 logging.setLoggerClass(MyLogger) 4302 self.assertEqual(logging.getLoggerClass(), MyLogger) 4303 4304 logging.setLoggerClass(logging.Logger) 4305 self.assertEqual(logging.getLoggerClass(), logging.Logger) 4306 4307 def test_subclass_logger_cache(self): 4308 # bpo-37258 4309 message = [] 4310 4311 class MyLogger(logging.getLoggerClass()): 4312 def __init__(self, name='MyLogger', level=logging.NOTSET): 4313 super().__init__(name, level) 4314 message.append('initialized') 4315 4316 logging.setLoggerClass(MyLogger) 4317 logger = logging.getLogger('just_some_logger') 4318 self.assertEqual(message, ['initialized']) 4319 stream = io.StringIO() 4320 h = logging.StreamHandler(stream) 4321 logger.addHandler(h) 4322 try: 4323 logger.setLevel(logging.DEBUG) 4324 logger.debug("hello") 4325 self.assertEqual(stream.getvalue().strip(), "hello") 4326 4327 stream.truncate(0) 4328 stream.seek(0) 4329 4330 logger.setLevel(logging.INFO) 4331 logger.debug("hello") 4332 self.assertEqual(stream.getvalue(), "") 4333 finally: 4334 logger.removeHandler(h) 4335 h.close() 4336 logging.setLoggerClass(logging.Logger) 4337 4338 def test_logging_at_shutdown(self): 4339 # bpo-20037: Doing text I/O late at interpreter shutdown must not crash 4340 code = textwrap.dedent(""" 4341 import logging 4342 4343 class A: 4344 def __del__(self): 4345 try: 4346 raise ValueError("some error") 4347 except Exception: 4348 logging.exception("exception in __del__") 4349 4350 a = A() 4351 """) 4352 rc, out, err = assert_python_ok("-c", code) 4353 err = err.decode() 4354 self.assertIn("exception in __del__", err) 4355 self.assertIn("ValueError: some error", err) 4356 4357 def test_logging_at_shutdown_open(self): 4358 # bpo-26789: FileHandler keeps a reference to the builtin open() 4359 # function to be able to open or reopen the file during Python 4360 # finalization. 4361 filename = os_helper.TESTFN 4362 self.addCleanup(os_helper.unlink, filename) 4363 4364 code = textwrap.dedent(f""" 4365 import builtins 4366 import logging 4367 4368 class A: 4369 def __del__(self): 4370 logging.error("log in __del__") 4371 4372 # basicConfig() opens the file, but logging.shutdown() closes 4373 # it at Python exit. When A.__del__() is called, 4374 # FileHandler._open() must be called again to re-open the file. 4375 logging.basicConfig(filename={filename!r}, encoding="utf-8") 4376 4377 a = A() 4378 4379 # Simulate the Python finalization which removes the builtin 4380 # open() function. 4381 del builtins.open 4382 """) 4383 assert_python_ok("-c", code) 4384 4385 with open(filename, encoding="utf-8") as fp: 4386 self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__") 4387 4388 def test_recursion_error(self): 4389 # Issue 36272 4390 code = textwrap.dedent(""" 4391 import logging 4392 4393 def rec(): 4394 logging.error("foo") 4395 rec() 4396 4397 rec() 4398 """) 4399 rc, out, err = assert_python_failure("-c", code) 4400 err = err.decode() 4401 self.assertNotIn("Cannot recover from stack overflow.", err) 4402 self.assertEqual(rc, 1) 4403 4404 def test_get_level_names_mapping(self): 4405 mapping = logging.getLevelNamesMapping() 4406 self.assertEqual(logging._nameToLevel, mapping) # value is equivalent 4407 self.assertIsNot(logging._nameToLevel, mapping) # but not the internal data 4408 new_mapping = logging.getLevelNamesMapping() # another call -> another copy 4409 self.assertIsNot(mapping, new_mapping) # verify not the same object as before 4410 self.assertEqual(mapping, new_mapping) # but equivalent in value 4411 4412 4413class LogRecordTest(BaseTest): 4414 def test_str_rep(self): 4415 r = logging.makeLogRecord({}) 4416 s = str(r) 4417 self.assertTrue(s.startswith('<LogRecord: ')) 4418 self.assertTrue(s.endswith('>')) 4419 4420 def test_dict_arg(self): 4421 h = RecordingHandler() 4422 r = logging.getLogger() 4423 r.addHandler(h) 4424 d = {'less' : 'more' } 4425 logging.warning('less is %(less)s', d) 4426 self.assertIs(h.records[0].args, d) 4427 self.assertEqual(h.records[0].message, 'less is more') 4428 r.removeHandler(h) 4429 h.close() 4430 4431 @staticmethod # pickled as target of child process in the following test 4432 def _extract_logrecord_process_name(key, logMultiprocessing, conn=None): 4433 prev_logMultiprocessing = logging.logMultiprocessing 4434 logging.logMultiprocessing = logMultiprocessing 4435 try: 4436 import multiprocessing as mp 4437 name = mp.current_process().name 4438 4439 r1 = logging.makeLogRecord({'msg': f'msg1_{key}'}) 4440 4441 # https://bugs.python.org/issue45128 4442 with support.swap_item(sys.modules, 'multiprocessing', None): 4443 r2 = logging.makeLogRecord({'msg': f'msg2_{key}'}) 4444 4445 results = {'processName' : name, 4446 'r1.processName': r1.processName, 4447 'r2.processName': r2.processName, 4448 } 4449 finally: 4450 logging.logMultiprocessing = prev_logMultiprocessing 4451 if conn: 4452 conn.send(results) 4453 else: 4454 return results 4455 4456 def test_multiprocessing(self): 4457 multiprocessing_imported = 'multiprocessing' in sys.modules 4458 try: 4459 # logMultiprocessing is True by default 4460 self.assertEqual(logging.logMultiprocessing, True) 4461 4462 LOG_MULTI_PROCESSING = True 4463 # When logMultiprocessing == True: 4464 # In the main process processName = 'MainProcess' 4465 r = logging.makeLogRecord({}) 4466 self.assertEqual(r.processName, 'MainProcess') 4467 4468 results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING) 4469 self.assertEqual('MainProcess', results['processName']) 4470 self.assertEqual('MainProcess', results['r1.processName']) 4471 self.assertEqual('MainProcess', results['r2.processName']) 4472 4473 # In other processes, processName is correct when multiprocessing in imported, 4474 # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762). 4475 import multiprocessing 4476 parent_conn, child_conn = multiprocessing.Pipe() 4477 p = multiprocessing.Process( 4478 target=self._extract_logrecord_process_name, 4479 args=(2, LOG_MULTI_PROCESSING, child_conn,) 4480 ) 4481 p.start() 4482 results = parent_conn.recv() 4483 self.assertNotEqual('MainProcess', results['processName']) 4484 self.assertEqual(results['processName'], results['r1.processName']) 4485 self.assertEqual('MainProcess', results['r2.processName']) 4486 p.join() 4487 4488 finally: 4489 if multiprocessing_imported: 4490 import multiprocessing 4491 4492 def test_optional(self): 4493 r = logging.makeLogRecord({}) 4494 NOT_NONE = self.assertIsNotNone 4495 NOT_NONE(r.thread) 4496 NOT_NONE(r.threadName) 4497 NOT_NONE(r.process) 4498 NOT_NONE(r.processName) 4499 log_threads = logging.logThreads 4500 log_processes = logging.logProcesses 4501 log_multiprocessing = logging.logMultiprocessing 4502 try: 4503 logging.logThreads = False 4504 logging.logProcesses = False 4505 logging.logMultiprocessing = False 4506 r = logging.makeLogRecord({}) 4507 NONE = self.assertIsNone 4508 NONE(r.thread) 4509 NONE(r.threadName) 4510 NONE(r.process) 4511 NONE(r.processName) 4512 finally: 4513 logging.logThreads = log_threads 4514 logging.logProcesses = log_processes 4515 logging.logMultiprocessing = log_multiprocessing 4516 4517class BasicConfigTest(unittest.TestCase): 4518 4519 """Test suite for logging.basicConfig.""" 4520 4521 def setUp(self): 4522 super(BasicConfigTest, self).setUp() 4523 self.handlers = logging.root.handlers 4524 self.saved_handlers = logging._handlers.copy() 4525 self.saved_handler_list = logging._handlerList[:] 4526 self.original_logging_level = logging.root.level 4527 self.addCleanup(self.cleanup) 4528 logging.root.handlers = [] 4529 4530 def tearDown(self): 4531 for h in logging.root.handlers[:]: 4532 logging.root.removeHandler(h) 4533 h.close() 4534 super(BasicConfigTest, self).tearDown() 4535 4536 def cleanup(self): 4537 setattr(logging.root, 'handlers', self.handlers) 4538 logging._handlers.clear() 4539 logging._handlers.update(self.saved_handlers) 4540 logging._handlerList[:] = self.saved_handler_list 4541 logging.root.setLevel(self.original_logging_level) 4542 4543 def test_no_kwargs(self): 4544 logging.basicConfig() 4545 4546 # handler defaults to a StreamHandler to sys.stderr 4547 self.assertEqual(len(logging.root.handlers), 1) 4548 handler = logging.root.handlers[0] 4549 self.assertIsInstance(handler, logging.StreamHandler) 4550 self.assertEqual(handler.stream, sys.stderr) 4551 4552 formatter = handler.formatter 4553 # format defaults to logging.BASIC_FORMAT 4554 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 4555 # datefmt defaults to None 4556 self.assertIsNone(formatter.datefmt) 4557 # style defaults to % 4558 self.assertIsInstance(formatter._style, logging.PercentStyle) 4559 4560 # level is not explicitly set 4561 self.assertEqual(logging.root.level, self.original_logging_level) 4562 4563 def test_strformatstyle(self): 4564 with support.captured_stdout() as output: 4565 logging.basicConfig(stream=sys.stdout, style="{") 4566 logging.error("Log an error") 4567 sys.stdout.seek(0) 4568 self.assertEqual(output.getvalue().strip(), 4569 "ERROR:root:Log an error") 4570 4571 def test_stringtemplatestyle(self): 4572 with support.captured_stdout() as output: 4573 logging.basicConfig(stream=sys.stdout, style="$") 4574 logging.error("Log an error") 4575 sys.stdout.seek(0) 4576 self.assertEqual(output.getvalue().strip(), 4577 "ERROR:root:Log an error") 4578 4579 def test_filename(self): 4580 4581 def cleanup(h1, h2, fn): 4582 h1.close() 4583 h2.close() 4584 os.remove(fn) 4585 4586 logging.basicConfig(filename='test.log', encoding='utf-8') 4587 4588 self.assertEqual(len(logging.root.handlers), 1) 4589 handler = logging.root.handlers[0] 4590 self.assertIsInstance(handler, logging.FileHandler) 4591 4592 expected = logging.FileHandler('test.log', 'a', encoding='utf-8') 4593 self.assertEqual(handler.stream.mode, expected.stream.mode) 4594 self.assertEqual(handler.stream.name, expected.stream.name) 4595 self.addCleanup(cleanup, handler, expected, 'test.log') 4596 4597 def test_filemode(self): 4598 4599 def cleanup(h1, h2, fn): 4600 h1.close() 4601 h2.close() 4602 os.remove(fn) 4603 4604 logging.basicConfig(filename='test.log', filemode='wb') 4605 4606 handler = logging.root.handlers[0] 4607 expected = logging.FileHandler('test.log', 'wb') 4608 self.assertEqual(handler.stream.mode, expected.stream.mode) 4609 self.addCleanup(cleanup, handler, expected, 'test.log') 4610 4611 def test_stream(self): 4612 stream = io.StringIO() 4613 self.addCleanup(stream.close) 4614 logging.basicConfig(stream=stream) 4615 4616 self.assertEqual(len(logging.root.handlers), 1) 4617 handler = logging.root.handlers[0] 4618 self.assertIsInstance(handler, logging.StreamHandler) 4619 self.assertEqual(handler.stream, stream) 4620 4621 def test_format(self): 4622 logging.basicConfig(format='%(asctime)s - %(message)s') 4623 4624 formatter = logging.root.handlers[0].formatter 4625 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 4626 4627 def test_datefmt(self): 4628 logging.basicConfig(datefmt='bar') 4629 4630 formatter = logging.root.handlers[0].formatter 4631 self.assertEqual(formatter.datefmt, 'bar') 4632 4633 def test_style(self): 4634 logging.basicConfig(style='$') 4635 4636 formatter = logging.root.handlers[0].formatter 4637 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4638 4639 def test_level(self): 4640 old_level = logging.root.level 4641 self.addCleanup(logging.root.setLevel, old_level) 4642 4643 logging.basicConfig(level=57) 4644 self.assertEqual(logging.root.level, 57) 4645 # Test that second call has no effect 4646 logging.basicConfig(level=58) 4647 self.assertEqual(logging.root.level, 57) 4648 4649 def test_incompatible(self): 4650 assertRaises = self.assertRaises 4651 handlers = [logging.StreamHandler()] 4652 stream = sys.stderr 4653 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4654 stream=stream) 4655 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4656 handlers=handlers) 4657 assertRaises(ValueError, logging.basicConfig, stream=stream, 4658 handlers=handlers) 4659 # Issue 23207: test for invalid kwargs 4660 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4661 # Should pop both filename and filemode even if filename is None 4662 logging.basicConfig(filename=None, filemode='a') 4663 4664 def test_handlers(self): 4665 handlers = [ 4666 logging.StreamHandler(), 4667 logging.StreamHandler(sys.stdout), 4668 logging.StreamHandler(), 4669 ] 4670 f = logging.Formatter() 4671 handlers[2].setFormatter(f) 4672 logging.basicConfig(handlers=handlers) 4673 self.assertIs(handlers[0], logging.root.handlers[0]) 4674 self.assertIs(handlers[1], logging.root.handlers[1]) 4675 self.assertIs(handlers[2], logging.root.handlers[2]) 4676 self.assertIsNotNone(handlers[0].formatter) 4677 self.assertIsNotNone(handlers[1].formatter) 4678 self.assertIs(handlers[2].formatter, f) 4679 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4680 4681 def test_force(self): 4682 old_string_io = io.StringIO() 4683 new_string_io = io.StringIO() 4684 old_handlers = [logging.StreamHandler(old_string_io)] 4685 new_handlers = [logging.StreamHandler(new_string_io)] 4686 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 4687 logging.warning('warn') 4688 logging.info('info') 4689 logging.debug('debug') 4690 self.assertEqual(len(logging.root.handlers), 1) 4691 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 4692 force=True) 4693 logging.warning('warn') 4694 logging.info('info') 4695 logging.debug('debug') 4696 self.assertEqual(len(logging.root.handlers), 1) 4697 self.assertEqual(old_string_io.getvalue().strip(), 4698 'WARNING:root:warn') 4699 self.assertEqual(new_string_io.getvalue().strip(), 4700 'WARNING:root:warn\nINFO:root:info') 4701 4702 def test_encoding(self): 4703 try: 4704 encoding = 'utf-8' 4705 logging.basicConfig(filename='test.log', encoding=encoding, 4706 errors='strict', 4707 format='%(message)s', level=logging.DEBUG) 4708 4709 self.assertEqual(len(logging.root.handlers), 1) 4710 handler = logging.root.handlers[0] 4711 self.assertIsInstance(handler, logging.FileHandler) 4712 self.assertEqual(handler.encoding, encoding) 4713 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4714 finally: 4715 handler.close() 4716 with open('test.log', encoding='utf-8') as f: 4717 data = f.read().strip() 4718 os.remove('test.log') 4719 self.assertEqual(data, 4720 'The Øresund Bridge joins Copenhagen to Malmö') 4721 4722 def test_encoding_errors(self): 4723 try: 4724 encoding = 'ascii' 4725 logging.basicConfig(filename='test.log', encoding=encoding, 4726 errors='ignore', 4727 format='%(message)s', level=logging.DEBUG) 4728 4729 self.assertEqual(len(logging.root.handlers), 1) 4730 handler = logging.root.handlers[0] 4731 self.assertIsInstance(handler, logging.FileHandler) 4732 self.assertEqual(handler.encoding, encoding) 4733 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4734 finally: 4735 handler.close() 4736 with open('test.log', encoding='utf-8') as f: 4737 data = f.read().strip() 4738 os.remove('test.log') 4739 self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm') 4740 4741 def test_encoding_errors_default(self): 4742 try: 4743 encoding = 'ascii' 4744 logging.basicConfig(filename='test.log', encoding=encoding, 4745 format='%(message)s', level=logging.DEBUG) 4746 4747 self.assertEqual(len(logging.root.handlers), 1) 4748 handler = logging.root.handlers[0] 4749 self.assertIsInstance(handler, logging.FileHandler) 4750 self.assertEqual(handler.encoding, encoding) 4751 self.assertEqual(handler.errors, 'backslashreplace') 4752 logging.debug(': ☃️: The Øresund Bridge joins Copenhagen to Malmö') 4753 finally: 4754 handler.close() 4755 with open('test.log', encoding='utf-8') as f: 4756 data = f.read().strip() 4757 os.remove('test.log') 4758 self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund ' 4759 r'Bridge joins Copenhagen to Malm\xf6') 4760 4761 def test_encoding_errors_none(self): 4762 # Specifying None should behave as 'strict' 4763 try: 4764 encoding = 'ascii' 4765 logging.basicConfig(filename='test.log', encoding=encoding, 4766 errors=None, 4767 format='%(message)s', level=logging.DEBUG) 4768 4769 self.assertEqual(len(logging.root.handlers), 1) 4770 handler = logging.root.handlers[0] 4771 self.assertIsInstance(handler, logging.FileHandler) 4772 self.assertEqual(handler.encoding, encoding) 4773 self.assertIsNone(handler.errors) 4774 4775 message = [] 4776 4777 def dummy_handle_error(record): 4778 _, v, _ = sys.exc_info() 4779 message.append(str(v)) 4780 4781 handler.handleError = dummy_handle_error 4782 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4783 self.assertTrue(message) 4784 self.assertIn("'ascii' codec can't encode " 4785 "character '\\xd8' in position 4:", message[0]) 4786 finally: 4787 handler.close() 4788 with open('test.log', encoding='utf-8') as f: 4789 data = f.read().strip() 4790 os.remove('test.log') 4791 # didn't write anything due to the encoding error 4792 self.assertEqual(data, r'') 4793 4794 4795 def _test_log(self, method, level=None): 4796 # logging.root has no handlers so basicConfig should be called 4797 called = [] 4798 4799 old_basic_config = logging.basicConfig 4800 def my_basic_config(*a, **kw): 4801 old_basic_config() 4802 old_level = logging.root.level 4803 logging.root.setLevel(100) # avoid having messages in stderr 4804 self.addCleanup(logging.root.setLevel, old_level) 4805 called.append((a, kw)) 4806 4807 support.patch(self, logging, 'basicConfig', my_basic_config) 4808 4809 log_method = getattr(logging, method) 4810 if level is not None: 4811 log_method(level, "test me") 4812 else: 4813 log_method("test me") 4814 4815 # basicConfig was called with no arguments 4816 self.assertEqual(called, [((), {})]) 4817 4818 def test_log(self): 4819 self._test_log('log', logging.WARNING) 4820 4821 def test_debug(self): 4822 self._test_log('debug') 4823 4824 def test_info(self): 4825 self._test_log('info') 4826 4827 def test_warning(self): 4828 self._test_log('warning') 4829 4830 def test_error(self): 4831 self._test_log('error') 4832 4833 def test_critical(self): 4834 self._test_log('critical') 4835 4836 4837class LoggerAdapterTest(unittest.TestCase): 4838 def setUp(self): 4839 super(LoggerAdapterTest, self).setUp() 4840 old_handler_list = logging._handlerList[:] 4841 4842 self.recording = RecordingHandler() 4843 self.logger = logging.root 4844 self.logger.addHandler(self.recording) 4845 self.addCleanup(self.logger.removeHandler, self.recording) 4846 self.addCleanup(self.recording.close) 4847 4848 def cleanup(): 4849 logging._handlerList[:] = old_handler_list 4850 4851 self.addCleanup(cleanup) 4852 self.addCleanup(logging.shutdown) 4853 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4854 4855 def test_exception(self): 4856 msg = 'testing exception: %r' 4857 exc = None 4858 try: 4859 1 / 0 4860 except ZeroDivisionError as e: 4861 exc = e 4862 self.adapter.exception(msg, self.recording) 4863 4864 self.assertEqual(len(self.recording.records), 1) 4865 record = self.recording.records[0] 4866 self.assertEqual(record.levelno, logging.ERROR) 4867 self.assertEqual(record.msg, msg) 4868 self.assertEqual(record.args, (self.recording,)) 4869 self.assertEqual(record.exc_info, 4870 (exc.__class__, exc, exc.__traceback__)) 4871 4872 def test_exception_excinfo(self): 4873 try: 4874 1 / 0 4875 except ZeroDivisionError as e: 4876 exc = e 4877 4878 self.adapter.exception('exc_info test', exc_info=exc) 4879 4880 self.assertEqual(len(self.recording.records), 1) 4881 record = self.recording.records[0] 4882 self.assertEqual(record.exc_info, 4883 (exc.__class__, exc, exc.__traceback__)) 4884 4885 def test_critical(self): 4886 msg = 'critical test! %r' 4887 self.adapter.critical(msg, self.recording) 4888 4889 self.assertEqual(len(self.recording.records), 1) 4890 record = self.recording.records[0] 4891 self.assertEqual(record.levelno, logging.CRITICAL) 4892 self.assertEqual(record.msg, msg) 4893 self.assertEqual(record.args, (self.recording,)) 4894 4895 def test_is_enabled_for(self): 4896 old_disable = self.adapter.logger.manager.disable 4897 self.adapter.logger.manager.disable = 33 4898 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 4899 old_disable) 4900 self.assertFalse(self.adapter.isEnabledFor(32)) 4901 4902 def test_has_handlers(self): 4903 self.assertTrue(self.adapter.hasHandlers()) 4904 4905 for handler in self.logger.handlers: 4906 self.logger.removeHandler(handler) 4907 4908 self.assertFalse(self.logger.hasHandlers()) 4909 self.assertFalse(self.adapter.hasHandlers()) 4910 4911 def test_nested(self): 4912 class Adapter(logging.LoggerAdapter): 4913 prefix = 'Adapter' 4914 4915 def process(self, msg, kwargs): 4916 return f"{self.prefix} {msg}", kwargs 4917 4918 msg = 'Adapters can be nested, yo.' 4919 adapter = Adapter(logger=self.logger, extra=None) 4920 adapter_adapter = Adapter(logger=adapter, extra=None) 4921 adapter_adapter.prefix = 'AdapterAdapter' 4922 self.assertEqual(repr(adapter), repr(adapter_adapter)) 4923 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 4924 self.assertEqual(len(self.recording.records), 1) 4925 record = self.recording.records[0] 4926 self.assertEqual(record.levelno, logging.CRITICAL) 4927 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 4928 self.assertEqual(record.args, (self.recording,)) 4929 orig_manager = adapter_adapter.manager 4930 self.assertIs(adapter.manager, orig_manager) 4931 self.assertIs(self.logger.manager, orig_manager) 4932 temp_manager = object() 4933 try: 4934 adapter_adapter.manager = temp_manager 4935 self.assertIs(adapter_adapter.manager, temp_manager) 4936 self.assertIs(adapter.manager, temp_manager) 4937 self.assertIs(self.logger.manager, temp_manager) 4938 finally: 4939 adapter_adapter.manager = orig_manager 4940 self.assertIs(adapter_adapter.manager, orig_manager) 4941 self.assertIs(adapter.manager, orig_manager) 4942 self.assertIs(self.logger.manager, orig_manager) 4943 4944 4945class LoggerTest(BaseTest, AssertErrorMessage): 4946 4947 def setUp(self): 4948 super(LoggerTest, self).setUp() 4949 self.recording = RecordingHandler() 4950 self.logger = logging.Logger(name='blah') 4951 self.logger.addHandler(self.recording) 4952 self.addCleanup(self.logger.removeHandler, self.recording) 4953 self.addCleanup(self.recording.close) 4954 self.addCleanup(logging.shutdown) 4955 4956 def test_set_invalid_level(self): 4957 self.assert_error_message( 4958 TypeError, 'Level not an integer or a valid string: None', 4959 self.logger.setLevel, None) 4960 self.assert_error_message( 4961 TypeError, 'Level not an integer or a valid string: (0, 0)', 4962 self.logger.setLevel, (0, 0)) 4963 4964 def test_exception(self): 4965 msg = 'testing exception: %r' 4966 exc = None 4967 try: 4968 1 / 0 4969 except ZeroDivisionError as e: 4970 exc = e 4971 self.logger.exception(msg, self.recording) 4972 4973 self.assertEqual(len(self.recording.records), 1) 4974 record = self.recording.records[0] 4975 self.assertEqual(record.levelno, logging.ERROR) 4976 self.assertEqual(record.msg, msg) 4977 self.assertEqual(record.args, (self.recording,)) 4978 self.assertEqual(record.exc_info, 4979 (exc.__class__, exc, exc.__traceback__)) 4980 4981 def test_log_invalid_level_with_raise(self): 4982 with support.swap_attr(logging, 'raiseExceptions', True): 4983 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 4984 4985 def test_log_invalid_level_no_raise(self): 4986 with support.swap_attr(logging, 'raiseExceptions', False): 4987 self.logger.log('10', 'test message') # no exception happens 4988 4989 def test_find_caller_with_stack_info(self): 4990 called = [] 4991 support.patch(self, logging.traceback, 'print_stack', 4992 lambda f, file: called.append(file.getvalue())) 4993 4994 self.logger.findCaller(stack_info=True) 4995 4996 self.assertEqual(len(called), 1) 4997 self.assertEqual('Stack (most recent call last):\n', called[0]) 4998 4999 def test_find_caller_with_stacklevel(self): 5000 the_level = 1 5001 5002 def innermost(): 5003 self.logger.warning('test', stacklevel=the_level) 5004 5005 def inner(): 5006 innermost() 5007 5008 def outer(): 5009 inner() 5010 5011 records = self.recording.records 5012 outer() 5013 self.assertEqual(records[-1].funcName, 'innermost') 5014 lineno = records[-1].lineno 5015 the_level += 1 5016 outer() 5017 self.assertEqual(records[-1].funcName, 'inner') 5018 self.assertGreater(records[-1].lineno, lineno) 5019 lineno = records[-1].lineno 5020 the_level += 1 5021 outer() 5022 self.assertEqual(records[-1].funcName, 'outer') 5023 self.assertGreater(records[-1].lineno, lineno) 5024 lineno = records[-1].lineno 5025 the_level += 1 5026 outer() 5027 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 5028 self.assertGreater(records[-1].lineno, lineno) 5029 5030 def test_make_record_with_extra_overwrite(self): 5031 name = 'my record' 5032 level = 13 5033 fn = lno = msg = args = exc_info = func = sinfo = None 5034 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 5035 exc_info, func, sinfo) 5036 5037 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 5038 extra = {key: 'some value'} 5039 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 5040 fn, lno, msg, args, exc_info, 5041 extra=extra, sinfo=sinfo) 5042 5043 def test_make_record_with_extra_no_overwrite(self): 5044 name = 'my record' 5045 level = 13 5046 fn = lno = msg = args = exc_info = func = sinfo = None 5047 extra = {'valid_key': 'some value'} 5048 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 5049 exc_info, extra=extra, sinfo=sinfo) 5050 self.assertIn('valid_key', result.__dict__) 5051 5052 def test_has_handlers(self): 5053 self.assertTrue(self.logger.hasHandlers()) 5054 5055 for handler in self.logger.handlers: 5056 self.logger.removeHandler(handler) 5057 self.assertFalse(self.logger.hasHandlers()) 5058 5059 def test_has_handlers_no_propagate(self): 5060 child_logger = logging.getLogger('blah.child') 5061 child_logger.propagate = False 5062 self.assertFalse(child_logger.hasHandlers()) 5063 5064 def test_is_enabled_for(self): 5065 old_disable = self.logger.manager.disable 5066 self.logger.manager.disable = 23 5067 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5068 self.assertFalse(self.logger.isEnabledFor(22)) 5069 5070 def test_is_enabled_for_disabled_logger(self): 5071 old_disabled = self.logger.disabled 5072 old_disable = self.logger.manager.disable 5073 5074 self.logger.disabled = True 5075 self.logger.manager.disable = 21 5076 5077 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 5078 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5079 5080 self.assertFalse(self.logger.isEnabledFor(22)) 5081 5082 def test_root_logger_aliases(self): 5083 root = logging.getLogger() 5084 self.assertIs(root, logging.root) 5085 self.assertIs(root, logging.getLogger(None)) 5086 self.assertIs(root, logging.getLogger('')) 5087 self.assertIs(root, logging.getLogger('root')) 5088 self.assertIs(root, logging.getLogger('foo').root) 5089 self.assertIs(root, logging.getLogger('foo.bar').root) 5090 self.assertIs(root, logging.getLogger('foo').parent) 5091 5092 self.assertIsNot(root, logging.getLogger('\0')) 5093 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 5094 5095 def test_invalid_names(self): 5096 self.assertRaises(TypeError, logging.getLogger, any) 5097 self.assertRaises(TypeError, logging.getLogger, b'foo') 5098 5099 def test_pickling(self): 5100 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 5101 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 5102 logger = logging.getLogger(name) 5103 s = pickle.dumps(logger, proto) 5104 unpickled = pickle.loads(s) 5105 self.assertIs(unpickled, logger) 5106 5107 def test_caching(self): 5108 root = self.root_logger 5109 logger1 = logging.getLogger("abc") 5110 logger2 = logging.getLogger("abc.def") 5111 5112 # Set root logger level and ensure cache is empty 5113 root.setLevel(logging.ERROR) 5114 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 5115 self.assertEqual(logger2._cache, {}) 5116 5117 # Ensure cache is populated and calls are consistent 5118 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5119 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 5120 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 5121 self.assertEqual(root._cache, {}) 5122 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5123 5124 # Ensure root cache gets populated 5125 self.assertEqual(root._cache, {}) 5126 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5127 self.assertEqual(root._cache, {logging.ERROR: True}) 5128 5129 # Set parent logger level and ensure caches are emptied 5130 logger1.setLevel(logging.CRITICAL) 5131 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5132 self.assertEqual(logger2._cache, {}) 5133 5134 # Ensure logger2 uses parent logger's effective level 5135 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5136 5137 # Set level to NOTSET and ensure caches are empty 5138 logger2.setLevel(logging.NOTSET) 5139 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5140 self.assertEqual(logger2._cache, {}) 5141 self.assertEqual(logger1._cache, {}) 5142 self.assertEqual(root._cache, {}) 5143 5144 # Verify logger2 follows parent and not root 5145 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5146 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 5147 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 5148 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 5149 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5150 5151 # Disable logging in manager and ensure caches are clear 5152 logging.disable() 5153 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5154 self.assertEqual(logger2._cache, {}) 5155 self.assertEqual(logger1._cache, {}) 5156 self.assertEqual(root._cache, {}) 5157 5158 # Ensure no loggers are enabled 5159 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 5160 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 5161 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 5162 5163 5164class BaseFileTest(BaseTest): 5165 "Base class for handler tests that write log files" 5166 5167 def setUp(self): 5168 BaseTest.setUp(self) 5169 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 5170 os.close(fd) 5171 self.rmfiles = [] 5172 5173 def tearDown(self): 5174 for fn in self.rmfiles: 5175 os.unlink(fn) 5176 if os.path.exists(self.fn): 5177 os.unlink(self.fn) 5178 BaseTest.tearDown(self) 5179 5180 def assertLogFile(self, filename): 5181 "Assert a log file is there and register it for deletion" 5182 self.assertTrue(os.path.exists(filename), 5183 msg="Log file %r does not exist" % filename) 5184 self.rmfiles.append(filename) 5185 5186 def next_rec(self): 5187 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 5188 self.next_message(), None, None, None) 5189 5190class FileHandlerTest(BaseFileTest): 5191 def test_delay(self): 5192 os.unlink(self.fn) 5193 fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True) 5194 self.assertIsNone(fh.stream) 5195 self.assertFalse(os.path.exists(self.fn)) 5196 fh.handle(logging.makeLogRecord({})) 5197 self.assertIsNotNone(fh.stream) 5198 self.assertTrue(os.path.exists(self.fn)) 5199 fh.close() 5200 5201 def test_emit_after_closing_in_write_mode(self): 5202 # Issue #42378 5203 os.unlink(self.fn) 5204 fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w') 5205 fh.setFormatter(logging.Formatter('%(message)s')) 5206 fh.emit(self.next_rec()) # '1' 5207 fh.close() 5208 fh.emit(self.next_rec()) # '2' 5209 with open(self.fn) as fp: 5210 self.assertEqual(fp.read().strip(), '1') 5211 5212class RotatingFileHandlerTest(BaseFileTest): 5213 def test_should_not_rollover(self): 5214 # If maxbytes is zero rollover never occurs 5215 rh = logging.handlers.RotatingFileHandler( 5216 self.fn, encoding="utf-8", maxBytes=0) 5217 self.assertFalse(rh.shouldRollover(None)) 5218 rh.close() 5219 # bpo-45401 - test with special file 5220 # We set maxBytes to 1 so that rollover would normally happen, except 5221 # for the check for regular files 5222 rh = logging.handlers.RotatingFileHandler( 5223 os.devnull, encoding="utf-8", maxBytes=1) 5224 self.assertFalse(rh.shouldRollover(self.next_rec())) 5225 rh.close() 5226 5227 def test_should_rollover(self): 5228 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1) 5229 self.assertTrue(rh.shouldRollover(self.next_rec())) 5230 rh.close() 5231 5232 def test_file_created(self): 5233 # checks that the file is created and assumes it was created 5234 # by us 5235 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8") 5236 rh.emit(self.next_rec()) 5237 self.assertLogFile(self.fn) 5238 rh.close() 5239 5240 def test_rollover_filenames(self): 5241 def namer(name): 5242 return name + ".test" 5243 rh = logging.handlers.RotatingFileHandler( 5244 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5245 rh.namer = namer 5246 rh.emit(self.next_rec()) 5247 self.assertLogFile(self.fn) 5248 rh.emit(self.next_rec()) 5249 self.assertLogFile(namer(self.fn + ".1")) 5250 rh.emit(self.next_rec()) 5251 self.assertLogFile(namer(self.fn + ".2")) 5252 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5253 rh.close() 5254 5255 def test_namer_rotator_inheritance(self): 5256 class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler): 5257 def namer(self, name): 5258 return name + ".test" 5259 5260 def rotator(self, source, dest): 5261 if os.path.exists(source): 5262 os.replace(source, dest + ".rotated") 5263 5264 rh = HandlerWithNamerAndRotator( 5265 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5266 self.assertEqual(rh.namer(self.fn), self.fn + ".test") 5267 rh.emit(self.next_rec()) 5268 self.assertLogFile(self.fn) 5269 rh.emit(self.next_rec()) 5270 self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated") 5271 self.assertFalse(os.path.exists(rh.namer(self.fn + ".1"))) 5272 rh.close() 5273 5274 @support.requires_zlib() 5275 def test_rotator(self): 5276 def namer(name): 5277 return name + ".gz" 5278 5279 def rotator(source, dest): 5280 with open(source, "rb") as sf: 5281 data = sf.read() 5282 compressed = zlib.compress(data, 9) 5283 with open(dest, "wb") as df: 5284 df.write(compressed) 5285 os.remove(source) 5286 5287 rh = logging.handlers.RotatingFileHandler( 5288 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5289 rh.rotator = rotator 5290 rh.namer = namer 5291 m1 = self.next_rec() 5292 rh.emit(m1) 5293 self.assertLogFile(self.fn) 5294 m2 = self.next_rec() 5295 rh.emit(m2) 5296 fn = namer(self.fn + ".1") 5297 self.assertLogFile(fn) 5298 newline = os.linesep 5299 with open(fn, "rb") as f: 5300 compressed = f.read() 5301 data = zlib.decompress(compressed) 5302 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5303 rh.emit(self.next_rec()) 5304 fn = namer(self.fn + ".2") 5305 self.assertLogFile(fn) 5306 with open(fn, "rb") as f: 5307 compressed = f.read() 5308 data = zlib.decompress(compressed) 5309 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5310 rh.emit(self.next_rec()) 5311 fn = namer(self.fn + ".2") 5312 with open(fn, "rb") as f: 5313 compressed = f.read() 5314 data = zlib.decompress(compressed) 5315 self.assertEqual(data.decode("ascii"), m2.msg + newline) 5316 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5317 rh.close() 5318 5319class TimedRotatingFileHandlerTest(BaseFileTest): 5320 def test_should_not_rollover(self): 5321 # See bpo-45401. Should only ever rollover regular files 5322 fh = logging.handlers.TimedRotatingFileHandler( 5323 os.devnull, 'S', encoding="utf-8", backupCount=1) 5324 time.sleep(1.1) # a little over a second ... 5325 r = logging.makeLogRecord({'msg': 'testing - device file'}) 5326 self.assertFalse(fh.shouldRollover(r)) 5327 fh.close() 5328 5329 # other test methods added below 5330 def test_rollover(self): 5331 fh = logging.handlers.TimedRotatingFileHandler( 5332 self.fn, 'S', encoding="utf-8", backupCount=1) 5333 fmt = logging.Formatter('%(asctime)s %(message)s') 5334 fh.setFormatter(fmt) 5335 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 5336 fh.emit(r1) 5337 self.assertLogFile(self.fn) 5338 time.sleep(1.1) # a little over a second ... 5339 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 5340 fh.emit(r2) 5341 fh.close() 5342 # At this point, we should have a recent rotated file which we 5343 # can test for the existence of. However, in practice, on some 5344 # machines which run really slowly, we don't know how far back 5345 # in time to go to look for the log file. So, we go back a fair 5346 # bit, and stop as soon as we see a rotated file. In theory this 5347 # could of course still fail, but the chances are lower. 5348 found = False 5349 now = datetime.datetime.now() 5350 GO_BACK = 5 * 60 # seconds 5351 for secs in range(GO_BACK): 5352 prev = now - datetime.timedelta(seconds=secs) 5353 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 5354 found = os.path.exists(fn) 5355 if found: 5356 self.rmfiles.append(fn) 5357 break 5358 msg = 'No rotated files found, went back %d seconds' % GO_BACK 5359 if not found: 5360 # print additional diagnostics 5361 dn, fn = os.path.split(self.fn) 5362 files = [f for f in os.listdir(dn) if f.startswith(fn)] 5363 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 5364 print('The only matching files are: %s' % files, file=sys.stderr) 5365 for f in files: 5366 print('Contents of %s:' % f) 5367 path = os.path.join(dn, f) 5368 with open(path, 'r') as tf: 5369 print(tf.read()) 5370 self.assertTrue(found, msg=msg) 5371 5372 def test_invalid(self): 5373 assertRaises = self.assertRaises 5374 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5375 self.fn, 'X', encoding="utf-8", delay=True) 5376 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5377 self.fn, 'W', encoding="utf-8", delay=True) 5378 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5379 self.fn, 'W7', encoding="utf-8", delay=True) 5380 5381 def test_compute_rollover_daily_attime(self): 5382 currentTime = 0 5383 atTime = datetime.time(12, 0, 0) 5384 rh = logging.handlers.TimedRotatingFileHandler( 5385 self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0, 5386 utc=True, atTime=atTime) 5387 try: 5388 actual = rh.computeRollover(currentTime) 5389 self.assertEqual(actual, currentTime + 12 * 60 * 60) 5390 5391 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 5392 self.assertEqual(actual, currentTime + 36 * 60 * 60) 5393 finally: 5394 rh.close() 5395 5396 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 5397 def test_compute_rollover_weekly_attime(self): 5398 currentTime = int(time.time()) 5399 today = currentTime - currentTime % 86400 5400 5401 atTime = datetime.time(12, 0, 0) 5402 5403 wday = time.gmtime(today).tm_wday 5404 for day in range(7): 5405 rh = logging.handlers.TimedRotatingFileHandler( 5406 self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0, 5407 utc=True, atTime=atTime) 5408 try: 5409 if wday > day: 5410 # The rollover day has already passed this week, so we 5411 # go over into next week 5412 expected = (7 - wday + day) 5413 else: 5414 expected = (day - wday) 5415 # At this point expected is in days from now, convert to seconds 5416 expected *= 24 * 60 * 60 5417 # Add in the rollover time 5418 expected += 12 * 60 * 60 5419 # Add in adjustment for today 5420 expected += today 5421 actual = rh.computeRollover(today) 5422 if actual != expected: 5423 print('failed in timezone: %d' % time.timezone) 5424 print('local vars: %s' % locals()) 5425 self.assertEqual(actual, expected) 5426 if day == wday: 5427 # goes into following week 5428 expected += 7 * 24 * 60 * 60 5429 actual = rh.computeRollover(today + 13 * 60 * 60) 5430 if actual != expected: 5431 print('failed in timezone: %d' % time.timezone) 5432 print('local vars: %s' % locals()) 5433 self.assertEqual(actual, expected) 5434 finally: 5435 rh.close() 5436 5437 5438def secs(**kw): 5439 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 5440 5441for when, exp in (('S', 1), 5442 ('M', 60), 5443 ('H', 60 * 60), 5444 ('D', 60 * 60 * 24), 5445 ('MIDNIGHT', 60 * 60 * 24), 5446 # current time (epoch start) is a Thursday, W0 means Monday 5447 ('W0', secs(days=4, hours=24)), 5448 ): 5449 def test_compute_rollover(self, when=when, exp=exp): 5450 rh = logging.handlers.TimedRotatingFileHandler( 5451 self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True) 5452 currentTime = 0.0 5453 actual = rh.computeRollover(currentTime) 5454 if exp != actual: 5455 # Failures occur on some systems for MIDNIGHT and W0. 5456 # Print detailed calculation for MIDNIGHT so we can try to see 5457 # what's going on 5458 if when == 'MIDNIGHT': 5459 try: 5460 if rh.utc: 5461 t = time.gmtime(currentTime) 5462 else: 5463 t = time.localtime(currentTime) 5464 currentHour = t[3] 5465 currentMinute = t[4] 5466 currentSecond = t[5] 5467 # r is the number of seconds left between now and midnight 5468 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 5469 currentMinute) * 60 + 5470 currentSecond) 5471 result = currentTime + r 5472 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 5473 print('currentHour: %s' % currentHour, file=sys.stderr) 5474 print('currentMinute: %s' % currentMinute, file=sys.stderr) 5475 print('currentSecond: %s' % currentSecond, file=sys.stderr) 5476 print('r: %s' % r, file=sys.stderr) 5477 print('result: %s' % result, file=sys.stderr) 5478 except Exception: 5479 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 5480 self.assertEqual(exp, actual) 5481 rh.close() 5482 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 5483 5484 5485@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 5486class NTEventLogHandlerTest(BaseTest): 5487 def test_basic(self): 5488 logtype = 'Application' 5489 elh = win32evtlog.OpenEventLog(None, logtype) 5490 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 5491 5492 try: 5493 h = logging.handlers.NTEventLogHandler('test_logging') 5494 except pywintypes.error as e: 5495 if e.winerror == 5: # access denied 5496 raise unittest.SkipTest('Insufficient privileges to run test') 5497 raise 5498 5499 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 5500 h.handle(r) 5501 h.close() 5502 # Now see if the event is recorded 5503 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 5504 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 5505 win32evtlog.EVENTLOG_SEQUENTIAL_READ 5506 found = False 5507 GO_BACK = 100 5508 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 5509 for e in events: 5510 if e.SourceName != 'test_logging': 5511 continue 5512 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 5513 if msg != 'Test Log Message\r\n': 5514 continue 5515 found = True 5516 break 5517 msg = 'Record not found in event log, went back %d records' % GO_BACK 5518 self.assertTrue(found, msg=msg) 5519 5520 5521class MiscTestCase(unittest.TestCase): 5522 def test__all__(self): 5523 not_exported = { 5524 'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe', 5525 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 5526 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root', 5527 'threading'} 5528 support.check__all__(self, logging, not_exported=not_exported) 5529 5530 5531# Set the locale to the platform-dependent default. I have no idea 5532# why the test does this, but in any case we save the current locale 5533# first and restore it at the end. 5534def setUpModule(): 5535 cm = support.run_with_locale('LC_ALL', '') 5536 cm.__enter__() 5537 unittest.addModuleCleanup(cm.__exit__, None, None, None) 5538 5539 5540if __name__ == "__main__": 5541 unittest.main() 5542