1# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Testing framework extensions""" 18 19# NOTE: Some classes in here use camelCaseNaming() rather than 20# underscore_naming(). That's for consistency with unittest; it's not the 21# general style of breezy. Please continue that consistency when adding e.g. 22# new assertFoo() methods. 23 24import atexit 25import codecs 26import copy 27import difflib 28import doctest 29import errno 30import functools 31from io import ( 32 BytesIO, 33 StringIO, 34 TextIOWrapper, 35 ) 36import itertools 37import logging 38import math 39import os 40import platform 41import pprint 42import random 43import re 44import shlex 45import site 46import stat 47import subprocess 48import sys 49import tempfile 50import threading 51import time 52import traceback 53import unittest 54import warnings 55 56import testtools 57# nb: check this before importing anything else from within it 58_testtools_version = getattr(testtools, '__version__', ()) 59if _testtools_version < (0, 9, 5): 60 raise ImportError("need at least testtools 0.9.5: %s is %r" 61 % (testtools.__file__, _testtools_version)) 62from testtools import content 63 64import breezy 65from .. import ( 66 branchbuilder, 67 controldir, 68 commands as _mod_commands, 69 config, 70 i18n, 71 debug, 72 errors, 73 hooks, 74 lock as _mod_lock, 75 lockdir, 76 osutils, 77 plugin as _mod_plugin, 78 pyutils, 79 ui, 80 urlutils, 81 registry, 82 symbol_versioning, 83 trace, 84 transport as _mod_transport, 85 workingtree, 86 ) 87from breezy.bzr import ( 88 chk_map, 89 ) 90try: 91 import breezy.lsprof 92except ImportError: 93 # lsprof not available 94 pass 95from ..bzr.smart import client, request 96from ..transport import ( 97 memory, 98 pathfilter, 99 ) 100from ..tests import ( 101 fixtures, 102 test_server, 103 TestUtil, 104 treeshape, 105 ui_testing, 106 ) 107 108# Mark this python module as being part of the implementation 109# of unittest: this gives us better tracebacks where the last 110# shown frame is the test code, not our assertXYZ. 111__unittest = 1 112 113default_transport = test_server.LocalURLServer 114 115 116_unitialized_attr = object() 117"""A sentinel needed to act as a default value in a method signature.""" 118 119 120# Subunit result codes, defined here to prevent a hard dependency on subunit. 121SUBUNIT_SEEK_SET = 0 122SUBUNIT_SEEK_CUR = 1 123 124# These are intentionally brought into this namespace. That way plugins, etc 125# can just "from breezy.tests import TestCase, TestLoader, etc" 126TestSuite = TestUtil.TestSuite 127TestLoader = TestUtil.TestLoader 128 129# Tests should run in a clean and clearly defined environment. The goal is to 130# keep them isolated from the running environment as mush as possible. The test 131# framework ensures the variables defined below are set (or deleted if the 132# value is None) before a test is run and reset to their original value after 133# the test is run. Generally if some code depends on an environment variable, 134# the tests should start without this variable in the environment. There are a 135# few exceptions but you shouldn't violate this rule lightly. 136isolated_environ = { 137 'BRZ_HOME': None, 138 'HOME': None, 139 'GNUPGHOME': None, 140 'XDG_CONFIG_HOME': None, 141 # brz now uses the Win32 API and doesn't rely on APPDATA, but the 142 # tests do check our impls match APPDATA 143 'BRZ_EDITOR': None, # test_msgeditor manipulates this variable 144 'VISUAL': None, 145 'EDITOR': None, 146 'BRZ_EMAIL': None, 147 'BZREMAIL': None, # may still be present in the environment 148 'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess 149 'BRZ_PROGRESS_BAR': None, 150 # Trap leaks to $XDG_CACHE_HOME/breezy/brz.log. This occurs when tests use 151 # TestCase as a base class instead of TestCaseInTempDir. Tests inheriting 152 # from TestCase should not use disk resources, BRZ_LOG is one. 153 'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file', 154 'BRZ_PLUGIN_PATH': '-site', 155 'BRZ_DISABLE_PLUGINS': None, 156 'BRZ_PLUGINS_AT': None, 157 'BRZ_CONCURRENCY': None, 158 # Make sure that any text ui tests are consistent regardless of 159 # the environment the test case is run in; you may want tests that 160 # test other combinations. 'dumb' is a reasonable guess for tests 161 # going to a pipe or a BytesIO. 162 'TERM': 'dumb', 163 'LINES': '25', 164 'COLUMNS': '80', 165 'BRZ_COLUMNS': '80', 166 # Disable SSH Agent 167 'SSH_AUTH_SOCK': None, 168 # Proxies 169 'http_proxy': None, 170 'HTTP_PROXY': None, 171 'https_proxy': None, 172 'HTTPS_PROXY': None, 173 'no_proxy': None, 174 'NO_PROXY': None, 175 'all_proxy': None, 176 'ALL_PROXY': None, 177 'BZR_REMOTE_PATH': None, 178 # Generally speaking, we don't want apport reporting on crashes in 179 # the test envirnoment unless we're specifically testing apport, 180 # so that it doesn't leak into the real system environment. We 181 # use an env var so it propagates to subprocesses. 182 'APPORT_DISABLE': '1', 183 } 184 185 186def override_os_environ(test, env=None): 187 """Modify os.environ keeping a copy. 188 189 :param test: A test instance 190 191 :param env: A dict containing variable definitions to be installed 192 """ 193 if env is None: 194 env = isolated_environ 195 test._original_os_environ = dict(**os.environ) 196 for var in env: 197 osutils.set_or_unset_env(var, env[var]) 198 if var not in test._original_os_environ: 199 # The var is new, add it with a value of None, so 200 # restore_os_environ will delete it 201 test._original_os_environ[var] = None 202 203 204def restore_os_environ(test): 205 """Restore os.environ to its original state. 206 207 :param test: A test instance previously passed to override_os_environ. 208 """ 209 for var, value in test._original_os_environ.items(): 210 # Restore the original value (or delete it if the value has been set to 211 # None in override_os_environ). 212 osutils.set_or_unset_env(var, value) 213 214 215def _clear__type_equality_funcs(test): 216 """Cleanup bound methods stored on TestCase instances 217 218 Clear the dict breaking a few (mostly) harmless cycles in the affected 219 unittests released with Python 2.6 and initial Python 2.7 versions. 220 221 For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly 222 shipped in Oneiric, an object with no clear method was used, hence the 223 extra complications, see bug 809048 for details. 224 """ 225 type_equality_funcs = getattr(test, "_type_equality_funcs", None) 226 if type_equality_funcs is not None: 227 tef_clear = getattr(type_equality_funcs, "clear", None) 228 if tef_clear is None: 229 tef_instance_dict = getattr(type_equality_funcs, "__dict__", None) 230 if tef_instance_dict is not None: 231 tef_clear = tef_instance_dict.clear 232 if tef_clear is not None: 233 tef_clear() 234 235 236class ExtendedTestResult(testtools.TextTestResult): 237 """Accepts, reports and accumulates the results of running tests. 238 239 Compared to the unittest version this class adds support for 240 profiling, benchmarking, stopping as soon as a test fails, and 241 skipping tests. There are further-specialized subclasses for 242 different types of display. 243 244 When a test finishes, in whatever way, it calls one of the addSuccess, 245 addFailure or addError methods. These in turn may redirect to a more 246 specific case for the special test results supported by our extended 247 tests. 248 249 Note that just one of these objects is fed the results from many tests. 250 """ 251 252 stop_early = False 253 254 def __init__(self, stream, descriptions, verbosity, 255 bench_history=None, 256 strict=False, 257 ): 258 """Construct new TestResult. 259 260 :param bench_history: Optionally, a writable file object to accumulate 261 benchmark results. 262 """ 263 testtools.TextTestResult.__init__(self, stream) 264 if bench_history is not None: 265 from breezy.version import _get_bzr_source_tree 266 src_tree = _get_bzr_source_tree() 267 if src_tree: 268 try: 269 revision_id = src_tree.get_parent_ids()[0] 270 except IndexError: 271 # XXX: if this is a brand new tree, do the same as if there 272 # is no branch. 273 revision_id = b'' 274 else: 275 # XXX: If there's no branch, what should we do? 276 revision_id = b'' 277 bench_history.write("--date %s %s\n" % (time.time(), revision_id)) 278 self._bench_history = bench_history 279 self.ui = ui.ui_factory 280 self.num_tests = 0 281 self.error_count = 0 282 self.failure_count = 0 283 self.known_failure_count = 0 284 self.skip_count = 0 285 self.not_applicable_count = 0 286 self.unsupported = {} 287 self.count = 0 288 self._overall_start_time = time.time() 289 self._strict = strict 290 self._first_thread_leaker_id = None 291 self._tests_leaking_threads_count = 0 292 self._traceback_from_test = None 293 294 def stopTestRun(self): 295 run = self.testsRun 296 actionTaken = "Ran" 297 stopTime = time.time() 298 timeTaken = stopTime - self.startTime 299 # GZ 2010-07-19: Seems testtools has no printErrors method, and though 300 # the parent class method is similar have to duplicate 301 self._show_list('ERROR', self.errors) 302 self._show_list('FAIL', self.failures) 303 self.stream.write(self.sep2) 304 self.stream.write("%s %d test%s in %.3fs\n\n" % ( 305 actionTaken, run, run != 1 and "s" or "", timeTaken)) 306 if not self.wasSuccessful(): 307 self.stream.write("FAILED (") 308 failed, errored = map(len, (self.failures, self.errors)) 309 if failed: 310 self.stream.write("failures=%d" % failed) 311 if errored: 312 if failed: 313 self.stream.write(", ") 314 self.stream.write("errors=%d" % errored) 315 if self.known_failure_count: 316 if failed or errored: 317 self.stream.write(", ") 318 self.stream.write("known_failure_count=%d" % 319 self.known_failure_count) 320 self.stream.write(")\n") 321 else: 322 if self.known_failure_count: 323 self.stream.write("OK (known_failures=%d)\n" % 324 self.known_failure_count) 325 else: 326 self.stream.write("OK\n") 327 if self.skip_count > 0: 328 skipped = self.skip_count 329 self.stream.write('%d test%s skipped\n' % 330 (skipped, skipped != 1 and "s" or "")) 331 if self.unsupported: 332 for feature, count in sorted(self.unsupported.items()): 333 self.stream.write("Missing feature '%s' skipped %d tests.\n" % 334 (feature, count)) 335 if self._strict: 336 ok = self.wasStrictlySuccessful() 337 else: 338 ok = self.wasSuccessful() 339 if self._first_thread_leaker_id: 340 self.stream.write( 341 '%s is leaking threads among %d leaking tests.\n' % ( 342 self._first_thread_leaker_id, 343 self._tests_leaking_threads_count)) 344 # We don't report the main thread as an active one. 345 self.stream.write( 346 '%d non-main threads were left active in the end.\n' 347 % (len(self._active_threads) - 1)) 348 349 def getDescription(self, test): 350 return test.id() 351 352 def _extractBenchmarkTime(self, testCase, details=None): 353 """Add a benchmark time for the current test case.""" 354 if details and 'benchtime' in details: 355 return float(''.join(details['benchtime'].iter_bytes())) 356 return getattr(testCase, "_benchtime", None) 357 358 def _delta_to_float(self, a_timedelta, precision): 359 # This calls ceiling to ensure that the most pessimistic view of time 360 # taken is shown (rather than leaving it to the Python %f operator 361 # to decide whether to round/floor/ceiling. This was added when we 362 # had pyp3 test failures that suggest a floor was happening. 363 shift = 10 ** precision 364 return math.ceil( 365 (a_timedelta.days * 86400.0 + a_timedelta.seconds + 366 a_timedelta.microseconds / 1000000.0) * shift) / shift 367 368 def _elapsedTestTimeString(self): 369 """Return time string for overall time the current test has taken.""" 370 return self._formatTime(self._delta_to_float( 371 self._now() - self._start_datetime, 3)) 372 373 def _testTimeString(self, testCase): 374 benchmark_time = self._extractBenchmarkTime(testCase) 375 if benchmark_time is not None: 376 return self._formatTime(benchmark_time) + "*" 377 else: 378 return self._elapsedTestTimeString() 379 380 def _formatTime(self, seconds): 381 """Format seconds as milliseconds with leading spaces.""" 382 # some benchmarks can take thousands of seconds to run, so we need 8 383 # places 384 return "%8dms" % (1000 * seconds) 385 386 def _shortened_test_description(self, test): 387 what = test.id() 388 what = re.sub(r'^breezy\.tests\.', '', what) 389 return what 390 391 # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method 392 # multiple times in a row, because the handler is added for 393 # each test but the container list is shared between cases. 394 # See lp:498869 lp:625574 and lp:637725 for background. 395 def _record_traceback_from_test(self, exc_info): 396 """Store the traceback from passed exc_info tuple till""" 397 self._traceback_from_test = exc_info[2] 398 399 def startTest(self, test): 400 super(ExtendedTestResult, self).startTest(test) 401 if self.count == 0: 402 self.startTests() 403 self.count += 1 404 self.report_test_start(test) 405 test.number = self.count 406 self._recordTestStartTime() 407 # Make testtools cases give us the real traceback on failure 408 addOnException = getattr(test, "addOnException", None) 409 if addOnException is not None: 410 addOnException(self._record_traceback_from_test) 411 # Only check for thread leaks on breezy derived test cases 412 if isinstance(test, TestCase): 413 test.addCleanup(self._check_leaked_threads, test) 414 415 def stopTest(self, test): 416 super(ExtendedTestResult, self).stopTest(test) 417 # Manually break cycles, means touching various private things but hey 418 getDetails = getattr(test, "getDetails", None) 419 if getDetails is not None: 420 getDetails().clear() 421 _clear__type_equality_funcs(test) 422 self._traceback_from_test = None 423 424 def startTests(self): 425 self.report_tests_starting() 426 self._active_threads = threading.enumerate() 427 428 def _check_leaked_threads(self, test): 429 """See if any threads have leaked since last call 430 431 A sample of live threads is stored in the _active_threads attribute, 432 when this method runs it compares the current live threads and any not 433 in the previous sample are treated as having leaked. 434 """ 435 now_active_threads = set(threading.enumerate()) 436 threads_leaked = now_active_threads.difference(self._active_threads) 437 if threads_leaked: 438 self._report_thread_leak(test, threads_leaked, now_active_threads) 439 self._tests_leaking_threads_count += 1 440 if self._first_thread_leaker_id is None: 441 self._first_thread_leaker_id = test.id() 442 self._active_threads = now_active_threads 443 444 def _recordTestStartTime(self): 445 """Record that a test has started.""" 446 self._start_datetime = self._now() 447 448 def addError(self, test, err): 449 """Tell result that test finished with an error. 450 451 Called from the TestCase run() method when the test 452 fails with an unexpected error. 453 """ 454 self._post_mortem(self._traceback_from_test or err[2]) 455 super(ExtendedTestResult, self).addError(test, err) 456 self.error_count += 1 457 self.report_error(test, err) 458 if self.stop_early: 459 self.stop() 460 461 def addFailure(self, test, err): 462 """Tell result that test failed. 463 464 Called from the TestCase run() method when the test 465 fails because e.g. an assert() method failed. 466 """ 467 self._post_mortem(self._traceback_from_test or err[2]) 468 super(ExtendedTestResult, self).addFailure(test, err) 469 self.failure_count += 1 470 self.report_failure(test, err) 471 if self.stop_early: 472 self.stop() 473 474 def addSuccess(self, test, details=None): 475 """Tell result that test completed successfully. 476 477 Called from the TestCase run() 478 """ 479 if self._bench_history is not None: 480 benchmark_time = self._extractBenchmarkTime(test, details) 481 if benchmark_time is not None: 482 self._bench_history.write("%s %s\n" % ( 483 self._formatTime(benchmark_time), 484 test.id())) 485 self.report_success(test) 486 super(ExtendedTestResult, self).addSuccess(test) 487 test._log_contents = '' 488 489 def addExpectedFailure(self, test, err): 490 self.known_failure_count += 1 491 self.report_known_failure(test, err) 492 493 def addUnexpectedSuccess(self, test, details=None): 494 """Tell result the test unexpectedly passed, counting as a failure 495 496 When the minimum version of testtools required becomes 0.9.8 this 497 can be updated to use the new handling there. 498 """ 499 super(ExtendedTestResult, self).addFailure(test, details=details) 500 self.failure_count += 1 501 self.report_unexpected_success(test, 502 "".join(details["reason"].iter_text())) 503 if self.stop_early: 504 self.stop() 505 506 def addNotSupported(self, test, feature): 507 """The test will not be run because of a missing feature. 508 """ 509 # this can be called in two different ways: it may be that the 510 # test started running, and then raised (through requireFeature) 511 # UnavailableFeature. Alternatively this method can be called 512 # while probing for features before running the test code proper; in 513 # that case we will see startTest and stopTest, but the test will 514 # never actually run. 515 self.unsupported.setdefault(str(feature), 0) 516 self.unsupported[str(feature)] += 1 517 self.report_unsupported(test, feature) 518 519 def addSkip(self, test, reason): 520 """A test has not run for 'reason'.""" 521 self.skip_count += 1 522 self.report_skip(test, reason) 523 524 def addNotApplicable(self, test, reason): 525 self.not_applicable_count += 1 526 self.report_not_applicable(test, reason) 527 528 def _count_stored_tests(self): 529 """Count of tests instances kept alive due to not succeeding""" 530 return self.error_count + self.failure_count + self.known_failure_count 531 532 def _post_mortem(self, tb=None): 533 """Start a PDB post mortem session.""" 534 if os.environ.get('BRZ_TEST_PDB', None): 535 import pdb 536 pdb.post_mortem(tb) 537 538 def progress(self, offset, whence): 539 """The test is adjusting the count of tests to run.""" 540 if whence == SUBUNIT_SEEK_SET: 541 self.num_tests = offset 542 elif whence == SUBUNIT_SEEK_CUR: 543 self.num_tests += offset 544 else: 545 raise errors.BzrError("Unknown whence %r" % whence) 546 547 def report_tests_starting(self): 548 """Display information before the test run begins""" 549 if getattr(sys, 'frozen', None) is None: 550 bzr_path = osutils.realpath(sys.argv[0]) 551 else: 552 bzr_path = sys.executable 553 self.stream.write( 554 'brz selftest: %s\n' % (bzr_path,)) 555 self.stream.write( 556 ' %s\n' % ( 557 breezy.__path__[0],)) 558 self.stream.write( 559 ' bzr-%s python-%s %s\n' % ( 560 breezy.version_string, 561 breezy._format_version_tuple(sys.version_info), 562 platform.platform(aliased=1), 563 )) 564 self.stream.write('\n') 565 566 def report_test_start(self, test): 567 """Display information on the test just about to be run""" 568 569 def _report_thread_leak(self, test, leaked_threads, active_threads): 570 """Display information on a test that leaked one or more threads""" 571 # GZ 2010-09-09: A leak summary reported separately from the general 572 # thread debugging would be nice. Tests under subunit 573 # need something not using stream, perhaps adding a 574 # testtools details object would be fitting. 575 if 'threads' in selftest_debug_flags: 576 self.stream.write('%s is leaking, active is now %d\n' % 577 (test.id(), len(active_threads))) 578 579 def startTestRun(self): 580 self.startTime = time.time() 581 582 def report_success(self, test): 583 pass 584 585 def wasStrictlySuccessful(self): 586 if self.unsupported or self.known_failure_count: 587 return False 588 return self.wasSuccessful() 589 590 591class TextTestResult(ExtendedTestResult): 592 """Displays progress and results of tests in text form""" 593 594 def __init__(self, stream, descriptions, verbosity, 595 bench_history=None, 596 strict=None, 597 ): 598 ExtendedTestResult.__init__(self, stream, descriptions, verbosity, 599 bench_history, strict) 600 self.pb = self.ui.nested_progress_bar() 601 self.pb.show_pct = False 602 self.pb.show_spinner = False 603 self.pb.show_eta = False, 604 self.pb.show_count = False 605 self.pb.show_bar = False 606 self.pb.update_latency = 0 607 self.pb.show_transport_activity = False 608 609 def stopTestRun(self): 610 # called when the tests that are going to run have run 611 self.pb.clear() 612 self.pb.finished() 613 super(TextTestResult, self).stopTestRun() 614 615 def report_tests_starting(self): 616 super(TextTestResult, self).report_tests_starting() 617 self.pb.update('[test 0/%d] Starting' % (self.num_tests)) 618 619 def _progress_prefix_text(self): 620 # the longer this text, the less space we have to show the test 621 # name... 622 a = '[%d' % self.count # total that have been run 623 # tests skipped as known not to be relevant are not important enough 624 # to show here 625 # if self.skip_count: 626 # a += ', %d skip' % self.skip_count 627 # if self.known_failure_count: 628 # a += '+%dX' % self.known_failure_count 629 if self.num_tests: 630 a += '/%d' % self.num_tests 631 a += ' in ' 632 runtime = time.time() - self._overall_start_time 633 if runtime >= 60: 634 a += '%dm%ds' % (runtime / 60, runtime % 60) 635 else: 636 a += '%ds' % runtime 637 total_fail_count = self.error_count + self.failure_count 638 if total_fail_count: 639 a += ', %d failed' % total_fail_count 640 # if self.unsupported: 641 # a += ', %d missing' % len(self.unsupported) 642 a += ']' 643 return a 644 645 def report_test_start(self, test): 646 self.pb.update( 647 self._progress_prefix_text() + 648 ' ' + 649 self._shortened_test_description(test)) 650 651 def _test_description(self, test): 652 return self._shortened_test_description(test) 653 654 def report_error(self, test, err): 655 self.stream.write('ERROR: %s\n %s\n' % ( 656 self._test_description(test), 657 err[1], 658 )) 659 660 def report_failure(self, test, err): 661 self.stream.write('FAIL: %s\n %s\n' % ( 662 self._test_description(test), 663 err[1], 664 )) 665 666 def report_known_failure(self, test, err): 667 pass 668 669 def report_unexpected_success(self, test, reason): 670 self.stream.write('FAIL: %s\n %s: %s\n' % ( 671 self._test_description(test), 672 "Unexpected success. Should have failed", 673 reason, 674 )) 675 676 def report_skip(self, test, reason): 677 pass 678 679 def report_not_applicable(self, test, reason): 680 pass 681 682 def report_unsupported(self, test, feature): 683 """test cannot be run because feature is missing.""" 684 685 686class VerboseTestResult(ExtendedTestResult): 687 """Produce long output, with one line per test run plus times""" 688 689 def _ellipsize_to_right(self, a_string, final_width): 690 """Truncate and pad a string, keeping the right hand side""" 691 if len(a_string) > final_width: 692 result = '...' + a_string[3 - final_width:] 693 else: 694 result = a_string 695 return result.ljust(final_width) 696 697 def report_tests_starting(self): 698 self.stream.write('running %d tests...\n' % self.num_tests) 699 super(VerboseTestResult, self).report_tests_starting() 700 701 def report_test_start(self, test): 702 name = self._shortened_test_description(test) 703 width = osutils.terminal_width() 704 if width is not None: 705 # width needs space for 6 char status, plus 1 for slash, plus an 706 # 11-char time string, plus a trailing blank 707 # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on 708 # space 709 self.stream.write(self._ellipsize_to_right(name, width - 18)) 710 else: 711 self.stream.write(name) 712 self.stream.flush() 713 714 def _error_summary(self, err): 715 indent = ' ' * 4 716 return '%s%s' % (indent, err[1]) 717 718 def report_error(self, test, err): 719 self.stream.write('ERROR %s\n%s\n' 720 % (self._testTimeString(test), 721 self._error_summary(err))) 722 723 def report_failure(self, test, err): 724 self.stream.write(' FAIL %s\n%s\n' 725 % (self._testTimeString(test), 726 self._error_summary(err))) 727 728 def report_known_failure(self, test, err): 729 self.stream.write('XFAIL %s\n%s\n' 730 % (self._testTimeString(test), 731 self._error_summary(err))) 732 733 def report_unexpected_success(self, test, reason): 734 self.stream.write(' FAIL %s\n%s: %s\n' 735 % (self._testTimeString(test), 736 "Unexpected success. Should have failed", 737 reason)) 738 739 def report_success(self, test): 740 self.stream.write(' OK %s\n' % self._testTimeString(test)) 741 for bench_called, stats in getattr(test, '_benchcalls', []): 742 self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called) 743 stats.pprint(file=self.stream) 744 # flush the stream so that we get smooth output. This verbose mode is 745 # used to show the output in PQM. 746 self.stream.flush() 747 748 def report_skip(self, test, reason): 749 self.stream.write(' SKIP %s\n%s\n' 750 % (self._testTimeString(test), reason)) 751 752 def report_not_applicable(self, test, reason): 753 self.stream.write(' N/A %s\n %s\n' 754 % (self._testTimeString(test), reason)) 755 756 def report_unsupported(self, test, feature): 757 """test cannot be run because feature is missing.""" 758 self.stream.write("NODEP %s\n The feature '%s' is not available.\n" 759 % (self._testTimeString(test), feature)) 760 761 762class TextTestRunner(object): 763 stop_on_failure = False 764 765 def __init__(self, 766 stream=sys.stderr, 767 descriptions=0, 768 verbosity=1, 769 bench_history=None, 770 strict=False, 771 result_decorators=None, 772 ): 773 """Create a TextTestRunner. 774 775 :param result_decorators: An optional list of decorators to apply 776 to the result object being used by the runner. Decorators are 777 applied left to right - the first element in the list is the 778 innermost decorator. 779 """ 780 # stream may know claim to know to write unicode strings, but in older 781 # pythons this goes sufficiently wrong that it is a bad idea. ( 782 # specifically a built in file with encoding 'UTF-8' will still try 783 # to encode using ascii. 784 new_encoding = osutils.get_terminal_encoding() 785 codec = codecs.lookup(new_encoding) 786 encode = codec.encode 787 # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes, 788 # so should swap to the plain codecs.StreamWriter 789 stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream, 790 "backslashreplace") 791 stream.encoding = new_encoding 792 self.stream = stream 793 self.descriptions = descriptions 794 self.verbosity = verbosity 795 self._bench_history = bench_history 796 self._strict = strict 797 self._result_decorators = result_decorators or [] 798 799 def run(self, test): 800 "Run the given test case or test suite." 801 if self.verbosity == 1: 802 result_class = TextTestResult 803 elif self.verbosity >= 2: 804 result_class = VerboseTestResult 805 original_result = result_class(self.stream, 806 self.descriptions, 807 self.verbosity, 808 bench_history=self._bench_history, 809 strict=self._strict, 810 ) 811 # Signal to result objects that look at stop early policy to stop, 812 original_result.stop_early = self.stop_on_failure 813 result = original_result 814 for decorator in self._result_decorators: 815 result = decorator(result) 816 result.stop_early = self.stop_on_failure 817 result.startTestRun() 818 try: 819 test.run(result) 820 finally: 821 result.stopTestRun() 822 # higher level code uses our extended protocol to determine 823 # what exit code to give. 824 return original_result 825 826 827def iter_suite_tests(suite): 828 """Return all tests in a suite, recursing through nested suites""" 829 if isinstance(suite, unittest.TestCase): 830 yield suite 831 elif isinstance(suite, unittest.TestSuite): 832 for item in suite: 833 for r in iter_suite_tests(item): 834 yield r 835 else: 836 raise Exception('unknown type %r for object %r' 837 % (type(suite), suite)) 838 839 840TestSkipped = testtools.testcase.TestSkipped 841 842 843class TestNotApplicable(TestSkipped): 844 """A test is not applicable to the situation where it was run. 845 846 This is only normally raised by parameterized tests, if they find that 847 the instance they're constructed upon does not support one aspect 848 of its interface. 849 """ 850 851 852# traceback._some_str fails to format exceptions that have the default 853# __str__ which does an implicit ascii conversion. However, repr() on those 854# objects works, for all that its not quite what the doctor may have ordered. 855def _clever_some_str(value): 856 try: 857 return str(value) 858 except BaseException: 859 try: 860 return repr(value).replace('\\n', '\n') 861 except BaseException: 862 return '<unprintable %s object>' % type(value).__name__ 863 864 865traceback._some_str = _clever_some_str 866 867 868# deprecated - use self.knownFailure(), or self.expectFailure. 869KnownFailure = testtools.testcase._ExpectedFailure 870 871 872class UnavailableFeature(Exception): 873 """A feature required for this test was not available. 874 875 This can be considered a specialised form of SkippedTest. 876 877 The feature should be used to construct the exception. 878 """ 879 880 881class StringIOWrapper(ui_testing.BytesIOWithEncoding): 882 883 @symbol_versioning.deprecated_method( 884 symbol_versioning.deprecated_in((3, 0))) 885 def __init__(self, s=None): 886 super(StringIOWrapper, self).__init__(s) 887 888 889TestUIFactory = ui_testing.TestUIFactory 890 891 892def isolated_doctest_setUp(test): 893 override_os_environ(test) 894 osutils.set_or_unset_env('BRZ_HOME', '/nonexistent') 895 test._orig_ui_factory = ui.ui_factory 896 ui.ui_factory = ui.SilentUIFactory() 897 898 899def isolated_doctest_tearDown(test): 900 restore_os_environ(test) 901 ui.ui_factory = test._orig_ui_factory 902 903 904def IsolatedDocTestSuite(*args, **kwargs): 905 """Overrides doctest.DocTestSuite to handle isolation. 906 907 The method is really a factory and users are expected to use it as such. 908 """ 909 910 kwargs['setUp'] = isolated_doctest_setUp 911 kwargs['tearDown'] = isolated_doctest_tearDown 912 return doctest.DocTestSuite(*args, **kwargs) 913 914 915class TestCase(testtools.TestCase): 916 """Base class for brz unit tests. 917 918 Tests that need access to disk resources should subclass 919 TestCaseInTempDir not TestCase. 920 921 Error and debug log messages are redirected from their usual 922 location into a temporary file, the contents of which can be 923 retrieved by _get_log(). We use a real OS file, not an in-memory object, 924 so that it can also capture file IO. When the test completes this file 925 is read into memory and removed from disk. 926 927 There are also convenience functions to invoke bzr's command-line 928 routine, and to build and check brz trees. 929 930 In addition to the usual method of overriding tearDown(), this class also 931 allows subclasses to register cleanup functions via addCleanup, which are 932 run in order as the object is torn down. It's less likely this will be 933 accidentally overlooked. 934 """ 935 936 _log_file = None 937 # record lsprof data when performing benchmark calls. 938 _gather_lsprof_in_benchmarks = False 939 940 def __init__(self, methodName='testMethod'): 941 super(TestCase, self).__init__(methodName) 942 self._directory_isolation = True 943 self.exception_handlers.insert( 944 0, (UnavailableFeature, self._do_unsupported_or_skip)) 945 self.exception_handlers.insert( 946 0, (TestNotApplicable, self._do_not_applicable)) 947 948 def setUp(self): 949 super(TestCase, self).setUp() 950 951 # At this point we're still accessing the config files in $BRZ_HOME (as 952 # set by the user running selftest). 953 timeout = config.GlobalStack().get('selftest.timeout') 954 if timeout: 955 timeout_fixture = fixtures.TimeoutFixture(timeout) 956 timeout_fixture.setUp() 957 self.addCleanup(timeout_fixture.cleanUp) 958 959 for feature in getattr(self, '_test_needs_features', []): 960 self.requireFeature(feature) 961 self._cleanEnvironment() 962 963 self.overrideAttr(breezy.get_global_state(), 'cmdline_overrides', 964 config.CommandLineStore()) 965 966 self._silenceUI() 967 self._startLogFile() 968 self._benchcalls = [] 969 self._benchtime = None 970 self._clear_hooks() 971 self._track_transports() 972 self._track_locks() 973 self._clear_debug_flags() 974 # Isolate global verbosity level, to make sure it's reproducible 975 # between tests. We should get rid of this altogether: bug 656694. -- 976 # mbp 20101008 977 self.overrideAttr(breezy.trace, '_verbosity_level', 0) 978 self._log_files = set() 979 # Each key in the ``_counters`` dict holds a value for a different 980 # counter. When the test ends, addDetail() should be used to output the 981 # counter values. This happens in install_counter_hook(). 982 self._counters = {} 983 if 'config_stats' in selftest_debug_flags: 984 self._install_config_stats_hooks() 985 # Do not use i18n for tests (unless the test reverses this) 986 i18n.disable_i18n() 987 988 def debug(self): 989 # debug a frame up. 990 import pdb 991 # The sys preserved stdin/stdout should allow blackbox tests debugging 992 pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__ 993 ).set_trace(sys._getframe().f_back) 994 995 def discardDetail(self, name): 996 """Extend the addDetail, getDetails api so we can remove a detail. 997 998 eg. brz always adds the 'log' detail at startup, but we don't want to 999 include it for skipped, xfail, etc tests. 1000 1001 It is safe to call this for a detail that doesn't exist, in case this 1002 gets called multiple times. 1003 """ 1004 # We cheat. details is stored in __details which means we shouldn't 1005 # touch it. but getDetails() returns the dict directly, so we can 1006 # mutate it. 1007 details = self.getDetails() 1008 if name in details: 1009 del details[name] 1010 1011 def install_counter_hook(self, hooks, name, counter_name=None): 1012 """Install a counting hook. 1013 1014 Any hook can be counted as long as it doesn't need to return a value. 1015 1016 :param hooks: Where the hook should be installed. 1017 1018 :param name: The hook name that will be counted. 1019 1020 :param counter_name: The counter identifier in ``_counters``, defaults 1021 to ``name``. 1022 """ 1023 _counters = self._counters # Avoid closing over self 1024 if counter_name is None: 1025 counter_name = name 1026 if counter_name in _counters: 1027 raise AssertionError('%s is already used as a counter name' 1028 % (counter_name,)) 1029 _counters[counter_name] = 0 1030 self.addDetail(counter_name, content.Content( 1031 content.UTF8_TEXT, 1032 lambda: [b'%d' % (_counters[counter_name],)])) 1033 1034 def increment_counter(*args, **kwargs): 1035 _counters[counter_name] += 1 1036 label = 'count %s calls' % (counter_name,) 1037 hooks.install_named_hook(name, increment_counter, label) 1038 self.addCleanup(hooks.uninstall_named_hook, name, label) 1039 1040 def _install_config_stats_hooks(self): 1041 """Install config hooks to count hook calls. 1042 1043 """ 1044 for hook_name in ('get', 'set', 'remove', 'load', 'save'): 1045 self.install_counter_hook(config.ConfigHooks, hook_name, 1046 'config.%s' % (hook_name,)) 1047 1048 # The OldConfigHooks are private and need special handling to protect 1049 # against recursive tests (tests that run other tests), so we just do 1050 # manually what registering them into _builtin_known_hooks will provide 1051 # us. 1052 self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks()) 1053 for hook_name in ('get', 'set', 'remove', 'load', 'save'): 1054 self.install_counter_hook(config.OldConfigHooks, hook_name, 1055 'old_config.%s' % (hook_name,)) 1056 1057 def _clear_debug_flags(self): 1058 """Prevent externally set debug flags affecting tests. 1059 1060 Tests that want to use debug flags can just set them in the 1061 debug_flags set during setup/teardown. 1062 """ 1063 # Start with a copy of the current debug flags we can safely modify. 1064 self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags)) 1065 if 'allow_debug' not in selftest_debug_flags: 1066 debug.debug_flags.clear() 1067 if 'disable_lock_checks' not in selftest_debug_flags: 1068 debug.debug_flags.add('strict_locks') 1069 1070 def _clear_hooks(self): 1071 # prevent hooks affecting tests 1072 known_hooks = hooks.known_hooks 1073 self._preserved_hooks = {} 1074 for key, (parent, name) in known_hooks.iter_parent_objects(): 1075 current_hooks = getattr(parent, name) 1076 self._preserved_hooks[parent] = (name, current_hooks) 1077 self._preserved_lazy_hooks = hooks._lazy_hooks 1078 hooks._lazy_hooks = {} 1079 self.addCleanup(self._restoreHooks) 1080 for key, (parent, name) in known_hooks.iter_parent_objects(): 1081 factory = known_hooks.get(key) 1082 setattr(parent, name, factory()) 1083 # this hook should always be installed 1084 request._install_hook() 1085 1086 def disable_directory_isolation(self): 1087 """Turn off directory isolation checks.""" 1088 self._directory_isolation = False 1089 1090 def enable_directory_isolation(self): 1091 """Enable directory isolation checks.""" 1092 self._directory_isolation = True 1093 1094 def _silenceUI(self): 1095 """Turn off UI for duration of test""" 1096 # by default the UI is off; tests can turn it on if they want it. 1097 self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory()) 1098 1099 def _check_locks(self): 1100 """Check that all lock take/release actions have been paired.""" 1101 # We always check for mismatched locks. If a mismatch is found, we 1102 # fail unless -Edisable_lock_checks is supplied to selftest, in which 1103 # case we just print a warning. 1104 # unhook: 1105 acquired_locks = [lock for action, lock in self._lock_actions 1106 if action == 'acquired'] 1107 released_locks = [lock for action, lock in self._lock_actions 1108 if action == 'released'] 1109 broken_locks = [lock for action, lock in self._lock_actions 1110 if action == 'broken'] 1111 # trivially, given the tests for lock acquistion and release, if we 1112 # have as many in each list, it should be ok. Some lock tests also 1113 # break some locks on purpose and should be taken into account by 1114 # considering that breaking a lock is just a dirty way of releasing it. 1115 if len(acquired_locks) != (len(released_locks) + len(broken_locks)): 1116 message = ( 1117 'Different number of acquired and ' 1118 'released or broken locks.\n' 1119 'acquired=%s\n' 1120 'released=%s\n' 1121 'broken=%s\n' % 1122 (acquired_locks, released_locks, broken_locks)) 1123 if not self._lock_check_thorough: 1124 # Rather than fail, just warn 1125 print("Broken test %s: %s" % (self, message)) 1126 return 1127 self.fail(message) 1128 1129 def _track_locks(self): 1130 """Track lock activity during tests.""" 1131 self._lock_actions = [] 1132 if 'disable_lock_checks' in selftest_debug_flags: 1133 self._lock_check_thorough = False 1134 else: 1135 self._lock_check_thorough = True 1136 1137 self.addCleanup(self._check_locks) 1138 _mod_lock.Lock.hooks.install_named_hook('lock_acquired', 1139 self._lock_acquired, None) 1140 _mod_lock.Lock.hooks.install_named_hook('lock_released', 1141 self._lock_released, None) 1142 _mod_lock.Lock.hooks.install_named_hook('lock_broken', 1143 self._lock_broken, None) 1144 1145 def _lock_acquired(self, result): 1146 self._lock_actions.append(('acquired', result)) 1147 1148 def _lock_released(self, result): 1149 self._lock_actions.append(('released', result)) 1150 1151 def _lock_broken(self, result): 1152 self._lock_actions.append(('broken', result)) 1153 1154 def permit_dir(self, name): 1155 """Permit a directory to be used by this test. See permit_url.""" 1156 name_transport = _mod_transport.get_transport_from_path(name) 1157 self.permit_url(name) 1158 self.permit_url(name_transport.base) 1159 1160 def permit_url(self, url): 1161 """Declare that url is an ok url to use in this test. 1162 1163 Do this for memory transports, temporary test directory etc. 1164 1165 Do not do this for the current working directory, /tmp, or any other 1166 preexisting non isolated url. 1167 """ 1168 if not url.endswith('/'): 1169 url += '/' 1170 self._bzr_selftest_roots.append(url) 1171 1172 def permit_source_tree_branch_repo(self): 1173 """Permit the source tree brz is running from to be opened. 1174 1175 Some code such as breezy.version attempts to read from the brz branch 1176 that brz is executing from (if any). This method permits that directory 1177 to be used in the test suite. 1178 """ 1179 path = self.get_source_path() 1180 self.record_directory_isolation() 1181 try: 1182 try: 1183 workingtree.WorkingTree.open(path) 1184 except (errors.NotBranchError, errors.NoWorkingTree): 1185 raise TestSkipped('Needs a working tree of brz sources') 1186 finally: 1187 self.enable_directory_isolation() 1188 1189 def _preopen_isolate_transport(self, transport): 1190 """Check that all transport openings are done in the test work area.""" 1191 while isinstance(transport, pathfilter.PathFilteringTransport): 1192 # Unwrap pathfiltered transports 1193 transport = transport.server.backing_transport.clone( 1194 transport._filter('.')) 1195 url = transport.base 1196 # ReadonlySmartTCPServer_for_testing decorates the backing transport 1197 # urls it is given by prepending readonly+. This is appropriate as the 1198 # client shouldn't know that the server is readonly (or not readonly). 1199 # We could register all servers twice, with readonly+ prepending, but 1200 # that makes for a long list; this is about the same but easier to 1201 # read. 1202 if url.startswith('readonly+'): 1203 url = url[len('readonly+'):] 1204 self._preopen_isolate_url(url) 1205 1206 def _preopen_isolate_url(self, url): 1207 if not self._directory_isolation: 1208 return 1209 if self._directory_isolation == 'record': 1210 self._bzr_selftest_roots.append(url) 1211 return 1212 # This prevents all transports, including e.g. sftp ones backed on disk 1213 # from working unless they are explicitly granted permission. We then 1214 # depend on the code that sets up test transports to check that they 1215 # are appropriately isolated and enable their use by calling 1216 # self.permit_transport() 1217 if not osutils.is_inside_any(self._bzr_selftest_roots, url): 1218 raise errors.BzrError("Attempt to escape test isolation: %r %r" 1219 % (url, self._bzr_selftest_roots)) 1220 1221 def record_directory_isolation(self): 1222 """Gather accessed directories to permit later access. 1223 1224 This is used for tests that access the branch brz is running from. 1225 """ 1226 self._directory_isolation = "record" 1227 1228 def start_server(self, transport_server, backing_server=None): 1229 """Start transport_server for this test. 1230 1231 This starts the server, registers a cleanup for it and permits the 1232 server's urls to be used. 1233 """ 1234 if backing_server is None: 1235 transport_server.start_server() 1236 else: 1237 transport_server.start_server(backing_server) 1238 self.addCleanup(transport_server.stop_server) 1239 # Obtain a real transport because if the server supplies a password, it 1240 # will be hidden from the base on the client side. 1241 t = _mod_transport.get_transport_from_url(transport_server.get_url()) 1242 # Some transport servers effectively chroot the backing transport; 1243 # others like SFTPServer don't - users of the transport can walk up the 1244 # transport to read the entire backing transport. This wouldn't matter 1245 # except that the workdir tests are given - and that they expect the 1246 # server's url to point at - is one directory under the safety net. So 1247 # Branch operations into the transport will attempt to walk up one 1248 # directory. Chrooting all servers would avoid this but also mean that 1249 # we wouldn't be testing directly against non-root urls. Alternatively 1250 # getting the test framework to start the server with a backing server 1251 # at the actual safety net directory would work too, but this then 1252 # means that the self.get_url/self.get_transport methods would need 1253 # to transform all their results. On balance its cleaner to handle it 1254 # here, and permit a higher url when we have one of these transports. 1255 if t.base.endswith('/work/'): 1256 # we have safety net/test root/work 1257 t = t.clone('../..') 1258 elif isinstance(transport_server, 1259 test_server.SmartTCPServer_for_testing): 1260 # The smart server adds a path similar to work, which is traversed 1261 # up from by the client. But the server is chrooted - the actual 1262 # backing transport is not escaped from, and VFS requests to the 1263 # root will error (because they try to escape the chroot). 1264 t2 = t.clone('..') 1265 while t2.base != t.base: 1266 t = t2 1267 t2 = t.clone('..') 1268 self.permit_url(t.base) 1269 1270 def _track_transports(self): 1271 """Install checks for transport usage.""" 1272 # TestCase has no safe place it can write to. 1273 self._bzr_selftest_roots = [] 1274 # Currently the easiest way to be sure that nothing is going on is to 1275 # hook into brz dir opening. This leaves a small window of error for 1276 # transport tests, but they are well known, and we can improve on this 1277 # step. 1278 controldir.ControlDir.hooks.install_named_hook( 1279 "pre_open", self._preopen_isolate_transport, 1280 "Check brz directories are safe.") 1281 1282 def _ndiff_strings(self, a, b): 1283 """Return ndiff between two strings containing lines. 1284 1285 A trailing newline is added if missing to make the strings 1286 print properly.""" 1287 if b and not b.endswith('\n'): 1288 b += '\n' 1289 if a and not a.endswith('\n'): 1290 a += '\n' 1291 difflines = difflib.ndiff(a.splitlines(True), 1292 b.splitlines(True), 1293 linejunk=lambda x: False, 1294 charjunk=lambda x: False) 1295 return ''.join(difflines) 1296 1297 def assertEqual(self, a, b, message=''): 1298 try: 1299 if a == b: 1300 return 1301 except UnicodeError as e: 1302 # If we can't compare without getting a UnicodeError, then 1303 # obviously they are different 1304 trace.mutter('UnicodeError: %s', e) 1305 if message: 1306 message += '\n' 1307 raise AssertionError("%snot equal:\na = %s\nb = %s\n" 1308 % (message, 1309 pprint.pformat(a), pprint.pformat(b))) 1310 1311 # FIXME: This is deprecated in unittest2 but plugins may still use it so we 1312 # need a deprecation period for them -- vila 2016-02-01 1313 assertEquals = assertEqual 1314 1315 def assertEqualDiff(self, a, b, message=None): 1316 """Assert two texts are equal, if not raise an exception. 1317 1318 This is intended for use with multi-line strings where it can 1319 be hard to find the differences by eye. 1320 """ 1321 # TODO: perhaps override assertEqual to call this for strings? 1322 if a == b: 1323 return 1324 if message is None: 1325 message = "texts not equal:\n" 1326 if a + ('\n' if isinstance(a, str) else b'\n') == b: 1327 message = 'first string is missing a final newline.\n' 1328 if a == b + ('\n' if isinstance(b, str) else b'\n'): 1329 message = 'second string is missing a final newline.\n' 1330 raise AssertionError(message 1331 + self._ndiff_strings( 1332 a if isinstance(a, str) else a.decode(), 1333 b if isinstance(b, str) else b.decode())) 1334 1335 def assertEqualMode(self, mode, mode_test): 1336 self.assertEqual(mode, mode_test, 1337 'mode mismatch %o != %o' % (mode, mode_test)) 1338 1339 def assertEqualStat(self, expected, actual): 1340 """assert that expected and actual are the same stat result. 1341 1342 :param expected: A stat result. 1343 :param actual: A stat result. 1344 :raises AssertionError: If the expected and actual stat values differ 1345 other than by atime. 1346 """ 1347 self.assertEqual(expected.st_size, actual.st_size, 1348 'st_size did not match') 1349 self.assertEqual(expected.st_mtime, actual.st_mtime, 1350 'st_mtime did not match') 1351 self.assertEqual(expected.st_ctime, actual.st_ctime, 1352 'st_ctime did not match') 1353 if sys.platform == 'win32': 1354 # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it 1355 # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is 1356 # odd. We just force it to always be 0 to avoid any problems. 1357 self.assertEqual(0, expected.st_dev) 1358 self.assertEqual(0, actual.st_dev) 1359 self.assertEqual(0, expected.st_ino) 1360 self.assertEqual(0, actual.st_ino) 1361 else: 1362 self.assertEqual(expected.st_dev, actual.st_dev, 1363 'st_dev did not match') 1364 self.assertEqual(expected.st_ino, actual.st_ino, 1365 'st_ino did not match') 1366 self.assertEqual(expected.st_mode, actual.st_mode, 1367 'st_mode did not match') 1368 1369 def assertLength(self, length, obj_with_len): 1370 """Assert that obj_with_len is of length length.""" 1371 if len(obj_with_len) != length: 1372 self.fail("Incorrect length: wanted %d, got %d for %r" % ( 1373 length, len(obj_with_len), obj_with_len)) 1374 1375 def assertLogsError(self, exception_class, func, *args, **kwargs): 1376 """Assert that `func(*args, **kwargs)` quietly logs a specific error. 1377 """ 1378 captured = [] 1379 orig_log_exception_quietly = trace.log_exception_quietly 1380 try: 1381 def capture(): 1382 orig_log_exception_quietly() 1383 captured.append(sys.exc_info()[1]) 1384 trace.log_exception_quietly = capture 1385 func(*args, **kwargs) 1386 finally: 1387 trace.log_exception_quietly = orig_log_exception_quietly 1388 self.assertLength(1, captured) 1389 err = captured[0] 1390 self.assertIsInstance(err, exception_class) 1391 return err 1392 1393 def assertPositive(self, val): 1394 """Assert that val is greater than 0.""" 1395 self.assertTrue(val > 0, 'expected a positive value, but got %s' % val) 1396 1397 def assertNegative(self, val): 1398 """Assert that val is less than 0.""" 1399 self.assertTrue(val < 0, 'expected a negative value, but got %s' % val) 1400 1401 def assertStartsWith(self, s, prefix): 1402 if not s.startswith(prefix): 1403 raise AssertionError( 1404 'string %r does not start with %r' % (s, prefix)) 1405 1406 def assertEndsWith(self, s, suffix): 1407 """Asserts that s ends with suffix.""" 1408 if not s.endswith(suffix): 1409 raise AssertionError( 1410 'string %r does not end with %r' % (s, suffix)) 1411 1412 def assertContainsRe(self, haystack, needle_re, flags=0): 1413 """Assert that a contains something matching a regular expression.""" 1414 if not re.search(needle_re, haystack, flags): 1415 if (('\n' if isinstance(haystack, str) else b'\n') in haystack or 1416 len(haystack) > 60): 1417 # a long string, format it in a more readable way 1418 raise AssertionError( 1419 'pattern "%s" not found in\n"""\\\n%s"""\n' 1420 % (needle_re, haystack)) 1421 else: 1422 raise AssertionError('pattern "%s" not found in "%s"' 1423 % (needle_re, haystack)) 1424 1425 def assertNotContainsRe(self, haystack, needle_re, flags=0): 1426 """Assert that a does not match a regular expression""" 1427 if re.search(needle_re, haystack, flags): 1428 raise AssertionError('pattern "%s" found in "%s"' 1429 % (needle_re, haystack)) 1430 1431 def assertContainsString(self, haystack, needle): 1432 if haystack.find(needle) == -1: 1433 self.fail("string %r not found in '''%s'''" % (needle, haystack)) 1434 1435 def assertNotContainsString(self, haystack, needle): 1436 if haystack.find(needle) != -1: 1437 self.fail("string %r found in '''%s'''" % (needle, haystack)) 1438 1439 def assertSubset(self, sublist, superlist): 1440 """Assert that every entry in sublist is present in superlist.""" 1441 missing = set(sublist) - set(superlist) 1442 if len(missing) > 0: 1443 raise AssertionError("value(s) %r not present in container %r" % 1444 (missing, superlist)) 1445 1446 def assertListRaises(self, excClass, func, *args, **kwargs): 1447 """Fail unless excClass is raised when the iterator from func is used. 1448 1449 Many functions can return generators this makes sure 1450 to wrap them in a list() call to make sure the whole generator 1451 is run, and that the proper exception is raised. 1452 """ 1453 try: 1454 list(func(*args, **kwargs)) 1455 except excClass as e: 1456 return e 1457 else: 1458 if getattr(excClass, '__name__', None) is not None: 1459 excName = excClass.__name__ 1460 else: 1461 excName = str(excClass) 1462 raise self.failureException("%s not raised" % excName) 1463 1464 def assertRaises(self, excClass, callableObj, *args, **kwargs): 1465 """Assert that a callable raises a particular exception. 1466 1467 :param excClass: As for the except statement, this may be either an 1468 exception class, or a tuple of classes. 1469 :param callableObj: A callable, will be passed ``*args`` and 1470 ``**kwargs``. 1471 1472 Returns the exception so that you can examine it. 1473 """ 1474 try: 1475 callableObj(*args, **kwargs) 1476 except excClass as e: 1477 return e 1478 else: 1479 if getattr(excClass, '__name__', None) is not None: 1480 excName = excClass.__name__ 1481 else: 1482 # probably a tuple 1483 excName = str(excClass) 1484 raise self.failureException("%s not raised" % excName) 1485 1486 def assertIs(self, left, right, message=None): 1487 if not (left is right): 1488 if message is not None: 1489 raise AssertionError(message) 1490 else: 1491 raise AssertionError("%r is not %r." % (left, right)) 1492 1493 def assertIsNot(self, left, right, message=None): 1494 if (left is right): 1495 if message is not None: 1496 raise AssertionError(message) 1497 else: 1498 raise AssertionError("%r is %r." % (left, right)) 1499 1500 def assertTransportMode(self, transport, path, mode): 1501 """Fail if a path does not have mode "mode". 1502 1503 If modes are not supported on this transport, the assertion is ignored. 1504 """ 1505 if not transport._can_roundtrip_unix_modebits(): 1506 return 1507 path_stat = transport.stat(path) 1508 actual_mode = stat.S_IMODE(path_stat.st_mode) 1509 self.assertEqual(mode, actual_mode, 1510 'mode of %r incorrect (%s != %s)' 1511 % (path, oct(mode), oct(actual_mode))) 1512 1513 def assertIsSameRealPath(self, path1, path2): 1514 """Fail if path1 and path2 points to different files""" 1515 self.assertEqual(osutils.realpath(path1), 1516 osutils.realpath(path2), 1517 "apparent paths:\na = %s\nb = %s\n," % (path1, path2)) 1518 1519 def assertIsInstance(self, obj, kls, msg=None): 1520 """Fail if obj is not an instance of kls 1521 1522 :param msg: Supplementary message to show if the assertion fails. 1523 """ 1524 if not isinstance(obj, kls): 1525 m = "%r is an instance of %s rather than %s" % ( 1526 obj, obj.__class__, kls) 1527 if msg: 1528 m += ": " + msg 1529 self.fail(m) 1530 1531 def assertFileEqual(self, content, path): 1532 """Fail if path does not contain 'content'.""" 1533 self.assertPathExists(path) 1534 1535 mode = 'r' + ('b' if isinstance(content, bytes) else '') 1536 with open(path, mode) as f: 1537 s = f.read() 1538 self.assertEqualDiff(content, s) 1539 1540 def assertDocstring(self, expected_docstring, obj): 1541 """Fail if obj does not have expected_docstring""" 1542 if __doc__ is None: 1543 # With -OO the docstring should be None instead 1544 self.assertIs(obj.__doc__, None) 1545 else: 1546 self.assertEqual(expected_docstring, obj.__doc__) 1547 1548 def assertPathExists(self, path): 1549 """Fail unless path or paths, which may be abs or relative, exist.""" 1550 # TODO(jelmer): Clean this up for pad.lv/1696545 1551 if not isinstance(path, (bytes, str)): 1552 for p in path: 1553 self.assertPathExists(p) 1554 else: 1555 self.assertTrue(osutils.lexists(path), 1556 path + " does not exist") 1557 1558 def assertPathDoesNotExist(self, path): 1559 """Fail if path or paths, which may be abs or relative, exist.""" 1560 if not isinstance(path, (str, str)): 1561 for p in path: 1562 self.assertPathDoesNotExist(p) 1563 else: 1564 self.assertFalse(osutils.lexists(path), 1565 path + " exists") 1566 1567 def _capture_deprecation_warnings(self, a_callable, *args, **kwargs): 1568 """A helper for callDeprecated and applyDeprecated. 1569 1570 :param a_callable: A callable to call. 1571 :param args: The positional arguments for the callable 1572 :param kwargs: The keyword arguments for the callable 1573 :return: A tuple (warnings, result). result is the result of calling 1574 a_callable(``*args``, ``**kwargs``). 1575 """ 1576 local_warnings = [] 1577 1578 def capture_warnings(msg, cls=None, stacklevel=None): 1579 # we've hooked into a deprecation specific callpath, 1580 # only deprecations should getting sent via it. 1581 self.assertEqual(cls, DeprecationWarning) 1582 local_warnings.append(msg) 1583 original_warning_method = symbol_versioning.warn 1584 symbol_versioning.set_warning_method(capture_warnings) 1585 try: 1586 result = a_callable(*args, **kwargs) 1587 finally: 1588 symbol_versioning.set_warning_method(original_warning_method) 1589 return (local_warnings, result) 1590 1591 def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs): 1592 """Call a deprecated callable without warning the user. 1593 1594 Note that this only captures warnings raised by symbol_versioning.warn, 1595 not other callers that go direct to the warning module. 1596 1597 To test that a deprecated method raises an error, do something like 1598 this (remember that both assertRaises and applyDeprecated delays *args 1599 and **kwargs passing):: 1600 1601 self.assertRaises(errors.ReservedId, 1602 self.applyDeprecated, 1603 deprecated_in((1, 5, 0)), 1604 br.append_revision, 1605 'current:') 1606 1607 :param deprecation_format: The deprecation format that the callable 1608 should have been deprecated with. This is the same type as the 1609 parameter to deprecated_method/deprecated_function. If the 1610 callable is not deprecated with this format, an assertion error 1611 will be raised. 1612 :param a_callable: A callable to call. This may be a bound method or 1613 a regular function. It will be called with ``*args`` and 1614 ``**kwargs``. 1615 :param args: The positional arguments for the callable 1616 :param kwargs: The keyword arguments for the callable 1617 :return: The result of a_callable(``*args``, ``**kwargs``) 1618 """ 1619 call_warnings, result = self._capture_deprecation_warnings( 1620 a_callable, *args, **kwargs) 1621 expected_first_warning = symbol_versioning.deprecation_string( 1622 a_callable, deprecation_format) 1623 if len(call_warnings) == 0: 1624 self.fail("No deprecation warning generated by call to %s" % 1625 a_callable) 1626 self.assertEqual(expected_first_warning, call_warnings[0]) 1627 return result 1628 1629 def callCatchWarnings(self, fn, *args, **kw): 1630 """Call a callable that raises python warnings. 1631 1632 The caller's responsible for examining the returned warnings. 1633 1634 If the callable raises an exception, the exception is not 1635 caught and propagates up to the caller. In that case, the list 1636 of warnings is not available. 1637 1638 :returns: ([warning_object, ...], fn_result) 1639 """ 1640 # XXX: This is not perfect, because it completely overrides the 1641 # warnings filters, and some code may depend on suppressing particular 1642 # warnings. It's the easiest way to insulate ourselves from -Werror, 1643 # though. -- Andrew, 20071062 1644 wlist = [] 1645 1646 def _catcher(message, category, filename, lineno, file=None, line=None): 1647 # despite the name, 'message' is normally(?) a Warning subclass 1648 # instance 1649 wlist.append(message) 1650 saved_showwarning = warnings.showwarning 1651 saved_filters = warnings.filters 1652 try: 1653 warnings.showwarning = _catcher 1654 warnings.filters = [] 1655 result = fn(*args, **kw) 1656 finally: 1657 warnings.showwarning = saved_showwarning 1658 warnings.filters = saved_filters 1659 return wlist, result 1660 1661 def callDeprecated(self, expected, callable, *args, **kwargs): 1662 """Assert that a callable is deprecated in a particular way. 1663 1664 This is a very precise test for unusual requirements. The 1665 applyDeprecated helper function is probably more suited for most tests 1666 as it allows you to simply specify the deprecation format being used 1667 and will ensure that that is issued for the function being called. 1668 1669 Note that this only captures warnings raised by symbol_versioning.warn, 1670 not other callers that go direct to the warning module. To catch 1671 general warnings, use callCatchWarnings. 1672 1673 :param expected: a list of the deprecation warnings expected, in order 1674 :param callable: The callable to call 1675 :param args: The positional arguments for the callable 1676 :param kwargs: The keyword arguments for the callable 1677 """ 1678 call_warnings, result = self._capture_deprecation_warnings( 1679 callable, *args, **kwargs) 1680 self.assertEqual(expected, call_warnings) 1681 return result 1682 1683 def _startLogFile(self): 1684 """Setup a in-memory target for bzr and testcase log messages""" 1685 pseudo_log_file = BytesIO() 1686 1687 def _get_log_contents_for_weird_testtools_api(): 1688 return [pseudo_log_file.getvalue().decode( 1689 "utf-8", "replace").encode("utf-8")] 1690 self.addDetail( 1691 "log", content.Content( 1692 content.ContentType("text", "plain", {"charset": "utf8"}), 1693 _get_log_contents_for_weird_testtools_api)) 1694 self._log_file = pseudo_log_file 1695 self._log_memento = trace.push_log_file(self._log_file) 1696 self.addCleanup(self._finishLogFile) 1697 1698 def _finishLogFile(self): 1699 """Flush and dereference the in-memory log for this testcase""" 1700 if trace._trace_file: 1701 # flush the log file, to get all content 1702 trace._trace_file.flush() 1703 trace.pop_log_file(self._log_memento) 1704 # The logging module now tracks references for cleanup so discard ours 1705 del self._log_memento 1706 1707 def thisFailsStrictLockCheck(self): 1708 """It is known that this test would fail with -Dstrict_locks. 1709 1710 By default, all tests are run with strict lock checking unless 1711 -Edisable_lock_checks is supplied. However there are some tests which 1712 we know fail strict locks at this point that have not been fixed. 1713 They should call this function to disable the strict checking. 1714 1715 This should be used sparingly, it is much better to fix the locking 1716 issues rather than papering over the problem by calling this function. 1717 """ 1718 debug.debug_flags.discard('strict_locks') 1719 1720 def overrideAttr(self, obj, attr_name, new=_unitialized_attr): 1721 """Overrides an object attribute restoring it after the test. 1722 1723 :note: This should be used with discretion; you should think about 1724 whether it's better to make the code testable without monkey-patching. 1725 1726 :param obj: The object that will be mutated. 1727 1728 :param attr_name: The attribute name we want to preserve/override in 1729 the object. 1730 1731 :param new: The optional value we want to set the attribute to. 1732 1733 :returns: The actual attr value. 1734 """ 1735 # The actual value is captured by the call below 1736 value = getattr(obj, attr_name, _unitialized_attr) 1737 if value is _unitialized_attr: 1738 # When the test completes, the attribute should not exist, but if 1739 # we aren't setting a value, we don't need to do anything. 1740 if new is not _unitialized_attr: 1741 self.addCleanup(delattr, obj, attr_name) 1742 else: 1743 self.addCleanup(setattr, obj, attr_name, value) 1744 if new is not _unitialized_attr: 1745 setattr(obj, attr_name, new) 1746 return value 1747 1748 def overrideEnv(self, name, new): 1749 """Set an environment variable, and reset it after the test. 1750 1751 :param name: The environment variable name. 1752 1753 :param new: The value to set the variable to. If None, the 1754 variable is deleted from the environment. 1755 1756 :returns: The actual variable value. 1757 """ 1758 value = osutils.set_or_unset_env(name, new) 1759 self.addCleanup(osutils.set_or_unset_env, name, value) 1760 return value 1761 1762 def recordCalls(self, obj, attr_name): 1763 """Monkeypatch in a wrapper that will record calls. 1764 1765 The monkeypatch is automatically removed when the test concludes. 1766 1767 :param obj: The namespace holding the reference to be replaced; 1768 typically a module, class, or object. 1769 :param attr_name: A string for the name of the attribute to patch. 1770 :returns: A list that will be extended with one item every time the 1771 function is called, with a tuple of (args, kwargs). 1772 """ 1773 calls = [] 1774 1775 def decorator(*args, **kwargs): 1776 calls.append((args, kwargs)) 1777 return orig(*args, **kwargs) 1778 orig = self.overrideAttr(obj, attr_name, decorator) 1779 return calls 1780 1781 def _cleanEnvironment(self): 1782 for name, value in isolated_environ.items(): 1783 self.overrideEnv(name, value) 1784 1785 def _restoreHooks(self): 1786 for klass, (name, hooks) in self._preserved_hooks.items(): 1787 setattr(klass, name, hooks) 1788 self._preserved_hooks.clear() 1789 breezy.hooks._lazy_hooks = self._preserved_lazy_hooks 1790 self._preserved_lazy_hooks.clear() 1791 1792 def knownFailure(self, reason): 1793 """Declare that this test fails for a known reason 1794 1795 Tests that are known to fail should generally be using expectedFailure 1796 with an appropriate reverse assertion if a change could cause the test 1797 to start passing. Conversely if the test has no immediate prospect of 1798 succeeding then using skip is more suitable. 1799 1800 When this method is called while an exception is being handled, that 1801 traceback will be used, otherwise a new exception will be thrown to 1802 provide one but won't be reported. 1803 """ 1804 self._add_reason(reason) 1805 try: 1806 exc_info = sys.exc_info() 1807 if exc_info != (None, None, None): 1808 self._report_traceback(exc_info) 1809 else: 1810 try: 1811 raise self.failureException(reason) 1812 except self.failureException: 1813 exc_info = sys.exc_info() 1814 # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too? 1815 raise testtools.testcase._ExpectedFailure(exc_info) 1816 finally: 1817 del exc_info 1818 1819 def _suppress_log(self): 1820 """Remove the log info from details.""" 1821 self.discardDetail('log') 1822 1823 def _do_skip(self, result, reason): 1824 self._suppress_log() 1825 addSkip = getattr(result, 'addSkip', None) 1826 if not callable(addSkip): 1827 result.addSuccess(result) 1828 else: 1829 addSkip(self, str(reason)) 1830 1831 @staticmethod 1832 def _do_known_failure(self, result, e): 1833 self._suppress_log() 1834 err = sys.exc_info() 1835 addExpectedFailure = getattr(result, 'addExpectedFailure', None) 1836 if addExpectedFailure is not None: 1837 addExpectedFailure(self, err) 1838 else: 1839 result.addSuccess(self) 1840 1841 @staticmethod 1842 def _do_not_applicable(self, result, e): 1843 if not e.args: 1844 reason = 'No reason given' 1845 else: 1846 reason = e.args[0] 1847 self._suppress_log() 1848 addNotApplicable = getattr(result, 'addNotApplicable', None) 1849 if addNotApplicable is not None: 1850 result.addNotApplicable(self, reason) 1851 else: 1852 self._do_skip(result, reason) 1853 1854 @staticmethod 1855 def _report_skip(self, result, err): 1856 """Override the default _report_skip. 1857 1858 We want to strip the 'log' detail. If we waint until _do_skip, it has 1859 already been formatted into the 'reason' string, and we can't pull it 1860 out again. 1861 """ 1862 self._suppress_log() 1863 super(TestCase, self)._report_skip(self, result, err) 1864 1865 @staticmethod 1866 def _report_expected_failure(self, result, err): 1867 """Strip the log. 1868 1869 See _report_skip for motivation. 1870 """ 1871 self._suppress_log() 1872 super(TestCase, self)._report_expected_failure(self, result, err) 1873 1874 @staticmethod 1875 def _do_unsupported_or_skip(self, result, e): 1876 reason = e.args[0] 1877 self._suppress_log() 1878 addNotSupported = getattr(result, 'addNotSupported', None) 1879 if addNotSupported is not None: 1880 result.addNotSupported(self, reason) 1881 else: 1882 self._do_skip(result, reason) 1883 1884 def time(self, callable, *args, **kwargs): 1885 """Run callable and accrue the time it takes to the benchmark time. 1886 1887 If lsprofiling is enabled (i.e. by --lsprof-time to brz selftest) then 1888 this will cause lsprofile statistics to be gathered and stored in 1889 self._benchcalls. 1890 """ 1891 if self._benchtime is None: 1892 self.addDetail('benchtime', content.Content( 1893 content.UTF8_TEXT, 1894 lambda: [str(self._benchtime).encode('utf-8')])) 1895 self._benchtime = 0 1896 start = time.time() 1897 try: 1898 if not self._gather_lsprof_in_benchmarks: 1899 return callable(*args, **kwargs) 1900 else: 1901 # record this benchmark 1902 ret, stats = breezy.lsprof.profile(callable, *args, **kwargs) 1903 stats.sort() 1904 self._benchcalls.append(((callable, args, kwargs), stats)) 1905 return ret 1906 finally: 1907 self._benchtime += time.time() - start 1908 1909 def log(self, *args): 1910 trace.mutter(*args) 1911 1912 def get_log(self): 1913 """Get a unicode string containing the log from breezy.trace. 1914 1915 Undecodable characters are replaced. 1916 """ 1917 return u"".join(self.getDetails()['log'].iter_text()) 1918 1919 def requireFeature(self, feature): 1920 """This test requires a specific feature is available. 1921 1922 :raises UnavailableFeature: When feature is not available. 1923 """ 1924 if not feature.available(): 1925 raise UnavailableFeature(feature) 1926 1927 def _run_bzr_core(self, args, encoding, stdin, stdout, stderr, 1928 working_dir): 1929 # Clear chk_map page cache, because the contents are likely to mask 1930 # locking errors. 1931 chk_map.clear_cache() 1932 1933 self.log('run brz: %r', args) 1934 1935 self._last_cmd_stdout = stdout 1936 self._last_cmd_stderr = stderr 1937 1938 old_ui_factory = ui.ui_factory 1939 ui.ui_factory = ui_testing.TestUIFactory( 1940 stdin=stdin, 1941 stdout=self._last_cmd_stdout, 1942 stderr=self._last_cmd_stderr) 1943 1944 cwd = None 1945 if working_dir is not None: 1946 cwd = osutils.getcwd() 1947 os.chdir(working_dir) 1948 1949 try: 1950 with ui.ui_factory: 1951 result = self.apply_redirected( 1952 ui.ui_factory.stdin, 1953 stdout, stderr, 1954 _mod_commands.run_bzr_catch_user_errors, 1955 args) 1956 finally: 1957 ui.ui_factory = old_ui_factory 1958 if cwd is not None: 1959 os.chdir(cwd) 1960 1961 return result 1962 1963 def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None, 1964 working_dir=None, error_regexes=[]): 1965 """Invoke brz, as if it were run from the command line. 1966 1967 The argument list should not include the brz program name - the 1968 first argument is normally the brz command. Arguments may be 1969 passed in three ways: 1970 1971 1- A list of strings, eg ["commit", "a"]. This is recommended 1972 when the command contains whitespace or metacharacters, or 1973 is built up at run time. 1974 1975 2- A single string, eg "add a". This is the most convenient 1976 for hardcoded commands. 1977 1978 This runs brz through the interface that catches and reports 1979 errors, and with logging set to something approximating the 1980 default, so that error reporting can be checked. 1981 1982 This should be the main method for tests that want to exercise the 1983 overall behavior of the brz application (rather than a unit test 1984 or a functional test of the library.) 1985 1986 This sends the stdout/stderr results into the test's log, 1987 where it may be useful for debugging. See also run_captured. 1988 1989 :keyword stdin: A string to be used as stdin for the command. 1990 :keyword retcode: The status code the command should return; 1991 default 0. 1992 :keyword working_dir: The directory to run the command in 1993 :keyword error_regexes: A list of expected error messages. If 1994 specified they must be seen in the error output of the command. 1995 """ 1996 if isinstance(args, str): 1997 args = shlex.split(args) 1998 1999 if encoding is None: 2000 encoding = osutils.get_user_encoding() 2001 2002 stdout = BytesIO() 2003 stderr = BytesIO() 2004 wrapped_stdout = TextIOWrapper(stdout, encoding) 2005 wrapped_stderr = TextIOWrapper(stderr, encoding) 2006 handler = logging.StreamHandler(wrapped_stderr) 2007 handler.setLevel(logging.INFO) 2008 2009 logger = logging.getLogger('') 2010 logger.addHandler(handler) 2011 try: 2012 result = self._run_bzr_core( 2013 args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout, 2014 stderr=wrapped_stderr, working_dir=working_dir, 2015 ) 2016 finally: 2017 logger.removeHandler(handler) 2018 2019 wrapped_stdout.flush() 2020 wrapped_stderr.flush() 2021 2022 out = stdout.getvalue() 2023 err = stderr.getvalue() 2024 if out: 2025 self.log('output:\n%r', out) 2026 if err: 2027 self.log('errors:\n%r', err) 2028 if retcode is not None: 2029 self.assertEqual(retcode, result, 2030 message='Unexpected return code') 2031 self.assertIsInstance(error_regexes, (list, tuple)) 2032 for regex in error_regexes: 2033 self.assertContainsRe(err, regex) 2034 return out, err 2035 2036 def run_bzr(self, args, retcode=0, stdin=None, encoding=None, 2037 working_dir=None, error_regexes=[]): 2038 """Invoke brz, as if it were run from the command line. 2039 2040 The argument list should not include the brz program name - the 2041 first argument is normally the brz command. Arguments may be 2042 passed in three ways: 2043 2044 1- A list of strings, eg ["commit", "a"]. This is recommended 2045 when the command contains whitespace or metacharacters, or 2046 is built up at run time. 2047 2048 2- A single string, eg "add a". This is the most convenient 2049 for hardcoded commands. 2050 2051 This runs brz through the interface that catches and reports 2052 errors, and with logging set to something approximating the 2053 default, so that error reporting can be checked. 2054 2055 This should be the main method for tests that want to exercise the 2056 overall behavior of the brz application (rather than a unit test 2057 or a functional test of the library.) 2058 2059 This sends the stdout/stderr results into the test's log, 2060 where it may be useful for debugging. See also run_captured. 2061 2062 :keyword stdin: A string to be used as stdin for the command. 2063 :keyword retcode: The status code the command should return; 2064 default 0. 2065 :keyword working_dir: The directory to run the command in 2066 :keyword error_regexes: A list of expected error messages. If 2067 specified they must be seen in the error output of the command. 2068 """ 2069 if isinstance(args, str): 2070 args = shlex.split(args) 2071 2072 if encoding is None: 2073 encoding = osutils.get_user_encoding() 2074 2075 stdout = ui_testing.StringIOWithEncoding() 2076 stderr = ui_testing.StringIOWithEncoding() 2077 stdout.encoding = stderr.encoding = encoding 2078 handler = logging.StreamHandler(stream=stderr) 2079 handler.setLevel(logging.INFO) 2080 2081 logger = logging.getLogger('') 2082 logger.addHandler(handler) 2083 2084 try: 2085 result = self._run_bzr_core( 2086 args, encoding=encoding, stdin=stdin, stdout=stdout, 2087 stderr=stderr, working_dir=working_dir) 2088 finally: 2089 logger.removeHandler(handler) 2090 2091 out = stdout.getvalue() 2092 err = stderr.getvalue() 2093 if out: 2094 self.log('output:\n%r', out) 2095 if err: 2096 self.log('errors:\n%r', err) 2097 if retcode is not None: 2098 self.assertEqual(retcode, result, 2099 message='Unexpected return code') 2100 self.assertIsInstance(error_regexes, (list, tuple)) 2101 for regex in error_regexes: 2102 self.assertContainsRe(err, regex) 2103 return out, err 2104 2105 def run_bzr_error(self, error_regexes, *args, **kwargs): 2106 """Run brz, and check that stderr contains the supplied regexes 2107 2108 :param error_regexes: Sequence of regular expressions which 2109 must each be found in the error output. The relative ordering 2110 is not enforced. 2111 :param args: command-line arguments for brz 2112 :param kwargs: Keyword arguments which are interpreted by run_brz 2113 This function changes the default value of retcode to be 3, 2114 since in most cases this is run when you expect brz to fail. 2115 2116 :return: (out, err) The actual output of running the command (in case 2117 you want to do more inspection) 2118 2119 Examples of use:: 2120 2121 # Make sure that commit is failing because there is nothing to do 2122 self.run_bzr_error(['no changes to commit'], 2123 ['commit', '-m', 'my commit comment']) 2124 # Make sure --strict is handling an unknown file, rather than 2125 # giving us the 'nothing to do' error 2126 self.build_tree(['unknown']) 2127 self.run_bzr_error( 2128 ['Commit refused because there are unknown files'], 2129 ['commit', --strict', '-m', 'my commit comment']) 2130 """ 2131 kwargs.setdefault('retcode', 3) 2132 kwargs['error_regexes'] = error_regexes 2133 out, err = self.run_bzr(*args, **kwargs) 2134 return out, err 2135 2136 def run_bzr_subprocess(self, *args, **kwargs): 2137 """Run brz in a subprocess for testing. 2138 2139 This starts a new Python interpreter and runs brz in there. 2140 This should only be used for tests that have a justifiable need for 2141 this isolation: e.g. they are testing startup time, or signal 2142 handling, or early startup code, etc. Subprocess code can't be 2143 profiled or debugged so easily. 2144 2145 :keyword retcode: The status code that is expected. Defaults to 0. If 2146 None is supplied, the status code is not checked. 2147 :keyword env_changes: A dictionary which lists changes to environment 2148 variables. A value of None will unset the env variable. 2149 The values must be strings. The change will only occur in the 2150 child, so you don't need to fix the environment after running. 2151 :keyword universal_newlines: Convert CRLF => LF 2152 :keyword allow_plugins: By default the subprocess is run with 2153 --no-plugins to ensure test reproducibility. Also, it is possible 2154 for system-wide plugins to create unexpected output on stderr, 2155 which can cause unnecessary test failures. 2156 """ 2157 env_changes = kwargs.get('env_changes', {}) 2158 working_dir = kwargs.get('working_dir', None) 2159 allow_plugins = kwargs.get('allow_plugins', False) 2160 if len(args) == 1: 2161 if isinstance(args[0], list): 2162 args = args[0] 2163 elif isinstance(args[0], str): 2164 args = list(shlex.split(args[0])) 2165 else: 2166 raise ValueError("passing varargs to run_bzr_subprocess") 2167 process = self.start_bzr_subprocess(args, env_changes=env_changes, 2168 working_dir=working_dir, 2169 allow_plugins=allow_plugins) 2170 # We distinguish between retcode=None and retcode not passed. 2171 supplied_retcode = kwargs.get('retcode', 0) 2172 return self.finish_bzr_subprocess(process, retcode=supplied_retcode, 2173 universal_newlines=kwargs.get( 2174 'universal_newlines', False), 2175 process_args=args) 2176 2177 def start_bzr_subprocess(self, process_args, env_changes=None, 2178 skip_if_plan_to_signal=False, 2179 working_dir=None, 2180 allow_plugins=False, stderr=subprocess.PIPE): 2181 """Start brz in a subprocess for testing. 2182 2183 This starts a new Python interpreter and runs brz in there. 2184 This should only be used for tests that have a justifiable need for 2185 this isolation: e.g. they are testing startup time, or signal 2186 handling, or early startup code, etc. Subprocess code can't be 2187 profiled or debugged so easily. 2188 2189 :param process_args: a list of arguments to pass to the brz executable, 2190 for example ``['--version']``. 2191 :param env_changes: A dictionary which lists changes to environment 2192 variables. A value of None will unset the env variable. 2193 The values must be strings. The change will only occur in the 2194 child, so you don't need to fix the environment after running. 2195 :param skip_if_plan_to_signal: raise TestSkipped when true and system 2196 doesn't support signalling subprocesses. 2197 :param allow_plugins: If False (default) pass --no-plugins to brz. 2198 :param stderr: file to use for the subprocess's stderr. Valid values 2199 are those valid for the stderr argument of `subprocess.Popen`. 2200 Default value is ``subprocess.PIPE``. 2201 2202 :returns: Popen object for the started process. 2203 """ 2204 if skip_if_plan_to_signal: 2205 if os.name != "posix": 2206 raise TestSkipped("Sending signals not supported") 2207 2208 if env_changes is None: 2209 env_changes = {} 2210 # Because $HOME is set to a tempdir for the context of a test, modules 2211 # installed in the user dir will not be found unless $PYTHONUSERBASE 2212 # gets set to the computed directory of this parent process. 2213 if site.USER_BASE is not None: 2214 env_changes["PYTHONUSERBASE"] = site.USER_BASE 2215 old_env = {} 2216 2217 def cleanup_environment(): 2218 for env_var, value in env_changes.items(): 2219 old_env[env_var] = osutils.set_or_unset_env(env_var, value) 2220 2221 def restore_environment(): 2222 for env_var, value in old_env.items(): 2223 osutils.set_or_unset_env(env_var, value) 2224 2225 bzr_path = self.get_brz_path() 2226 2227 cwd = None 2228 if working_dir is not None: 2229 cwd = osutils.getcwd() 2230 os.chdir(working_dir) 2231 2232 try: 2233 # win32 subprocess doesn't support preexec_fn 2234 # so we will avoid using it on all platforms, just to 2235 # make sure the code path is used, and we don't break on win32 2236 cleanup_environment() 2237 # Include the subprocess's log file in the test details, in case 2238 # the test fails due to an error in the subprocess. 2239 self._add_subprocess_log(trace._get_brz_log_filename()) 2240 command = [sys.executable] 2241 # frozen executables don't need the path to bzr 2242 if getattr(sys, "frozen", None) is None: 2243 command.append(bzr_path) 2244 if not allow_plugins: 2245 command.append('--no-plugins') 2246 command.extend(process_args) 2247 process = self._popen(command, stdin=subprocess.PIPE, 2248 stdout=subprocess.PIPE, 2249 stderr=stderr, bufsize=0) 2250 finally: 2251 restore_environment() 2252 if cwd is not None: 2253 os.chdir(cwd) 2254 2255 return process 2256 2257 def _add_subprocess_log(self, log_file_path): 2258 if len(self._log_files) == 0: 2259 # Register an addCleanup func. We do this on the first call to 2260 # _add_subprocess_log rather than in TestCase.setUp so that this 2261 # addCleanup is registered after any cleanups for tempdirs that 2262 # subclasses might create, which will probably remove the log file 2263 # we want to read. 2264 self.addCleanup(self._subprocess_log_cleanup) 2265 # self._log_files is a set, so if a log file is reused we won't grab it 2266 # twice. 2267 self._log_files.add(log_file_path) 2268 2269 def _subprocess_log_cleanup(self): 2270 for count, log_file_path in enumerate(self._log_files): 2271 # We use buffer_now=True to avoid holding the file open beyond 2272 # the life of this function, which might interfere with e.g. 2273 # cleaning tempdirs on Windows. 2274 # XXX: Testtools 0.9.5 doesn't have the content_from_file helper 2275 # detail_content = content.content_from_file( 2276 # log_file_path, buffer_now=True) 2277 with open(log_file_path, 'rb') as log_file: 2278 log_file_bytes = log_file.read() 2279 detail_content = content.Content( 2280 content.ContentType("text", "plain", {"charset": "utf8"}), 2281 lambda: [log_file_bytes]) 2282 self.addDetail("start_bzr_subprocess-log-%d" % (count,), 2283 detail_content) 2284 2285 def _popen(self, *args, **kwargs): 2286 """Place a call to Popen. 2287 2288 Allows tests to override this method to intercept the calls made to 2289 Popen for introspection. 2290 """ 2291 return subprocess.Popen(*args, **kwargs) 2292 2293 def get_source_path(self): 2294 """Return the path of the directory containing breezy.""" 2295 return os.path.dirname(os.path.dirname(breezy.__file__)) 2296 2297 def get_brz_path(self): 2298 """Return the path of the 'brz' executable for this test suite.""" 2299 brz_path = os.path.join(self.get_source_path(), "brz") 2300 if not os.path.isfile(brz_path): 2301 # We are probably installed. Assume sys.argv is the right file 2302 brz_path = sys.argv[0] 2303 return brz_path 2304 2305 def finish_bzr_subprocess(self, process, retcode=0, send_signal=None, 2306 universal_newlines=False, process_args=None): 2307 """Finish the execution of process. 2308 2309 :param process: the Popen object returned from start_bzr_subprocess. 2310 :param retcode: The status code that is expected. Defaults to 0. If 2311 None is supplied, the status code is not checked. 2312 :param send_signal: an optional signal to send to the process. 2313 :param universal_newlines: Convert CRLF => LF 2314 :returns: (stdout, stderr) 2315 """ 2316 if send_signal is not None: 2317 os.kill(process.pid, send_signal) 2318 out, err = process.communicate() 2319 2320 if universal_newlines: 2321 out = out.replace(b'\r\n', b'\n') 2322 err = err.replace(b'\r\n', b'\n') 2323 2324 if retcode is not None and retcode != process.returncode: 2325 if process_args is None: 2326 process_args = "(unknown args)" 2327 trace.mutter('Output of brz %r:\n%s', process_args, out) 2328 trace.mutter('Error for brz %r:\n%s', process_args, err) 2329 self.fail('Command brz %r failed with retcode %d != %d' 2330 % (process_args, retcode, process.returncode)) 2331 return [out, err] 2332 2333 def check_tree_shape(self, tree, shape): 2334 """Compare a tree to a list of expected names. 2335 2336 Fail if they are not precisely equal. 2337 """ 2338 extras = [] 2339 shape = list(shape) # copy 2340 for path, ie in tree.iter_entries_by_dir(): 2341 name = path.replace('\\', '/') 2342 if ie.kind == 'directory': 2343 name = name + '/' 2344 if name == "/": 2345 pass # ignore root entry 2346 elif name in shape: 2347 shape.remove(name) 2348 else: 2349 extras.append(name) 2350 if shape: 2351 self.fail("expected paths not found in inventory: %r" % shape) 2352 if extras: 2353 self.fail("unexpected paths found in inventory: %r" % extras) 2354 2355 def apply_redirected(self, stdin=None, stdout=None, stderr=None, 2356 a_callable=None, *args, **kwargs): 2357 """Call callable with redirected std io pipes. 2358 2359 Returns the return code.""" 2360 if not callable(a_callable): 2361 raise ValueError("a_callable must be callable.") 2362 if stdin is None: 2363 stdin = BytesIO(b"") 2364 if stdout is None: 2365 if getattr(self, "_log_file", None) is not None: 2366 stdout = self._log_file 2367 else: 2368 stdout = StringIO() 2369 if stderr is None: 2370 if getattr(self, "_log_file", None is not None): 2371 stderr = self._log_file 2372 else: 2373 stderr = StringIO() 2374 real_stdin = sys.stdin 2375 real_stdout = sys.stdout 2376 real_stderr = sys.stderr 2377 try: 2378 sys.stdout = stdout 2379 sys.stderr = stderr 2380 sys.stdin = stdin 2381 return a_callable(*args, **kwargs) 2382 finally: 2383 sys.stdout = real_stdout 2384 sys.stderr = real_stderr 2385 sys.stdin = real_stdin 2386 2387 def reduceLockdirTimeout(self): 2388 """Reduce the default lock timeout for the duration of the test, so that 2389 if LockContention occurs during a test, it does so quickly. 2390 2391 Tests that expect to provoke LockContention errors should call this. 2392 """ 2393 self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0) 2394 2395 def make_utf8_encoded_stringio(self, encoding_type=None): 2396 """Return a wrapped BytesIO, that will encode text input to UTF-8.""" 2397 if encoding_type is None: 2398 encoding_type = 'strict' 2399 bio = BytesIO() 2400 output_encoding = 'utf-8' 2401 sio = codecs.getwriter(output_encoding)(bio, errors=encoding_type) 2402 sio.encoding = output_encoding 2403 return sio 2404 2405 def disable_verb(self, verb): 2406 """Disable a smart server verb for one test.""" 2407 from breezy.bzr.smart import request 2408 request_handlers = request.request_handlers 2409 orig_method = request_handlers.get(verb) 2410 orig_info = request_handlers.get_info(verb) 2411 request_handlers.remove(verb) 2412 self.addCleanup(request_handlers.register, verb, orig_method, 2413 info=orig_info) 2414 2415 def __hash__(self): 2416 return id(self) 2417 2418 2419class CapturedCall(object): 2420 """A helper for capturing smart server calls for easy debug analysis.""" 2421 2422 def __init__(self, params, prefix_length): 2423 """Capture the call with params and skip prefix_length stack frames.""" 2424 self.call = params 2425 import traceback 2426 # The last 5 frames are the __init__, the hook frame, and 3 smart 2427 # client frames. Beyond this we could get more clever, but this is good 2428 # enough for now. 2429 stack = traceback.extract_stack()[prefix_length:-5] 2430 self.stack = ''.join(traceback.format_list(stack)) 2431 2432 def __str__(self): 2433 return self.call.method.decode('utf-8') 2434 2435 def __repr__(self): 2436 return self.call.method.decode('utf-8') 2437 2438 def stack(self): 2439 return self.stack 2440 2441 2442class TestCaseWithMemoryTransport(TestCase): 2443 """Common test class for tests that do not need disk resources. 2444 2445 Tests that need disk resources should derive from TestCaseInTempDir 2446 orTestCaseWithTransport. 2447 2448 TestCaseWithMemoryTransport sets the TEST_ROOT variable for all brz tests. 2449 2450 For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of 2451 a directory which does not exist. This serves to help ensure test isolation 2452 is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they 2453 must exist. However, TestCaseWithMemoryTransport does not offer local file 2454 defaults for the transport in tests, nor does it obey the command line 2455 override, so tests that accidentally write to the common directory should 2456 be rare. 2457 2458 :cvar TEST_ROOT: Directory containing all temporary directories, plus a 2459 ``.bzr`` directory that stops us ascending higher into the filesystem. 2460 """ 2461 2462 TEST_ROOT = None 2463 _TEST_NAME = 'test' 2464 2465 def __init__(self, methodName='runTest'): 2466 # allow test parameterization after test construction and before test 2467 # execution. Variables that the parameterizer sets need to be 2468 # ones that are not set by setUp, or setUp will trash them. 2469 super(TestCaseWithMemoryTransport, self).__init__(methodName) 2470 self.vfs_transport_factory = default_transport 2471 self.transport_server = None 2472 self.transport_readonly_server = None 2473 self.__vfs_server = None 2474 2475 def setUp(self): 2476 super(TestCaseWithMemoryTransport, self).setUp() 2477 2478 def _add_disconnect_cleanup(transport): 2479 """Schedule disconnection of given transport at test cleanup 2480 2481 This needs to happen for all connected transports or leaks occur. 2482 2483 Note reconnections may mean we call disconnect multiple times per 2484 transport which is suboptimal but seems harmless. 2485 """ 2486 self.addCleanup(transport.disconnect) 2487 2488 _mod_transport.Transport.hooks.install_named_hook( 2489 'post_connect', _add_disconnect_cleanup, None) 2490 2491 self._make_test_root() 2492 self.addCleanup(os.chdir, osutils.getcwd()) 2493 self.makeAndChdirToTestDir() 2494 self.overrideEnvironmentForTesting() 2495 self.__readonly_server = None 2496 self.__server = None 2497 self.reduceLockdirTimeout() 2498 # Each test may use its own config files even if the local config files 2499 # don't actually exist. They'll rightly fail if they try to create them 2500 # though. 2501 self.overrideAttr(config, '_shared_stores', {}) 2502 2503 def get_transport(self, relpath=None): 2504 """Return a writeable transport. 2505 2506 This transport is for the test scratch space relative to 2507 "self._test_root" 2508 2509 :param relpath: a path relative to the base url. 2510 """ 2511 t = _mod_transport.get_transport_from_url(self.get_url(relpath)) 2512 self.assertFalse(t.is_readonly()) 2513 return t 2514 2515 def get_readonly_transport(self, relpath=None): 2516 """Return a readonly transport for the test scratch space 2517 2518 This can be used to test that operations which should only need 2519 readonly access in fact do not try to write. 2520 2521 :param relpath: a path relative to the base url. 2522 """ 2523 t = _mod_transport.get_transport_from_url( 2524 self.get_readonly_url(relpath)) 2525 self.assertTrue(t.is_readonly()) 2526 return t 2527 2528 def create_transport_readonly_server(self): 2529 """Create a transport server from class defined at init. 2530 2531 This is mostly a hook for daughter classes. 2532 """ 2533 return self.transport_readonly_server() 2534 2535 def get_readonly_server(self): 2536 """Get the server instance for the readonly transport 2537 2538 This is useful for some tests with specific servers to do diagnostics. 2539 """ 2540 if self.__readonly_server is None: 2541 if self.transport_readonly_server is None: 2542 # readonly decorator requested 2543 self.__readonly_server = test_server.ReadonlyServer() 2544 else: 2545 # explicit readonly transport. 2546 self.__readonly_server = ( 2547 self.create_transport_readonly_server()) 2548 self.start_server(self.__readonly_server, 2549 self.get_vfs_only_server()) 2550 return self.__readonly_server 2551 2552 def get_readonly_url(self, relpath=None): 2553 """Get a URL for the readonly transport. 2554 2555 This will either be backed by '.' or a decorator to the transport 2556 used by self.get_url() 2557 relpath provides for clients to get a path relative to the base url. 2558 These should only be downwards relative, not upwards. 2559 """ 2560 base = self.get_readonly_server().get_url() 2561 return self._adjust_url(base, relpath) 2562 2563 def get_vfs_only_server(self): 2564 """Get the vfs only read/write server instance. 2565 2566 This is useful for some tests with specific servers that need 2567 diagnostics. 2568 2569 For TestCaseWithMemoryTransport this is always a MemoryServer, and there 2570 is no means to override it. 2571 """ 2572 if self.__vfs_server is None: 2573 self.__vfs_server = memory.MemoryServer() 2574 self.start_server(self.__vfs_server) 2575 return self.__vfs_server 2576 2577 def get_server(self): 2578 """Get the read/write server instance. 2579 2580 This is useful for some tests with specific servers that need 2581 diagnostics. 2582 2583 This is built from the self.transport_server factory. If that is None, 2584 then the self.get_vfs_server is returned. 2585 """ 2586 if self.__server is None: 2587 if (self.transport_server is None or self.transport_server is 2588 self.vfs_transport_factory): 2589 self.__server = self.get_vfs_only_server() 2590 else: 2591 # bring up a decorated means of access to the vfs only server. 2592 self.__server = self.transport_server() 2593 self.start_server(self.__server, self.get_vfs_only_server()) 2594 return self.__server 2595 2596 def _adjust_url(self, base, relpath): 2597 """Get a URL (or maybe a path) for the readwrite transport. 2598 2599 This will either be backed by '.' or to an equivalent non-file based 2600 facility. 2601 relpath provides for clients to get a path relative to the base url. 2602 These should only be downwards relative, not upwards. 2603 """ 2604 if relpath is not None and relpath != '.': 2605 if not base.endswith('/'): 2606 base = base + '/' 2607 # XXX: Really base should be a url; we did after all call 2608 # get_url()! But sometimes it's just a path (from 2609 # LocalAbspathServer), and it'd be wrong to append urlescaped data 2610 # to a non-escaped local path. 2611 if base.startswith('./') or base.startswith('/'): 2612 base += relpath 2613 else: 2614 base += urlutils.escape(relpath) 2615 return base 2616 2617 def get_url(self, relpath=None): 2618 """Get a URL (or maybe a path) for the readwrite transport. 2619 2620 This will either be backed by '.' or to an equivalent non-file based 2621 facility. 2622 relpath provides for clients to get a path relative to the base url. 2623 These should only be downwards relative, not upwards. 2624 """ 2625 base = self.get_server().get_url() 2626 return self._adjust_url(base, relpath) 2627 2628 def get_vfs_only_url(self, relpath=None): 2629 """Get a URL (or maybe a path for the plain old vfs transport. 2630 2631 This will never be a smart protocol. It always has all the 2632 capabilities of the local filesystem, but it might actually be a 2633 MemoryTransport or some other similar virtual filesystem. 2634 2635 This is the backing transport (if any) of the server returned by 2636 get_url and get_readonly_url. 2637 2638 :param relpath: provides for clients to get a path relative to the base 2639 url. These should only be downwards relative, not upwards. 2640 :return: A URL 2641 """ 2642 base = self.get_vfs_only_server().get_url() 2643 return self._adjust_url(base, relpath) 2644 2645 def _create_safety_net(self): 2646 """Make a fake bzr directory. 2647 2648 This prevents any tests propagating up onto the TEST_ROOT directory's 2649 real branch. 2650 """ 2651 root = TestCaseWithMemoryTransport.TEST_ROOT 2652 try: 2653 # Make sure we get a readable and accessible home for brz.log 2654 # and/or config files, and not fallback to weird defaults (see 2655 # http://pad.lv/825027). 2656 self.assertIs(None, os.environ.get('BRZ_HOME', None)) 2657 os.environ['BRZ_HOME'] = root 2658 from breezy.bzr.bzrdir import BzrDirMetaFormat1 2659 wt = controldir.ControlDir.create_standalone_workingtree( 2660 root, format=BzrDirMetaFormat1()) 2661 del os.environ['BRZ_HOME'] 2662 except Exception as e: 2663 self.fail("Fail to initialize the safety net: %r\n" % (e,)) 2664 # Hack for speed: remember the raw bytes of the dirstate file so that 2665 # we don't need to re-open the wt to check it hasn't changed. 2666 TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = ( 2667 wt.control_transport.get_bytes('dirstate')) 2668 2669 def _check_safety_net(self): 2670 """Check that the safety .bzr directory have not been touched. 2671 2672 _make_test_root have created a .bzr directory to prevent tests from 2673 propagating. This method ensures than a test did not leaked. 2674 """ 2675 root = TestCaseWithMemoryTransport.TEST_ROOT 2676 t = _mod_transport.get_transport_from_path(root) 2677 self.permit_url(t.base) 2678 if (t.get_bytes('.bzr/checkout/dirstate') != 2679 TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE): 2680 # The current test have modified the /bzr directory, we need to 2681 # recreate a new one or all the followng tests will fail. 2682 # If you need to inspect its content uncomment the following line 2683 # import pdb; pdb.set_trace() 2684 _rmtree_temp_dir(root + '/.bzr', test_id=self.id()) 2685 self._create_safety_net() 2686 raise AssertionError('%s/.bzr should not be modified' % root) 2687 2688 def _make_test_root(self): 2689 if TestCaseWithMemoryTransport.TEST_ROOT is None: 2690 # Watch out for tricky test dir (on OSX /tmp -> /private/tmp) 2691 root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-', 2692 suffix='.tmp')) 2693 TestCaseWithMemoryTransport.TEST_ROOT = root 2694 2695 self._create_safety_net() 2696 2697 # The same directory is used by all tests, and we're not 2698 # specifically told when all tests are finished. This will do. 2699 atexit.register(_rmtree_temp_dir, root) 2700 2701 self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT) 2702 self.addCleanup(self._check_safety_net) 2703 2704 def makeAndChdirToTestDir(self): 2705 """Create a temporary directories for this one test. 2706 2707 This must set self.test_home_dir and self.test_dir and chdir to 2708 self.test_dir. 2709 2710 For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this 2711 test. 2712 """ 2713 os.chdir(TestCaseWithMemoryTransport.TEST_ROOT) 2714 self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT 2715 self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir" 2716 self.permit_dir(self.test_dir) 2717 2718 def make_branch(self, relpath, format=None, name=None): 2719 """Create a branch on the transport at relpath.""" 2720 repo = self.make_repository(relpath, format=format) 2721 return repo.controldir.create_branch(append_revisions_only=False, name=name) 2722 2723 def get_default_format(self): 2724 return 'default' 2725 2726 def resolve_format(self, format): 2727 """Resolve an object to a ControlDir format object. 2728 2729 The initial format object can either already be 2730 a ControlDirFormat, None (for the default format), 2731 or a string with the name of the control dir format. 2732 2733 :param format: Object to resolve 2734 :return A ControlDirFormat instance 2735 """ 2736 if format is None: 2737 format = self.get_default_format() 2738 if isinstance(format, str): 2739 format = controldir.format_registry.make_controldir(format) 2740 return format 2741 2742 def make_controldir(self, relpath, format=None): 2743 try: 2744 # might be a relative or absolute path 2745 maybe_a_url = self.get_url(relpath) 2746 segments = maybe_a_url.rsplit('/', 1) 2747 t = _mod_transport.get_transport(maybe_a_url) 2748 if len(segments) > 1 and segments[-1] not in ('', '.'): 2749 t.ensure_base() 2750 format = self.resolve_format(format) 2751 return format.initialize_on_transport(t) 2752 except errors.UninitializableFormat: 2753 raise TestSkipped("Format %s is not initializable." % format) 2754 2755 def make_repository(self, relpath, shared=None, format=None): 2756 """Create a repository on our default transport at relpath. 2757 2758 Note that relpath must be a relative path, not a full url. 2759 """ 2760 # FIXME: If you create a remoterepository this returns the underlying 2761 # real format, which is incorrect. Actually we should make sure that 2762 # RemoteBzrDir returns a RemoteRepository. 2763 # maybe mbp 20070410 2764 made_control = self.make_controldir(relpath, format=format) 2765 return made_control.create_repository(shared=shared) 2766 2767 def make_smart_server(self, path, backing_server=None): 2768 if backing_server is None: 2769 backing_server = self.get_server() 2770 smart_server = test_server.SmartTCPServer_for_testing() 2771 self.start_server(smart_server, backing_server) 2772 remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url() 2773 ).clone(path) 2774 return remote_transport 2775 2776 def make_branch_and_memory_tree(self, relpath, format=None): 2777 """Create a branch on the default transport and a MemoryTree for it.""" 2778 b = self.make_branch(relpath, format=format) 2779 return b.create_memorytree() 2780 2781 def make_branch_builder(self, relpath, format=None): 2782 branch = self.make_branch(relpath, format=format) 2783 return branchbuilder.BranchBuilder(branch=branch) 2784 2785 def overrideEnvironmentForTesting(self): 2786 test_home_dir = self.test_home_dir 2787 self.overrideEnv('HOME', test_home_dir) 2788 self.overrideEnv('BRZ_HOME', test_home_dir) 2789 self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg')) 2790 2791 def setup_smart_server_with_call_log(self): 2792 """Sets up a smart server as the transport server with a call log.""" 2793 self.transport_server = test_server.SmartTCPServer_for_testing 2794 self.hpss_connections = [] 2795 self.hpss_calls = [] 2796 import traceback 2797 # Skip the current stack down to the caller of 2798 # setup_smart_server_with_call_log 2799 prefix_length = len(traceback.extract_stack()) - 2 2800 2801 def capture_hpss_call(params): 2802 self.hpss_calls.append( 2803 CapturedCall(params, prefix_length)) 2804 2805 def capture_connect(transport): 2806 self.hpss_connections.append(transport) 2807 client._SmartClient.hooks.install_named_hook( 2808 'call', capture_hpss_call, None) 2809 _mod_transport.Transport.hooks.install_named_hook( 2810 'post_connect', capture_connect, None) 2811 2812 def reset_smart_call_log(self): 2813 self.hpss_calls = [] 2814 self.hpss_connections = [] 2815 2816 2817class TestCaseInTempDir(TestCaseWithMemoryTransport): 2818 """Derived class that runs a test within a temporary directory. 2819 2820 This is useful for tests that need to create a branch, etc. 2821 2822 The directory is created in a slightly complex way: for each 2823 Python invocation, a new temporary top-level directory is created. 2824 All test cases create their own directory within that. If the 2825 tests complete successfully, the directory is removed. 2826 2827 :ivar test_base_dir: The path of the top-level directory for this 2828 test, which contains a home directory and a work directory. 2829 2830 :ivar test_home_dir: An initially empty directory under test_base_dir 2831 which is used as $HOME for this test. 2832 2833 :ivar test_dir: A directory under test_base_dir used as the current 2834 directory when the test proper is run. 2835 """ 2836 2837 OVERRIDE_PYTHON = 'python' 2838 2839 def setUp(self): 2840 super(TestCaseInTempDir, self).setUp() 2841 # Remove the protection set in isolated_environ, we have a proper 2842 # access to disk resources now. 2843 self.overrideEnv('BRZ_LOG', None) 2844 2845 def check_file_contents(self, filename, expect): 2846 self.log("check contents of file %s" % filename) 2847 with open(filename, 'rb') as f: 2848 contents = f.read() 2849 if contents != expect: 2850 self.log("expected: %r" % expect) 2851 self.log("actually: %r" % contents) 2852 self.fail("contents of %s not as expected" % filename) 2853 2854 def _getTestDirPrefix(self): 2855 # create a directory within the top level test directory 2856 if sys.platform in ('win32', 'cygwin'): 2857 name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id()) 2858 # windows is likely to have path-length limits so use a short name 2859 name_prefix = name_prefix[-30:] 2860 else: 2861 name_prefix = re.sub('[/]', '_', self.id()) 2862 return name_prefix 2863 2864 def makeAndChdirToTestDir(self): 2865 """See TestCaseWithMemoryTransport.makeAndChdirToTestDir(). 2866 2867 For TestCaseInTempDir we create a temporary directory based on the test 2868 name and then create two subdirs - test and home under it. 2869 """ 2870 name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT, 2871 self._getTestDirPrefix()) 2872 name = name_prefix 2873 for i in range(100): 2874 if os.path.exists(name): 2875 name = name_prefix + '_' + str(i) 2876 else: 2877 # now create test and home directories within this dir 2878 self.test_base_dir = name 2879 self.addCleanup(self.deleteTestDir) 2880 os.mkdir(self.test_base_dir) 2881 break 2882 self.permit_dir(self.test_base_dir) 2883 # 'sprouting' and 'init' of a branch both walk up the tree to find 2884 # stacking policy to honour; create a bzr dir with an unshared 2885 # repository (but not a branch - our code would be trying to escape 2886 # then!) to stop them, and permit it to be read. 2887 # control = controldir.ControlDir.create(self.test_base_dir) 2888 # control.create_repository() 2889 self.test_home_dir = self.test_base_dir + '/home' 2890 os.mkdir(self.test_home_dir) 2891 self.test_dir = self.test_base_dir + '/work' 2892 os.mkdir(self.test_dir) 2893 os.chdir(self.test_dir) 2894 # put name of test inside 2895 with open(self.test_base_dir + '/name', 'w') as f: 2896 f.write(self.id()) 2897 2898 def deleteTestDir(self): 2899 os.chdir(TestCaseWithMemoryTransport.TEST_ROOT) 2900 _rmtree_temp_dir(self.test_base_dir, test_id=self.id()) 2901 2902 def build_tree(self, shape, line_endings='binary', transport=None): 2903 """Build a test tree according to a pattern. 2904 2905 shape is a sequence of file specifications. If the final 2906 character is '/', a directory is created. 2907 2908 This assumes that all the elements in the tree being built are new. 2909 2910 This doesn't add anything to a branch. 2911 2912 :type shape: list or tuple. 2913 :param line_endings: Either 'binary' or 'native' 2914 in binary mode, exact contents are written in native mode, the 2915 line endings match the default platform endings. 2916 :param transport: A transport to write to, for building trees on VFS's. 2917 If the transport is readonly or None, "." is opened automatically. 2918 :return: None 2919 """ 2920 if type(shape) not in (list, tuple): 2921 raise AssertionError("Parameter 'shape' should be " 2922 "a list or a tuple. Got %r instead" % (shape,)) 2923 # It's OK to just create them using forward slashes on windows. 2924 if transport is None or transport.is_readonly(): 2925 transport = _mod_transport.get_transport_from_path(".") 2926 for name in shape: 2927 self.assertIsInstance(name, str) 2928 if name[-1] == '/': 2929 transport.mkdir(urlutils.escape(name[:-1])) 2930 else: 2931 if line_endings == 'binary': 2932 end = b'\n' 2933 elif line_endings == 'native': 2934 end = os.linesep.encode('ascii') 2935 else: 2936 raise errors.BzrError( 2937 'Invalid line ending request %r' % line_endings) 2938 content = b"contents of %s%s" % (name.encode('utf-8'), end) 2939 transport.put_bytes_non_atomic(urlutils.escape(name), content) 2940 2941 build_tree_contents = staticmethod(treeshape.build_tree_contents) 2942 2943 def assertInWorkingTree(self, path, root_path='.', tree=None): 2944 """Assert whether path or paths are in the WorkingTree""" 2945 if tree is None: 2946 tree = workingtree.WorkingTree.open(root_path) 2947 if not isinstance(path, str): 2948 for p in path: 2949 self.assertInWorkingTree(p, tree=tree) 2950 else: 2951 self.assertTrue(tree.is_versioned(path), 2952 path + ' not in working tree.') 2953 2954 def assertNotInWorkingTree(self, path, root_path='.', tree=None): 2955 """Assert whether path or paths are not in the WorkingTree""" 2956 if tree is None: 2957 tree = workingtree.WorkingTree.open(root_path) 2958 if not isinstance(path, str): 2959 for p in path: 2960 self.assertNotInWorkingTree(p, tree=tree) 2961 else: 2962 self.assertFalse(tree.is_versioned( 2963 path), path + ' in working tree.') 2964 2965 2966class TestCaseWithTransport(TestCaseInTempDir): 2967 """A test case that provides get_url and get_readonly_url facilities. 2968 2969 These back onto two transport servers, one for readonly access and one for 2970 read write access. 2971 2972 If no explicit class is provided for readonly access, a 2973 ReadonlyTransportDecorator is used instead which allows the use of non disk 2974 based read write transports. 2975 2976 If an explicit class is provided for readonly access, that server and the 2977 readwrite one must both define get_url() as resolving to os.getcwd(). 2978 """ 2979 2980 def setUp(self): 2981 super(TestCaseWithTransport, self).setUp() 2982 self.__vfs_server = None 2983 2984 def get_vfs_only_server(self): 2985 """See TestCaseWithMemoryTransport. 2986 2987 This is useful for some tests with specific servers that need 2988 diagnostics. 2989 """ 2990 if self.__vfs_server is None: 2991 self.__vfs_server = self.vfs_transport_factory() 2992 self.start_server(self.__vfs_server) 2993 return self.__vfs_server 2994 2995 def make_branch_and_tree(self, relpath, format=None): 2996 """Create a branch on the transport and a tree locally. 2997 2998 If the transport is not a LocalTransport, the Tree can't be created on 2999 the transport. In that case if the vfs_transport_factory is 3000 LocalURLServer the working tree is created in the local 3001 directory backing the transport, and the returned tree's branch and 3002 repository will also be accessed locally. Otherwise a lightweight 3003 checkout is created and returned. 3004 3005 We do this because we can't physically create a tree in the local 3006 path, with a branch reference to the transport_factory url, and 3007 a branch + repository in the vfs_transport, unless the vfs_transport 3008 namespace is distinct from the local disk - the two branch objects 3009 would collide. While we could construct a tree with its branch object 3010 pointing at the transport_factory transport in memory, reopening it 3011 would behaving unexpectedly, and has in the past caused testing bugs 3012 when we tried to do it that way. 3013 3014 :param format: The BzrDirFormat. 3015 :returns: the WorkingTree. 3016 """ 3017 # TODO: always use the local disk path for the working tree, 3018 # this obviously requires a format that supports branch references 3019 # so check for that by checking bzrdir.BzrDirFormat.get_default_format() 3020 # RBC 20060208 3021 format = self.resolve_format(format=format) 3022 if not format.supports_workingtrees: 3023 b = self.make_branch(relpath + '.branch', format=format) 3024 return b.create_checkout(relpath, lightweight=True) 3025 b = self.make_branch(relpath, format=format) 3026 try: 3027 return b.controldir.create_workingtree() 3028 except errors.NotLocalUrl: 3029 # We can only make working trees locally at the moment. If the 3030 # transport can't support them, then we keep the non-disk-backed 3031 # branch and create a local checkout. 3032 if self.vfs_transport_factory is test_server.LocalURLServer: 3033 # the branch is colocated on disk, we cannot create a checkout. 3034 # hopefully callers will expect this. 3035 local_controldir = controldir.ControlDir.open( 3036 self.get_vfs_only_url(relpath)) 3037 wt = local_controldir.create_workingtree() 3038 if wt.branch._format != b._format: 3039 wt._branch = b 3040 # Make sure that assigning to wt._branch fixes wt.branch, 3041 # in case the implementation details of workingtree objects 3042 # change. 3043 self.assertIs(b, wt.branch) 3044 return wt 3045 else: 3046 return b.create_checkout(relpath, lightweight=True) 3047 3048 def assertIsDirectory(self, relpath, transport): 3049 """Assert that relpath within transport is a directory. 3050 3051 This may not be possible on all transports; in that case it propagates 3052 a TransportNotPossible. 3053 """ 3054 try: 3055 mode = transport.stat(relpath).st_mode 3056 except errors.NoSuchFile: 3057 self.fail("path %s is not a directory; no such file" 3058 % (relpath)) 3059 if not stat.S_ISDIR(mode): 3060 self.fail("path %s is not a directory; has mode %#o" 3061 % (relpath, mode)) 3062 3063 def assertTreesEqual(self, left, right): 3064 """Check that left and right have the same content and properties.""" 3065 # we use a tree delta to check for equality of the content, and we 3066 # manually check for equality of other things such as the parents list. 3067 self.assertEqual(left.get_parent_ids(), right.get_parent_ids()) 3068 differences = left.changes_from(right) 3069 self.assertFalse(differences.has_changed(), 3070 "Trees %r and %r are different: %r" % (left, right, differences)) 3071 3072 def disable_missing_extensions_warning(self): 3073 """Some tests expect a precise stderr content. 3074 3075 There is no point in forcing them to duplicate the extension related 3076 warning. 3077 """ 3078 config.GlobalConfig().set_user_option( 3079 'suppress_warnings', 'missing_extensions') 3080 3081 3082class ChrootedTestCase(TestCaseWithTransport): 3083 """A support class that provides readonly urls outside the local namespace. 3084 3085 This is done by checking if self.transport_server is a MemoryServer. if it 3086 is then we are chrooted already, if it is not then an HttpServer is used 3087 for readonly urls. 3088 3089 TODO RBC 20060127: make this an option to TestCaseWithTransport so it can 3090 be used without needed to redo it when a different 3091 subclass is in use ? 3092 """ 3093 3094 def setUp(self): 3095 from breezy.tests import http_server 3096 super(ChrootedTestCase, self).setUp() 3097 if not self.vfs_transport_factory == memory.MemoryServer: 3098 self.transport_readonly_server = http_server.HttpServer 3099 3100 3101def condition_id_re(pattern): 3102 """Create a condition filter which performs a re check on a test's id. 3103 3104 :param pattern: A regular expression string. 3105 :return: A callable that returns True if the re matches. 3106 """ 3107 filter_re = re.compile(pattern, 0) 3108 3109 def condition(test): 3110 test_id = test.id() 3111 return filter_re.search(test_id) 3112 return condition 3113 3114 3115def condition_isinstance(klass_or_klass_list): 3116 """Create a condition filter which returns isinstance(param, klass). 3117 3118 :return: A callable which when called with one parameter obj return the 3119 result of isinstance(obj, klass_or_klass_list). 3120 """ 3121 def condition(obj): 3122 return isinstance(obj, klass_or_klass_list) 3123 return condition 3124 3125 3126def condition_id_in_list(id_list): 3127 """Create a condition filter which verify that test's id in a list. 3128 3129 :param id_list: A TestIdList object. 3130 :return: A callable that returns True if the test's id appears in the list. 3131 """ 3132 def condition(test): 3133 return id_list.includes(test.id()) 3134 return condition 3135 3136 3137def condition_id_startswith(starts): 3138 """Create a condition filter verifying that test's id starts with a string. 3139 3140 :param starts: A list of string. 3141 :return: A callable that returns True if the test's id starts with one of 3142 the given strings. 3143 """ 3144 def condition(test): 3145 for start in starts: 3146 if test.id().startswith(start): 3147 return True 3148 return False 3149 return condition 3150 3151 3152def exclude_tests_by_condition(suite, condition): 3153 """Create a test suite which excludes some tests from suite. 3154 3155 :param suite: The suite to get tests from. 3156 :param condition: A callable whose result evaluates True when called with a 3157 test case which should be excluded from the result. 3158 :return: A suite which contains the tests found in suite that fail 3159 condition. 3160 """ 3161 result = [] 3162 for test in iter_suite_tests(suite): 3163 if not condition(test): 3164 result.append(test) 3165 return TestUtil.TestSuite(result) 3166 3167 3168def filter_suite_by_condition(suite, condition): 3169 """Create a test suite by filtering another one. 3170 3171 :param suite: The source suite. 3172 :param condition: A callable whose result evaluates True when called with a 3173 test case which should be included in the result. 3174 :return: A suite which contains the tests found in suite that pass 3175 condition. 3176 """ 3177 result = [] 3178 for test in iter_suite_tests(suite): 3179 if condition(test): 3180 result.append(test) 3181 return TestUtil.TestSuite(result) 3182 3183 3184def filter_suite_by_re(suite, pattern): 3185 """Create a test suite by filtering another one. 3186 3187 :param suite: the source suite 3188 :param pattern: pattern that names must match 3189 :returns: the newly created suite 3190 """ 3191 condition = condition_id_re(pattern) 3192 result_suite = filter_suite_by_condition(suite, condition) 3193 return result_suite 3194 3195 3196def filter_suite_by_id_list(suite, test_id_list): 3197 """Create a test suite by filtering another one. 3198 3199 :param suite: The source suite. 3200 :param test_id_list: A list of the test ids to keep as strings. 3201 :returns: the newly created suite 3202 """ 3203 condition = condition_id_in_list(test_id_list) 3204 result_suite = filter_suite_by_condition(suite, condition) 3205 return result_suite 3206 3207 3208def filter_suite_by_id_startswith(suite, start): 3209 """Create a test suite by filtering another one. 3210 3211 :param suite: The source suite. 3212 :param start: A list of string the test id must start with one of. 3213 :returns: the newly created suite 3214 """ 3215 condition = condition_id_startswith(start) 3216 result_suite = filter_suite_by_condition(suite, condition) 3217 return result_suite 3218 3219 3220def exclude_tests_by_re(suite, pattern): 3221 """Create a test suite which excludes some tests from suite. 3222 3223 :param suite: The suite to get tests from. 3224 :param pattern: A regular expression string. Test ids that match this 3225 pattern will be excluded from the result. 3226 :return: A TestSuite that contains all the tests from suite without the 3227 tests that matched pattern. The order of tests is the same as it was in 3228 suite. 3229 """ 3230 return exclude_tests_by_condition(suite, condition_id_re(pattern)) 3231 3232 3233def preserve_input(something): 3234 """A helper for performing test suite transformation chains. 3235 3236 :param something: Anything you want to preserve. 3237 :return: Something. 3238 """ 3239 return something 3240 3241 3242def randomize_suite(suite): 3243 """Return a new TestSuite with suite's tests in random order. 3244 3245 The tests in the input suite are flattened into a single suite in order to 3246 accomplish this. Any nested TestSuites are removed to provide global 3247 randomness. 3248 """ 3249 tests = list(iter_suite_tests(suite)) 3250 random.shuffle(tests) 3251 return TestUtil.TestSuite(tests) 3252 3253 3254def split_suite_by_condition(suite, condition): 3255 """Split a test suite into two by a condition. 3256 3257 :param suite: The suite to split. 3258 :param condition: The condition to match on. Tests that match this 3259 condition are returned in the first test suite, ones that do not match 3260 are in the second suite. 3261 :return: A tuple of two test suites, where the first contains tests from 3262 suite matching the condition, and the second contains the remainder 3263 from suite. The order within each output suite is the same as it was in 3264 suite. 3265 """ 3266 matched = [] 3267 did_not_match = [] 3268 for test in iter_suite_tests(suite): 3269 if condition(test): 3270 matched.append(test) 3271 else: 3272 did_not_match.append(test) 3273 return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match) 3274 3275 3276def split_suite_by_re(suite, pattern): 3277 """Split a test suite into two by a regular expression. 3278 3279 :param suite: The suite to split. 3280 :param pattern: A regular expression string. Test ids that match this 3281 pattern will be in the first test suite returned, and the others in the 3282 second test suite returned. 3283 :return: A tuple of two test suites, where the first contains tests from 3284 suite matching pattern, and the second contains the remainder from 3285 suite. The order within each output suite is the same as it was in 3286 suite. 3287 """ 3288 return split_suite_by_condition(suite, condition_id_re(pattern)) 3289 3290 3291def run_suite(suite, name='test', verbose=False, pattern=".*", 3292 stop_on_failure=False, 3293 transport=None, lsprof_timed=None, bench_history=None, 3294 matching_tests_first=None, 3295 list_only=False, 3296 random_seed=None, 3297 exclude_pattern=None, 3298 strict=False, 3299 runner_class=None, 3300 suite_decorators=None, 3301 stream=None, 3302 result_decorators=None, 3303 ): 3304 """Run a test suite for brz selftest. 3305 3306 :param runner_class: The class of runner to use. Must support the 3307 constructor arguments passed by run_suite which are more than standard 3308 python uses. 3309 :return: A boolean indicating success. 3310 """ 3311 TestCase._gather_lsprof_in_benchmarks = lsprof_timed 3312 if verbose: 3313 verbosity = 2 3314 else: 3315 verbosity = 1 3316 if runner_class is None: 3317 runner_class = TextTestRunner 3318 if stream is None: 3319 stream = sys.stdout 3320 runner = runner_class(stream=stream, 3321 descriptions=0, 3322 verbosity=verbosity, 3323 bench_history=bench_history, 3324 strict=strict, 3325 result_decorators=result_decorators, 3326 ) 3327 runner.stop_on_failure = stop_on_failure 3328 if isinstance(suite, unittest.TestSuite): 3329 # Empty out _tests list of passed suite and populate new TestSuite 3330 suite._tests[:], suite = [], TestSuite(suite) 3331 # built in decorator factories: 3332 decorators = [ 3333 random_order(random_seed, runner), 3334 exclude_tests(exclude_pattern), 3335 ] 3336 if matching_tests_first: 3337 decorators.append(tests_first(pattern)) 3338 else: 3339 decorators.append(filter_tests(pattern)) 3340 if suite_decorators: 3341 decorators.extend(suite_decorators) 3342 # tell the result object how many tests will be running: (except if 3343 # --parallel=fork is being used. Robert said he will provide a better 3344 # progress design later -- vila 20090817) 3345 if fork_decorator not in decorators: 3346 decorators.append(CountingDecorator) 3347 for decorator in decorators: 3348 suite = decorator(suite) 3349 if list_only: 3350 # Done after test suite decoration to allow randomisation etc 3351 # to take effect, though that is of marginal benefit. 3352 if verbosity >= 2: 3353 stream.write("Listing tests only ...\n") 3354 if getattr(runner, 'list', None) is not None: 3355 runner.list(suite) 3356 else: 3357 for t in iter_suite_tests(suite): 3358 stream.write("%s\n" % (t.id())) 3359 return True 3360 result = runner.run(suite) 3361 if strict and getattr(result, 'wasStrictlySuccessful', False): 3362 return result.wasStrictlySuccessful() 3363 else: 3364 return result.wasSuccessful() 3365 3366 3367# A registry where get() returns a suite decorator. 3368parallel_registry = registry.Registry() 3369 3370 3371def fork_decorator(suite): 3372 if getattr(os, "fork", None) is None: 3373 raise errors.CommandError("platform does not support fork," 3374 " try --parallel=subprocess instead.") 3375 concurrency = osutils.local_concurrency() 3376 if concurrency == 1: 3377 return suite 3378 from testtools import ConcurrentTestSuite 3379 return ConcurrentTestSuite(suite, fork_for_tests) 3380 3381 3382parallel_registry.register('fork', fork_decorator) 3383 3384 3385def subprocess_decorator(suite): 3386 concurrency = osutils.local_concurrency() 3387 if concurrency == 1: 3388 return suite 3389 from testtools import ConcurrentTestSuite 3390 return ConcurrentTestSuite(suite, reinvoke_for_tests) 3391 3392 3393parallel_registry.register('subprocess', subprocess_decorator) 3394 3395 3396def exclude_tests(exclude_pattern): 3397 """Return a test suite decorator that excludes tests.""" 3398 if exclude_pattern is None: 3399 return identity_decorator 3400 3401 def decorator(suite): 3402 return ExcludeDecorator(suite, exclude_pattern) 3403 return decorator 3404 3405 3406def filter_tests(pattern): 3407 if pattern == '.*': 3408 return identity_decorator 3409 3410 def decorator(suite): 3411 return FilterTestsDecorator(suite, pattern) 3412 return decorator 3413 3414 3415def random_order(random_seed, runner): 3416 """Return a test suite decorator factory for randomising tests order. 3417 3418 :param random_seed: now, a string which casts to an integer, or an integer. 3419 :param runner: A test runner with a stream attribute to report on. 3420 """ 3421 if random_seed is None: 3422 return identity_decorator 3423 3424 def decorator(suite): 3425 return RandomDecorator(suite, random_seed, runner.stream) 3426 return decorator 3427 3428 3429def tests_first(pattern): 3430 if pattern == '.*': 3431 return identity_decorator 3432 3433 def decorator(suite): 3434 return TestFirstDecorator(suite, pattern) 3435 return decorator 3436 3437 3438def identity_decorator(suite): 3439 """Return suite.""" 3440 return suite 3441 3442 3443class TestDecorator(TestUtil.TestSuite): 3444 """A decorator for TestCase/TestSuite objects. 3445 3446 Contains rather than flattening suite passed on construction 3447 """ 3448 3449 def __init__(self, suite=None): 3450 super(TestDecorator, self).__init__() 3451 if suite is not None: 3452 self.addTest(suite) 3453 3454 # Don't need subclass run method with suite emptying 3455 run = unittest.TestSuite.run 3456 3457 3458class CountingDecorator(TestDecorator): 3459 """A decorator which calls result.progress(self.countTestCases).""" 3460 3461 def run(self, result): 3462 progress_method = getattr(result, 'progress', None) 3463 if callable(progress_method): 3464 progress_method(self.countTestCases(), SUBUNIT_SEEK_SET) 3465 return super(CountingDecorator, self).run(result) 3466 3467 3468class ExcludeDecorator(TestDecorator): 3469 """A decorator which excludes test matching an exclude pattern.""" 3470 3471 def __init__(self, suite, exclude_pattern): 3472 super(ExcludeDecorator, self).__init__( 3473 exclude_tests_by_re(suite, exclude_pattern)) 3474 3475 3476class FilterTestsDecorator(TestDecorator): 3477 """A decorator which filters tests to those matching a pattern.""" 3478 3479 def __init__(self, suite, pattern): 3480 super(FilterTestsDecorator, self).__init__( 3481 filter_suite_by_re(suite, pattern)) 3482 3483 3484class RandomDecorator(TestDecorator): 3485 """A decorator which randomises the order of its tests.""" 3486 3487 def __init__(self, suite, random_seed, stream): 3488 random_seed = self.actual_seed(random_seed) 3489 stream.write("Randomizing test order using seed %s\n\n" % 3490 (random_seed,)) 3491 # Initialise the random number generator. 3492 random.seed(random_seed) 3493 super(RandomDecorator, self).__init__(randomize_suite(suite)) 3494 3495 @staticmethod 3496 def actual_seed(seed): 3497 if seed == "now": 3498 # We convert the seed to an integer to make it reuseable across 3499 # invocations (because the user can reenter it). 3500 return int(time.time()) 3501 else: 3502 # Convert the seed to an integer if we can 3503 try: 3504 return int(seed) 3505 except (TypeError, ValueError): 3506 pass 3507 return seed 3508 3509 3510class TestFirstDecorator(TestDecorator): 3511 """A decorator which moves named tests to the front.""" 3512 3513 def __init__(self, suite, pattern): 3514 super(TestFirstDecorator, self).__init__() 3515 self.addTests(split_suite_by_re(suite, pattern)) 3516 3517 3518def partition_tests(suite, count): 3519 """Partition suite into count lists of tests.""" 3520 # This just assigns tests in a round-robin fashion. On one hand this 3521 # splits up blocks of related tests that might run faster if they shared 3522 # resources, but on the other it avoids assigning blocks of slow tests to 3523 # just one partition. So the slowest partition shouldn't be much slower 3524 # than the fastest. 3525 partitions = [list() for i in range(count)] 3526 tests = iter_suite_tests(suite) 3527 for partition, test in zip(itertools.cycle(partitions), tests): 3528 partition.append(test) 3529 return partitions 3530 3531 3532def workaround_zealous_crypto_random(): 3533 """Crypto.Random want to help us being secure, but we don't care here. 3534 3535 This workaround some test failure related to the sftp server. Once paramiko 3536 stop using the controversial API in Crypto.Random, we may get rid of it. 3537 """ 3538 try: 3539 from Crypto.Random import atfork 3540 atfork() 3541 except ImportError: 3542 pass 3543 3544 3545def fork_for_tests(suite): 3546 """Take suite and start up one runner per CPU by forking() 3547 3548 :return: An iterable of TestCase-like objects which can each have 3549 run(result) called on them to feed tests to result. 3550 """ 3551 concurrency = osutils.local_concurrency() 3552 result = [] 3553 from subunit import ProtocolTestCase 3554 from subunit.test_results import AutoTimingTestResultDecorator 3555 3556 class TestInOtherProcess(ProtocolTestCase): 3557 # Should be in subunit, I think. RBC. 3558 def __init__(self, stream, pid): 3559 ProtocolTestCase.__init__(self, stream) 3560 self.pid = pid 3561 3562 def run(self, result): 3563 try: 3564 ProtocolTestCase.run(self, result) 3565 finally: 3566 pid, status = os.waitpid(self.pid, 0) 3567 # GZ 2011-10-18: If status is nonzero, should report to the result 3568 # that something went wrong. 3569 3570 test_blocks = partition_tests(suite, concurrency) 3571 # Clear the tests from the original suite so it doesn't keep them alive 3572 suite._tests[:] = [] 3573 for process_tests in test_blocks: 3574 process_suite = TestUtil.TestSuite(process_tests) 3575 # Also clear each split list so new suite has only reference 3576 process_tests[:] = [] 3577 c2pread, c2pwrite = os.pipe() 3578 pid = os.fork() 3579 if pid == 0: 3580 try: 3581 stream = os.fdopen(c2pwrite, 'wb', 0) 3582 workaround_zealous_crypto_random() 3583 try: 3584 import coverage 3585 except ImportError: 3586 pass 3587 else: 3588 coverage.process_startup() 3589 os.close(c2pread) 3590 # Leave stderr and stdout open so we can see test noise 3591 # Close stdin so that the child goes away if it decides to 3592 # read from stdin (otherwise its a roulette to see what 3593 # child actually gets keystrokes for pdb etc). 3594 sys.stdin.close() 3595 subunit_result = AutoTimingTestResultDecorator( 3596 SubUnitBzrProtocolClientv1(stream)) 3597 process_suite.run(subunit_result) 3598 except: 3599 # Try and report traceback on stream, but exit with error even 3600 # if stream couldn't be created or something else goes wrong. 3601 # The traceback is formatted to a string and written in one go 3602 # to avoid interleaving lines from multiple failing children. 3603 tb = traceback.format_exc() 3604 if isinstance(tb, str): 3605 tb = tb.encode('utf-8') 3606 try: 3607 stream.write(tb) 3608 finally: 3609 stream.flush() 3610 os._exit(1) 3611 os._exit(0) 3612 else: 3613 os.close(c2pwrite) 3614 stream = os.fdopen(c2pread, 'rb', 0) 3615 test = TestInOtherProcess(stream, pid) 3616 result.append(test) 3617 return result 3618 3619 3620def reinvoke_for_tests(suite): 3621 """Take suite and start up one runner per CPU using subprocess(). 3622 3623 :return: An iterable of TestCase-like objects which can each have 3624 run(result) called on them to feed tests to result. 3625 """ 3626 concurrency = osutils.local_concurrency() 3627 result = [] 3628 from subunit import ProtocolTestCase 3629 3630 class TestInSubprocess(ProtocolTestCase): 3631 def __init__(self, process, name): 3632 ProtocolTestCase.__init__(self, process.stdout) 3633 self.process = process 3634 self.process.stdin.close() 3635 self.name = name 3636 3637 def run(self, result): 3638 try: 3639 ProtocolTestCase.run(self, result) 3640 finally: 3641 self.process.wait() 3642 os.unlink(self.name) 3643 # print "pid %d finished" % finished_process 3644 test_blocks = partition_tests(suite, concurrency) 3645 for process_tests in test_blocks: 3646 # ugly; currently reimplement rather than reuses TestCase methods. 3647 bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr' 3648 if not os.path.isfile(bzr_path): 3649 # We are probably installed. Assume sys.argv is the right file 3650 bzr_path = sys.argv[0] 3651 bzr_path = [bzr_path] 3652 if sys.platform == "win32": 3653 # if we're on windows, we can't execute the bzr script directly 3654 bzr_path = [sys.executable] + bzr_path 3655 fd, test_list_file_name = tempfile.mkstemp() 3656 test_list_file = os.fdopen(fd, 'wb', 1) 3657 for test in process_tests: 3658 test_list_file.write(test.id() + '\n') 3659 test_list_file.close() 3660 try: 3661 argv = bzr_path + ['selftest', '--load-list', test_list_file_name, 3662 '--subunit'] 3663 if '--no-plugins' in sys.argv: 3664 argv.append('--no-plugins') 3665 # stderr=subprocess.STDOUT would be ideal, but until we prevent 3666 # noise on stderr it can interrupt the subunit protocol. 3667 process = subprocess.Popen(argv, stdin=subprocess.PIPE, 3668 stdout=subprocess.PIPE, 3669 stderr=subprocess.PIPE, 3670 bufsize=1) 3671 test = TestInSubprocess(process, test_list_file_name) 3672 result.append(test) 3673 except: 3674 os.unlink(test_list_file_name) 3675 raise 3676 return result 3677 3678 3679class ProfileResult(testtools.ExtendedToOriginalDecorator): 3680 """Generate profiling data for all activity between start and success. 3681 3682 The profile data is appended to the test's _benchcalls attribute and can 3683 be accessed by the forwarded-to TestResult. 3684 3685 While it might be cleaner do accumulate this in stopTest, addSuccess is 3686 where our existing output support for lsprof is, and this class aims to 3687 fit in with that: while it could be moved it's not necessary to accomplish 3688 test profiling, nor would it be dramatically cleaner. 3689 """ 3690 3691 def startTest(self, test): 3692 self.profiler = breezy.lsprof.BzrProfiler() 3693 # Prevent deadlocks in tests that use lsprof: those tests will 3694 # unavoidably fail. 3695 breezy.lsprof.BzrProfiler.profiler_block = 0 3696 self.profiler.start() 3697 testtools.ExtendedToOriginalDecorator.startTest(self, test) 3698 3699 def addSuccess(self, test): 3700 stats = self.profiler.stop() 3701 try: 3702 calls = test._benchcalls 3703 except AttributeError: 3704 test._benchcalls = [] 3705 calls = test._benchcalls 3706 calls.append(((test.id(), "", ""), stats)) 3707 testtools.ExtendedToOriginalDecorator.addSuccess(self, test) 3708 3709 def stopTest(self, test): 3710 testtools.ExtendedToOriginalDecorator.stopTest(self, test) 3711 self.profiler = None 3712 3713 3714# Controlled by "brz selftest -E=..." option 3715# Currently supported: 3716# -Eallow_debug Will no longer clear debug.debug_flags() so it 3717# preserves any flags supplied at the command line. 3718# -Edisable_lock_checks Turns errors in mismatched locks into simple prints 3719# rather than failing tests. And no longer raise 3720# LockContention when fctnl locks are not being used 3721# with proper exclusion rules. 3722# -Ethreads Will display thread ident at creation/join time to 3723# help track thread leaks 3724# -Euncollected_cases Display the identity of any test cases that weren't 3725# deallocated after being completed. 3726# -Econfig_stats Will collect statistics using addDetail 3727selftest_debug_flags = set() 3728 3729 3730def selftest(verbose=False, pattern=".*", stop_on_failure=True, 3731 transport=None, 3732 test_suite_factory=None, 3733 lsprof_timed=None, 3734 bench_history=None, 3735 matching_tests_first=None, 3736 list_only=False, 3737 random_seed=None, 3738 exclude_pattern=None, 3739 strict=False, 3740 load_list=None, 3741 debug_flags=None, 3742 starting_with=None, 3743 runner_class=None, 3744 suite_decorators=None, 3745 stream=None, 3746 lsprof_tests=False, 3747 ): 3748 """Run the whole test suite under the enhanced runner""" 3749 # XXX: Very ugly way to do this... 3750 # Disable warning about old formats because we don't want it to disturb 3751 # any blackbox tests. 3752 from breezy import repository 3753 repository._deprecation_warning_done = True 3754 3755 global default_transport 3756 if transport is None: 3757 transport = default_transport 3758 old_transport = default_transport 3759 default_transport = transport 3760 global selftest_debug_flags 3761 old_debug_flags = selftest_debug_flags 3762 if debug_flags is not None: 3763 selftest_debug_flags = set(debug_flags) 3764 try: 3765 if load_list is None: 3766 keep_only = None 3767 else: 3768 keep_only = load_test_id_list(load_list) 3769 if starting_with: 3770 starting_with = [test_prefix_alias_registry.resolve_alias(start) 3771 for start in starting_with] 3772 # Always consider 'unittest' an interesting name so that failed 3773 # suites wrapped as test cases appear in the output. 3774 starting_with.append('unittest') 3775 if test_suite_factory is None: 3776 # Reduce loading time by loading modules based on the starting_with 3777 # patterns. 3778 suite = test_suite(keep_only, starting_with) 3779 else: 3780 suite = test_suite_factory() 3781 if starting_with: 3782 # But always filter as requested. 3783 suite = filter_suite_by_id_startswith(suite, starting_with) 3784 result_decorators = [] 3785 if lsprof_tests: 3786 result_decorators.append(ProfileResult) 3787 return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern, 3788 stop_on_failure=stop_on_failure, 3789 transport=transport, 3790 lsprof_timed=lsprof_timed, 3791 bench_history=bench_history, 3792 matching_tests_first=matching_tests_first, 3793 list_only=list_only, 3794 random_seed=random_seed, 3795 exclude_pattern=exclude_pattern, 3796 strict=strict, 3797 runner_class=runner_class, 3798 suite_decorators=suite_decorators, 3799 stream=stream, 3800 result_decorators=result_decorators, 3801 ) 3802 finally: 3803 default_transport = old_transport 3804 selftest_debug_flags = old_debug_flags 3805 3806 3807def load_test_id_list(file_name): 3808 """Load a test id list from a text file. 3809 3810 The format is one test id by line. No special care is taken to impose 3811 strict rules, these test ids are used to filter the test suite so a test id 3812 that do not match an existing test will do no harm. This allows user to add 3813 comments, leave blank lines, etc. 3814 """ 3815 test_list = [] 3816 try: 3817 ftest = open(file_name, 'rt') 3818 except IOError as e: 3819 if e.errno != errno.ENOENT: 3820 raise 3821 else: 3822 raise errors.NoSuchFile(file_name) 3823 3824 for test_name in ftest.readlines(): 3825 test_list.append(test_name.strip()) 3826 ftest.close() 3827 return test_list 3828 3829 3830def suite_matches_id_list(test_suite, id_list): 3831 """Warns about tests not appearing or appearing more than once. 3832 3833 :param test_suite: A TestSuite object. 3834 :param test_id_list: The list of test ids that should be found in 3835 test_suite. 3836 3837 :return: (absents, duplicates) absents is a list containing the test found 3838 in id_list but not in test_suite, duplicates is a list containing the 3839 tests found multiple times in test_suite. 3840 3841 When using a prefined test id list, it may occurs that some tests do not 3842 exist anymore or that some tests use the same id. This function warns the 3843 tester about potential problems in his workflow (test lists are volatile) 3844 or in the test suite itself (using the same id for several tests does not 3845 help to localize defects). 3846 """ 3847 # Build a dict counting id occurrences 3848 tests = dict() 3849 for test in iter_suite_tests(test_suite): 3850 id = test.id() 3851 tests[id] = tests.get(id, 0) + 1 3852 3853 not_found = [] 3854 duplicates = [] 3855 for id in id_list: 3856 occurs = tests.get(id, 0) 3857 if not occurs: 3858 not_found.append(id) 3859 elif occurs > 1: 3860 duplicates.append(id) 3861 3862 return not_found, duplicates 3863 3864 3865class TestIdList(object): 3866 """Test id list to filter a test suite. 3867 3868 Relying on the assumption that test ids are built as: 3869 <module>[.<class>.<method>][(<param>+)], <module> being in python dotted 3870 notation, this class offers methods to : 3871 - avoid building a test suite for modules not refered to in the test list, 3872 - keep only the tests listed from the module test suite. 3873 """ 3874 3875 def __init__(self, test_id_list): 3876 # When a test suite needs to be filtered against us we compare test ids 3877 # for equality, so a simple dict offers a quick and simple solution. 3878 self.tests = dict().fromkeys(test_id_list, True) 3879 3880 # While unittest.TestCase have ids like: 3881 # <module>.<class>.<method>[(<param+)], 3882 # doctest.DocTestCase can have ids like: 3883 # <module> 3884 # <module>.<class> 3885 # <module>.<function> 3886 # <module>.<class>.<method> 3887 3888 # Since we can't predict a test class from its name only, we settle on 3889 # a simple constraint: a test id always begins with its module name. 3890 3891 modules = {} 3892 for test_id in test_id_list: 3893 parts = test_id.split('.') 3894 mod_name = parts.pop(0) 3895 modules[mod_name] = True 3896 for part in parts: 3897 mod_name += '.' + part 3898 modules[mod_name] = True 3899 self.modules = modules 3900 3901 def refers_to(self, module_name): 3902 """Is there tests for the module or one of its sub modules.""" 3903 return module_name in self.modules 3904 3905 def includes(self, test_id): 3906 return test_id in self.tests 3907 3908 3909class TestPrefixAliasRegistry(registry.Registry): 3910 """A registry for test prefix aliases. 3911 3912 This helps implement shorcuts for the --starting-with selftest 3913 option. Overriding existing prefixes is not allowed but not fatal (a 3914 warning will be emitted). 3915 """ 3916 3917 def register(self, key, obj, help=None, info=None, 3918 override_existing=False): 3919 """See Registry.register. 3920 3921 Trying to override an existing alias causes a warning to be emitted, 3922 not a fatal execption. 3923 """ 3924 try: 3925 super(TestPrefixAliasRegistry, self).register( 3926 key, obj, help=help, info=info, override_existing=False) 3927 except KeyError: 3928 actual = self.get(key) 3929 trace.note( 3930 'Test prefix alias %s is already used for %s, ignoring %s' 3931 % (key, actual, obj)) 3932 3933 def resolve_alias(self, id_start): 3934 """Replace the alias by the prefix in the given string. 3935 3936 Using an unknown prefix is an error to help catching typos. 3937 """ 3938 parts = id_start.split('.') 3939 try: 3940 parts[0] = self.get(parts[0]) 3941 except KeyError: 3942 raise errors.CommandError( 3943 '%s is not a known test prefix alias' % parts[0]) 3944 return '.'.join(parts) 3945 3946 3947test_prefix_alias_registry = TestPrefixAliasRegistry() 3948"""Registry of test prefix aliases.""" 3949 3950 3951# This alias allows to detect typos ('bzrlin.') by making all valid test ids 3952# appear prefixed ('breezy.' is "replaced" by 'breezy.'). 3953test_prefix_alias_registry.register('breezy', 'breezy') 3954 3955# Obvious highest levels prefixes, feel free to add your own via a plugin 3956test_prefix_alias_registry.register('bd', 'breezy.doc') 3957test_prefix_alias_registry.register('bu', 'breezy.utils') 3958test_prefix_alias_registry.register('bt', 'breezy.tests') 3959test_prefix_alias_registry.register('bgt', 'breezy.git.tests') 3960test_prefix_alias_registry.register('bbt', 'breezy.bzr.tests') 3961test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox') 3962test_prefix_alias_registry.register('bp', 'breezy.plugins') 3963 3964 3965def _test_suite_testmod_names(): 3966 """Return the standard list of test module names to test.""" 3967 return [ 3968 'breezy.bzr.tests', 3969 'breezy.git.tests', 3970 'breezy.tests.blackbox', 3971 'breezy.tests.commands', 3972 'breezy.tests.per_branch', 3973 'breezy.tests.per_controldir', 3974 'breezy.tests.per_controldir_colo', 3975 'breezy.tests.per_foreign_vcs', 3976 'breezy.tests.per_interrepository', 3977 'breezy.tests.per_intertree', 3978 'breezy.tests.per_interbranch', 3979 'breezy.tests.per_lock', 3980 'breezy.tests.per_merger', 3981 'breezy.tests.per_transport', 3982 'breezy.tests.per_tree', 3983 'breezy.tests.per_repository', 3984 'breezy.tests.per_repository_reference', 3985 'breezy.tests.per_uifactory', 3986 'breezy.tests.per_workingtree', 3987 'breezy.tests.test__annotator', 3988 'breezy.tests.test__bencode', 3989 'breezy.tests.test__known_graph', 3990 'breezy.tests.test__simple_set', 3991 'breezy.tests.test__static_tuple', 3992 'breezy.tests.test__walkdirs_win32', 3993 'breezy.tests.test_ancestry', 3994 'breezy.tests.test_annotate', 3995 'breezy.tests.test_atomicfile', 3996 'breezy.tests.test_bad_files', 3997 'breezy.tests.test_bisect', 3998 'breezy.tests.test_bisect_multi', 3999 'breezy.tests.test_branch', 4000 'breezy.tests.test_branchbuilder', 4001 'breezy.tests.test_bugtracker', 4002 'breezy.tests.test__chunks_to_lines', 4003 'breezy.tests.test_cache_utf8', 4004 'breezy.tests.test_chunk_writer', 4005 'breezy.tests.test_clean_tree', 4006 'breezy.tests.test_cmdline', 4007 'breezy.tests.test_commands', 4008 'breezy.tests.test_commit', 4009 'breezy.tests.test_commit_merge', 4010 'breezy.tests.test_config', 4011 'breezy.tests.test_bedding', 4012 'breezy.tests.test_conflicts', 4013 'breezy.tests.test_controldir', 4014 'breezy.tests.test_counted_lock', 4015 'breezy.tests.test_crash', 4016 'breezy.tests.test_decorators', 4017 'breezy.tests.test_delta', 4018 'breezy.tests.test_debug', 4019 'breezy.tests.test_diff', 4020 'breezy.tests.test_directory_service', 4021 'breezy.tests.test_dirty_tracker', 4022 'breezy.tests.test_email_message', 4023 'breezy.tests.test_eol_filters', 4024 'breezy.tests.test_errors', 4025 'breezy.tests.test_estimate_compressed_size', 4026 'breezy.tests.test_export', 4027 'breezy.tests.test_export_pot', 4028 'breezy.tests.test_extract', 4029 'breezy.tests.test_features', 4030 'breezy.tests.test_fetch', 4031 'breezy.tests.test_fetch_ghosts', 4032 'breezy.tests.test_fixtures', 4033 'breezy.tests.test_fifo_cache', 4034 'breezy.tests.test_filters', 4035 'breezy.tests.test_filter_tree', 4036 'breezy.tests.test_foreign', 4037 'breezy.tests.test_generate_docs', 4038 'breezy.tests.test_globbing', 4039 'breezy.tests.test_gpg', 4040 'breezy.tests.test_graph', 4041 'breezy.tests.test_grep', 4042 'breezy.tests.test_hashcache', 4043 'breezy.tests.test_help', 4044 'breezy.tests.test_hooks', 4045 'breezy.tests.test_http', 4046 'breezy.tests.test_http_response', 4047 'breezy.tests.test_https_ca_bundle', 4048 'breezy.tests.test_https_urllib', 4049 'breezy.tests.test_i18n', 4050 'breezy.tests.test_identitymap', 4051 'breezy.tests.test_ignores', 4052 'breezy.tests.test_import_tariff', 4053 'breezy.tests.test_info', 4054 'breezy.tests.test_lazy_import', 4055 'breezy.tests.test_lazy_regex', 4056 'breezy.tests.test_library_state', 4057 'breezy.tests.test_location', 4058 'breezy.tests.test_lock', 4059 'breezy.tests.test_lockable_files', 4060 'breezy.tests.test_lockdir', 4061 'breezy.tests.test_log', 4062 'breezy.tests.test_lru_cache', 4063 'breezy.tests.test_lsprof', 4064 'breezy.tests.test_mail_client', 4065 'breezy.tests.test_matchers', 4066 'breezy.tests.test_memorybranch', 4067 'breezy.tests.test_memorytree', 4068 'breezy.tests.test_merge', 4069 'breezy.tests.test_merge3', 4070 'breezy.tests.test_mergeable', 4071 'breezy.tests.test_merge_core', 4072 'breezy.tests.test_merge_directive', 4073 'breezy.tests.test_mergetools', 4074 'breezy.tests.test_missing', 4075 'breezy.tests.test_msgeditor', 4076 'breezy.tests.test_multiparent', 4077 'breezy.tests.test_multiwalker', 4078 'breezy.tests.test_mutabletree', 4079 'breezy.tests.test_nonascii', 4080 'breezy.tests.test_options', 4081 'breezy.tests.test_osutils', 4082 'breezy.tests.test_osutils_encodings', 4083 'breezy.tests.test_patch', 4084 'breezy.tests.test_patches', 4085 'breezy.tests.test_permissions', 4086 'breezy.tests.test_plugins', 4087 'breezy.tests.test_progress', 4088 'breezy.tests.test_propose', 4089 'breezy.tests.test_pyutils', 4090 'breezy.tests.test_reconcile', 4091 'breezy.tests.test_reconfigure', 4092 'breezy.tests.test_registry', 4093 'breezy.tests.test_rename_map', 4094 'breezy.tests.test_revert', 4095 'breezy.tests.test_revision', 4096 'breezy.tests.test_revisionspec', 4097 'breezy.tests.test_revisiontree', 4098 'breezy.tests.test_rio', 4099 'breezy.tests.test__rio', 4100 'breezy.tests.test_rules', 4101 'breezy.tests.test_url_policy_open', 4102 'breezy.tests.test_sampler', 4103 'breezy.tests.test_scenarios', 4104 'breezy.tests.test_script', 4105 'breezy.tests.test_selftest', 4106 'breezy.tests.test_setup', 4107 'breezy.tests.test_sftp_transport', 4108 'breezy.tests.test_shelf', 4109 'breezy.tests.test_shelf_ui', 4110 'breezy.tests.test_smart_add', 4111 'breezy.tests.test_smtp_connection', 4112 'breezy.tests.test_source', 4113 'breezy.tests.test_ssh_transport', 4114 'breezy.tests.test_status', 4115 'breezy.tests.test_strace', 4116 'breezy.tests.test_subsume', 4117 'breezy.tests.test_switch', 4118 'breezy.tests.test_symbol_versioning', 4119 'breezy.tests.test_tag', 4120 'breezy.tests.test_test_server', 4121 'breezy.tests.test_textfile', 4122 'breezy.tests.test_textmerge', 4123 'breezy.tests.test_cethread', 4124 'breezy.tests.test_timestamp', 4125 'breezy.tests.test_trace', 4126 'breezy.tests.test_transactions', 4127 'breezy.tests.test_transform', 4128 'breezy.tests.test_transport', 4129 'breezy.tests.test_transport_log', 4130 'breezy.tests.test_tree', 4131 'breezy.tests.test_treebuilder', 4132 'breezy.tests.test_treeshape', 4133 'breezy.tests.test_tsort', 4134 'breezy.tests.test_tuned_gzip', 4135 'breezy.tests.test_ui', 4136 'breezy.tests.test_uncommit', 4137 'breezy.tests.test_upgrade', 4138 'breezy.tests.test_upgrade_stacked', 4139 'breezy.tests.test_upstream_import', 4140 'breezy.tests.test_urlutils', 4141 'breezy.tests.test_utextwrap', 4142 'breezy.tests.test_version', 4143 'breezy.tests.test_version_info', 4144 'breezy.tests.test_views', 4145 'breezy.tests.test_whitebox', 4146 'breezy.tests.test_win32utils', 4147 'breezy.tests.test_workspace', 4148 'breezy.tests.test_workingtree', 4149 'breezy.tests.test_wsgi', 4150 ] 4151 4152 4153def _test_suite_modules_to_doctest(): 4154 """Return the list of modules to doctest.""" 4155 if __doc__ is None: 4156 # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest 4157 return [] 4158 return [ 4159 'breezy', 4160 'breezy.branchbuilder', 4161 'breezy.bzr.inventory', 4162 'breezy.decorators', 4163 'breezy.iterablefile', 4164 'breezy.lockdir', 4165 'breezy.merge3', 4166 'breezy.option', 4167 'breezy.pyutils', 4168 'breezy.symbol_versioning', 4169 'breezy.tests', 4170 'breezy.tests.fixtures', 4171 'breezy.timestamp', 4172 'breezy.transport.http.urllib', 4173 'breezy.version_info_formats.format_custom', 4174 ] 4175 4176 4177def test_suite(keep_only=None, starting_with=None): 4178 """Build and return TestSuite for the whole of breezy. 4179 4180 :param keep_only: A list of test ids limiting the suite returned. 4181 4182 :param starting_with: An id limiting the suite returned to the tests 4183 starting with it. 4184 4185 This function can be replaced if you need to change the default test 4186 suite on a global basis, but it is not encouraged. 4187 """ 4188 4189 loader = TestUtil.TestLoader() 4190 4191 if keep_only is not None: 4192 id_filter = TestIdList(keep_only) 4193 if starting_with: 4194 # We take precedence over keep_only because *at loading time* using 4195 # both options means we will load less tests for the same final result. 4196 def interesting_module(name): 4197 for start in starting_with: 4198 # Either the module name starts with the specified string 4199 # or it may contain tests starting with the specified string 4200 if name.startswith(start) or start.startswith(name): 4201 return True 4202 return False 4203 loader = TestUtil.FilteredByModuleTestLoader(interesting_module) 4204 4205 elif keep_only is not None: 4206 loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to) 4207 4208 def interesting_module(name): 4209 return id_filter.refers_to(name) 4210 4211 else: 4212 loader = TestUtil.TestLoader() 4213 4214 def interesting_module(name): 4215 # No filtering, all modules are interesting 4216 return True 4217 4218 suite = loader.suiteClass() 4219 4220 # modules building their suite with loadTestsFromModuleNames 4221 suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names())) 4222 4223 suite.addTest(loader.loadTestsFromModuleNames(['breezy.doc'])) 4224 4225 for mod in _test_suite_modules_to_doctest(): 4226 if not interesting_module(mod): 4227 # No tests to keep here, move along 4228 continue 4229 try: 4230 # note that this really does mean "report only" -- doctest 4231 # still runs the rest of the examples 4232 doc_suite = IsolatedDocTestSuite( 4233 mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE) 4234 except ValueError as e: 4235 print('**failed to get doctest for: %s\n%s' % (mod, e)) 4236 raise 4237 if len(doc_suite._tests) == 0: 4238 raise errors.BzrError("no doctests found in %s" % (mod,)) 4239 suite.addTest(doc_suite) 4240 4241 default_encoding = sys.getdefaultencoding() 4242 for name, plugin in _mod_plugin.plugins().items(): 4243 if not interesting_module(plugin.module.__name__): 4244 continue 4245 plugin_suite = plugin.test_suite() 4246 # We used to catch ImportError here and turn it into just a warning, 4247 # but really if you don't have --no-plugins this should be a failure. 4248 # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771 4249 if plugin_suite is None: 4250 plugin_suite = plugin.load_plugin_tests(loader) 4251 if plugin_suite is not None: 4252 suite.addTest(plugin_suite) 4253 if default_encoding != sys.getdefaultencoding(): 4254 trace.warning( 4255 'Plugin "%s" tried to reset default encoding to: %s', name, 4256 sys.getdefaultencoding()) 4257 reload(sys) 4258 sys.setdefaultencoding(default_encoding) 4259 4260 if keep_only is not None: 4261 # Now that the referred modules have loaded their tests, keep only the 4262 # requested ones. 4263 suite = filter_suite_by_id_list(suite, id_filter) 4264 # Do some sanity checks on the id_list filtering 4265 not_found, duplicates = suite_matches_id_list(suite, keep_only) 4266 if starting_with: 4267 # The tester has used both keep_only and starting_with, so he is 4268 # already aware that some tests are excluded from the list, there 4269 # is no need to tell him which. 4270 pass 4271 else: 4272 # Some tests mentioned in the list are not in the test suite. The 4273 # list may be out of date, report to the tester. 4274 for id in not_found: 4275 trace.warning('"%s" not found in the test suite', id) 4276 for id in duplicates: 4277 trace.warning('"%s" is used as an id by several tests', id) 4278 4279 return suite 4280 4281 4282def multiply_scenarios(*scenarios): 4283 """Multiply two or more iterables of scenarios. 4284 4285 It is safe to pass scenario generators or iterators. 4286 4287 :returns: A list of compound scenarios: the cross-product of all 4288 scenarios, with the names concatenated and the parameters 4289 merged together. 4290 """ 4291 return functools.reduce(_multiply_two_scenarios, map(list, scenarios)) 4292 4293 4294def _multiply_two_scenarios(scenarios_left, scenarios_right): 4295 """Multiply two sets of scenarios. 4296 4297 :returns: the cartesian product of the two sets of scenarios, that is 4298 a scenario for every possible combination of a left scenario and a 4299 right scenario. 4300 """ 4301 return [ 4302 ('%s,%s' % (left_name, right_name), 4303 dict(left_dict, **right_dict)) 4304 for left_name, left_dict in scenarios_left 4305 for right_name, right_dict in scenarios_right] 4306 4307 4308def multiply_tests(tests, scenarios, result): 4309 """Multiply tests_list by scenarios into result. 4310 4311 This is the core workhorse for test parameterisation. 4312 4313 Typically the load_tests() method for a per-implementation test suite will 4314 call multiply_tests and return the result. 4315 4316 :param tests: The tests to parameterise. 4317 :param scenarios: The scenarios to apply: pairs of (scenario_name, 4318 scenario_param_dict). 4319 :param result: A TestSuite to add created tests to. 4320 4321 This returns the passed in result TestSuite with the cross product of all 4322 the tests repeated once for each scenario. Each test is adapted by adding 4323 the scenario name at the end of its id(), and updating the test object's 4324 __dict__ with the scenario_param_dict. 4325 4326 >>> import breezy.tests.test_sampler 4327 >>> r = multiply_tests( 4328 ... breezy.tests.test_sampler.DemoTest('test_nothing'), 4329 ... [('one', dict(param=1)), 4330 ... ('two', dict(param=2))], 4331 ... TestUtil.TestSuite()) 4332 >>> tests = list(iter_suite_tests(r)) 4333 >>> len(tests) 4334 2 4335 >>> tests[0].id() 4336 'breezy.tests.test_sampler.DemoTest.test_nothing(one)' 4337 >>> tests[0].param 4338 1 4339 >>> tests[1].param 4340 2 4341 """ 4342 for test in iter_suite_tests(tests): 4343 apply_scenarios(test, scenarios, result) 4344 return result 4345 4346 4347def apply_scenarios(test, scenarios, result): 4348 """Apply the scenarios in scenarios to test and add to result. 4349 4350 :param test: The test to apply scenarios to. 4351 :param scenarios: An iterable of scenarios to apply to test. 4352 :return: result 4353 :seealso: apply_scenario 4354 """ 4355 for scenario in scenarios: 4356 result.addTest(apply_scenario(test, scenario)) 4357 return result 4358 4359 4360def apply_scenario(test, scenario): 4361 """Copy test and apply scenario to it. 4362 4363 :param test: A test to adapt. 4364 :param scenario: A tuple describing the scenario. 4365 The first element of the tuple is the new test id. 4366 The second element is a dict containing attributes to set on the 4367 test. 4368 :return: The adapted test. 4369 """ 4370 new_id = "%s(%s)" % (test.id(), scenario[0]) 4371 new_test = clone_test(test, new_id) 4372 for name, value in scenario[1].items(): 4373 setattr(new_test, name, value) 4374 return new_test 4375 4376 4377def clone_test(test, new_id): 4378 """Clone a test giving it a new id. 4379 4380 :param test: The test to clone. 4381 :param new_id: The id to assign to it. 4382 :return: The new test. 4383 """ 4384 new_test = copy.copy(test) 4385 new_test.id = lambda: new_id 4386 # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which 4387 # causes cloned tests to share the 'details' dict. This makes it hard to 4388 # read the test output for parameterized tests, because tracebacks will be 4389 # associated with irrelevant tests. 4390 try: 4391 details = new_test._TestCase__details 4392 except AttributeError: 4393 # must be a different version of testtools than expected. Do nothing. 4394 pass 4395 else: 4396 # Reset the '__details' dict. 4397 new_test._TestCase__details = {} 4398 return new_test 4399 4400 4401 4402def permute_tests_for_extension(standard_tests, loader, py_module_name, 4403 ext_module_name): 4404 """Helper for permutating tests against an extension module. 4405 4406 This is meant to be used inside a modules 'load_tests()' function. It will 4407 create 2 scenarios, and cause all tests in the 'standard_tests' to be run 4408 against both implementations. Setting 'test.module' to the appropriate 4409 module. See breezy.tests.test__chk_map.load_tests as an example. 4410 4411 :param standard_tests: A test suite to permute 4412 :param loader: A TestLoader 4413 :param py_module_name: The python path to a python module that can always 4414 be loaded, and will be considered the 'python' implementation. (eg 4415 'breezy._chk_map_py') 4416 :param ext_module_name: The python path to an extension module. If the 4417 module cannot be loaded, a single test will be added, which notes that 4418 the module is not available. If it can be loaded, all standard_tests 4419 will be run against that module. 4420 :return: (suite, feature) suite is a test-suite that has all the permuted 4421 tests. feature is the Feature object that can be used to determine if 4422 the module is available. 4423 """ 4424 4425 from .features import ModuleAvailableFeature 4426 py_module = pyutils.get_named_object(py_module_name) 4427 scenarios = [ 4428 ('python', {'module': py_module}), 4429 ] 4430 suite = loader.suiteClass() 4431 feature = ModuleAvailableFeature(ext_module_name) 4432 if feature.available(): 4433 scenarios.append(('C', {'module': feature.module})) 4434 else: 4435 class FailWithoutFeature(TestCase): 4436 def id(self): 4437 return ext_module_name + '.' + super(FailWithoutFeature, self).id() 4438 def test_fail(self): 4439 self.requireFeature(feature) 4440 # the compiled module isn't available, so we add a failing test 4441 suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature)) 4442 result = multiply_tests(standard_tests, scenarios, suite) 4443 return result, feature 4444 4445 4446def _rmtree_temp_dir(dirname, test_id=None): 4447 # If LANG=C we probably have created some bogus paths 4448 # which rmtree(unicode) will fail to delete 4449 # so make sure we are using rmtree(str) to delete everything 4450 # except on win32, where rmtree(str) will fail 4451 # since it doesn't have the property of byte-stream paths 4452 # (they are either ascii or mbcs) 4453 if sys.platform == 'win32' and isinstance(dirname, bytes): 4454 # make sure we are using the unicode win32 api 4455 dirname = dirname.decode('mbcs') 4456 else: 4457 dirname = dirname.encode(sys.getfilesystemencoding()) 4458 try: 4459 osutils.rmtree(dirname) 4460 except OSError as e: 4461 # We don't want to fail here because some useful display will be lost 4462 # otherwise. Polluting the tmp dir is bad, but not giving all the 4463 # possible info to the test runner is even worse. 4464 if test_id is not None: 4465 ui.ui_factory.clear_term() 4466 sys.stderr.write('\nWhile running: %s\n' % (test_id,)) 4467 # Ugly, but the last thing we want here is fail, so bear with it. 4468 printable_e = str(e).decode(osutils.get_user_encoding(), 'replace' 4469 ).encode('ascii', 'replace') 4470 sys.stderr.write('Unable to remove testing dir %s\n%s' 4471 % (os.path.basename(dirname), printable_e)) 4472 4473 4474def probe_unicode_in_user_encoding(): 4475 """Try to encode several unicode strings to use in unicode-aware tests. 4476 Return first successfull match. 4477 4478 :return: (unicode value, encoded plain string value) or (None, None) 4479 """ 4480 possible_vals = [u'm\xb5', u'\xe1', u'\u0410'] 4481 for uni_val in possible_vals: 4482 try: 4483 str_val = uni_val.encode(osutils.get_user_encoding()) 4484 except UnicodeEncodeError: 4485 # Try a different character 4486 pass 4487 else: 4488 return uni_val, str_val 4489 return None, None 4490 4491 4492def probe_bad_non_ascii(encoding): 4493 """Try to find [bad] character with code [128..255] 4494 that cannot be decoded to unicode in some encoding. 4495 Return None if all non-ascii characters is valid 4496 for given encoding. 4497 """ 4498 for i in range(128, 256): 4499 char = bytes([i]) 4500 try: 4501 char.decode(encoding) 4502 except UnicodeDecodeError: 4503 return char 4504 return None 4505 4506 4507# Only define SubUnitBzrRunner if subunit is available. 4508try: 4509 from subunit import TestProtocolClient 4510 from subunit.test_results import AutoTimingTestResultDecorator 4511 4512 class SubUnitBzrProtocolClientv1(TestProtocolClient): 4513 4514 def stopTest(self, test): 4515 super(SubUnitBzrProtocolClientv1, self).stopTest(test) 4516 _clear__type_equality_funcs(test) 4517 4518 def addSuccess(self, test, details=None): 4519 # The subunit client always includes the details in the subunit 4520 # stream, but we don't want to include it in ours. 4521 if details is not None and 'log' in details: 4522 del details['log'] 4523 return super(SubUnitBzrProtocolClientv1, self).addSuccess( 4524 test, details) 4525 4526 class SubUnitBzrRunnerv1(TextTestRunner): 4527 4528 def run(self, test): 4529 result = AutoTimingTestResultDecorator( 4530 SubUnitBzrProtocolClientv1(self.stream)) 4531 test.run(result) 4532 return result 4533except ImportError: 4534 pass 4535 4536 4537try: 4538 from subunit.run import SubunitTestRunner 4539 4540 class SubUnitBzrRunnerv2(TextTestRunner, SubunitTestRunner): 4541 4542 def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1, 4543 bench_history=None, strict=False, result_decorators=None): 4544 TextTestRunner.__init__( 4545 self, stream=stream, 4546 descriptions=descriptions, verbosity=verbosity, 4547 bench_history=bench_history, strict=strict, 4548 result_decorators=result_decorators) 4549 SubunitTestRunner.__init__(self, verbosity=verbosity, 4550 stream=stream) 4551 4552 run = SubunitTestRunner.run 4553except ImportError: 4554 pass 4555