1# Copyright (C) 2005-2013, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Tests for the test framework.""" 18 19import gc 20import doctest 21from functools import reduce 22from io import BytesIO, StringIO, TextIOWrapper 23import os 24import signal 25import sys 26import threading 27import time 28import unittest 29import warnings 30 31from testtools import ( 32 ExtendedToOriginalDecorator, 33 MultiTestResult, 34 ) 35from testtools.content import Content 36from testtools.content_type import ContentType 37from testtools.matchers import ( 38 DocTestMatches, 39 Equals, 40 ) 41import testtools.testresult.doubles 42 43import breezy 44from .. import ( 45 branchbuilder, 46 controldir, 47 errors, 48 hooks, 49 lockdir, 50 memorytree, 51 osutils, 52 repository, 53 symbol_versioning, 54 tests, 55 transport, 56 workingtree, 57 ) 58from ..bzr import ( 59 bzrdir, 60 remote, 61 workingtree_3, 62 workingtree_4, 63 ) 64from ..bzr import ( 65 groupcompress_repo, 66 ) 67from ..git import ( 68 workingtree as git_workingtree, 69 ) 70from ..symbol_versioning import ( 71 deprecated_function, 72 deprecated_in, 73 deprecated_method, 74 ) 75from . import ( 76 features, 77 test_lsprof, 78 test_server, 79 TestUtil, 80 ) 81from ..trace import note, mutter 82from ..transport import memory 83 84 85def _test_ids(test_suite): 86 """Get the ids for the tests in a test suite.""" 87 return [t.id() for t in tests.iter_suite_tests(test_suite)] 88 89 90class MetaTestLog(tests.TestCase): 91 92 def test_logging(self): 93 """Test logs are captured when a test fails.""" 94 self.log('a test message') 95 details = self.getDetails() 96 log = details['log'] 97 self.assertThat(log.content_type, Equals(ContentType( 98 "text", "plain", {"charset": "utf8"}))) 99 self.assertThat(u"".join(log.iter_text()), Equals(self.get_log())) 100 self.assertThat(self.get_log(), 101 DocTestMatches(u"...a test message\n", doctest.ELLIPSIS)) 102 103 104class TestTreeShape(tests.TestCaseInTempDir): 105 106 def test_unicode_paths(self): 107 self.requireFeature(features.UnicodeFilenameFeature) 108 109 filename = u'hell\u00d8' 110 self.build_tree_contents([(filename, b'contents of hello')]) 111 self.assertPathExists(filename) 112 113 114class TestClassesAvailable(tests.TestCase): 115 """As a convenience we expose Test* classes from breezy.tests""" 116 117 def test_test_case(self): 118 from . import TestCase 119 120 def test_test_loader(self): 121 from . import TestLoader 122 123 def test_test_suite(self): 124 from . import TestSuite 125 126 127class TestTransportScenarios(tests.TestCase): 128 """A group of tests that test the transport implementation adaption core. 129 130 This is a meta test that the tests are applied to all available 131 transports. 132 133 This will be generalised in the future which is why it is in this 134 test file even though it is specific to transport tests at the moment. 135 """ 136 137 def test_get_transport_permutations(self): 138 # this checks that get_test_permutations defined by the module is 139 # called by the get_transport_test_permutations function. 140 class MockModule(object): 141 def get_test_permutations(self): 142 return sample_permutation 143 sample_permutation = [(1, 2), (3, 4)] 144 from .per_transport import get_transport_test_permutations 145 self.assertEqual(sample_permutation, 146 get_transport_test_permutations(MockModule())) 147 148 def test_scenarios_include_all_modules(self): 149 # this checks that the scenario generator returns as many permutations 150 # as there are in all the registered transport modules - we assume if 151 # this matches its probably doing the right thing especially in 152 # combination with the tests for setting the right classes below. 153 from .per_transport import transport_test_permutations 154 from ..transport import _get_transport_modules 155 modules = _get_transport_modules() 156 permutation_count = 0 157 for module in modules: 158 try: 159 permutation_count += len(reduce(getattr, 160 (module 161 + ".get_test_permutations").split('.')[1:], 162 __import__(module))()) 163 except errors.DependencyNotPresent: 164 pass 165 scenarios = transport_test_permutations() 166 self.assertEqual(permutation_count, len(scenarios)) 167 168 def test_scenarios_include_transport_class(self): 169 # This test used to know about all the possible transports and the 170 # order they were returned but that seems overly brittle (mbp 171 # 20060307) 172 from .per_transport import transport_test_permutations 173 scenarios = transport_test_permutations() 174 # there are at least that many builtin transports 175 self.assertTrue(len(scenarios) > 6) 176 one_scenario = scenarios[0] 177 self.assertIsInstance(one_scenario[0], str) 178 self.assertTrue(issubclass(one_scenario[1]["transport_class"], 179 breezy.transport.Transport)) 180 self.assertTrue(issubclass(one_scenario[1]["transport_server"], 181 breezy.transport.Server)) 182 183 184class TestBranchScenarios(tests.TestCase): 185 186 def test_scenarios(self): 187 # check that constructor parameters are passed through to the adapted 188 # test. 189 from .per_branch import make_scenarios 190 server1 = "a" 191 server2 = "b" 192 formats = [("c", "C"), ("d", "D")] 193 scenarios = make_scenarios(server1, server2, formats) 194 self.assertEqual(2, len(scenarios)) 195 self.assertEqual([ 196 ('str', 197 {'branch_format': 'c', 198 'bzrdir_format': 'C', 199 'transport_readonly_server': 'b', 200 'transport_server': 'a'}), 201 ('str', 202 {'branch_format': 'd', 203 'bzrdir_format': 'D', 204 'transport_readonly_server': 'b', 205 'transport_server': 'a'})], 206 scenarios) 207 208 209class TestBzrDirScenarios(tests.TestCase): 210 211 def test_scenarios(self): 212 # check that constructor parameters are passed through to the adapted 213 # test. 214 from .per_controldir import make_scenarios 215 vfs_factory = "v" 216 server1 = "a" 217 server2 = "b" 218 formats = ["c", "d"] 219 scenarios = make_scenarios(vfs_factory, server1, server2, formats) 220 self.assertEqual([ 221 ('str', 222 {'bzrdir_format': 'c', 223 'transport_readonly_server': 'b', 224 'transport_server': 'a', 225 'vfs_transport_factory': 'v'}), 226 ('str', 227 {'bzrdir_format': 'd', 228 'transport_readonly_server': 'b', 229 'transport_server': 'a', 230 'vfs_transport_factory': 'v'})], 231 scenarios) 232 233 234class TestRepositoryScenarios(tests.TestCase): 235 236 def test_formats_to_scenarios(self): 237 from .per_repository import formats_to_scenarios 238 formats = [("(c)", remote.RemoteRepositoryFormat()), 239 ("(d)", repository.format_registry.get( 240 b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))] 241 no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly", 242 None) 243 vfs_scenarios = formats_to_scenarios(formats, "server", "readonly", 244 vfs_transport_factory="vfs") 245 # no_vfs generate scenarios without vfs_transport_factory 246 expected = [ 247 ('RemoteRepositoryFormat(c)', 248 {'bzrdir_format': remote.RemoteBzrDirFormat(), 249 'repository_format': remote.RemoteRepositoryFormat(), 250 'transport_readonly_server': 'readonly', 251 'transport_server': 'server'}), 252 ('RepositoryFormat2a(d)', 253 {'bzrdir_format': bzrdir.BzrDirMetaFormat1(), 254 'repository_format': groupcompress_repo.RepositoryFormat2a(), 255 'transport_readonly_server': 'readonly', 256 'transport_server': 'server'})] 257 self.assertEqual(expected, no_vfs_scenarios) 258 self.assertEqual([ 259 ('RemoteRepositoryFormat(c)', 260 {'bzrdir_format': remote.RemoteBzrDirFormat(), 261 'repository_format': remote.RemoteRepositoryFormat(), 262 'transport_readonly_server': 'readonly', 263 'transport_server': 'server', 264 'vfs_transport_factory': 'vfs'}), 265 ('RepositoryFormat2a(d)', 266 {'bzrdir_format': bzrdir.BzrDirMetaFormat1(), 267 'repository_format': groupcompress_repo.RepositoryFormat2a(), 268 'transport_readonly_server': 'readonly', 269 'transport_server': 'server', 270 'vfs_transport_factory': 'vfs'})], 271 vfs_scenarios) 272 273 274class TestTestScenarioApplication(tests.TestCase): 275 """Tests for the test adaption facilities.""" 276 277 def test_apply_scenario(self): 278 from breezy.tests import apply_scenario 279 input_test = TestTestScenarioApplication("test_apply_scenario") 280 # setup two adapted tests 281 adapted_test1 = apply_scenario(input_test, 282 ("new id", 283 {"bzrdir_format": "bzr_format", 284 "repository_format": "repo_fmt", 285 "transport_server": "transport_server", 286 "transport_readonly_server": "readonly-server"})) 287 adapted_test2 = apply_scenario(input_test, 288 ("new id 2", {"bzrdir_format": None})) 289 # input_test should have been altered. 290 self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format") 291 # the new tests are mutually incompatible, ensuring it has 292 # made new ones, and unspecified elements in the scenario 293 # should not have been altered. 294 self.assertEqual("bzr_format", adapted_test1.bzrdir_format) 295 self.assertEqual("repo_fmt", adapted_test1.repository_format) 296 self.assertEqual("transport_server", adapted_test1.transport_server) 297 self.assertEqual("readonly-server", 298 adapted_test1.transport_readonly_server) 299 self.assertEqual( 300 "breezy.tests.test_selftest.TestTestScenarioApplication." 301 "test_apply_scenario(new id)", 302 adapted_test1.id()) 303 self.assertEqual(None, adapted_test2.bzrdir_format) 304 self.assertEqual( 305 "breezy.tests.test_selftest.TestTestScenarioApplication." 306 "test_apply_scenario(new id 2)", 307 adapted_test2.id()) 308 309 310class TestInterRepositoryScenarios(tests.TestCase): 311 312 def test_scenarios(self): 313 # check that constructor parameters are passed through to the adapted 314 # test. 315 from .per_interrepository import make_scenarios 316 server1 = "a" 317 server2 = "b" 318 formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")] 319 scenarios = make_scenarios(server1, server2, formats) 320 self.assertEqual([ 321 ('C0,str,str', 322 {'repository_format': 'C1', 323 'repository_format_to': 'C2', 324 'transport_readonly_server': 'b', 325 'transport_server': 'a', 326 'extra_setup': 'C3'}), 327 ('D0,str,str', 328 {'repository_format': 'D1', 329 'repository_format_to': 'D2', 330 'transport_readonly_server': 'b', 331 'transport_server': 'a', 332 'extra_setup': 'D3'})], 333 scenarios) 334 335 336class TestWorkingTreeScenarios(tests.TestCase): 337 338 def test_scenarios(self): 339 # check that constructor parameters are passed through to the adapted 340 # test. 341 from .per_workingtree import make_scenarios 342 server1 = "a" 343 server2 = "b" 344 formats = [workingtree_4.WorkingTreeFormat4(), 345 workingtree_3.WorkingTreeFormat3(), 346 workingtree_4.WorkingTreeFormat6()] 347 scenarios = make_scenarios(server1, server2, formats, 348 remote_server='c', remote_readonly_server='d', 349 remote_backing_server='e') 350 self.assertEqual([ 351 ('WorkingTreeFormat4', 352 {'bzrdir_format': formats[0]._matchingcontroldir, 353 'transport_readonly_server': 'b', 354 'transport_server': 'a', 355 'workingtree_format': formats[0]}), 356 ('WorkingTreeFormat3', 357 {'bzrdir_format': formats[1]._matchingcontroldir, 358 'transport_readonly_server': 'b', 359 'transport_server': 'a', 360 'workingtree_format': formats[1]}), 361 ('WorkingTreeFormat6', 362 {'bzrdir_format': formats[2]._matchingcontroldir, 363 'transport_readonly_server': 'b', 364 'transport_server': 'a', 365 'workingtree_format': formats[2]}), 366 ('WorkingTreeFormat6,remote', 367 {'bzrdir_format': formats[2]._matchingcontroldir, 368 'repo_is_remote': True, 369 'transport_readonly_server': 'd', 370 'transport_server': 'c', 371 'vfs_transport_factory': 'e', 372 'workingtree_format': formats[2]}), 373 ], scenarios) 374 375 376class TestTreeScenarios(tests.TestCase): 377 378 def test_scenarios(self): 379 # the tree implementation scenario generator is meant to setup one 380 # instance for each working tree format, one additional instance 381 # that will use the default wt format, but create a revision tree for 382 # the tests, and one more that uses the default wt format as a 383 # lightweight checkout of a remote repository. This means that the wt 384 # ones should have the workingtree_to_test_tree attribute set to 385 # 'return_parameter' and the revision one set to 386 # revision_tree_from_workingtree. 387 388 from .per_tree import ( 389 _dirstate_tree_from_workingtree, 390 make_scenarios, 391 preview_tree_pre, 392 preview_tree_post, 393 return_parameter, 394 revision_tree_from_workingtree 395 ) 396 server1 = "a" 397 server2 = "b" 398 smart_server = test_server.SmartTCPServer_for_testing 399 smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing 400 mem_server = memory.MemoryServer 401 formats = [workingtree_4.WorkingTreeFormat4(), 402 workingtree_3.WorkingTreeFormat3(), ] 403 scenarios = make_scenarios(server1, server2, formats) 404 self.assertEqual(9, len(scenarios)) 405 default_wt_format = workingtree.format_registry.get_default() 406 wt4_format = workingtree_4.WorkingTreeFormat4() 407 wt5_format = workingtree_4.WorkingTreeFormat5() 408 wt6_format = workingtree_4.WorkingTreeFormat6() 409 git_wt_format = git_workingtree.GitWorkingTreeFormat() 410 expected_scenarios = [ 411 ('WorkingTreeFormat4', 412 {'bzrdir_format': formats[0]._matchingcontroldir, 413 'transport_readonly_server': 'b', 414 'transport_server': 'a', 415 'workingtree_format': formats[0], 416 '_workingtree_to_test_tree': return_parameter, 417 }), 418 ('WorkingTreeFormat3', 419 {'bzrdir_format': formats[1]._matchingcontroldir, 420 'transport_readonly_server': 'b', 421 'transport_server': 'a', 422 'workingtree_format': formats[1], 423 '_workingtree_to_test_tree': return_parameter, 424 }), 425 ('WorkingTreeFormat6,remote', 426 {'bzrdir_format': wt6_format._matchingcontroldir, 427 'repo_is_remote': True, 428 'transport_readonly_server': smart_readonly_server, 429 'transport_server': smart_server, 430 'vfs_transport_factory': mem_server, 431 'workingtree_format': wt6_format, 432 '_workingtree_to_test_tree': return_parameter, 433 }), 434 ('RevisionTree', 435 {'_workingtree_to_test_tree': revision_tree_from_workingtree, 436 'bzrdir_format': default_wt_format._matchingcontroldir, 437 'transport_readonly_server': 'b', 438 'transport_server': 'a', 439 'workingtree_format': default_wt_format, 440 }), 441 ('GitRevisionTree', 442 {'_workingtree_to_test_tree': revision_tree_from_workingtree, 443 'bzrdir_format': git_wt_format._matchingcontroldir, 444 'transport_readonly_server': 'b', 445 'transport_server': 'a', 446 'workingtree_format': git_wt_format, 447 } 448 ), 449 ('DirStateRevisionTree,WT4', 450 {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree, 451 'bzrdir_format': wt4_format._matchingcontroldir, 452 'transport_readonly_server': 'b', 453 'transport_server': 'a', 454 'workingtree_format': wt4_format, 455 }), 456 ('DirStateRevisionTree,WT5', 457 {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree, 458 'bzrdir_format': wt5_format._matchingcontroldir, 459 'transport_readonly_server': 'b', 460 'transport_server': 'a', 461 'workingtree_format': wt5_format, 462 }), 463 ('PreviewTree', 464 {'_workingtree_to_test_tree': preview_tree_pre, 465 'bzrdir_format': default_wt_format._matchingcontroldir, 466 'transport_readonly_server': 'b', 467 'transport_server': 'a', 468 'workingtree_format': default_wt_format}), 469 ('PreviewTreePost', 470 {'_workingtree_to_test_tree': preview_tree_post, 471 'bzrdir_format': default_wt_format._matchingcontroldir, 472 'transport_readonly_server': 'b', 473 'transport_server': 'a', 474 'workingtree_format': default_wt_format}), 475 ] 476 self.assertEqual(expected_scenarios, scenarios) 477 478 479class TestInterTreeScenarios(tests.TestCase): 480 """A group of tests that test the InterTreeTestAdapter.""" 481 482 def test_scenarios(self): 483 # check that constructor parameters are passed through to the adapted 484 # test. 485 # for InterTree tests we want the machinery to bring up two trees in 486 # each instance: the base one, and the one we are interacting with. 487 # because each optimiser can be direction specific, we need to test 488 # each optimiser in its chosen direction. 489 # unlike the TestProviderAdapter we dont want to automatically add a 490 # parameterized one for WorkingTree - the optimisers will tell us what 491 # ones to add. 492 from .per_tree import ( 493 return_parameter, 494 ) 495 from .per_intertree import ( 496 make_scenarios, 497 ) 498 from ..bzr.workingtree_3 import WorkingTreeFormat3 499 from ..bzr.workingtree_4 import WorkingTreeFormat4 500 input_test = TestInterTreeScenarios( 501 "test_scenarios") 502 server1 = "a" 503 server2 = "b" 504 format1 = WorkingTreeFormat4() 505 format2 = WorkingTreeFormat3() 506 formats = [("1", str, format1, format2, "converter1"), 507 ("2", int, format2, format1, "converter2")] 508 scenarios = make_scenarios(server1, server2, formats) 509 self.assertEqual(2, len(scenarios)) 510 expected_scenarios = [ 511 ("1", { 512 "bzrdir_format": format1._matchingcontroldir, 513 "intertree_class": formats[0][1], 514 "workingtree_format": formats[0][2], 515 "workingtree_format_to": formats[0][3], 516 "mutable_trees_to_test_trees": formats[0][4], 517 "_workingtree_to_test_tree": return_parameter, 518 "transport_server": server1, 519 "transport_readonly_server": server2, 520 }), 521 ("2", { 522 "bzrdir_format": format2._matchingcontroldir, 523 "intertree_class": formats[1][1], 524 "workingtree_format": formats[1][2], 525 "workingtree_format_to": formats[1][3], 526 "mutable_trees_to_test_trees": formats[1][4], 527 "_workingtree_to_test_tree": return_parameter, 528 "transport_server": server1, 529 "transport_readonly_server": server2, 530 }), 531 ] 532 self.assertEqual(scenarios, expected_scenarios) 533 534 535class TestTestCaseInTempDir(tests.TestCaseInTempDir): 536 537 def test_home_is_not_working(self): 538 self.assertNotEqual(self.test_dir, self.test_home_dir) 539 cwd = osutils.getcwd() 540 self.assertIsSameRealPath(self.test_dir, cwd) 541 self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME']) 542 543 def test_assertEqualStat_equal(self): 544 from ..bzr.tests.test_dirstate import _FakeStat 545 self.build_tree(["foo"]) 546 real = os.lstat("foo") 547 fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime, 548 real.st_dev, real.st_ino, real.st_mode) 549 self.assertEqualStat(real, fake) 550 551 def test_assertEqualStat_notequal(self): 552 self.build_tree(["foo", "longname"]) 553 self.assertRaises(AssertionError, self.assertEqualStat, 554 os.lstat("foo"), os.lstat("longname")) 555 556 def test_assertPathExists(self): 557 self.assertPathExists('.') 558 self.build_tree(['foo/', 'foo/bar']) 559 self.assertPathExists('foo/bar') 560 self.assertPathDoesNotExist('foo/foo') 561 562 563class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport): 564 565 def test_home_is_non_existant_dir_under_root(self): 566 """The test_home_dir for TestCaseWithMemoryTransport is missing. 567 568 This is because TestCaseWithMemoryTransport is for tests that do not 569 need any disk resources: they should be hooked into breezy in such a 570 way that no global settings are being changed by the test (only a 571 few tests should need to do that), and having a missing dir as home is 572 an effective way to ensure that this is the case. 573 """ 574 self.assertIsSameRealPath( 575 self.TEST_ROOT + "/MemoryTransportMissingHomeDir", 576 self.test_home_dir) 577 self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME']) 578 579 def test_cwd_is_TEST_ROOT(self): 580 self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT) 581 cwd = osutils.getcwd() 582 self.assertIsSameRealPath(self.test_dir, cwd) 583 584 def test_BRZ_HOME_and_HOME_are_bytestrings(self): 585 """The $BRZ_HOME and $HOME environment variables should not be unicode. 586 587 See https://bugs.launchpad.net/bzr/+bug/464174 588 """ 589 self.assertIsInstance(os.environ['BRZ_HOME'], str) 590 self.assertIsInstance(os.environ['HOME'], str) 591 592 def test_make_branch_and_memory_tree(self): 593 """In TestCaseWithMemoryTransport we should not make the branch on disk. 594 595 This is hard to comprehensively robustly test, so we settle for making 596 a branch and checking no directory was created at its relpath. 597 """ 598 tree = self.make_branch_and_memory_tree('dir') 599 # Guard against regression into MemoryTransport leaking 600 # files to disk instead of keeping them in memory. 601 self.assertFalse(osutils.lexists('dir')) 602 self.assertIsInstance(tree, memorytree.MemoryTree) 603 604 def test_make_branch_and_memory_tree_with_format(self): 605 """make_branch_and_memory_tree should accept a format option.""" 606 format = bzrdir.BzrDirMetaFormat1() 607 format.repository_format = repository.format_registry.get_default() 608 tree = self.make_branch_and_memory_tree('dir', format=format) 609 # Guard against regression into MemoryTransport leaking 610 # files to disk instead of keeping them in memory. 611 self.assertFalse(osutils.lexists('dir')) 612 self.assertIsInstance(tree, memorytree.MemoryTree) 613 self.assertEqual(format.repository_format.__class__, 614 tree.branch.repository._format.__class__) 615 616 def test_make_branch_builder(self): 617 builder = self.make_branch_builder('dir') 618 self.assertIsInstance(builder, branchbuilder.BranchBuilder) 619 # Guard against regression into MemoryTransport leaking 620 # files to disk instead of keeping them in memory. 621 self.assertFalse(osutils.lexists('dir')) 622 623 def test_make_branch_builder_with_format(self): 624 # Use a repo layout that doesn't conform to a 'named' layout, to ensure 625 # that the format objects are used. 626 format = bzrdir.BzrDirMetaFormat1() 627 repo_format = repository.format_registry.get_default() 628 format.repository_format = repo_format 629 builder = self.make_branch_builder('dir', format=format) 630 the_branch = builder.get_branch() 631 # Guard against regression into MemoryTransport leaking 632 # files to disk instead of keeping them in memory. 633 self.assertFalse(osutils.lexists('dir')) 634 self.assertEqual(format.repository_format.__class__, 635 the_branch.repository._format.__class__) 636 self.assertEqual(repo_format.get_format_string(), 637 self.get_transport().get_bytes( 638 'dir/.bzr/repository/format')) 639 640 def test_make_branch_builder_with_format_name(self): 641 builder = self.make_branch_builder('dir', format='knit') 642 the_branch = builder.get_branch() 643 # Guard against regression into MemoryTransport leaking 644 # files to disk instead of keeping them in memory. 645 self.assertFalse(osutils.lexists('dir')) 646 dir_format = controldir.format_registry.make_controldir('knit') 647 self.assertEqual(dir_format.repository_format.__class__, 648 the_branch.repository._format.__class__) 649 self.assertEqual(b'Bazaar-NG Knit Repository Format 1', 650 self.get_transport().get_bytes( 651 'dir/.bzr/repository/format')) 652 653 def test_dangling_locks_cause_failures(self): 654 class TestDanglingLock(tests.TestCaseWithMemoryTransport): 655 def test_function(self): 656 t = self.get_transport_from_path('.') 657 l = lockdir.LockDir(t, 'lock') 658 l.create() 659 l.attempt_lock() 660 test = TestDanglingLock('test_function') 661 result = test.run() 662 total_failures = result.errors + result.failures 663 if self._lock_check_thorough: 664 self.assertEqual(1, len(total_failures)) 665 else: 666 # When _lock_check_thorough is disabled, then we don't trigger a 667 # failure 668 self.assertEqual(0, len(total_failures)) 669 670 671class TestTestCaseWithTransport(tests.TestCaseWithTransport): 672 """Tests for the convenience functions TestCaseWithTransport introduces.""" 673 674 def test_get_readonly_url_none(self): 675 from ..transport.readonly import ReadonlyTransportDecorator 676 self.vfs_transport_factory = memory.MemoryServer 677 self.transport_readonly_server = None 678 # calling get_readonly_transport() constructs a decorator on the url 679 # for the server 680 url = self.get_readonly_url() 681 url2 = self.get_readonly_url('foo/bar') 682 t = transport.get_transport_from_url(url) 683 t2 = transport.get_transport_from_url(url2) 684 self.assertIsInstance(t, ReadonlyTransportDecorator) 685 self.assertIsInstance(t2, ReadonlyTransportDecorator) 686 self.assertEqual(t2.base[:-1], t.abspath('foo/bar')) 687 688 def test_get_readonly_url_http(self): 689 from .http_server import HttpServer 690 from ..transport.http.urllib import HttpTransport 691 self.transport_server = test_server.LocalURLServer 692 self.transport_readonly_server = HttpServer 693 # calling get_readonly_transport() gives us a HTTP server instance. 694 url = self.get_readonly_url() 695 url2 = self.get_readonly_url('foo/bar') 696 # the transport returned may be any HttpTransportBase subclass 697 t = transport.get_transport_from_url(url) 698 t2 = transport.get_transport_from_url(url2) 699 self.assertIsInstance(t, HttpTransport) 700 self.assertIsInstance(t2, HttpTransport) 701 self.assertEqual(t2.base[:-1], t.abspath('foo/bar')) 702 703 def test_is_directory(self): 704 """Test assertIsDirectory assertion""" 705 t = self.get_transport() 706 self.build_tree(['a_dir/', 'a_file'], transport=t) 707 self.assertIsDirectory('a_dir', t) 708 self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t) 709 self.assertRaises( 710 AssertionError, self.assertIsDirectory, 'not_here', t) 711 712 def test_make_branch_builder(self): 713 builder = self.make_branch_builder('dir') 714 rev_id = builder.build_commit() 715 self.assertPathExists('dir') 716 a_dir = controldir.ControlDir.open('dir') 717 self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree) 718 a_branch = a_dir.open_branch() 719 builder_branch = builder.get_branch() 720 self.assertEqual(a_branch.base, builder_branch.base) 721 self.assertEqual((1, rev_id), builder_branch.last_revision_info()) 722 self.assertEqual((1, rev_id), a_branch.last_revision_info()) 723 724 725class TestTestCaseTransports(tests.TestCaseWithTransport): 726 727 def setUp(self): 728 super(TestTestCaseTransports, self).setUp() 729 self.vfs_transport_factory = memory.MemoryServer 730 731 def test_make_controldir_preserves_transport(self): 732 t = self.get_transport() 733 result_bzrdir = self.make_controldir('subdir') 734 self.assertIsInstance(result_bzrdir.transport, 735 memory.MemoryTransport) 736 # should not be on disk, should only be in memory 737 self.assertPathDoesNotExist('subdir') 738 739 740class TestChrootedTest(tests.ChrootedTestCase): 741 742 def test_root_is_root(self): 743 t = transport.get_transport_from_url(self.get_readonly_url()) 744 url = t.base 745 self.assertEqual(url, t.clone('..').base) 746 747 748class TestProfileResult(tests.TestCase): 749 750 def test_profiles_tests(self): 751 self.requireFeature(features.lsprof_feature) 752 terminal = testtools.testresult.doubles.ExtendedTestResult() 753 result = tests.ProfileResult(terminal) 754 755 class Sample(tests.TestCase): 756 def a(self): 757 self.sample_function() 758 759 def sample_function(self): 760 pass 761 test = Sample("a") 762 test.run(result) 763 case = terminal._events[0][1] 764 self.assertLength(1, case._benchcalls) 765 # We must be able to unpack it as the test reporting code wants 766 (_, _, _), stats = case._benchcalls[0] 767 self.assertTrue(callable(stats.pprint)) 768 769 770class TestTestResult(tests.TestCase): 771 772 def check_timing(self, test_case, expected_re): 773 result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1) 774 capture = testtools.testresult.doubles.ExtendedTestResult() 775 test_case.run(MultiTestResult(result, capture)) 776 run_case = capture._events[0][1] 777 timed_string = result._testTimeString(run_case) 778 self.assertContainsRe(timed_string, expected_re) 779 780 def test_test_reporting(self): 781 class ShortDelayTestCase(tests.TestCase): 782 def test_short_delay(self): 783 time.sleep(0.003) 784 785 def test_short_benchmark(self): 786 self.time(time.sleep, 0.003) 787 self.check_timing(ShortDelayTestCase('test_short_delay'), 788 r"^ +[0-9]+ms$") 789 # if a benchmark time is given, we now show just that time followed by 790 # a star 791 self.check_timing(ShortDelayTestCase('test_short_benchmark'), 792 r"^ +[0-9]+ms\*$") 793 794 def test_unittest_reporting_unittest_class(self): 795 # getting the time from a non-breezy test works ok 796 class ShortDelayTestCase(unittest.TestCase): 797 def test_short_delay(self): 798 time.sleep(0.003) 799 self.check_timing(ShortDelayTestCase('test_short_delay'), 800 r"^ +[0-9]+ms$") 801 802 def _time_hello_world_encoding(self): 803 """Profile two sleep calls 804 805 This is used to exercise the test framework. 806 """ 807 self.time(str, b'hello', errors='replace') 808 self.time(str, b'world', errors='replace') 809 810 def test_lsprofiling(self): 811 """Verbose test result prints lsprof statistics from test cases.""" 812 self.requireFeature(features.lsprof_feature) 813 result_stream = StringIO() 814 result = breezy.tests.VerboseTestResult( 815 result_stream, 816 descriptions=0, 817 verbosity=2, 818 ) 819 # we want profile a call of some sort and check it is output by 820 # addSuccess. We dont care about addError or addFailure as they 821 # are not that interesting for performance tuning. 822 # make a new test instance that when run will generate a profile 823 example_test_case = TestTestResult("_time_hello_world_encoding") 824 example_test_case._gather_lsprof_in_benchmarks = True 825 # execute the test, which should succeed and record profiles 826 example_test_case.run(result) 827 # lsprofile_something() 828 # if this worked we want 829 # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'}) 830 # CallCount Recursive Total(ms) Inline(ms) module:lineno(function) 831 # (the lsprof header) 832 # ... an arbitrary number of lines 833 # and the function call which is time.sleep. 834 # 1 0 ??? ??? ???(sleep) 835 # and then repeated but with 'world', rather than 'hello'. 836 # this should appear in the output stream of our test result. 837 output = result_stream.getvalue() 838 self.assertContainsRe(output, 839 r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)") 840 self.assertContainsRe(output, 841 r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)") 842 self.assertContainsRe(output, 843 r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n") 844 self.assertContainsRe(output, 845 r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?") 846 847 def test_uses_time_from_testtools(self): 848 """Test case timings in verbose results should use testtools times""" 849 import datetime 850 851 class TimeAddedVerboseTestResult(tests.VerboseTestResult): 852 def startTest(self, test): 853 self.time(datetime.datetime.utcfromtimestamp(1.145)) 854 super(TimeAddedVerboseTestResult, self).startTest(test) 855 856 def addSuccess(self, test): 857 self.time(datetime.datetime.utcfromtimestamp(51.147)) 858 super(TimeAddedVerboseTestResult, self).addSuccess(test) 859 860 def report_tests_starting(self): pass 861 sio = StringIO() 862 self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2)) 863 self.assertEndsWith(sio.getvalue(), "OK 50002ms\n") 864 865 def test_known_failure(self): 866 """Using knownFailure should trigger several result actions.""" 867 class InstrumentedTestResult(tests.ExtendedTestResult): 868 def stopTestRun(self): pass 869 870 def report_tests_starting(self): pass 871 872 def report_known_failure(self, test, err=None, details=None): 873 self._call = test, 'known failure' 874 result = InstrumentedTestResult(None, None, None, None) 875 876 class Test(tests.TestCase): 877 def test_function(self): 878 self.knownFailure('failed!') 879 test = Test("test_function") 880 test.run(result) 881 # it should invoke 'report_known_failure'. 882 self.assertEqual(2, len(result._call)) 883 self.assertEqual(test.id(), result._call[0].id()) 884 self.assertEqual('known failure', result._call[1]) 885 # we dont introspec the traceback, if the rest is ok, it would be 886 # exceptional for it not to be. 887 # it should update the known_failure_count on the object. 888 self.assertEqual(1, result.known_failure_count) 889 # the result should be successful. 890 self.assertTrue(result.wasSuccessful()) 891 892 def test_verbose_report_known_failure(self): 893 # verbose test output formatting 894 result_stream = StringIO() 895 result = breezy.tests.VerboseTestResult( 896 result_stream, 897 descriptions=0, 898 verbosity=2, 899 ) 900 _get_test("test_xfail").run(result) 901 self.assertContainsRe(result_stream.getvalue(), 902 "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n" 903 "\\s*(?:Text attachment: )?reason" 904 "(?:\n-+\n|: {{{)" 905 "this_fails" 906 "(?:\n-+\n|}}}\n)") 907 908 def get_passing_test(self): 909 """Return a test object that can't be run usefully.""" 910 def passing_test(): 911 pass 912 return unittest.FunctionTestCase(passing_test) 913 914 def test_add_not_supported(self): 915 """Test the behaviour of invoking addNotSupported.""" 916 class InstrumentedTestResult(tests.ExtendedTestResult): 917 def stopTestRun(self): pass 918 919 def report_tests_starting(self): pass 920 921 def report_unsupported(self, test, feature): 922 self._call = test, feature 923 result = InstrumentedTestResult(None, None, None, None) 924 test = SampleTestCase('_test_pass') 925 feature = features.Feature() 926 result.startTest(test) 927 result.addNotSupported(test, feature) 928 # it should invoke 'report_unsupported'. 929 self.assertEqual(2, len(result._call)) 930 self.assertEqual(test, result._call[0]) 931 self.assertEqual(feature, result._call[1]) 932 # the result should be successful. 933 self.assertTrue(result.wasSuccessful()) 934 # it should record the test against a count of tests not run due to 935 # this feature. 936 self.assertEqual(1, result.unsupported['Feature']) 937 # and invoking it again should increment that counter 938 result.addNotSupported(test, feature) 939 self.assertEqual(2, result.unsupported['Feature']) 940 941 def test_verbose_report_unsupported(self): 942 # verbose test output formatting 943 result_stream = StringIO() 944 result = breezy.tests.VerboseTestResult( 945 result_stream, 946 descriptions=0, 947 verbosity=2, 948 ) 949 test = self.get_passing_test() 950 feature = features.Feature() 951 result.startTest(test) 952 prefix = len(result_stream.getvalue()) 953 result.report_unsupported(test, feature) 954 output = result_stream.getvalue()[prefix:] 955 lines = output.splitlines() 956 # We don't check for the final '0ms' since it may fail on slow hosts 957 self.assertStartsWith(lines[0], 'NODEP') 958 self.assertEqual(lines[1], 959 " The feature 'Feature' is not available.") 960 961 def test_unavailable_exception(self): 962 """An UnavailableFeature being raised should invoke addNotSupported.""" 963 class InstrumentedTestResult(tests.ExtendedTestResult): 964 def stopTestRun(self): 965 pass 966 967 def report_tests_starting(self): 968 pass 969 970 def addNotSupported(self, test, feature): 971 self._call = test, feature 972 result = InstrumentedTestResult(None, None, None, None) 973 feature = features.Feature() 974 975 class Test(tests.TestCase): 976 def test_function(self): 977 raise tests.UnavailableFeature(feature) 978 test = Test("test_function") 979 test.run(result) 980 # it should invoke 'addNotSupported'. 981 self.assertEqual(2, len(result._call)) 982 self.assertEqual(test.id(), result._call[0].id()) 983 self.assertEqual(feature, result._call[1]) 984 # and not count as an error 985 self.assertEqual(0, result.error_count) 986 987 def test_strict_with_unsupported_feature(self): 988 result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1) 989 test = self.get_passing_test() 990 feature = "Unsupported Feature" 991 result.addNotSupported(test, feature) 992 self.assertFalse(result.wasStrictlySuccessful()) 993 self.assertEqual(None, result._extractBenchmarkTime(test)) 994 995 def test_strict_with_known_failure(self): 996 result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1) 997 test = _get_test("test_xfail") 998 test.run(result) 999 self.assertFalse(result.wasStrictlySuccessful()) 1000 self.assertEqual(None, result._extractBenchmarkTime(test)) 1001 1002 def test_strict_with_success(self): 1003 result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1) 1004 test = self.get_passing_test() 1005 result.addSuccess(test) 1006 self.assertTrue(result.wasStrictlySuccessful()) 1007 self.assertEqual(None, result._extractBenchmarkTime(test)) 1008 1009 def test_startTests(self): 1010 """Starting the first test should trigger startTests.""" 1011 class InstrumentedTestResult(tests.ExtendedTestResult): 1012 calls = 0 1013 1014 def startTests(self): 1015 self.calls += 1 1016 result = InstrumentedTestResult(None, None, None, None) 1017 1018 def test_function(): 1019 pass 1020 test = unittest.FunctionTestCase(test_function) 1021 test.run(result) 1022 self.assertEqual(1, result.calls) 1023 1024 def test_startTests_only_once(self): 1025 """With multiple tests startTests should still only be called once""" 1026 class InstrumentedTestResult(tests.ExtendedTestResult): 1027 calls = 0 1028 1029 def startTests(self): 1030 self.calls += 1 1031 result = InstrumentedTestResult(None, None, None, None) 1032 suite = unittest.TestSuite([ 1033 unittest.FunctionTestCase(lambda: None), 1034 unittest.FunctionTestCase(lambda: None)]) 1035 suite.run(result) 1036 self.assertEqual(1, result.calls) 1037 self.assertEqual(2, result.count) 1038 1039 1040class TestRunner(tests.TestCase): 1041 1042 def dummy_test(self): 1043 pass 1044 1045 def run_test_runner(self, testrunner, test): 1046 """Run suite in testrunner, saving global state and restoring it. 1047 1048 This current saves and restores: 1049 TestCaseInTempDir.TEST_ROOT 1050 1051 There should be no tests in this file that use 1052 breezy.tests.TextTestRunner without using this convenience method, 1053 because of our use of global state. 1054 """ 1055 old_root = tests.TestCaseInTempDir.TEST_ROOT 1056 try: 1057 tests.TestCaseInTempDir.TEST_ROOT = None 1058 return testrunner.run(test) 1059 finally: 1060 tests.TestCaseInTempDir.TEST_ROOT = old_root 1061 1062 def test_known_failure_failed_run(self): 1063 # run a test that generates a known failure which should be printed in 1064 # the final output when real failures occur. 1065 class Test(tests.TestCase): 1066 def known_failure_test(self): 1067 self.expectFailure('failed', self.assertTrue, False) 1068 test = unittest.TestSuite() 1069 test.addTest(Test("known_failure_test")) 1070 1071 def failing_test(): 1072 raise AssertionError('foo') 1073 test.addTest(unittest.FunctionTestCase(failing_test)) 1074 stream = StringIO() 1075 runner = tests.TextTestRunner(stream=stream) 1076 self.run_test_runner(runner, test) 1077 self.assertContainsRe( 1078 stream.getvalue(), 1079 '(?sm)^brz selftest.*$' 1080 '.*' 1081 '^======================================================================\n' 1082 '^FAIL: failing_test\n' 1083 '^----------------------------------------------------------------------\n' 1084 'Traceback \\(most recent call last\\):\n' 1085 ' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc 1086 ' raise AssertionError\\(\'foo\'\\)\n' 1087 '.*' 1088 '^----------------------------------------------------------------------\n' 1089 '.*' 1090 'FAILED \\(failures=1, known_failure_count=1\\)' 1091 ) 1092 1093 def test_known_failure_ok_run(self): 1094 # run a test that generates a known failure which should be printed in 1095 # the final output. 1096 class Test(tests.TestCase): 1097 def known_failure_test(self): 1098 self.knownFailure("Never works...") 1099 test = Test("known_failure_test") 1100 stream = StringIO() 1101 runner = tests.TextTestRunner(stream=stream) 1102 self.run_test_runner(runner, test) 1103 self.assertContainsRe(stream.getvalue(), 1104 '\n' 1105 '-*\n' 1106 'Ran 1 test in .*\n' 1107 '\n' 1108 'OK \\(known_failures=1\\)\n') 1109 1110 def test_unexpected_success_bad(self): 1111 class Test(tests.TestCase): 1112 def test_truth(self): 1113 self.expectFailure("No absolute truth", self.assertTrue, True) 1114 runner = tests.TextTestRunner(stream=StringIO()) 1115 self.run_test_runner(runner, Test("test_truth")) 1116 self.assertContainsRe(runner.stream.getvalue(), 1117 "=+\n" 1118 "FAIL: \\S+\\.test_truth\n" 1119 "-+\n" 1120 "(?:.*\n)*" 1121 "\\s*(?:Text attachment: )?reason" 1122 "(?:\n-+\n|: {{{)" 1123 "No absolute truth" 1124 "(?:\n-+\n|}}}\n)" 1125 "(?:.*\n)*" 1126 "-+\n" 1127 "Ran 1 test in .*\n" 1128 "\n" 1129 "FAILED \\(failures=1\\)\n\\Z") 1130 1131 def test_result_decorator(self): 1132 # decorate results 1133 calls = [] 1134 1135 class LoggingDecorator(ExtendedToOriginalDecorator): 1136 def startTest(self, test): 1137 ExtendedToOriginalDecorator.startTest(self, test) 1138 calls.append('start') 1139 test = unittest.FunctionTestCase(lambda: None) 1140 stream = StringIO() 1141 runner = tests.TextTestRunner(stream=stream, 1142 result_decorators=[LoggingDecorator]) 1143 self.run_test_runner(runner, test) 1144 self.assertLength(1, calls) 1145 1146 def test_skipped_test(self): 1147 # run a test that is skipped, and check the suite as a whole still 1148 # succeeds. 1149 # skipping_test must be hidden in here so it's not run as a real test 1150 class SkippingTest(tests.TestCase): 1151 def skipping_test(self): 1152 raise tests.TestSkipped('test intentionally skipped') 1153 runner = tests.TextTestRunner(stream=StringIO()) 1154 test = SkippingTest("skipping_test") 1155 result = self.run_test_runner(runner, test) 1156 self.assertTrue(result.wasSuccessful()) 1157 1158 def test_skipped_from_setup(self): 1159 calls = [] 1160 1161 class SkippedSetupTest(tests.TestCase): 1162 1163 def setUp(self): 1164 calls.append('setUp') 1165 self.addCleanup(self.cleanup) 1166 raise tests.TestSkipped('skipped setup') 1167 1168 def test_skip(self): 1169 self.fail('test reached') 1170 1171 def cleanup(self): 1172 calls.append('cleanup') 1173 1174 runner = tests.TextTestRunner(stream=StringIO()) 1175 test = SkippedSetupTest('test_skip') 1176 result = self.run_test_runner(runner, test) 1177 self.assertTrue(result.wasSuccessful()) 1178 # Check if cleanup was called the right number of times. 1179 self.assertEqual(['setUp', 'cleanup'], calls) 1180 1181 def test_skipped_from_test(self): 1182 calls = [] 1183 1184 class SkippedTest(tests.TestCase): 1185 1186 def setUp(self): 1187 super(SkippedTest, self).setUp() 1188 calls.append('setUp') 1189 self.addCleanup(self.cleanup) 1190 1191 def test_skip(self): 1192 raise tests.TestSkipped('skipped test') 1193 1194 def cleanup(self): 1195 calls.append('cleanup') 1196 1197 runner = tests.TextTestRunner(stream=StringIO()) 1198 test = SkippedTest('test_skip') 1199 result = self.run_test_runner(runner, test) 1200 self.assertTrue(result.wasSuccessful()) 1201 # Check if cleanup was called the right number of times. 1202 self.assertEqual(['setUp', 'cleanup'], calls) 1203 1204 def test_not_applicable(self): 1205 # run a test that is skipped because it's not applicable 1206 class Test(tests.TestCase): 1207 def not_applicable_test(self): 1208 raise tests.TestNotApplicable('this test never runs') 1209 out = StringIO() 1210 runner = tests.TextTestRunner(stream=out, verbosity=2) 1211 test = Test("not_applicable_test") 1212 result = self.run_test_runner(runner, test) 1213 self.log(out.getvalue()) 1214 self.assertTrue(result.wasSuccessful()) 1215 self.assertTrue(result.wasStrictlySuccessful()) 1216 self.assertContainsRe(out.getvalue(), 1217 r'(?m)not_applicable_test * N/A') 1218 self.assertContainsRe(out.getvalue(), 1219 r'(?m)^ this test never runs') 1220 1221 def test_unsupported_features_listed(self): 1222 """When unsupported features are encountered they are detailed.""" 1223 class Feature1(features.Feature): 1224 def _probe(self): 1225 return False 1226 1227 class Feature2(features.Feature): 1228 def _probe(self): 1229 return False 1230 # create sample tests 1231 test1 = SampleTestCase('_test_pass') 1232 test1._test_needs_features = [Feature1()] 1233 test2 = SampleTestCase('_test_pass') 1234 test2._test_needs_features = [Feature2()] 1235 test = unittest.TestSuite() 1236 test.addTest(test1) 1237 test.addTest(test2) 1238 stream = StringIO() 1239 runner = tests.TextTestRunner(stream=stream) 1240 self.run_test_runner(runner, test) 1241 lines = stream.getvalue().splitlines() 1242 self.assertEqual([ 1243 'OK', 1244 "Missing feature 'Feature1' skipped 1 tests.", 1245 "Missing feature 'Feature2' skipped 1 tests.", 1246 ], 1247 lines[-3:]) 1248 1249 def test_verbose_test_count(self): 1250 """A verbose test run reports the right test count at the start""" 1251 suite = TestUtil.TestSuite([ 1252 unittest.FunctionTestCase(lambda:None), 1253 unittest.FunctionTestCase(lambda:None)]) 1254 self.assertEqual(suite.countTestCases(), 2) 1255 stream = StringIO() 1256 runner = tests.TextTestRunner(stream=stream, verbosity=2) 1257 # Need to use the CountingDecorator as that's what sets num_tests 1258 self.run_test_runner(runner, tests.CountingDecorator(suite)) 1259 self.assertStartsWith(stream.getvalue(), "running 2 tests") 1260 1261 def test_startTestRun(self): 1262 """run should call result.startTestRun()""" 1263 calls = [] 1264 1265 class LoggingDecorator(ExtendedToOriginalDecorator): 1266 def startTestRun(self): 1267 ExtendedToOriginalDecorator.startTestRun(self) 1268 calls.append('startTestRun') 1269 test = unittest.FunctionTestCase(lambda: None) 1270 stream = StringIO() 1271 runner = tests.TextTestRunner(stream=stream, 1272 result_decorators=[LoggingDecorator]) 1273 self.run_test_runner(runner, test) 1274 self.assertLength(1, calls) 1275 1276 def test_stopTestRun(self): 1277 """run should call result.stopTestRun()""" 1278 calls = [] 1279 1280 class LoggingDecorator(ExtendedToOriginalDecorator): 1281 def stopTestRun(self): 1282 ExtendedToOriginalDecorator.stopTestRun(self) 1283 calls.append('stopTestRun') 1284 test = unittest.FunctionTestCase(lambda: None) 1285 stream = StringIO() 1286 runner = tests.TextTestRunner(stream=stream, 1287 result_decorators=[LoggingDecorator]) 1288 self.run_test_runner(runner, test) 1289 self.assertLength(1, calls) 1290 1291 def test_unicode_test_output_on_ascii_stream(self): 1292 """Showing results should always succeed even on an ascii console""" 1293 class FailureWithUnicode(tests.TestCase): 1294 def test_log_unicode(self): 1295 self.log(u"\u2606") 1296 self.fail("Now print that log!") 1297 bio = BytesIO() 1298 out = TextIOWrapper(bio, 'ascii', 'backslashreplace') 1299 self.overrideAttr(osutils, "get_terminal_encoding", 1300 lambda trace=False: "ascii") 1301 self.run_test_runner( 1302 tests.TextTestRunner(stream=out), 1303 FailureWithUnicode("test_log_unicode")) 1304 out.flush() 1305 self.assertContainsRe(bio.getvalue(), 1306 b"(?:Text attachment: )?log" 1307 b"(?:\n-+\n|: {{{)" 1308 b"\\d+\\.\\d+ \\\\u2606" 1309 b"(?:\n-+\n|}}}\n)") 1310 1311 1312class SampleTestCase(tests.TestCase): 1313 1314 def _test_pass(self): 1315 pass 1316 1317 1318class _TestException(Exception): 1319 pass 1320 1321 1322class TestTestCase(tests.TestCase): 1323 """Tests that test the core breezy TestCase.""" 1324 1325 def test_assertLength_matches_empty(self): 1326 a_list = [] 1327 self.assertLength(0, a_list) 1328 1329 def test_assertLength_matches_nonempty(self): 1330 a_list = [1, 2, 3] 1331 self.assertLength(3, a_list) 1332 1333 def test_assertLength_fails_different(self): 1334 a_list = [] 1335 self.assertRaises(AssertionError, self.assertLength, 1, a_list) 1336 1337 def test_assertLength_shows_sequence_in_failure(self): 1338 a_list = [1, 2, 3] 1339 exception = self.assertRaises(AssertionError, self.assertLength, 2, 1340 a_list) 1341 self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]', 1342 exception.args[0]) 1343 1344 def test_base_setUp_not_called_causes_failure(self): 1345 class TestCaseWithBrokenSetUp(tests.TestCase): 1346 def setUp(self): 1347 pass # does not call TestCase.setUp 1348 1349 def test_foo(self): 1350 pass 1351 test = TestCaseWithBrokenSetUp('test_foo') 1352 result = unittest.TestResult() 1353 test.run(result) 1354 self.assertFalse(result.wasSuccessful()) 1355 self.assertEqual(1, result.testsRun) 1356 1357 def test_base_tearDown_not_called_causes_failure(self): 1358 class TestCaseWithBrokenTearDown(tests.TestCase): 1359 def tearDown(self): 1360 pass # does not call TestCase.tearDown 1361 1362 def test_foo(self): 1363 pass 1364 test = TestCaseWithBrokenTearDown('test_foo') 1365 result = unittest.TestResult() 1366 test.run(result) 1367 self.assertFalse(result.wasSuccessful()) 1368 self.assertEqual(1, result.testsRun) 1369 1370 def test_debug_flags_sanitised(self): 1371 """The breezy debug flags should be sanitised by setUp.""" 1372 if 'allow_debug' in tests.selftest_debug_flags: 1373 raise tests.TestNotApplicable( 1374 '-Eallow_debug option prevents debug flag sanitisation') 1375 # we could set something and run a test that will check 1376 # it gets santised, but this is probably sufficient for now: 1377 # if someone runs the test with -Dsomething it will error. 1378 flags = set() 1379 if self._lock_check_thorough: 1380 flags.add('strict_locks') 1381 self.assertEqual(flags, breezy.debug.debug_flags) 1382 1383 def change_selftest_debug_flags(self, new_flags): 1384 self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags)) 1385 1386 def test_allow_debug_flag(self): 1387 """The -Eallow_debug flag prevents breezy.debug.debug_flags from being 1388 sanitised (i.e. cleared) before running a test. 1389 """ 1390 self.change_selftest_debug_flags({'allow_debug'}) 1391 breezy.debug.debug_flags = {'a-flag'} 1392 1393 class TestThatRecordsFlags(tests.TestCase): 1394 def test_foo(nested_self): 1395 self.flags = set(breezy.debug.debug_flags) 1396 test = TestThatRecordsFlags('test_foo') 1397 test.run(self.make_test_result()) 1398 flags = {'a-flag'} 1399 if 'disable_lock_checks' not in tests.selftest_debug_flags: 1400 flags.add('strict_locks') 1401 self.assertEqual(flags, self.flags) 1402 1403 def test_disable_lock_checks(self): 1404 """The -Edisable_lock_checks flag disables thorough checks.""" 1405 class TestThatRecordsFlags(tests.TestCase): 1406 def test_foo(nested_self): 1407 self.flags = set(breezy.debug.debug_flags) 1408 self.test_lock_check_thorough = nested_self._lock_check_thorough 1409 self.change_selftest_debug_flags(set()) 1410 test = TestThatRecordsFlags('test_foo') 1411 test.run(self.make_test_result()) 1412 # By default we do strict lock checking and thorough lock/unlock 1413 # tracking. 1414 self.assertTrue(self.test_lock_check_thorough) 1415 self.assertEqual({'strict_locks'}, self.flags) 1416 # Now set the disable_lock_checks flag, and show that this changed. 1417 self.change_selftest_debug_flags({'disable_lock_checks'}) 1418 test = TestThatRecordsFlags('test_foo') 1419 test.run(self.make_test_result()) 1420 self.assertFalse(self.test_lock_check_thorough) 1421 self.assertEqual(set(), self.flags) 1422 1423 def test_this_fails_strict_lock_check(self): 1424 class TestThatRecordsFlags(tests.TestCase): 1425 def test_foo(nested_self): 1426 self.flags1 = set(breezy.debug.debug_flags) 1427 self.thisFailsStrictLockCheck() 1428 self.flags2 = set(breezy.debug.debug_flags) 1429 # Make sure lock checking is active 1430 self.change_selftest_debug_flags(set()) 1431 test = TestThatRecordsFlags('test_foo') 1432 test.run(self.make_test_result()) 1433 self.assertEqual({'strict_locks'}, self.flags1) 1434 self.assertEqual(set(), self.flags2) 1435 1436 def test_debug_flags_restored(self): 1437 """The breezy debug flags should be restored to their original state 1438 after the test was run, even if allow_debug is set. 1439 """ 1440 self.change_selftest_debug_flags({'allow_debug'}) 1441 # Now run a test that modifies debug.debug_flags. 1442 breezy.debug.debug_flags = {'original-state'} 1443 1444 class TestThatModifiesFlags(tests.TestCase): 1445 def test_foo(self): 1446 breezy.debug.debug_flags = {'modified'} 1447 test = TestThatModifiesFlags('test_foo') 1448 test.run(self.make_test_result()) 1449 self.assertEqual({'original-state'}, breezy.debug.debug_flags) 1450 1451 def make_test_result(self): 1452 """Get a test result that writes to a StringIO.""" 1453 return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1) 1454 1455 def inner_test(self): 1456 # the inner child test 1457 note("inner_test") 1458 1459 def outer_child(self): 1460 # the outer child test 1461 note("outer_start") 1462 self.inner_test = TestTestCase("inner_child") 1463 result = self.make_test_result() 1464 self.inner_test.run(result) 1465 note("outer finish") 1466 self.addCleanup(osutils.delete_any, self._log_file_name) 1467 1468 def test_trace_nesting(self): 1469 # this tests that each test case nests its trace facility correctly. 1470 # we do this by running a test case manually. That test case (A) 1471 # should setup a new log, log content to it, setup a child case (B), 1472 # which should log independently, then case (A) should log a trailer 1473 # and return. 1474 # we do two nested children so that we can verify the state of the 1475 # logs after the outer child finishes is correct, which a bad clean 1476 # up routine in tearDown might trigger a fault in our test with only 1477 # one child, we should instead see the bad result inside our test with 1478 # the two children. 1479 # the outer child test 1480 original_trace = breezy.trace._trace_file 1481 outer_test = TestTestCase("outer_child") 1482 result = self.make_test_result() 1483 outer_test.run(result) 1484 self.assertEqual(original_trace, breezy.trace._trace_file) 1485 1486 def method_that_times_a_bit_twice(self): 1487 # call self.time twice to ensure it aggregates 1488 self.time(time.sleep, 0.007) 1489 self.time(time.sleep, 0.007) 1490 1491 def test_time_creates_benchmark_in_result(self): 1492 """The TestCase.time() method accumulates a benchmark time.""" 1493 sample_test = TestTestCase("method_that_times_a_bit_twice") 1494 output_stream = StringIO() 1495 result = breezy.tests.VerboseTestResult( 1496 output_stream, 1497 descriptions=0, 1498 verbosity=2) 1499 sample_test.run(result) 1500 self.assertContainsRe( 1501 output_stream.getvalue(), 1502 r"\d+ms\*\n$") 1503 1504 def test_hooks_sanitised(self): 1505 """The breezy hooks should be sanitised by setUp.""" 1506 # Note this test won't fail with hooks that the core library doesn't 1507 # use - but it trigger with a plugin that adds hooks, so its still a 1508 # useful warning in that case. 1509 self.assertEqual(breezy.branch.BranchHooks(), 1510 breezy.branch.Branch.hooks) 1511 self.assertEqual( 1512 breezy.bzr.smart.server.SmartServerHooks(), 1513 breezy.bzr.smart.server.SmartTCPServer.hooks) 1514 self.assertEqual( 1515 breezy.commands.CommandHooks(), breezy.commands.Command.hooks) 1516 1517 def test__gather_lsprof_in_benchmarks(self): 1518 """When _gather_lsprof_in_benchmarks is on, accumulate profile data. 1519 1520 Each self.time() call is individually and separately profiled. 1521 """ 1522 self.requireFeature(features.lsprof_feature) 1523 # overrides the class member with an instance member so no cleanup 1524 # needed. 1525 self._gather_lsprof_in_benchmarks = True 1526 self.time(time.sleep, 0.000) 1527 self.time(time.sleep, 0.003) 1528 self.assertEqual(2, len(self._benchcalls)) 1529 self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0]) 1530 self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0]) 1531 self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats) 1532 self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats) 1533 del self._benchcalls[:] 1534 1535 def test_knownFailure(self): 1536 """Self.knownFailure() should raise a KnownFailure exception.""" 1537 self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure") 1538 1539 def test_open_bzrdir_safe_roots(self): 1540 # even a memory transport should fail to open when its url isn't 1541 # permitted. 1542 # Manually set one up (TestCase doesn't and shouldn't provide magic 1543 # machinery) 1544 transport_server = memory.MemoryServer() 1545 transport_server.start_server() 1546 self.addCleanup(transport_server.stop_server) 1547 t = transport.get_transport_from_url(transport_server.get_url()) 1548 controldir.ControlDir.create(t.base) 1549 self.assertRaises(errors.BzrError, 1550 controldir.ControlDir.open_from_transport, t) 1551 # But if we declare this as safe, we can open the bzrdir. 1552 self.permit_url(t.base) 1553 self._bzr_selftest_roots.append(t.base) 1554 controldir.ControlDir.open_from_transport(t) 1555 1556 def test_requireFeature_available(self): 1557 """self.requireFeature(available) is a no-op.""" 1558 class Available(features.Feature): 1559 def _probe(self): 1560 return True 1561 feature = Available() 1562 self.requireFeature(feature) 1563 1564 def test_requireFeature_unavailable(self): 1565 """self.requireFeature(unavailable) raises UnavailableFeature.""" 1566 class Unavailable(features.Feature): 1567 def _probe(self): 1568 return False 1569 feature = Unavailable() 1570 self.assertRaises(tests.UnavailableFeature, 1571 self.requireFeature, feature) 1572 1573 def test_run_no_parameters(self): 1574 test = SampleTestCase('_test_pass') 1575 test.run() 1576 1577 def test_run_enabled_unittest_result(self): 1578 """Test we revert to regular behaviour when the test is enabled.""" 1579 test = SampleTestCase('_test_pass') 1580 1581 class EnabledFeature(object): 1582 def available(self): 1583 return True 1584 test._test_needs_features = [EnabledFeature()] 1585 result = unittest.TestResult() 1586 test.run(result) 1587 self.assertEqual(1, result.testsRun) 1588 self.assertEqual([], result.errors) 1589 self.assertEqual([], result.failures) 1590 1591 def test_run_disabled_unittest_result(self): 1592 """Test our compatibility for disabled tests with unittest results.""" 1593 test = SampleTestCase('_test_pass') 1594 1595 class DisabledFeature(object): 1596 def available(self): 1597 return False 1598 test._test_needs_features = [DisabledFeature()] 1599 result = unittest.TestResult() 1600 test.run(result) 1601 self.assertEqual(1, result.testsRun) 1602 self.assertEqual([], result.errors) 1603 self.assertEqual([], result.failures) 1604 1605 def test_run_disabled_supporting_result(self): 1606 """Test disabled tests behaviour with support aware results.""" 1607 test = SampleTestCase('_test_pass') 1608 1609 class DisabledFeature(object): 1610 def __eq__(self, other): 1611 return isinstance(other, DisabledFeature) 1612 1613 def available(self): 1614 return False 1615 the_feature = DisabledFeature() 1616 test._test_needs_features = [the_feature] 1617 1618 class InstrumentedTestResult(unittest.TestResult): 1619 def __init__(self): 1620 unittest.TestResult.__init__(self) 1621 self.calls = [] 1622 1623 def startTest(self, test): 1624 self.calls.append(('startTest', test)) 1625 1626 def stopTest(self, test): 1627 self.calls.append(('stopTest', test)) 1628 1629 def addNotSupported(self, test, feature): 1630 self.calls.append(('addNotSupported', test, feature)) 1631 result = InstrumentedTestResult() 1632 test.run(result) 1633 case = result.calls[0][1] 1634 self.assertEqual([ 1635 ('startTest', case), 1636 ('addNotSupported', case, the_feature), 1637 ('stopTest', case), 1638 ], 1639 result.calls) 1640 1641 def test_start_server_registers_url(self): 1642 transport_server = memory.MemoryServer() 1643 # A little strict, but unlikely to be changed soon. 1644 self.assertEqual([], self._bzr_selftest_roots) 1645 self.start_server(transport_server) 1646 self.assertSubset([transport_server.get_url()], 1647 self._bzr_selftest_roots) 1648 1649 def test_assert_list_raises_on_generator(self): 1650 def generator_which_will_raise(): 1651 # This will not raise until after the first yield 1652 yield 1 1653 raise _TestException() 1654 1655 e = self.assertListRaises(_TestException, generator_which_will_raise) 1656 self.assertIsInstance(e, _TestException) 1657 1658 e = self.assertListRaises(Exception, generator_which_will_raise) 1659 self.assertIsInstance(e, _TestException) 1660 1661 def test_assert_list_raises_on_plain(self): 1662 def plain_exception(): 1663 raise _TestException() 1664 return [] 1665 1666 e = self.assertListRaises(_TestException, plain_exception) 1667 self.assertIsInstance(e, _TestException) 1668 1669 e = self.assertListRaises(Exception, plain_exception) 1670 self.assertIsInstance(e, _TestException) 1671 1672 def test_assert_list_raises_assert_wrong_exception(self): 1673 class _NotTestException(Exception): 1674 pass 1675 1676 def wrong_exception(): 1677 raise _NotTestException() 1678 1679 def wrong_exception_generator(): 1680 yield 1 1681 yield 2 1682 raise _NotTestException() 1683 1684 # Wrong exceptions are not intercepted 1685 self.assertRaises( 1686 _NotTestException, 1687 self.assertListRaises, _TestException, wrong_exception) 1688 self.assertRaises( 1689 _NotTestException, 1690 self.assertListRaises, _TestException, wrong_exception_generator) 1691 1692 def test_assert_list_raises_no_exception(self): 1693 def success(): 1694 return [] 1695 1696 def success_generator(): 1697 yield 1 1698 yield 2 1699 1700 self.assertRaises(AssertionError, 1701 self.assertListRaises, _TestException, success) 1702 1703 self.assertRaises( 1704 AssertionError, 1705 self.assertListRaises, _TestException, success_generator) 1706 1707 def _run_successful_test(self, test): 1708 result = testtools.TestResult() 1709 test.run(result) 1710 self.assertTrue(result.wasSuccessful()) 1711 return result 1712 1713 def test_overrideAttr_without_value(self): 1714 self.test_attr = 'original' # Define a test attribute 1715 obj = self # Make 'obj' visible to the embedded test 1716 1717 class Test(tests.TestCase): 1718 1719 def setUp(self): 1720 super(Test, self).setUp() 1721 self.orig = self.overrideAttr(obj, 'test_attr') 1722 1723 def test_value(self): 1724 self.assertEqual('original', self.orig) 1725 self.assertEqual('original', obj.test_attr) 1726 obj.test_attr = 'modified' 1727 self.assertEqual('modified', obj.test_attr) 1728 1729 self._run_successful_test(Test('test_value')) 1730 self.assertEqual('original', obj.test_attr) 1731 1732 def test_overrideAttr_with_value(self): 1733 self.test_attr = 'original' # Define a test attribute 1734 obj = self # Make 'obj' visible to the embedded test 1735 1736 class Test(tests.TestCase): 1737 1738 def setUp(self): 1739 super(Test, self).setUp() 1740 self.orig = self.overrideAttr(obj, 'test_attr', new='modified') 1741 1742 def test_value(self): 1743 self.assertEqual('original', self.orig) 1744 self.assertEqual('modified', obj.test_attr) 1745 1746 self._run_successful_test(Test('test_value')) 1747 self.assertEqual('original', obj.test_attr) 1748 1749 def test_overrideAttr_with_no_existing_value_and_value(self): 1750 # Do not define the test_attribute 1751 obj = self # Make 'obj' visible to the embedded test 1752 1753 class Test(tests.TestCase): 1754 1755 def setUp(self): 1756 tests.TestCase.setUp(self) 1757 self.orig = self.overrideAttr(obj, 'test_attr', new='modified') 1758 1759 def test_value(self): 1760 self.assertEqual(tests._unitialized_attr, self.orig) 1761 self.assertEqual('modified', obj.test_attr) 1762 1763 self._run_successful_test(Test('test_value')) 1764 self.assertRaises(AttributeError, getattr, obj, 'test_attr') 1765 1766 def test_overrideAttr_with_no_existing_value_and_no_value(self): 1767 # Do not define the test_attribute 1768 obj = self # Make 'obj' visible to the embedded test 1769 1770 class Test(tests.TestCase): 1771 1772 def setUp(self): 1773 tests.TestCase.setUp(self) 1774 self.orig = self.overrideAttr(obj, 'test_attr') 1775 1776 def test_value(self): 1777 self.assertEqual(tests._unitialized_attr, self.orig) 1778 self.assertRaises(AttributeError, getattr, obj, 'test_attr') 1779 1780 self._run_successful_test(Test('test_value')) 1781 self.assertRaises(AttributeError, getattr, obj, 'test_attr') 1782 1783 def test_recordCalls(self): 1784 from breezy.tests import test_selftest 1785 calls = self.recordCalls( 1786 test_selftest, '_add_numbers') 1787 self.assertEqual(test_selftest._add_numbers(2, 10), 1788 12) 1789 self.assertEqual(calls, [((2, 10), {})]) 1790 1791 1792def _add_numbers(a, b): 1793 return a + b 1794 1795 1796class _MissingFeature(features.Feature): 1797 def _probe(self): 1798 return False 1799 1800 1801missing_feature = _MissingFeature() 1802 1803 1804def _get_test(name): 1805 """Get an instance of a specific example test. 1806 1807 We protect this in a function so that they don't auto-run in the test 1808 suite. 1809 """ 1810 1811 class ExampleTests(tests.TestCase): 1812 1813 def test_fail(self): 1814 mutter('this was a failing test') 1815 self.fail('this test will fail') 1816 1817 def test_error(self): 1818 mutter('this test errored') 1819 raise RuntimeError('gotcha') 1820 1821 def test_missing_feature(self): 1822 mutter('missing the feature') 1823 self.requireFeature(missing_feature) 1824 1825 def test_skip(self): 1826 mutter('this test will be skipped') 1827 raise tests.TestSkipped('reason') 1828 1829 def test_success(self): 1830 mutter('this test succeeds') 1831 1832 def test_xfail(self): 1833 mutter('test with expected failure') 1834 self.knownFailure('this_fails') 1835 1836 def test_unexpected_success(self): 1837 mutter('test with unexpected success') 1838 self.expectFailure('should_fail', lambda: None) 1839 1840 return ExampleTests(name) 1841 1842 1843class TestTestCaseLogDetails(tests.TestCase): 1844 1845 def _run_test(self, test_name): 1846 test = _get_test(test_name) 1847 result = testtools.TestResult() 1848 test.run(result) 1849 return result 1850 1851 def test_fail_has_log(self): 1852 result = self._run_test('test_fail') 1853 self.assertEqual(1, len(result.failures)) 1854 result_content = result.failures[0][1] 1855 self.assertContainsRe(result_content, 1856 '(?m)^(?:Text attachment: )?log(?:$|: )') 1857 self.assertContainsRe(result_content, 'this was a failing test') 1858 1859 def test_error_has_log(self): 1860 result = self._run_test('test_error') 1861 self.assertEqual(1, len(result.errors)) 1862 result_content = result.errors[0][1] 1863 self.assertContainsRe(result_content, 1864 '(?m)^(?:Text attachment: )?log(?:$|: )') 1865 self.assertContainsRe(result_content, 'this test errored') 1866 1867 def test_skip_has_no_log(self): 1868 result = self._run_test('test_skip') 1869 reasons = result.skip_reasons 1870 self.assertEqual({'reason'}, set(reasons)) 1871 skips = reasons['reason'] 1872 self.assertEqual(1, len(skips)) 1873 test = skips[0] 1874 self.assertFalse('log' in test.getDetails()) 1875 1876 def test_missing_feature_has_no_log(self): 1877 # testtools doesn't know about addNotSupported, so it just gets 1878 # considered as a skip 1879 result = self._run_test('test_missing_feature') 1880 reasons = result.skip_reasons 1881 self.assertEqual({str(missing_feature)}, set(reasons)) 1882 skips = reasons[str(missing_feature)] 1883 self.assertEqual(1, len(skips)) 1884 test = skips[0] 1885 self.assertFalse('log' in test.getDetails()) 1886 1887 def test_xfail_has_no_log(self): 1888 result = self._run_test('test_xfail') 1889 self.assertEqual(1, len(result.expectedFailures)) 1890 result_content = result.expectedFailures[0][1] 1891 self.assertNotContainsRe(result_content, 1892 '(?m)^(?:Text attachment: )?log(?:$|: )') 1893 self.assertNotContainsRe(result_content, 'test with expected failure') 1894 1895 def test_unexpected_success_has_log(self): 1896 result = self._run_test('test_unexpected_success') 1897 self.assertEqual(1, len(result.unexpectedSuccesses)) 1898 # Inconsistency, unexpectedSuccesses is a list of tests, 1899 # expectedFailures is a list of reasons? 1900 test = result.unexpectedSuccesses[0] 1901 details = test.getDetails() 1902 self.assertTrue('log' in details) 1903 1904 1905class TestTestCloning(tests.TestCase): 1906 """Tests that test cloning of TestCases (as used by multiply_tests).""" 1907 1908 def test_cloned_testcase_does_not_share_details(self): 1909 """A TestCase cloned with clone_test does not share mutable attributes 1910 such as details or cleanups. 1911 """ 1912 class Test(tests.TestCase): 1913 def test_foo(self): 1914 self.addDetail('foo', Content('text/plain', lambda: 'foo')) 1915 orig_test = Test('test_foo') 1916 cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)') 1917 orig_test.run(unittest.TestResult()) 1918 self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes()) 1919 self.assertEqual(None, cloned_test.getDetails().get('foo')) 1920 1921 def test_double_apply_scenario_preserves_first_scenario(self): 1922 """Applying two levels of scenarios to a test preserves the attributes 1923 added by both scenarios. 1924 """ 1925 class Test(tests.TestCase): 1926 def test_foo(self): 1927 pass 1928 test = Test('test_foo') 1929 scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})] 1930 scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})] 1931 suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite()) 1932 suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite()) 1933 all_tests = list(tests.iter_suite_tests(suite)) 1934 self.assertLength(4, all_tests) 1935 all_xys = sorted((t.x, t.y) for t in all_tests) 1936 self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys) 1937 1938 1939# NB: Don't delete this; it's not actually from 0.11! 1940@deprecated_function(deprecated_in((0, 11, 0))) 1941def sample_deprecated_function(): 1942 """A deprecated function to test applyDeprecated with.""" 1943 return 2 1944 1945 1946def sample_undeprecated_function(a_param): 1947 """A undeprecated function to test applyDeprecated with.""" 1948 1949 1950class ApplyDeprecatedHelper(object): 1951 """A helper class for ApplyDeprecated tests.""" 1952 1953 @deprecated_method(deprecated_in((0, 11, 0))) 1954 def sample_deprecated_method(self, param_one): 1955 """A deprecated method for testing with.""" 1956 return param_one 1957 1958 def sample_normal_method(self): 1959 """A undeprecated method.""" 1960 1961 @deprecated_method(deprecated_in((0, 10, 0))) 1962 def sample_nested_deprecation(self): 1963 return sample_deprecated_function() 1964 1965 1966class TestExtraAssertions(tests.TestCase): 1967 """Tests for new test assertions in breezy test suite""" 1968 1969 def test_assert_isinstance(self): 1970 self.assertIsInstance(2, int) 1971 self.assertIsInstance(u'', str) 1972 e = self.assertRaises(AssertionError, self.assertIsInstance, None, int) 1973 self.assertIn( 1974 str(e), 1975 ["None is an instance of <type 'NoneType'> rather than " 1976 "<type 'int'>", 1977 "None is an instance of <class 'NoneType'> rather than " 1978 "<class 'int'>"]) 1979 self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int) 1980 e = self.assertRaises(AssertionError, 1981 self.assertIsInstance, None, int, 1982 "it's just not") 1983 self.assertEqual( 1984 str(e), 1985 "None is an instance of <class 'NoneType'> rather " 1986 "than <class 'int'>: it's just not") 1987 1988 def test_assertEndsWith(self): 1989 self.assertEndsWith('foo', 'oo') 1990 self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo') 1991 1992 def test_assertEqualDiff(self): 1993 e = self.assertRaises(AssertionError, 1994 self.assertEqualDiff, '', '\n') 1995 self.assertEqual(str(e), 1996 # Don't blink ! The '+' applies to the second string 1997 'first string is missing a final newline.\n+ \n') 1998 e = self.assertRaises(AssertionError, 1999 self.assertEqualDiff, '\n', '') 2000 self.assertEqual(str(e), 2001 # Don't blink ! The '-' applies to the second string 2002 'second string is missing a final newline.\n- \n') 2003 2004 2005class TestDeprecations(tests.TestCase): 2006 2007 def test_applyDeprecated_not_deprecated(self): 2008 sample_object = ApplyDeprecatedHelper() 2009 # calling an undeprecated callable raises an assertion 2010 self.assertRaises(AssertionError, self.applyDeprecated, 2011 deprecated_in((0, 11, 0)), 2012 sample_object.sample_normal_method) 2013 self.assertRaises(AssertionError, self.applyDeprecated, 2014 deprecated_in((0, 11, 0)), 2015 sample_undeprecated_function, "a param value") 2016 # calling a deprecated callable (function or method) with the wrong 2017 # expected deprecation fails. 2018 self.assertRaises(AssertionError, self.applyDeprecated, 2019 deprecated_in((0, 10, 0)), 2020 sample_object.sample_deprecated_method, 2021 "a param value") 2022 self.assertRaises(AssertionError, self.applyDeprecated, 2023 deprecated_in((0, 10, 0)), 2024 sample_deprecated_function) 2025 # calling a deprecated callable (function or method) with the right 2026 # expected deprecation returns the functions result. 2027 self.assertEqual( 2028 "a param value", 2029 self.applyDeprecated( 2030 deprecated_in((0, 11, 0)), 2031 sample_object.sample_deprecated_method, "a param value")) 2032 self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)), 2033 sample_deprecated_function)) 2034 # calling a nested deprecation with the wrong deprecation version 2035 # fails even if a deeper nested function was deprecated with the 2036 # supplied version. 2037 self.assertRaises( 2038 AssertionError, self.applyDeprecated, 2039 deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation) 2040 # calling a nested deprecation with the right deprecation value 2041 # returns the calls result. 2042 self.assertEqual( 2043 2, self.applyDeprecated( 2044 deprecated_in((0, 10, 0)), 2045 sample_object.sample_nested_deprecation)) 2046 2047 def test_callDeprecated(self): 2048 def testfunc(be_deprecated, result=None): 2049 if be_deprecated is True: 2050 symbol_versioning.warn('i am deprecated', DeprecationWarning, 2051 stacklevel=1) 2052 return result 2053 result = self.callDeprecated(['i am deprecated'], testfunc, True) 2054 self.assertIs(None, result) 2055 result = self.callDeprecated([], testfunc, False, 'result') 2056 self.assertEqual('result', result) 2057 self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True) 2058 self.callDeprecated([], testfunc, be_deprecated=False) 2059 2060 2061class TestWarningTests(tests.TestCase): 2062 """Tests for calling methods that raise warnings.""" 2063 2064 def test_callCatchWarnings(self): 2065 def meth(a, b): 2066 warnings.warn("this is your last warning") 2067 return a + b 2068 wlist, result = self.callCatchWarnings(meth, 1, 2) 2069 self.assertEqual(3, result) 2070 # would like just to compare them, but UserWarning doesn't implement 2071 # eq well 2072 w0, = wlist 2073 self.assertIsInstance(w0, UserWarning) 2074 self.assertEqual("this is your last warning", str(w0)) 2075 2076 2077class TestConvenienceMakers(tests.TestCaseWithTransport): 2078 """Test for the make_* convenience functions.""" 2079 2080 def test_make_branch_and_tree_with_format(self): 2081 # we should be able to supply a format to make_branch_and_tree 2082 self.make_branch_and_tree( 2083 'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1()) 2084 self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format, 2085 breezy.bzr.bzrdir.BzrDirMetaFormat1) 2086 2087 def test_make_branch_and_memory_tree(self): 2088 # we should be able to get a new branch and a mutable tree from 2089 # TestCaseWithTransport 2090 tree = self.make_branch_and_memory_tree('a') 2091 self.assertIsInstance(tree, breezy.memorytree.MemoryTree) 2092 2093 def test_make_tree_for_local_vfs_backed_transport(self): 2094 # make_branch_and_tree has to use local branch and repositories 2095 # when the vfs transport and local disk are colocated, even if 2096 # a different transport is in use for url generation. 2097 self.transport_server = test_server.FakeVFATServer 2098 self.assertFalse(self.get_url('t1').startswith('file://')) 2099 tree = self.make_branch_and_tree('t1') 2100 base = tree.controldir.root_transport.base 2101 self.assertStartsWith(base, 'file://') 2102 self.assertEqual(tree.controldir.root_transport, 2103 tree.branch.controldir.root_transport) 2104 self.assertEqual(tree.controldir.root_transport, 2105 tree.branch.repository.controldir.root_transport) 2106 2107 2108class SelfTestHelper(object): 2109 2110 def run_selftest(self, **kwargs): 2111 """Run selftest returning its output.""" 2112 bio = BytesIO() 2113 output = TextIOWrapper(bio, 'utf-8') 2114 old_transport = breezy.tests.default_transport 2115 old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT 2116 tests.TestCaseWithMemoryTransport.TEST_ROOT = None 2117 try: 2118 self.assertEqual(True, tests.selftest(stream=output, **kwargs)) 2119 finally: 2120 breezy.tests.default_transport = old_transport 2121 tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root 2122 output.flush() 2123 output.detach() 2124 bio.seek(0) 2125 return bio 2126 2127 2128class TestSelftest(tests.TestCase, SelfTestHelper): 2129 """Tests of breezy.tests.selftest.""" 2130 2131 def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__( 2132 self): 2133 factory_called = [] 2134 2135 def factory(): 2136 factory_called.append(True) 2137 return TestUtil.TestSuite() 2138 out = StringIO() 2139 err = StringIO() 2140 self.apply_redirected(out, err, None, breezy.tests.selftest, 2141 test_suite_factory=factory) 2142 self.assertEqual([True], factory_called) 2143 2144 def factory(self): 2145 """A test suite factory.""" 2146 class Test(tests.TestCase): 2147 def id(self): 2148 return __name__ + ".Test." + self._testMethodName 2149 2150 def a(self): 2151 pass 2152 2153 def b(self): 2154 pass 2155 2156 def c(telf): 2157 pass 2158 return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")]) 2159 2160 def test_list_only(self): 2161 output = self.run_selftest(test_suite_factory=self.factory, 2162 list_only=True) 2163 self.assertEqual(3, len(output.readlines())) 2164 2165 def test_list_only_filtered(self): 2166 output = self.run_selftest(test_suite_factory=self.factory, 2167 list_only=True, pattern="Test.b") 2168 self.assertEndsWith(output.getvalue(), b"Test.b\n") 2169 self.assertLength(1, output.readlines()) 2170 2171 def test_list_only_excludes(self): 2172 output = self.run_selftest(test_suite_factory=self.factory, 2173 list_only=True, exclude_pattern="Test.b") 2174 self.assertNotContainsRe(b"Test.b", output.getvalue()) 2175 self.assertLength(2, output.readlines()) 2176 2177 def test_lsprof_tests(self): 2178 self.requireFeature(features.lsprof_feature) 2179 results = [] 2180 2181 class Test(object): 2182 def __call__(test, result): 2183 test.run(result) 2184 2185 def run(test, result): 2186 results.append(result) 2187 2188 def countTestCases(self): 2189 return 1 2190 self.run_selftest(test_suite_factory=Test, lsprof_tests=True) 2191 self.assertLength(1, results) 2192 self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator) 2193 2194 def test_random(self): 2195 # test randomising by listing a number of tests. 2196 output_123 = self.run_selftest(test_suite_factory=self.factory, 2197 list_only=True, random_seed="123") 2198 output_234 = self.run_selftest(test_suite_factory=self.factory, 2199 list_only=True, random_seed="234") 2200 self.assertNotEqual(output_123, output_234) 2201 # "Randominzing test order..\n\n 2202 self.assertLength(5, output_123.readlines()) 2203 self.assertLength(5, output_234.readlines()) 2204 2205 def test_random_reuse_is_same_order(self): 2206 # test randomising by listing a number of tests. 2207 expected = self.run_selftest(test_suite_factory=self.factory, 2208 list_only=True, random_seed="123") 2209 repeated = self.run_selftest(test_suite_factory=self.factory, 2210 list_only=True, random_seed="123") 2211 self.assertEqual(expected.getvalue(), repeated.getvalue()) 2212 2213 def test_runner_class(self): 2214 self.requireFeature(features.subunit) 2215 from subunit import ProtocolTestCase 2216 stream = self.run_selftest( 2217 runner_class=tests.SubUnitBzrRunnerv1, 2218 test_suite_factory=self.factory) 2219 test = ProtocolTestCase(stream) 2220 result = unittest.TestResult() 2221 test.run(result) 2222 self.assertEqual(3, result.testsRun) 2223 2224 def test_starting_with_single_argument(self): 2225 output = self.run_selftest(test_suite_factory=self.factory, 2226 starting_with=[ 2227 'breezy.tests.test_selftest.Test.a'], 2228 list_only=True) 2229 self.assertEqual(b'breezy.tests.test_selftest.Test.a\n', 2230 output.getvalue()) 2231 2232 def test_starting_with_multiple_argument(self): 2233 output = self.run_selftest( 2234 test_suite_factory=self.factory, 2235 starting_with=['breezy.tests.test_selftest.Test.a', 2236 'breezy.tests.test_selftest.Test.b'], 2237 list_only=True) 2238 self.assertEqual(b'breezy.tests.test_selftest.Test.a\n' 2239 b'breezy.tests.test_selftest.Test.b\n', 2240 output.getvalue()) 2241 2242 def check_transport_set(self, transport_server): 2243 captured_transport = [] 2244 2245 def seen_transport(a_transport): 2246 captured_transport.append(a_transport) 2247 2248 class Capture(tests.TestCase): 2249 def a(self): 2250 seen_transport(breezy.tests.default_transport) 2251 2252 def factory(): 2253 return TestUtil.TestSuite([Capture("a")]) 2254 self.run_selftest(transport=transport_server, 2255 test_suite_factory=factory) 2256 self.assertEqual(transport_server, captured_transport[0]) 2257 2258 def test_transport_sftp(self): 2259 self.requireFeature(features.paramiko) 2260 from breezy.tests import stub_sftp 2261 self.check_transport_set(stub_sftp.SFTPAbsoluteServer) 2262 2263 def test_transport_memory(self): 2264 self.check_transport_set(memory.MemoryServer) 2265 2266 2267class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper): 2268 # Does IO: reads test.list 2269 2270 def test_load_list(self): 2271 # Provide a list with one test - this test. 2272 test_id_line = b'%s\n' % self.id().encode('ascii') 2273 self.build_tree_contents([('test.list', test_id_line)]) 2274 # And generate a list of the tests in the suite. 2275 stream = self.run_selftest(load_list='test.list', list_only=True) 2276 self.assertEqual(test_id_line, stream.getvalue()) 2277 2278 def test_load_unknown(self): 2279 # Provide a list with one test - this test. 2280 # And generate a list of the tests in the suite. 2281 self.assertRaises(errors.NoSuchFile, self.run_selftest, 2282 load_list='missing file name', list_only=True) 2283 2284 2285class TestSubunitLogDetails(tests.TestCase, SelfTestHelper): 2286 2287 _test_needs_features = [features.subunit] 2288 2289 def run_subunit_stream(self, test_name): 2290 from subunit import ProtocolTestCase 2291 2292 def factory(): 2293 return TestUtil.TestSuite([_get_test(test_name)]) 2294 stream = self.run_selftest( 2295 runner_class=tests.SubUnitBzrRunnerv1, 2296 test_suite_factory=factory) 2297 test = ProtocolTestCase(stream) 2298 result = testtools.TestResult() 2299 test.run(result) 2300 content = stream.getvalue() 2301 return content, result 2302 2303 def test_fail_has_log(self): 2304 content, result = self.run_subunit_stream('test_fail') 2305 self.assertEqual(1, len(result.failures)) 2306 self.assertContainsRe(content, b'(?m)^log$') 2307 self.assertContainsRe(content, b'this test will fail') 2308 2309 def test_error_has_log(self): 2310 content, result = self.run_subunit_stream('test_error') 2311 self.assertContainsRe(content, b'(?m)^log$') 2312 self.assertContainsRe(content, b'this test errored') 2313 2314 def test_skip_has_no_log(self): 2315 content, result = self.run_subunit_stream('test_skip') 2316 self.assertNotContainsRe(content, b'(?m)^log$') 2317 self.assertNotContainsRe(content, b'this test will be skipped') 2318 reasons = result.skip_reasons 2319 self.assertEqual({'reason'}, set(reasons)) 2320 skips = reasons['reason'] 2321 self.assertEqual(1, len(skips)) 2322 # test = skips[0] 2323 # RemotedTestCase doesn't preserve the "details" 2324 # self.assertFalse('log' in test.getDetails()) 2325 2326 def test_missing_feature_has_no_log(self): 2327 content, result = self.run_subunit_stream('test_missing_feature') 2328 self.assertNotContainsRe(content, b'(?m)^log$') 2329 self.assertNotContainsRe(content, b'missing the feature') 2330 reasons = result.skip_reasons 2331 self.assertEqual({'_MissingFeature\n'}, set(reasons)) 2332 skips = reasons['_MissingFeature\n'] 2333 self.assertEqual(1, len(skips)) 2334 # test = skips[0] 2335 # RemotedTestCase doesn't preserve the "details" 2336 # self.assertFalse('log' in test.getDetails()) 2337 2338 def test_xfail_has_no_log(self): 2339 content, result = self.run_subunit_stream('test_xfail') 2340 self.assertNotContainsRe(content, b'(?m)^log$') 2341 self.assertNotContainsRe(content, b'test with expected failure') 2342 self.assertEqual(1, len(result.expectedFailures)) 2343 result_content = result.expectedFailures[0][1] 2344 self.assertNotContainsRe(result_content, 2345 '(?m)^(?:Text attachment: )?log(?:$|: )') 2346 self.assertNotContainsRe(result_content, 'test with expected failure') 2347 2348 def test_unexpected_success_has_log(self): 2349 content, result = self.run_subunit_stream('test_unexpected_success') 2350 self.assertContainsRe(content, b'(?m)^log$') 2351 self.assertContainsRe(content, b'test with unexpected success') 2352 self.assertEqual(1, len(result.unexpectedSuccesses)) 2353 # test = result.unexpectedSuccesses[0] 2354 # RemotedTestCase doesn't preserve the "details" 2355 # self.assertTrue('log' in test.getDetails()) 2356 2357 def test_success_has_no_log(self): 2358 content, result = self.run_subunit_stream('test_success') 2359 self.assertEqual(1, result.testsRun) 2360 self.assertNotContainsRe(content, b'(?m)^log$') 2361 self.assertNotContainsRe(content, b'this test succeeds') 2362 2363 2364class TestRunBzr(tests.TestCase): 2365 2366 result = 0 2367 out = '' 2368 err = '' 2369 2370 def _run_bzr_core(self, argv, encoding=None, stdin=None, 2371 stdout=None, stderr=None, working_dir=None): 2372 """Override _run_bzr_core to test how it is invoked by run_bzr. 2373 2374 Attempts to run bzr from inside this class don't actually run it. 2375 2376 We test how run_bzr actually invokes bzr in another location. Here we 2377 only need to test that it passes the right parameters to run_bzr. 2378 """ 2379 self.argv = list(argv) 2380 self.encoding = encoding 2381 self.stdin = stdin 2382 self.working_dir = working_dir 2383 stdout.write(self.out) 2384 stderr.write(self.err) 2385 return self.result 2386 2387 def test_run_bzr_error(self): 2388 self.out = "It sure does!\n" 2389 self.result = 34 2390 out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34) 2391 self.assertEqual(['rocks'], self.argv) 2392 self.assertEqual('It sure does!\n', out) 2393 self.assertEqual(out, self.out) 2394 self.assertEqual('', err) 2395 self.assertEqual(err, self.err) 2396 2397 def test_run_bzr_error_regexes(self): 2398 self.out = '' 2399 self.err = "bzr: ERROR: foobarbaz is not versioned" 2400 self.result = 3 2401 out, err = self.run_bzr_error( 2402 ["bzr: ERROR: foobarbaz is not versioned"], 2403 ['file-id', 'foobarbaz']) 2404 2405 def test_encoding(self): 2406 """Test that run_bzr passes encoding to _run_bzr_core""" 2407 self.run_bzr('foo bar') 2408 self.assertEqual(osutils.get_user_encoding(), self.encoding) 2409 self.assertEqual(['foo', 'bar'], self.argv) 2410 2411 self.run_bzr('foo bar', encoding='baz') 2412 self.assertEqual('baz', self.encoding) 2413 self.assertEqual(['foo', 'bar'], self.argv) 2414 2415 def test_stdin(self): 2416 # test that the stdin keyword to run_bzr is passed through to 2417 # _run_bzr_core as-is. We do this by overriding 2418 # _run_bzr_core in this class, and then calling run_bzr, 2419 # which is a convenience function for _run_bzr_core, so 2420 # should invoke it. 2421 self.run_bzr('foo bar', stdin='gam') 2422 self.assertEqual('gam', self.stdin) 2423 self.assertEqual(['foo', 'bar'], self.argv) 2424 2425 self.run_bzr('foo bar', stdin='zippy') 2426 self.assertEqual('zippy', self.stdin) 2427 self.assertEqual(['foo', 'bar'], self.argv) 2428 2429 def test_working_dir(self): 2430 """Test that run_bzr passes working_dir to _run_bzr_core""" 2431 self.run_bzr('foo bar') 2432 self.assertEqual(None, self.working_dir) 2433 self.assertEqual(['foo', 'bar'], self.argv) 2434 2435 self.run_bzr('foo bar', working_dir='baz') 2436 self.assertEqual('baz', self.working_dir) 2437 self.assertEqual(['foo', 'bar'], self.argv) 2438 2439 def test_reject_extra_keyword_arguments(self): 2440 self.assertRaises(TypeError, self.run_bzr, "foo bar", 2441 error_regex=['error message']) 2442 2443 2444class TestRunBzrCaptured(tests.TestCaseWithTransport): 2445 # Does IO when testing the working_dir parameter. 2446 2447 def apply_redirected(self, stdin=None, stdout=None, stderr=None, 2448 a_callable=None, *args, **kwargs): 2449 self.stdin = stdin 2450 self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None) 2451 self.factory = breezy.ui.ui_factory 2452 self.working_dir = osutils.getcwd() 2453 stdout.write('foo\n') 2454 stderr.write('bar\n') 2455 return 0 2456 2457 def test_stdin(self): 2458 # test that the stdin keyword to _run_bzr_core is passed through to 2459 # apply_redirected as a StringIO. We do this by overriding 2460 # apply_redirected in this class, and then calling _run_bzr_core, 2461 # which calls apply_redirected. 2462 self.run_bzr(['foo', 'bar'], stdin='gam') 2463 self.assertEqual('gam', self.stdin.read()) 2464 self.assertTrue(self.stdin is self.factory_stdin) 2465 self.run_bzr(['foo', 'bar'], stdin='zippy') 2466 self.assertEqual('zippy', self.stdin.read()) 2467 self.assertTrue(self.stdin is self.factory_stdin) 2468 2469 def test_ui_factory(self): 2470 # each invocation of self.run_bzr should get its 2471 # own UI factory, which is an instance of TestUIFactory, 2472 # with stdin, stdout and stderr attached to the stdin, 2473 # stdout and stderr of the invoked run_bzr 2474 current_factory = breezy.ui.ui_factory 2475 self.run_bzr(['foo']) 2476 self.assertFalse(current_factory is self.factory) 2477 self.assertNotEqual(sys.stdout, self.factory.stdout) 2478 self.assertNotEqual(sys.stderr, self.factory.stderr) 2479 self.assertEqual('foo\n', self.factory.stdout.getvalue()) 2480 self.assertEqual('bar\n', self.factory.stderr.getvalue()) 2481 self.assertIsInstance(self.factory, tests.TestUIFactory) 2482 2483 def test_working_dir(self): 2484 self.build_tree(['one/', 'two/']) 2485 cwd = osutils.getcwd() 2486 2487 # Default is to work in the current directory 2488 self.run_bzr(['foo', 'bar']) 2489 self.assertEqual(cwd, self.working_dir) 2490 2491 self.run_bzr(['foo', 'bar'], working_dir=None) 2492 self.assertEqual(cwd, self.working_dir) 2493 2494 # The function should be run in the alternative directory 2495 # but afterwards the current working dir shouldn't be changed 2496 self.run_bzr(['foo', 'bar'], working_dir='one') 2497 self.assertNotEqual(cwd, self.working_dir) 2498 self.assertEndsWith(self.working_dir, 'one') 2499 self.assertEqual(cwd, osutils.getcwd()) 2500 2501 self.run_bzr(['foo', 'bar'], working_dir='two') 2502 self.assertNotEqual(cwd, self.working_dir) 2503 self.assertEndsWith(self.working_dir, 'two') 2504 self.assertEqual(cwd, osutils.getcwd()) 2505 2506 2507class StubProcess(object): 2508 """A stub process for testing run_bzr_subprocess.""" 2509 2510 def __init__(self, out="", err="", retcode=0): 2511 self.out = out 2512 self.err = err 2513 self.returncode = retcode 2514 2515 def communicate(self): 2516 return self.out, self.err 2517 2518 2519class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport): 2520 """Base class for tests testing how we might run bzr.""" 2521 2522 def setUp(self): 2523 super(TestWithFakedStartBzrSubprocess, self).setUp() 2524 self.subprocess_calls = [] 2525 2526 def start_bzr_subprocess(self, process_args, env_changes=None, 2527 skip_if_plan_to_signal=False, 2528 working_dir=None, 2529 allow_plugins=False): 2530 """capture what run_bzr_subprocess tries to do.""" 2531 self.subprocess_calls.append( 2532 {'process_args': process_args, 2533 'env_changes': env_changes, 2534 'skip_if_plan_to_signal': skip_if_plan_to_signal, 2535 'working_dir': working_dir, 'allow_plugins': allow_plugins}) 2536 return self.next_subprocess 2537 2538 2539class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess): 2540 2541 def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs): 2542 """Run run_bzr_subprocess with args and kwargs using a stubbed process. 2543 2544 Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess 2545 that will return static results. This assertion method populates those 2546 results and also checks the arguments run_bzr_subprocess generates. 2547 """ 2548 self.next_subprocess = process 2549 try: 2550 result = self.run_bzr_subprocess(*args, **kwargs) 2551 except BaseException: 2552 self.next_subprocess = None 2553 for key, expected in expected_args.items(): 2554 self.assertEqual(expected, self.subprocess_calls[-1][key]) 2555 raise 2556 else: 2557 self.next_subprocess = None 2558 for key, expected in expected_args.items(): 2559 self.assertEqual(expected, self.subprocess_calls[-1][key]) 2560 return result 2561 2562 def test_run_bzr_subprocess(self): 2563 """The run_bzr_helper_external command behaves nicely.""" 2564 self.assertRunBzrSubprocess({'process_args': ['--version']}, 2565 StubProcess(), '--version') 2566 self.assertRunBzrSubprocess({'process_args': ['--version']}, 2567 StubProcess(), ['--version']) 2568 # retcode=None disables retcode checking 2569 result = self.assertRunBzrSubprocess( 2570 {}, StubProcess(retcode=3), '--version', retcode=None) 2571 result = self.assertRunBzrSubprocess( 2572 {}, StubProcess(out="is free software"), '--version') 2573 self.assertContainsRe(result[0], 'is free software') 2574 # Running a subcommand that is missing errors 2575 self.assertRaises(AssertionError, self.assertRunBzrSubprocess, 2576 {'process_args': ['--versionn'] 2577 }, StubProcess(retcode=3), 2578 '--versionn') 2579 # Unless it is told to expect the error from the subprocess 2580 result = self.assertRunBzrSubprocess( 2581 {}, StubProcess(retcode=3), '--versionn', retcode=3) 2582 # Or to ignore retcode checking 2583 result = self.assertRunBzrSubprocess( 2584 {}, StubProcess(err="unknown command", retcode=3), 2585 '--versionn', retcode=None) 2586 self.assertContainsRe(result[1], 'unknown command') 2587 2588 def test_env_change_passes_through(self): 2589 self.assertRunBzrSubprocess( 2590 {'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}}, 2591 StubProcess(), '', 2592 env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None}) 2593 2594 def test_no_working_dir_passed_as_None(self): 2595 self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '') 2596 2597 def test_no_working_dir_passed_through(self): 2598 self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '', 2599 working_dir='dir') 2600 2601 def test_run_bzr_subprocess_no_plugins(self): 2602 self.assertRunBzrSubprocess({'allow_plugins': False}, 2603 StubProcess(), '') 2604 2605 def test_allow_plugins(self): 2606 self.assertRunBzrSubprocess({'allow_plugins': True}, 2607 StubProcess(), '', allow_plugins=True) 2608 2609 2610class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess): 2611 2612 def test_finish_bzr_subprocess_with_error(self): 2613 """finish_bzr_subprocess allows specification of the desired exit code. 2614 """ 2615 process = StubProcess(err="unknown command", retcode=3) 2616 result = self.finish_bzr_subprocess(process, retcode=3) 2617 self.assertEqual('', result[0]) 2618 self.assertContainsRe(result[1], 'unknown command') 2619 2620 def test_finish_bzr_subprocess_ignoring_retcode(self): 2621 """finish_bzr_subprocess allows the exit code to be ignored.""" 2622 process = StubProcess(err="unknown command", retcode=3) 2623 result = self.finish_bzr_subprocess(process, retcode=None) 2624 self.assertEqual('', result[0]) 2625 self.assertContainsRe(result[1], 'unknown command') 2626 2627 def test_finish_subprocess_with_unexpected_retcode(self): 2628 """finish_bzr_subprocess raises self.failureException if the retcode is 2629 not the expected one. 2630 """ 2631 process = StubProcess(err="unknown command", retcode=3) 2632 self.assertRaises(self.failureException, self.finish_bzr_subprocess, 2633 process) 2634 2635 2636class _DontSpawnProcess(Exception): 2637 """A simple exception which just allows us to skip unnecessary steps""" 2638 2639 2640class TestStartBzrSubProcess(tests.TestCase): 2641 """Stub test start_bzr_subprocess.""" 2642 2643 def _subprocess_log_cleanup(self): 2644 """Inhibits the base version as we don't produce a log file.""" 2645 2646 def _popen(self, *args, **kwargs): 2647 """Override the base version to record the command that is run. 2648 2649 From there we can ensure it is correct without spawning a real process. 2650 """ 2651 self.check_popen_state() 2652 self._popen_args = args 2653 self._popen_kwargs = kwargs 2654 raise _DontSpawnProcess() 2655 2656 def check_popen_state(self): 2657 """Replace to make assertions when popen is called.""" 2658 2659 def test_run_bzr_subprocess_no_plugins(self): 2660 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, []) 2661 command = self._popen_args[0] 2662 self.assertEqual(sys.executable, command[0]) 2663 self.assertEqual(self.get_brz_path(), command[1]) 2664 self.assertEqual(['--no-plugins'], command[2:]) 2665 2666 def test_allow_plugins(self): 2667 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], 2668 allow_plugins=True) 2669 command = self._popen_args[0] 2670 self.assertEqual([], command[2:]) 2671 2672 def test_set_env(self): 2673 self.assertFalse('EXISTANT_ENV_VAR' in os.environ) 2674 # set in the child 2675 2676 def check_environment(): 2677 self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR']) 2678 self.check_popen_state = check_environment 2679 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], 2680 env_changes={'EXISTANT_ENV_VAR': 'set variable'}) 2681 # not set in theparent 2682 self.assertFalse('EXISTANT_ENV_VAR' in os.environ) 2683 2684 def test_run_bzr_subprocess_env_del(self): 2685 """run_bzr_subprocess can remove environment variables too.""" 2686 self.assertFalse('EXISTANT_ENV_VAR' in os.environ) 2687 2688 def check_environment(): 2689 self.assertFalse('EXISTANT_ENV_VAR' in os.environ) 2690 os.environ['EXISTANT_ENV_VAR'] = 'set variable' 2691 self.check_popen_state = check_environment 2692 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], 2693 env_changes={'EXISTANT_ENV_VAR': None}) 2694 # Still set in parent 2695 self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR']) 2696 del os.environ['EXISTANT_ENV_VAR'] 2697 2698 def test_env_del_missing(self): 2699 self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ) 2700 2701 def check_environment(): 2702 self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ) 2703 self.check_popen_state = check_environment 2704 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], 2705 env_changes={'NON_EXISTANT_ENV_VAR': None}) 2706 2707 def test_working_dir(self): 2708 """Test that we can specify the working dir for the child""" 2709 chdirs = [] 2710 2711 def chdir(path): 2712 chdirs.append(path) 2713 self.overrideAttr(os, 'chdir', chdir) 2714 2715 def getcwd(): 2716 return 'current' 2717 self.overrideAttr(osutils, 'getcwd', getcwd) 2718 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], 2719 working_dir='foo') 2720 self.assertEqual(['foo', 'current'], chdirs) 2721 2722 def test_get_brz_path_with_cwd_breezy(self): 2723 self.get_source_path = lambda: "" 2724 self.overrideAttr(os.path, "isfile", lambda path: True) 2725 self.assertEqual(self.get_brz_path(), "brz") 2726 2727 2728class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport): 2729 """Tests that really need to do things with an external bzr.""" 2730 2731 def test_start_and_stop_bzr_subprocess_send_signal(self): 2732 """finish_bzr_subprocess raises self.failureException if the retcode is 2733 not the expected one. 2734 """ 2735 self.disable_missing_extensions_warning() 2736 process = self.start_bzr_subprocess(['wait-until-signalled'], 2737 skip_if_plan_to_signal=True) 2738 self.assertEqual(b'running\n', process.stdout.readline()) 2739 result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT, 2740 retcode=3) 2741 self.assertEqual(b'', result[0]) 2742 self.assertEqual(b'brz: interrupted\n', result[1]) 2743 2744 2745class TestSelftestFiltering(tests.TestCase): 2746 2747 def setUp(self): 2748 super(TestSelftestFiltering, self).setUp() 2749 self.suite = TestUtil.TestSuite() 2750 self.loader = TestUtil.TestLoader() 2751 self.suite.addTest(self.loader.loadTestsFromModule( 2752 sys.modules['breezy.tests.test_selftest'])) 2753 self.all_names = _test_ids(self.suite) 2754 2755 def test_condition_id_re(self): 2756 test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2757 'test_condition_id_re') 2758 filtered_suite = tests.filter_suite_by_condition( 2759 self.suite, tests.condition_id_re('test_condition_id_re')) 2760 self.assertEqual([test_name], _test_ids(filtered_suite)) 2761 2762 def test_condition_id_in_list(self): 2763 test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.' 2764 'test_condition_id_in_list'] 2765 id_list = tests.TestIdList(test_names) 2766 filtered_suite = tests.filter_suite_by_condition( 2767 self.suite, tests.condition_id_in_list(id_list)) 2768 my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list' 2769 re_filtered = tests.filter_suite_by_re(self.suite, my_pattern) 2770 self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite)) 2771 2772 def test_condition_id_startswith(self): 2773 klass = 'breezy.tests.test_selftest.TestSelftestFiltering.' 2774 start1 = klass + 'test_condition_id_starts' 2775 start2 = klass + 'test_condition_id_in' 2776 test_names = [klass + 'test_condition_id_in_list', 2777 klass + 'test_condition_id_startswith', 2778 ] 2779 filtered_suite = tests.filter_suite_by_condition( 2780 self.suite, tests.condition_id_startswith([start1, start2])) 2781 self.assertEqual(test_names, _test_ids(filtered_suite)) 2782 2783 def test_condition_isinstance(self): 2784 filtered_suite = tests.filter_suite_by_condition( 2785 self.suite, tests.condition_isinstance(self.__class__)) 2786 class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.' 2787 re_filtered = tests.filter_suite_by_re(self.suite, class_pattern) 2788 self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite)) 2789 2790 def test_exclude_tests_by_condition(self): 2791 excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2792 'test_exclude_tests_by_condition') 2793 filtered_suite = tests.exclude_tests_by_condition( 2794 self.suite, lambda x: x.id() == excluded_name) 2795 self.assertEqual(len(self.all_names) - 1, 2796 filtered_suite.countTestCases()) 2797 self.assertFalse(excluded_name in _test_ids(filtered_suite)) 2798 remaining_names = list(self.all_names) 2799 remaining_names.remove(excluded_name) 2800 self.assertEqual(remaining_names, _test_ids(filtered_suite)) 2801 2802 def test_exclude_tests_by_re(self): 2803 self.all_names = _test_ids(self.suite) 2804 filtered_suite = tests.exclude_tests_by_re(self.suite, 2805 'exclude_tests_by_re') 2806 excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2807 'test_exclude_tests_by_re') 2808 self.assertEqual(len(self.all_names) - 1, 2809 filtered_suite.countTestCases()) 2810 self.assertFalse(excluded_name in _test_ids(filtered_suite)) 2811 remaining_names = list(self.all_names) 2812 remaining_names.remove(excluded_name) 2813 self.assertEqual(remaining_names, _test_ids(filtered_suite)) 2814 2815 def test_filter_suite_by_condition(self): 2816 test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2817 'test_filter_suite_by_condition') 2818 filtered_suite = tests.filter_suite_by_condition( 2819 self.suite, lambda x: x.id() == test_name) 2820 self.assertEqual([test_name], _test_ids(filtered_suite)) 2821 2822 def test_filter_suite_by_re(self): 2823 filtered_suite = tests.filter_suite_by_re(self.suite, 2824 'test_filter_suite_by_r') 2825 filtered_names = _test_ids(filtered_suite) 2826 self.assertEqual( 2827 filtered_names, ['breezy.tests.test_selftest.' 2828 'TestSelftestFiltering.test_filter_suite_by_re']) 2829 2830 def test_filter_suite_by_id_list(self): 2831 test_list = ['breezy.tests.test_selftest.' 2832 'TestSelftestFiltering.test_filter_suite_by_id_list'] 2833 filtered_suite = tests.filter_suite_by_id_list( 2834 self.suite, tests.TestIdList(test_list)) 2835 filtered_names = _test_ids(filtered_suite) 2836 self.assertEqual( 2837 filtered_names, 2838 ['breezy.tests.test_selftest.' 2839 'TestSelftestFiltering.test_filter_suite_by_id_list']) 2840 2841 def test_filter_suite_by_id_startswith(self): 2842 # By design this test may fail if another test is added whose name also 2843 # begins with one of the start value used. 2844 klass = 'breezy.tests.test_selftest.TestSelftestFiltering.' 2845 start1 = klass + 'test_filter_suite_by_id_starts' 2846 start2 = klass + 'test_filter_suite_by_id_li' 2847 test_list = [klass + 'test_filter_suite_by_id_list', 2848 klass + 'test_filter_suite_by_id_startswith', 2849 ] 2850 filtered_suite = tests.filter_suite_by_id_startswith( 2851 self.suite, [start1, start2]) 2852 self.assertEqual( 2853 test_list, 2854 _test_ids(filtered_suite), 2855 ) 2856 2857 def test_preserve_input(self): 2858 # NB: Surely this is something in the stdlib to do this? 2859 self.assertIs(self.suite, tests.preserve_input(self.suite)) 2860 self.assertEqual("@#$", tests.preserve_input("@#$")) 2861 2862 def test_randomize_suite(self): 2863 randomized_suite = tests.randomize_suite(self.suite) 2864 # randomizing should not add or remove test names. 2865 self.assertEqual(set(_test_ids(self.suite)), 2866 set(_test_ids(randomized_suite))) 2867 # Technically, this *can* fail, because random.shuffle(list) can be 2868 # equal to list. Trying multiple times just pushes the frequency back. 2869 # As its len(self.all_names)!:1, the failure frequency should be low 2870 # enough to ignore. RBC 20071021. 2871 # It should change the order. 2872 self.assertNotEqual(self.all_names, _test_ids(randomized_suite)) 2873 # But not the length. (Possibly redundant with the set test, but not 2874 # necessarily.) 2875 self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite))) 2876 2877 def test_split_suit_by_condition(self): 2878 self.all_names = _test_ids(self.suite) 2879 condition = tests.condition_id_re('test_filter_suite_by_r') 2880 split_suite = tests.split_suite_by_condition(self.suite, condition) 2881 filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2882 'test_filter_suite_by_re') 2883 self.assertEqual([filtered_name], _test_ids(split_suite[0])) 2884 self.assertFalse(filtered_name in _test_ids(split_suite[1])) 2885 remaining_names = list(self.all_names) 2886 remaining_names.remove(filtered_name) 2887 self.assertEqual(remaining_names, _test_ids(split_suite[1])) 2888 2889 def test_split_suit_by_re(self): 2890 self.all_names = _test_ids(self.suite) 2891 split_suite = tests.split_suite_by_re(self.suite, 2892 'test_filter_suite_by_r') 2893 filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.' 2894 'test_filter_suite_by_re') 2895 self.assertEqual([filtered_name], _test_ids(split_suite[0])) 2896 self.assertFalse(filtered_name in _test_ids(split_suite[1])) 2897 remaining_names = list(self.all_names) 2898 remaining_names.remove(filtered_name) 2899 self.assertEqual(remaining_names, _test_ids(split_suite[1])) 2900 2901 2902class TestCheckTreeShape(tests.TestCaseWithTransport): 2903 2904 def test_check_tree_shape(self): 2905 files = ['a', 'b/', 'b/c'] 2906 tree = self.make_branch_and_tree('.') 2907 self.build_tree(files) 2908 tree.add(files) 2909 tree.lock_read() 2910 try: 2911 self.check_tree_shape(tree, files) 2912 finally: 2913 tree.unlock() 2914 2915 2916class TestBlackboxSupport(tests.TestCase): 2917 """Tests for testsuite blackbox features.""" 2918 2919 def test_run_bzr_failure_not_caught(self): 2920 # When we run bzr in blackbox mode, we want any unexpected errors to 2921 # propagate up to the test suite so that it can show the error in the 2922 # usual way, and we won't get a double traceback. 2923 e = self.assertRaises( 2924 AssertionError, 2925 self.run_bzr, ['assert-fail']) 2926 # make sure we got the real thing, not an error from somewhere else in 2927 # the test framework 2928 self.assertEqual('always fails', str(e)) 2929 # check that there's no traceback in the test log 2930 self.assertNotContainsRe(self.get_log(), r'Traceback') 2931 2932 def test_run_bzr_user_error_caught(self): 2933 # Running bzr in blackbox mode, normal/expected/user errors should be 2934 # caught in the regular way and turned into an error message plus exit 2935 # code. 2936 transport_server = memory.MemoryServer() 2937 transport_server.start_server() 2938 self.addCleanup(transport_server.stop_server) 2939 url = transport_server.get_url() 2940 self.permit_url(url) 2941 out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3) 2942 self.assertEqual(out, '') 2943 self.assertContainsRe( 2944 err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n') 2945 2946 2947class TestTestLoader(tests.TestCase): 2948 """Tests for the test loader.""" 2949 2950 def _get_loader_and_module(self): 2951 """Gets a TestLoader and a module with one test in it.""" 2952 loader = TestUtil.TestLoader() 2953 module = {} 2954 2955 class Stub(tests.TestCase): 2956 def test_foo(self): 2957 pass 2958 2959 class MyModule(object): 2960 pass 2961 MyModule.a_class = Stub 2962 module = MyModule() 2963 module.__name__ = 'fake_module' 2964 return loader, module 2965 2966 def test_module_no_load_tests_attribute_loads_classes(self): 2967 loader, module = self._get_loader_and_module() 2968 self.assertEqual(1, loader.loadTestsFromModule( 2969 module).countTestCases()) 2970 2971 def test_module_load_tests_attribute_gets_called(self): 2972 loader, module = self._get_loader_and_module() 2973 2974 def load_tests(loader, standard_tests, pattern): 2975 result = loader.suiteClass() 2976 for test in tests.iter_suite_tests(standard_tests): 2977 result.addTests([test, test]) 2978 return result 2979 # add a load_tests() method which multiplies the tests from the module. 2980 module.__class__.load_tests = staticmethod(load_tests) 2981 self.assertEqual( 2982 2 * [str(module.a_class('test_foo'))], 2983 list(map(str, loader.loadTestsFromModule(module)))) 2984 2985 def test_load_tests_from_module_name_smoke_test(self): 2986 loader = TestUtil.TestLoader() 2987 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 2988 self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'], 2989 _test_ids(suite)) 2990 2991 def test_load_tests_from_module_name_with_bogus_module_name(self): 2992 loader = TestUtil.TestLoader() 2993 self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus') 2994 2995 2996class TestTestIdList(tests.TestCase): 2997 2998 def _create_id_list(self, test_list): 2999 return tests.TestIdList(test_list) 3000 3001 def _create_suite(self, test_id_list): 3002 3003 class Stub(tests.TestCase): 3004 def test_foo(self): 3005 pass 3006 3007 def _create_test_id(id): 3008 return lambda: id 3009 3010 suite = TestUtil.TestSuite() 3011 for id in test_id_list: 3012 t = Stub('test_foo') 3013 t.id = _create_test_id(id) 3014 suite.addTest(t) 3015 return suite 3016 3017 def _test_ids(self, test_suite): 3018 """Get the ids for the tests in a test suite.""" 3019 return [t.id() for t in tests.iter_suite_tests(test_suite)] 3020 3021 def test_empty_list(self): 3022 id_list = self._create_id_list([]) 3023 self.assertEqual({}, id_list.tests) 3024 self.assertEqual({}, id_list.modules) 3025 3026 def test_valid_list(self): 3027 id_list = self._create_id_list( 3028 ['mod1.cl1.meth1', 'mod1.cl1.meth2', 3029 'mod1.func1', 'mod1.cl2.meth2', 3030 'mod1.submod1', 3031 'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2', 3032 ]) 3033 self.assertTrue(id_list.refers_to('mod1')) 3034 self.assertTrue(id_list.refers_to('mod1.submod1')) 3035 self.assertTrue(id_list.refers_to('mod1.submod2')) 3036 self.assertTrue(id_list.includes('mod1.cl1.meth1')) 3037 self.assertTrue(id_list.includes('mod1.submod1')) 3038 self.assertTrue(id_list.includes('mod1.func1')) 3039 3040 def test_bad_chars_in_params(self): 3041 id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)']) 3042 self.assertTrue(id_list.refers_to('mod1')) 3043 self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)')) 3044 3045 def test_module_used(self): 3046 id_list = self._create_id_list(['mod.class.meth']) 3047 self.assertTrue(id_list.refers_to('mod')) 3048 self.assertTrue(id_list.refers_to('mod.class')) 3049 self.assertTrue(id_list.refers_to('mod.class.meth')) 3050 3051 def test_test_suite_matches_id_list_with_unknown(self): 3052 loader = TestUtil.TestLoader() 3053 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3054 test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', 3055 'bogus'] 3056 not_found, duplicates = tests.suite_matches_id_list(suite, test_list) 3057 self.assertEqual(['bogus'], not_found) 3058 self.assertEqual([], duplicates) 3059 3060 def test_suite_matches_id_list_with_duplicates(self): 3061 loader = TestUtil.TestLoader() 3062 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3063 dupes = loader.suiteClass() 3064 for test in tests.iter_suite_tests(suite): 3065 dupes.addTest(test) 3066 dupes.addTest(test) # Add it again 3067 3068 test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ] 3069 not_found, duplicates = tests.suite_matches_id_list( 3070 dupes, test_list) 3071 self.assertEqual([], not_found) 3072 self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'], 3073 duplicates) 3074 3075 3076class TestTestSuite(tests.TestCase): 3077 3078 def test__test_suite_testmod_names(self): 3079 # Test that a plausible list of test module names are returned 3080 # by _test_suite_testmod_names. 3081 test_list = tests._test_suite_testmod_names() 3082 self.assertSubset([ 3083 'breezy.tests.blackbox', 3084 'breezy.tests.per_transport', 3085 'breezy.tests.test_selftest', 3086 ], 3087 test_list) 3088 3089 def test__test_suite_modules_to_doctest(self): 3090 # Test that a plausible list of modules to doctest is returned 3091 # by _test_suite_modules_to_doctest. 3092 test_list = tests._test_suite_modules_to_doctest() 3093 if __doc__ is None: 3094 # When docstrings are stripped, there are no modules to doctest 3095 self.assertEqual([], test_list) 3096 return 3097 self.assertSubset([ 3098 'breezy.timestamp', 3099 ], 3100 test_list) 3101 3102 def test_test_suite(self): 3103 # test_suite() loads the entire test suite to operate. To avoid this 3104 # overhead, and yet still be confident that things are happening, 3105 # we temporarily replace two functions used by test_suite with 3106 # test doubles that supply a few sample tests to load, and check they 3107 # are loaded. 3108 calls = [] 3109 3110 def testmod_names(): 3111 calls.append("testmod_names") 3112 return [ 3113 'breezy.tests.blackbox.test_branch', 3114 'breezy.tests.per_transport', 3115 'breezy.tests.test_selftest', 3116 ] 3117 self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names) 3118 3119 def doctests(): 3120 calls.append("modules_to_doctest") 3121 if __doc__ is None: 3122 return [] 3123 return ['breezy.timestamp'] 3124 self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests) 3125 expected_test_list = [ 3126 # testmod_names 3127 'breezy.tests.blackbox.test_branch.TestBranch.test_branch', 3128 ('breezy.tests.per_transport.TransportTests' 3129 '.test_abspath(LocalTransport,LocalURLServer)'), 3130 'breezy.tests.test_selftest.TestTestSuite.test_test_suite', 3131 # plugins can't be tested that way since selftest may be run with 3132 # --no-plugins 3133 ] 3134 suite = tests.test_suite() 3135 self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls)) 3136 self.assertSubset(expected_test_list, _test_ids(suite)) 3137 3138 def test_test_suite_list_and_start(self): 3139 # We cannot test this at the same time as the main load, because we 3140 # want to know that starting_with == None works. So a second load is 3141 # incurred - note that the starting_with parameter causes a partial 3142 # load rather than a full load so this test should be pretty quick. 3143 test_list = [ 3144 'breezy.tests.test_selftest.TestTestSuite.test_test_suite'] 3145 suite = tests.test_suite(test_list, 3146 ['breezy.tests.test_selftest.TestTestSuite']) 3147 # test_test_suite_list_and_start is not included 3148 self.assertEqual(test_list, _test_ids(suite)) 3149 3150 3151class TestLoadTestIdList(tests.TestCaseInTempDir): 3152 3153 def _create_test_list_file(self, file_name, content): 3154 fl = open(file_name, 'wt') 3155 fl.write(content) 3156 fl.close() 3157 3158 def test_load_unknown(self): 3159 self.assertRaises(errors.NoSuchFile, 3160 tests.load_test_id_list, 'i_do_not_exist') 3161 3162 def test_load_test_list(self): 3163 test_list_fname = 'test.list' 3164 self._create_test_list_file(test_list_fname, 3165 'mod1.cl1.meth1\nmod2.cl2.meth2\n') 3166 tlist = tests.load_test_id_list(test_list_fname) 3167 self.assertEqual(2, len(tlist)) 3168 self.assertEqual('mod1.cl1.meth1', tlist[0]) 3169 self.assertEqual('mod2.cl2.meth2', tlist[1]) 3170 3171 def test_load_dirty_file(self): 3172 test_list_fname = 'test.list' 3173 self._create_test_list_file(test_list_fname, 3174 ' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n' 3175 'bar baz\n') 3176 tlist = tests.load_test_id_list(test_list_fname) 3177 self.assertEqual(4, len(tlist)) 3178 self.assertEqual('mod1.cl1.meth1', tlist[0]) 3179 self.assertEqual('', tlist[1]) 3180 self.assertEqual('mod2.cl2.meth2', tlist[2]) 3181 self.assertEqual('bar baz', tlist[3]) 3182 3183 3184class TestFilteredByModuleTestLoader(tests.TestCase): 3185 3186 def _create_loader(self, test_list): 3187 id_filter = tests.TestIdList(test_list) 3188 loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to) 3189 return loader 3190 3191 def test_load_tests(self): 3192 test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing'] 3193 loader = self._create_loader(test_list) 3194 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3195 self.assertEqual(test_list, _test_ids(suite)) 3196 3197 def test_exclude_tests(self): 3198 test_list = ['bogus'] 3199 loader = self._create_loader(test_list) 3200 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3201 self.assertEqual([], _test_ids(suite)) 3202 3203 3204class TestFilteredByNameStartTestLoader(tests.TestCase): 3205 3206 def _create_loader(self, name_start): 3207 def needs_module(name): 3208 return name.startswith(name_start) or name_start.startswith(name) 3209 loader = TestUtil.FilteredByModuleTestLoader(needs_module) 3210 return loader 3211 3212 def test_load_tests(self): 3213 test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing'] 3214 loader = self._create_loader('breezy.tests.test_samp') 3215 3216 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3217 self.assertEqual(test_list, _test_ids(suite)) 3218 3219 def test_load_tests_inside_module(self): 3220 test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing'] 3221 loader = self._create_loader('breezy.tests.test_sampler.Demo') 3222 3223 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3224 self.assertEqual(test_list, _test_ids(suite)) 3225 3226 def test_exclude_tests(self): 3227 loader = self._create_loader('bogus') 3228 3229 suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler') 3230 self.assertEqual([], _test_ids(suite)) 3231 3232 3233class TestTestPrefixRegistry(tests.TestCase): 3234 3235 def _get_registry(self): 3236 tp_registry = tests.TestPrefixAliasRegistry() 3237 return tp_registry 3238 3239 def test_register_new_prefix(self): 3240 tpr = self._get_registry() 3241 tpr.register('foo', 'fff.ooo.ooo') 3242 self.assertEqual('fff.ooo.ooo', tpr.get('foo')) 3243 3244 def test_register_existing_prefix(self): 3245 tpr = self._get_registry() 3246 tpr.register('bar', 'bbb.aaa.rrr') 3247 tpr.register('bar', 'bBB.aAA.rRR') 3248 self.assertEqual('bbb.aaa.rrr', tpr.get('bar')) 3249 self.assertThat(self.get_log(), 3250 DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", 3251 doctest.ELLIPSIS)) 3252 3253 def test_get_unknown_prefix(self): 3254 tpr = self._get_registry() 3255 self.assertRaises(KeyError, tpr.get, 'I am not a prefix') 3256 3257 def test_resolve_prefix(self): 3258 tpr = self._get_registry() 3259 tpr.register('bar', 'bb.aa.rr') 3260 self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar')) 3261 3262 def test_resolve_unknown_alias(self): 3263 tpr = self._get_registry() 3264 self.assertRaises(errors.CommandError, 3265 tpr.resolve_alias, 'I am not a prefix') 3266 3267 def test_predefined_prefixes(self): 3268 tpr = tests.test_prefix_alias_registry 3269 self.assertEqual('breezy', tpr.resolve_alias('breezy')) 3270 self.assertEqual('breezy.doc', tpr.resolve_alias('bd')) 3271 self.assertEqual('breezy.utils', tpr.resolve_alias('bu')) 3272 self.assertEqual('breezy.tests', tpr.resolve_alias('bt')) 3273 self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb')) 3274 self.assertEqual('breezy.plugins', tpr.resolve_alias('bp')) 3275 3276 3277class TestThreadLeakDetection(tests.TestCase): 3278 """Ensure when tests leak threads we detect and report it""" 3279 3280 class LeakRecordingResult(tests.ExtendedTestResult): 3281 def __init__(self): 3282 tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1) 3283 self.leaks = [] 3284 3285 def _report_thread_leak(self, test, leaks, alive): 3286 self.leaks.append((test, leaks)) 3287 3288 def test_testcase_without_addCleanups(self): 3289 """Check old TestCase instances don't break with leak detection""" 3290 class Test(unittest.TestCase): 3291 def runTest(self): 3292 pass 3293 result = self.LeakRecordingResult() 3294 test = Test() 3295 result.startTestRun() 3296 test.run(result) 3297 result.stopTestRun() 3298 self.assertEqual(result._tests_leaking_threads_count, 0) 3299 self.assertEqual(result.leaks, []) 3300 3301 def test_thread_leak(self): 3302 """Ensure a thread that outlives the running of a test is reported 3303 3304 Uses a thread that blocks on an event, and is started by the inner 3305 test case. As the thread outlives the inner case's run, it should be 3306 detected as a leak, but the event is then set so that the thread can 3307 be safely joined in cleanup so it's not leaked for real. 3308 """ 3309 event = threading.Event() 3310 thread = threading.Thread(name="Leaker", target=event.wait) 3311 3312 class Test(tests.TestCase): 3313 def test_leak(self): 3314 thread.start() 3315 result = self.LeakRecordingResult() 3316 test = Test("test_leak") 3317 self.addCleanup(thread.join) 3318 self.addCleanup(event.set) 3319 result.startTestRun() 3320 test.run(result) 3321 result.stopTestRun() 3322 self.assertEqual(result._tests_leaking_threads_count, 1) 3323 self.assertEqual(result._first_thread_leaker_id, test.id()) 3324 self.assertEqual(result.leaks, [(test, {thread})]) 3325 self.assertContainsString(result.stream.getvalue(), "leaking threads") 3326 3327 def test_multiple_leaks(self): 3328 """Check multiple leaks are blamed on the test cases at fault 3329 3330 Same concept as the previous test, but has one inner test method that 3331 leaks two threads, and one that doesn't leak at all. 3332 """ 3333 event = threading.Event() 3334 thread_a = threading.Thread(name="LeakerA", target=event.wait) 3335 thread_b = threading.Thread(name="LeakerB", target=event.wait) 3336 thread_c = threading.Thread(name="LeakerC", target=event.wait) 3337 3338 class Test(tests.TestCase): 3339 def test_first_leak(self): 3340 thread_b.start() 3341 3342 def test_second_no_leak(self): 3343 pass 3344 3345 def test_third_leak(self): 3346 thread_c.start() 3347 thread_a.start() 3348 result = self.LeakRecordingResult() 3349 first_test = Test("test_first_leak") 3350 third_test = Test("test_third_leak") 3351 self.addCleanup(thread_a.join) 3352 self.addCleanup(thread_b.join) 3353 self.addCleanup(thread_c.join) 3354 self.addCleanup(event.set) 3355 result.startTestRun() 3356 unittest.TestSuite( 3357 [first_test, Test("test_second_no_leak"), third_test] 3358 ).run(result) 3359 result.stopTestRun() 3360 self.assertEqual(result._tests_leaking_threads_count, 2) 3361 self.assertEqual(result._first_thread_leaker_id, first_test.id()) 3362 self.assertEqual(result.leaks, [ 3363 (first_test, {thread_b}), 3364 (third_test, {thread_a, thread_c})]) 3365 self.assertContainsString(result.stream.getvalue(), "leaking threads") 3366 3367 3368class TestPostMortemDebugging(tests.TestCase): 3369 """Check post mortem debugging works when tests fail or error""" 3370 3371 class TracebackRecordingResult(tests.ExtendedTestResult): 3372 def __init__(self): 3373 tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1) 3374 self.postcode = None 3375 3376 def _post_mortem(self, tb=None): 3377 """Record the code object at the end of the current traceback""" 3378 tb = tb or sys.exc_info()[2] 3379 if tb is not None: 3380 next = tb.tb_next 3381 while next is not None: 3382 tb = next 3383 next = next.tb_next 3384 self.postcode = tb.tb_frame.f_code 3385 3386 def report_error(self, test, err): 3387 pass 3388 3389 def report_failure(self, test, err): 3390 pass 3391 3392 def test_location_unittest_error(self): 3393 """Needs right post mortem traceback with erroring unittest case""" 3394 class Test(unittest.TestCase): 3395 def runTest(self): 3396 raise RuntimeError 3397 result = self.TracebackRecordingResult() 3398 Test().run(result) 3399 self.assertEqual(result.postcode, Test.runTest.__code__) 3400 3401 def test_location_unittest_failure(self): 3402 """Needs right post mortem traceback with failing unittest case""" 3403 class Test(unittest.TestCase): 3404 def runTest(self): 3405 raise self.failureException 3406 result = self.TracebackRecordingResult() 3407 Test().run(result) 3408 self.assertEqual(result.postcode, Test.runTest.__code__) 3409 3410 def test_location_bt_error(self): 3411 """Needs right post mortem traceback with erroring breezy.tests case""" 3412 class Test(tests.TestCase): 3413 def test_error(self): 3414 raise RuntimeError 3415 result = self.TracebackRecordingResult() 3416 Test("test_error").run(result) 3417 self.assertEqual(result.postcode, Test.test_error.__code__) 3418 3419 def test_location_bt_failure(self): 3420 """Needs right post mortem traceback with failing breezy.tests case""" 3421 class Test(tests.TestCase): 3422 def test_failure(self): 3423 raise self.failureException 3424 result = self.TracebackRecordingResult() 3425 Test("test_failure").run(result) 3426 self.assertEqual(result.postcode, Test.test_failure.__code__) 3427 3428 def test_env_var_triggers_post_mortem(self): 3429 """Check pdb.post_mortem is called iff BRZ_TEST_PDB is set""" 3430 import pdb 3431 result = tests.ExtendedTestResult(StringIO(), 0, 1) 3432 post_mortem_calls = [] 3433 self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append) 3434 self.overrideEnv('BRZ_TEST_PDB', None) 3435 result._post_mortem(1) 3436 self.overrideEnv('BRZ_TEST_PDB', 'on') 3437 result._post_mortem(2) 3438 self.assertEqual([2], post_mortem_calls) 3439 3440 3441class TestRunSuite(tests.TestCase): 3442 3443 def test_runner_class(self): 3444 """run_suite accepts and uses a runner_class keyword argument.""" 3445 class Stub(tests.TestCase): 3446 def test_foo(self): 3447 pass 3448 suite = Stub("test_foo") 3449 calls = [] 3450 3451 class MyRunner(tests.TextTestRunner): 3452 def run(self, test): 3453 calls.append(test) 3454 return tests.ExtendedTestResult(self.stream, self.descriptions, 3455 self.verbosity) 3456 tests.run_suite(suite, runner_class=MyRunner, stream=StringIO()) 3457 self.assertLength(1, calls) 3458 3459 3460class _Selftest(object): 3461 """Mixin for tests needing full selftest output""" 3462 3463 def _inject_stream_into_subunit(self, stream): 3464 """To be overridden by subclasses that run tests out of process""" 3465 3466 def _run_selftest(self, **kwargs): 3467 bio = BytesIO() 3468 sio = TextIOWrapper(bio, 'utf-8') 3469 self._inject_stream_into_subunit(bio) 3470 tests.selftest(stream=sio, stop_on_failure=False, **kwargs) 3471 sio.flush() 3472 return bio.getvalue() 3473 3474 3475class _ForkedSelftest(_Selftest): 3476 """Mixin for tests needing full selftest output with forked children""" 3477 3478 _test_needs_features = [features.subunit] 3479 3480 def _inject_stream_into_subunit(self, stream): 3481 """Monkey-patch subunit so the extra output goes to stream not stdout 3482 3483 Some APIs need rewriting so this kind of bogus hackery can be replaced 3484 by passing the stream param from run_tests down into ProtocolTestCase. 3485 """ 3486 from subunit import ProtocolTestCase 3487 _original_init = ProtocolTestCase.__init__ 3488 3489 def _init_with_passthrough(self, *args, **kwargs): 3490 _original_init(self, *args, **kwargs) 3491 self._passthrough = stream 3492 self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough) 3493 3494 def _run_selftest(self, **kwargs): 3495 # GZ 2011-05-26: Add a PosixSystem feature so this check can go away 3496 if getattr(os, "fork", None) is None: 3497 raise tests.TestNotApplicable("Platform doesn't support forking") 3498 # Make sure the fork code is actually invoked by claiming two cores 3499 self.overrideAttr(osutils, "local_concurrency", lambda: 2) 3500 kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator) 3501 return super(_ForkedSelftest, self)._run_selftest(**kwargs) 3502 3503 3504class TestParallelFork(_ForkedSelftest, tests.TestCase): 3505 """Check operation of --parallel=fork selftest option""" 3506 3507 def test_error_in_child_during_fork(self): 3508 """Error in a forked child during test setup should get reported""" 3509 class Test(tests.TestCase): 3510 def testMethod(self): 3511 pass 3512 # We don't care what, just break something that a child will run 3513 self.overrideAttr(tests, "workaround_zealous_crypto_random", None) 3514 out = self._run_selftest(test_suite_factory=Test) 3515 # Lines from the tracebacks of the two child processes may be mixed 3516 # together due to the way subunit parses and forwards the streams, 3517 # so permit extra lines between each part of the error output. 3518 self.assertContainsRe(out, 3519 b"Traceback.*:\n" 3520 b"(?:.*\n)*" 3521 b".+ in fork_for_tests\n" 3522 b"(?:.*\n)*" 3523 b"\\s*workaround_zealous_crypto_random\\(\\)\n" 3524 b"(?:.*\n)*" 3525 b"TypeError:") 3526 3527 3528class TestUncollectedWarnings(_Selftest, tests.TestCase): 3529 """Check a test case still alive after being run emits a warning""" 3530 3531 class Test(tests.TestCase): 3532 def test_pass(self): 3533 pass 3534 3535 def test_self_ref(self): 3536 self.also_self = self.test_self_ref 3537 3538 def test_skip(self): 3539 self.skipTest("Don't need") 3540 3541 def _get_suite(self): 3542 return TestUtil.TestSuite([ 3543 self.Test("test_pass"), 3544 self.Test("test_self_ref"), 3545 self.Test("test_skip"), 3546 ]) 3547 3548 def _run_selftest_with_suite(self, **kwargs): 3549 old_flags = tests.selftest_debug_flags 3550 tests.selftest_debug_flags = old_flags.union(["uncollected_cases"]) 3551 gc_on = gc.isenabled() 3552 if gc_on: 3553 gc.disable() 3554 try: 3555 output = self._run_selftest(test_suite_factory=self._get_suite, 3556 **kwargs) 3557 finally: 3558 if gc_on: 3559 gc.enable() 3560 tests.selftest_debug_flags = old_flags 3561 self.assertNotContainsRe(output, b"Uncollected test case.*test_pass") 3562 self.assertContainsRe(output, b"Uncollected test case.*test_self_ref") 3563 return output 3564 3565 def test_testsuite(self): 3566 self._run_selftest_with_suite() 3567 3568 def test_pattern(self): 3569 out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$") 3570 self.assertNotContainsRe(out, b"test_skip") 3571 3572 def test_exclude_pattern(self): 3573 out = self._run_selftest_with_suite(exclude_pattern="test_skip$") 3574 self.assertNotContainsRe(out, b"test_skip") 3575 3576 def test_random_seed(self): 3577 self._run_selftest_with_suite(random_seed="now") 3578 3579 def test_matching_tests_first(self): 3580 self._run_selftest_with_suite(matching_tests_first=True, 3581 pattern="test_self_ref$") 3582 3583 def test_starting_with_and_exclude(self): 3584 out = self._run_selftest_with_suite(starting_with=["bt."], 3585 exclude_pattern="test_skip$") 3586 self.assertNotContainsRe(out, b"test_skip") 3587 3588 def test_additonal_decorator(self): 3589 self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator]) 3590 3591 3592class TestUncollectedWarningsSubunit(TestUncollectedWarnings): 3593 """Check warnings from tests staying alive are emitted with subunit""" 3594 3595 _test_needs_features = [features.subunit] 3596 3597 def _run_selftest_with_suite(self, **kwargs): 3598 return TestUncollectedWarnings._run_selftest_with_suite( 3599 self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs) 3600 3601 3602class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings): 3603 """Check warnings from tests staying alive are emitted when forking""" 3604 3605 3606class TestEnvironHandling(tests.TestCase): 3607 3608 def test_overrideEnv_None_called_twice_doesnt_leak(self): 3609 self.assertFalse('MYVAR' in os.environ) 3610 self.overrideEnv('MYVAR', '42') 3611 # We use an embedded test to make sure we fix the _captureVar bug 3612 3613 class Test(tests.TestCase): 3614 def test_me(self): 3615 # The first call save the 42 value 3616 self.overrideEnv('MYVAR', None) 3617 self.assertEqual(None, os.environ.get('MYVAR')) 3618 # Make sure we can call it twice 3619 self.overrideEnv('MYVAR', None) 3620 self.assertEqual(None, os.environ.get('MYVAR')) 3621 output = StringIO() 3622 result = tests.TextTestResult(output, 0, 1) 3623 Test('test_me').run(result) 3624 if not result.wasStrictlySuccessful(): 3625 self.fail(output.getvalue()) 3626 # We get our value back 3627 self.assertEqual('42', os.environ.get('MYVAR')) 3628 3629 3630class TestIsolatedEnv(tests.TestCase): 3631 """Test isolating tests from os.environ. 3632 3633 Since we use tests that are already isolated from os.environ a bit of care 3634 should be taken when designing the tests to avoid bootstrap side-effects. 3635 The tests start an already clean os.environ which allow doing valid 3636 assertions about which variables are present or not and design tests around 3637 these assertions. 3638 """ 3639 3640 class ScratchMonkey(tests.TestCase): 3641 3642 def test_me(self): 3643 pass 3644 3645 def test_basics(self): 3646 # Make sure we know the definition of BRZ_HOME: not part of os.environ 3647 # for tests.TestCase. 3648 self.assertTrue('BRZ_HOME' in tests.isolated_environ) 3649 self.assertEqual(None, tests.isolated_environ['BRZ_HOME']) 3650 # Being part of isolated_environ, BRZ_HOME should not appear here 3651 self.assertFalse('BRZ_HOME' in os.environ) 3652 # Make sure we know the definition of LINES: part of os.environ for 3653 # tests.TestCase 3654 self.assertTrue('LINES' in tests.isolated_environ) 3655 self.assertEqual('25', tests.isolated_environ['LINES']) 3656 self.assertEqual('25', os.environ['LINES']) 3657 3658 def test_injecting_unknown_variable(self): 3659 # BRZ_HOME is known to be absent from os.environ 3660 test = self.ScratchMonkey('test_me') 3661 tests.override_os_environ(test, {'BRZ_HOME': 'foo'}) 3662 self.assertEqual('foo', os.environ['BRZ_HOME']) 3663 tests.restore_os_environ(test) 3664 self.assertFalse('BRZ_HOME' in os.environ) 3665 3666 def test_injecting_known_variable(self): 3667 test = self.ScratchMonkey('test_me') 3668 # LINES is known to be present in os.environ 3669 tests.override_os_environ(test, {'LINES': '42'}) 3670 self.assertEqual('42', os.environ['LINES']) 3671 tests.restore_os_environ(test) 3672 self.assertEqual('25', os.environ['LINES']) 3673 3674 def test_deleting_variable(self): 3675 test = self.ScratchMonkey('test_me') 3676 # LINES is known to be present in os.environ 3677 tests.override_os_environ(test, {'LINES': None}) 3678 self.assertTrue('LINES' not in os.environ) 3679 tests.restore_os_environ(test) 3680 self.assertEqual('25', os.environ['LINES']) 3681 3682 3683class TestDocTestSuiteIsolation(tests.TestCase): 3684 """Test that `tests.DocTestSuite` isolates doc tests from os.environ. 3685 3686 Since tests.TestCase alreay provides an isolation from os.environ, we use 3687 the clean environment as a base for testing. To precisely capture the 3688 isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to 3689 compare against. 3690 3691 We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`, 3692 not `os.environ` so each test overrides it to suit its needs. 3693 3694 """ 3695 3696 def get_doctest_suite_for_string(self, klass, string): 3697 class Finder(doctest.DocTestFinder): 3698 3699 def find(*args, **kwargs): 3700 test = doctest.DocTestParser().get_doctest( 3701 string, {}, 'foo', 'foo.py', 0) 3702 return [test] 3703 3704 suite = klass(test_finder=Finder()) 3705 return suite 3706 3707 def run_doctest_suite_for_string(self, klass, string): 3708 suite = self.get_doctest_suite_for_string(klass, string) 3709 output = StringIO() 3710 result = tests.TextTestResult(output, 0, 1) 3711 suite.run(result) 3712 return result, output 3713 3714 def assertDocTestStringSucceds(self, klass, string): 3715 result, output = self.run_doctest_suite_for_string(klass, string) 3716 if not result.wasStrictlySuccessful(): 3717 self.fail(output.getvalue()) 3718 3719 def assertDocTestStringFails(self, klass, string): 3720 result, output = self.run_doctest_suite_for_string(klass, string) 3721 if result.wasStrictlySuccessful(): 3722 self.fail(output.getvalue()) 3723 3724 def test_injected_variable(self): 3725 self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'}) 3726 test = """ 3727 >>> import os 3728 >>> os.environ['LINES'] 3729 '42' 3730 """ 3731 # doctest.DocTestSuite fails as it sees '25' 3732 self.assertDocTestStringFails(doctest.DocTestSuite, test) 3733 # tests.DocTestSuite sees '42' 3734 self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test) 3735 3736 def test_deleted_variable(self): 3737 self.overrideAttr(tests, 'isolated_environ', {'LINES': None}) 3738 test = """ 3739 >>> import os 3740 >>> os.environ.get('LINES') 3741 """ 3742 # doctest.DocTestSuite fails as it sees '25' 3743 self.assertDocTestStringFails(doctest.DocTestSuite, test) 3744 # tests.DocTestSuite sees None 3745 self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test) 3746 3747 3748class TestSelftestExcludePatterns(tests.TestCase): 3749 3750 def setUp(self): 3751 super(TestSelftestExcludePatterns, self).setUp() 3752 self.overrideAttr(tests, 'test_suite', self.suite_factory) 3753 3754 def suite_factory(self, keep_only=None, starting_with=None): 3755 """A test suite factory with only a few tests.""" 3756 class Test(tests.TestCase): 3757 def id(self): 3758 # We don't need the full class path 3759 return self._testMethodName 3760 3761 def a(self): 3762 pass 3763 3764 def b(self): 3765 pass 3766 3767 def c(self): 3768 pass 3769 return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")]) 3770 3771 def assertTestList(self, expected, *selftest_args): 3772 # We rely on setUp installing the right test suite factory so we can 3773 # test at the command level without loading the whole test suite 3774 out, err = self.run_bzr(('selftest', '--list') + selftest_args) 3775 actual = out.splitlines() 3776 self.assertEqual(expected, actual) 3777 3778 def test_full_list(self): 3779 self.assertTestList(['a', 'b', 'c']) 3780 3781 def test_single_exclude(self): 3782 self.assertTestList(['b', 'c'], '-x', 'a') 3783 3784 def test_mutiple_excludes(self): 3785 self.assertTestList(['c'], '-x', 'a', '-x', 'b') 3786 3787 3788class TestCounterHooks(tests.TestCase, SelfTestHelper): 3789 3790 _test_needs_features = [features.subunit] 3791 3792 def setUp(self): 3793 super(TestCounterHooks, self).setUp() 3794 3795 class Test(tests.TestCase): 3796 3797 def setUp(self): 3798 super(Test, self).setUp() 3799 self.hooks = hooks.Hooks() 3800 self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4)) 3801 self.install_counter_hook(self.hooks, 'myhook') 3802 3803 def no_hook(self): 3804 pass 3805 3806 def run_hook_once(self): 3807 for hook in self.hooks['myhook']: 3808 hook(self) 3809 3810 self.test_class = Test 3811 3812 def assertHookCalls(self, expected_calls, test_name): 3813 test = self.test_class(test_name) 3814 result = unittest.TestResult() 3815 test.run(result) 3816 self.assertTrue(hasattr(test, '_counters')) 3817 self.assertTrue('myhook' in test._counters) 3818 self.assertEqual(expected_calls, test._counters['myhook']) 3819 3820 def test_no_hook(self): 3821 self.assertHookCalls(0, 'no_hook') 3822 3823 def test_run_hook_once(self): 3824 tt = features.testtools 3825 if tt.module.__version__ < (0, 9, 8): 3826 raise tests.TestSkipped('testtools-0.9.8 required for addDetail') 3827 self.assertHookCalls(1, 'run_hook_once') 3828