1# Copyright (C) 2006-2011 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Tests for repository implementations - tests a repository format.""" 18 19from io import BytesIO 20import re 21 22from ... import ( 23 branch as _mod_branch, 24 commit, 25 controldir, 26 delta as _mod_delta, 27 errors, 28 gpg, 29 info, 30 repository, 31 revision as _mod_revision, 32 tests, 33 transport, 34 upgrade, 35 workingtree, 36 ) 37from ...bzr import ( 38 branch as _mod_bzrbranch, 39 inventory, 40 remote, 41 repository as bzrrepository, 42 ) 43from ...bzr import ( 44 knitpack_repo, 45 ) 46from .. import ( 47 per_repository, 48 test_server, 49 ) 50from ..matchers import * 51 52 53class TestRepositoryMakeBranchAndTree(per_repository.TestCaseWithRepository): 54 55 def test_repository_format(self): 56 # make sure the repository on tree.branch is of the desired format, 57 # because developers use this api to setup the tree, branch and 58 # repository for their tests: having it now give the right repository 59 # type would invalidate the tests. 60 tree = self.make_branch_and_tree('repo') 61 self.assertIsInstance(tree.branch.repository._format, 62 self.repository_format.__class__) 63 64 65class TestRepository(per_repository.TestCaseWithRepository): 66 67 def assertFormatAttribute(self, attribute, allowed_values): 68 """Assert that the format has an attribute 'attribute'.""" 69 repo = self.make_repository('repo') 70 self.assertIn(getattr(repo._format, attribute), allowed_values) 71 72 def assertRepositoryAttribute(self, attribute, allowed_values): 73 """Assert that the repo has an attribute 'attribute'.""" 74 repo = self.make_repository('repo') 75 self.assertIn(getattr(repo, attribute), allowed_values) 76 77 def test_attribute_fast_deltas(self): 78 """Test the format.fast_deltas attribute.""" 79 self.assertFormatAttribute('fast_deltas', (True, False)) 80 81 def test_attribute_supports_nesting_repositories(self): 82 """Test the format.supports_nesting_repositories.""" 83 self.assertFormatAttribute('supports_nesting_repositories', 84 (True, False)) 85 86 def test_attribute_supports_multiple_authors(self): 87 """Test the format.supports_multiple_authors.""" 88 self.assertFormatAttribute('supports_multiple_authors', 89 (True, False)) 90 91 def test_attribute_supports_unreferenced_revisions(self): 92 """Test the format.supports_unreferenced_revisions.""" 93 self.assertFormatAttribute('supports_unreferenced_revisions', 94 (True, False)) 95 96 def test_attribute__fetch_reconcile(self): 97 """Test the _fetch_reconcile attribute.""" 98 self.assertFormatAttribute('_fetch_reconcile', (True, False)) 99 100 def test_attribute_format_experimental(self): 101 self.assertFormatAttribute('experimental', (True, False)) 102 103 def test_attribute_format_pack_compresses(self): 104 self.assertFormatAttribute('pack_compresses', (True, False)) 105 106 def test_attribute_format_supports_full_versioned_files(self): 107 self.assertFormatAttribute('supports_full_versioned_files', 108 (True, False)) 109 110 def test_attribute_format_supports_funky_characters(self): 111 self.assertFormatAttribute('supports_funky_characters', 112 (True, False)) 113 114 def test_attribute_format_supports_leaving_lock(self): 115 self.assertFormatAttribute('supports_leaving_lock', 116 (True, False)) 117 118 def test_attribute_format_versioned_directories(self): 119 self.assertFormatAttribute( 120 'supports_versioned_directories', (True, False)) 121 122 def test_attribute_format_revision_graph_can_have_wrong_parents(self): 123 self.assertFormatAttribute('revision_graph_can_have_wrong_parents', 124 (True, False)) 125 126 def test_attribute_format_supports_random_access(self): 127 self.assertRepositoryAttribute('supports_random_access', (True, False)) 128 129 def test_attribute_format_supports_setting_revision_ids(self): 130 self.assertFormatAttribute('supports_setting_revision_ids', 131 (True, False)) 132 133 def test_attribute_format_supports_storing_branch_nick(self): 134 self.assertFormatAttribute('supports_storing_branch_nick', 135 (True, False)) 136 137 def test_attribute_format_supports_custom_revision_properties(self): 138 self.assertFormatAttribute( 139 'supports_custom_revision_properties', 140 (True, False)) 141 142 def test_attribute_format_supports_overriding_transport(self): 143 repo = self.make_repository('repo') 144 self.assertIn( 145 repo._format.supports_overriding_transport, (True, False)) 146 147 repo.control_transport.copy_tree('.', '../repository.backup') 148 backup_transport = repo.control_transport.clone('../repository.backup') 149 if repo._format.supports_overriding_transport: 150 backup = repo._format.open( 151 repo.controldir, 152 _override_transport=backup_transport) 153 self.assertIs(backup_transport, backup.control_transport) 154 else: 155 self.assertRaises(TypeError, repo._format.open, 156 repo.controldir, _override_transport=backup_transport) 157 158 def test_format_is_deprecated(self): 159 repo = self.make_repository('repo') 160 self.assertIn(repo._format.is_deprecated(), (True, False)) 161 162 def test_format_is_supported(self): 163 repo = self.make_repository('repo') 164 self.assertIn(repo._format.is_supported(), (True, False)) 165 166 def test_attribute_format_records_per_file_revision(self): 167 self.assertFormatAttribute('records_per_file_revision', 168 (True, False)) 169 170 def test_clone_to_default_format(self): 171 # TODO: Test that cloning a repository preserves all the information 172 # such as signatures[not tested yet] etc etc. 173 # when changing to the current default format. 174 tree_a = self.make_branch_and_tree('a') 175 self.build_tree(['a/foo']) 176 tree_a.add('foo') 177 file_id = tree_a.path2id('foo') 178 rev1 = tree_a.commit('rev1') 179 bzrdirb = self.make_controldir('b') 180 repo_b = tree_a.branch.repository.clone(bzrdirb) 181 tree_b = repo_b.revision_tree(rev1) 182 tree_b.lock_read() 183 self.addCleanup(tree_b.unlock) 184 tree_b.get_file_text('foo') 185 repo_b.get_revision(rev1) 186 187 def test_supports_rich_root(self): 188 tree = self.make_branch_and_tree('a') 189 tree.commit('') 190 second_revision = tree.commit('') 191 rev_tree = tree.branch.repository.revision_tree(second_revision) 192 rev_tree.lock_read() 193 self.addCleanup(rev_tree.unlock) 194 root_revision = rev_tree.get_file_revision(u'') 195 rich_root = (root_revision != second_revision) 196 self.assertEqual(rich_root, 197 tree.branch.repository.supports_rich_root()) 198 199 def test_clone_specific_format(self): 200 """todo""" 201 202 def test_format_initialize_find_open(self): 203 # loopback test to check the current format initializes to itself. 204 if not self.repository_format.is_supported(): 205 # unsupported formats are not loopback testable 206 # because the default open will not open them and 207 # they may not be initializable. 208 return 209 # supported formats must be able to init and open 210 t = self.get_transport() 211 readonly_t = self.get_readonly_transport() 212 made_control = self.bzrdir_format.initialize(t.base) 213 made_repo = self.repository_format.initialize(made_control) 214 self.assertEqual(made_control, made_repo.controldir) 215 216 # find it via controldir opening: 217 opened_control = controldir.ControlDir.open(readonly_t.base) 218 direct_opened_repo = opened_control.open_repository() 219 self.assertEqual(direct_opened_repo.__class__, made_repo.__class__) 220 self.assertEqual(opened_control, direct_opened_repo.controldir) 221 222 self.assertIsInstance(direct_opened_repo._format, 223 self.repository_format.__class__) 224 # find it via Repository.open 225 opened_repo = repository.Repository.open(readonly_t.base) 226 self.assertIsInstance(opened_repo, made_repo.__class__) 227 self.assertEqual(made_repo._format.__class__, 228 opened_repo._format.__class__) 229 # if it has a unique id string, can we probe for it ? 230 try: 231 self.repository_format.get_format_string() 232 except NotImplementedError: 233 return 234 self.assertEqual(self.repository_format, 235 bzrrepository.RepositoryFormatMetaDir.find_format(opened_control)) 236 237 def test_format_matchingcontroldir(self): 238 self.assertEqual(self.repository_format, 239 self.repository_format._matchingcontroldir.repository_format) 240 self.assertEqual(self.repository_format, 241 self.bzrdir_format.repository_format) 242 243 def test_format_network_name(self): 244 repo = self.make_repository('r') 245 format = repo._format 246 network_name = format.network_name() 247 self.assertIsInstance(network_name, bytes) 248 # We want to test that the network_name matches the actual format on 249 # disk. For local repositories, that means that using network_name as 250 # a key in the registry gives back the same format. For remote 251 # repositories, that means that the network_name of the 252 # RemoteRepositoryFormat we have locally matches the actual format 253 # present on the remote side. 254 if isinstance(format, remote.RemoteRepositoryFormat): 255 repo._ensure_real() 256 real_repo = repo._real_repository 257 self.assertEqual(real_repo._format.network_name(), network_name) 258 else: 259 registry = repository.network_format_registry 260 looked_up_format = registry.get(network_name) 261 self.assertEqual(format.__class__, looked_up_format.__class__) 262 263 def test_create_repository(self): 264 # bzrdir can construct a repository for itself. 265 if not self.bzrdir_format.is_supported(): 266 # unsupported formats are not loopback testable 267 # because the default open will not open them and 268 # they may not be initializable. 269 return 270 t = self.get_transport() 271 made_control = self.bzrdir_format.initialize(t.base) 272 made_repo = made_control.create_repository() 273 # Check that we have a repository object. 274 made_repo.has_revision(b'foo') 275 self.assertEqual(made_control, made_repo.controldir) 276 277 def test_create_repository_shared(self): 278 # bzrdir can construct a shared repository. 279 if not self.bzrdir_format.is_supported(): 280 # unsupported formats are not loopback testable 281 # because the default open will not open them and 282 # they may not be initializable. 283 return 284 t = self.get_transport() 285 made_control = self.bzrdir_format.initialize(t.base) 286 try: 287 made_repo = made_control.create_repository(shared=True) 288 except errors.IncompatibleFormat: 289 # not all repository formats understand being shared, or 290 # may only be shared in some circumstances. 291 return 292 # Check that we have a repository object. 293 made_repo.has_revision(b'foo') 294 self.assertEqual(made_control, made_repo.controldir) 295 self.assertTrue(made_repo.is_shared()) 296 297 def test_revision_tree(self): 298 wt = self.make_branch_and_tree('.') 299 rev1 = wt.commit('lala!', allow_pointless=True) 300 root_id = wt.path2id('') 301 tree = wt.branch.repository.revision_tree(rev1) 302 with tree.lock_read(): 303 self.assertEqual(rev1, tree.get_file_revision(u'')) 304 expected = inventory.InventoryDirectory(root_id, '', None) 305 expected.revision = rev1 306 self.assertEqual([('', 'V', 'directory', expected)], 307 list(tree.list_files(include_root=True))) 308 self.assertRaises(ValueError, wt.branch.repository.revision_tree, None) 309 tree = wt.branch.repository.revision_tree(_mod_revision.NULL_REVISION) 310 with tree.lock_read(): 311 self.assertEqual([], list(tree.list_files(include_root=True))) 312 313 def test_get_revision_delta(self): 314 tree_a = self.make_branch_and_tree('a') 315 self.build_tree(['a/foo']) 316 tree_a.add('foo') 317 rev1 = tree_a.commit('rev1') 318 self.build_tree(['a/vla']) 319 tree_a.add('vla') 320 rev2 = tree_a.commit('rev2') 321 322 delta = tree_a.branch.repository.get_revision_delta(rev1) 323 self.assertIsInstance(delta, _mod_delta.TreeDelta) 324 self.assertEqual([('foo', 'file')], [(c.path[1], c.kind[1]) for c in delta.added]) 325 delta = tree_a.branch.repository.get_revision_delta(rev2) 326 self.assertIsInstance(delta, _mod_delta.TreeDelta) 327 self.assertEqual([('vla', 'file')], [(c.path[1], c.kind[1]) for c in delta.added]) 328 329 def test_clone_bzrdir_repository_revision(self): 330 # make a repository with some revisions, 331 # and clone it, this should not have unreferenced revisions. 332 # also: test cloning with a revision id of NULL_REVISION -> empty repo. 333 raise tests.TestSkipped('revision limiting is not implemented yet.') 334 335 def test_clone_repository_basis_revision(self): 336 raise tests.TestSkipped( 337 'the use of a basis should not add noise data to the result.') 338 339 def test_clone_shared_no_tree(self): 340 # cloning a shared repository keeps it shared 341 # and preserves the make_working_tree setting. 342 made_control = self.make_controldir('source') 343 try: 344 made_repo = made_control.create_repository(shared=True) 345 except errors.IncompatibleFormat: 346 # not all repository formats understand being shared, or 347 # may only be shared in some circumstances. 348 return 349 try: 350 made_repo.set_make_working_trees(False) 351 except errors.UnsupportedOperation: 352 # the repository does not support having its tree-making flag 353 # toggled. 354 return 355 result = made_control.clone(self.get_url('target')) 356 # Check that we have a repository object. 357 made_repo.has_revision(b'foo') 358 359 self.assertEqual(made_control, made_repo.controldir) 360 self.assertTrue(result.open_repository().is_shared()) 361 self.assertFalse(result.open_repository().make_working_trees()) 362 363 def test_upgrade_preserves_signatures(self): 364 if not self.repository_format.supports_revision_signatures: 365 raise tests.TestNotApplicable( 366 "repository does not support signing revisions") 367 wt = self.make_branch_and_tree('source') 368 a = wt.commit('A', allow_pointless=True) 369 repo = wt.branch.repository 370 repo.lock_write() 371 repo.start_write_group() 372 try: 373 repo.sign_revision(a, gpg.LoopbackGPGStrategy(None)) 374 except errors.UnsupportedOperation: 375 self.assertFalse(repo._format.supports_revision_signatures) 376 raise tests.TestNotApplicable( 377 "signatures not supported by repository format") 378 repo.commit_write_group() 379 repo.unlock() 380 old_signature = repo.get_signature_text(a) 381 try: 382 old_format = controldir.ControlDirFormat.get_default_format() 383 # This gives metadir branches something they can convert to. 384 # it would be nice to have a 'latest' vs 'default' concept. 385 format = controldir.format_registry.make_controldir( 386 'development-subtree') 387 upgrade.upgrade(repo.controldir.root_transport.base, format=format) 388 except errors.UpToDateFormat: 389 # this is in the most current format already. 390 return 391 except errors.BadConversionTarget as e: 392 raise tests.TestSkipped(str(e)) 393 wt = workingtree.WorkingTree.open(wt.basedir) 394 new_signature = wt.branch.repository.get_signature_text(a) 395 self.assertEqual(old_signature, new_signature) 396 397 def test_format_description(self): 398 repo = self.make_repository('.') 399 text = repo._format.get_format_description() 400 self.assertTrue(len(text)) 401 402 def test_format_supports_external_lookups(self): 403 repo = self.make_repository('.') 404 self.assertIn(repo._format.supports_external_lookups, (True, False)) 405 406 def assertMessageRoundtrips(self, message): 407 """Assert that message roundtrips to a repository and back intact.""" 408 tree = self.make_branch_and_tree('.') 409 a = tree.commit(message, allow_pointless=True) 410 rev = tree.branch.repository.get_revision(a) 411 serializer = getattr(tree.branch.repository, "_serializer", None) 412 if serializer is not None and serializer.squashes_xml_invalid_characters: 413 # we have to manually escape this as we dont try to 414 # roundtrip xml invalid characters in the xml-based serializers. 415 escaped_message, escape_count = re.subn( 416 u'[^\x09\x0A\x0D\u0020-\uD7FF\uE000-\uFFFD]+', 417 lambda match: match.group(0).encode( 418 'unicode_escape').decode('ascii'), 419 message) 420 self.assertEqual(rev.message, escaped_message) 421 else: 422 self.assertEqual(rev.message, message) 423 # insist the class is unicode no matter what came in for 424 # consistency. 425 self.assertIsInstance(rev.message, str) 426 427 def test_commit_unicode_message(self): 428 # a siple unicode message should be preserved 429 self.assertMessageRoundtrips(u'foo bar gamm\xae plop') 430 431 def test_commit_unicode_control_characters(self): 432 # a unicode message with control characters should roundtrip too. 433 unichars = [chr(x) for x in range(256)] 434 # '\r' is not directly allowed anymore, as it used to be translated 435 # into '\n' anyway 436 unichars[ord('\r')] = u'\n' 437 self.assertMessageRoundtrips( 438 u"All 8-bit chars: " + ''.join(unichars)) 439 440 def test_check_repository(self): 441 """Check a fairly simple repository's history""" 442 tree = self.make_branch_and_tree('.') 443 a_rev = tree.commit('initial empty commit', allow_pointless=True) 444 result = tree.branch.repository.check() 445 # writes to log; should accept both verbose or non-verbose 446 result.report_results(verbose=True) 447 result.report_results(verbose=False) 448 449 def test_get_revisions(self): 450 tree = self.make_branch_and_tree('.') 451 a_rev = tree.commit('initial empty commit', allow_pointless=True) 452 b_rev = tree.commit('second empty commit', allow_pointless=True) 453 c_rev = tree.commit('third empty commit', allow_pointless=True) 454 repo = tree.branch.repository 455 revision_ids = [a_rev, b_rev, c_rev] 456 revisions = repo.get_revisions(revision_ids) 457 self.assertEqual(len(revisions), 3) 458 zipped = list(zip(revisions, revision_ids)) 459 self.assertEqual(len(zipped), 3) 460 for revision, revision_id in zipped: 461 self.assertEqual(revision.revision_id, revision_id) 462 self.assertEqual(revision, repo.get_revision(revision_id)) 463 464 def test_iter_revisions(self): 465 tree = self.make_branch_and_tree('.') 466 a_rev = tree.commit('initial empty commit', allow_pointless=True) 467 b_rev = tree.commit('second empty commit', allow_pointless=True) 468 c_rev = tree.commit('third empty commit', allow_pointless=True) 469 d_rev = b'd-rev' 470 repo = tree.branch.repository 471 revision_ids = [a_rev, c_rev, b_rev, d_rev] 472 revid_with_rev = repo.iter_revisions(revision_ids) 473 self.assertEqual( 474 set((revid, rev.revision_id if rev is not None else None) 475 for (revid, rev) in revid_with_rev), 476 {(a_rev, a_rev), 477 (b_rev, b_rev), 478 (c_rev, c_rev), 479 (d_rev, None)}) 480 481 def test_root_entry_has_revision(self): 482 tree = self.make_branch_and_tree('.') 483 revid = tree.commit('message') 484 rev_tree = tree.branch.repository.revision_tree(tree.last_revision()) 485 rev_tree.lock_read() 486 self.addCleanup(rev_tree.unlock) 487 self.assertEqual(revid, rev_tree.get_file_revision(u'')) 488 489 def test_pointless_commit(self): 490 tree = self.make_branch_and_tree('.') 491 self.assertRaises(commit.PointlessCommit, tree.commit, 'pointless', 492 allow_pointless=False) 493 tree.commit('pointless', allow_pointless=True) 494 495 def test_format_attributes(self): 496 """All repository formats should have some basic attributes.""" 497 # create a repository to get a real format instance, not the 498 # template from the test suite parameterization. 499 repo = self.make_repository('.') 500 repo._format.rich_root_data 501 repo._format.supports_tree_reference 502 503 def test_iter_files_bytes(self): 504 tree = self.make_branch_and_tree('tree') 505 self.build_tree_contents([('tree/file1', b'foo'), 506 ('tree/file2', b'bar')]) 507 tree.add(['file1', 'file2']) 508 file1_id = tree.path2id('file1') 509 file2_id = tree.path2id('file2') 510 rev1 = tree.commit('rev1') 511 self.build_tree_contents([('tree/file1', b'baz')]) 512 rev2 = tree.commit('rev2') 513 repository = tree.branch.repository 514 repository.lock_read() 515 self.addCleanup(repository.unlock) 516 extracted = dict((i, b''.join(b)) for i, b in 517 repository.iter_files_bytes( 518 [(file1_id, rev1, 'file1-old'), 519 (file1_id, rev2, 'file1-new'), 520 (file2_id, rev1, 'file2'), 521 ])) 522 self.assertEqual(b'foo', extracted['file1-old']) 523 self.assertEqual(b'bar', extracted['file2']) 524 self.assertEqual(b'baz', extracted['file1-new']) 525 self.assertRaises(errors.RevisionNotPresent, list, 526 repository.iter_files_bytes( 527 [(file1_id, b'rev3', 'file1-notpresent')])) 528 self.assertRaises((errors.RevisionNotPresent, errors.NoSuchId), list, 529 repository.iter_files_bytes( 530 [(b'file3-id', b'rev3', 'file1-notpresent')])) 531 532 def test_get_graph(self): 533 """Bare-bones smoketest that all repositories implement get_graph.""" 534 repo = self.make_repository('repo') 535 repo.lock_read() 536 self.addCleanup(repo.unlock) 537 repo.get_graph() 538 539 def test_graph_ghost_handling(self): 540 if not self.repository_format.supports_ghosts: 541 raise tests.TestNotApplicable('format does not support ghosts') 542 tree = self.make_branch_and_tree('here') 543 tree.lock_write() 544 self.addCleanup(tree.unlock) 545 rev1 = tree.commit('initial commit') 546 tree.add_parent_tree_id(b'ghost') 547 rev2 = tree.commit('commit-with-ghost') 548 graph = tree.branch.repository.get_graph() 549 parents = graph.get_parent_map([b'ghost', rev2]) 550 self.assertTrue(b'ghost' not in parents) 551 self.assertEqual(parents[rev2], (rev1, b'ghost')) 552 553 def test_get_known_graph_ancestry(self): 554 tree = self.make_branch_and_tree('here') 555 tree.lock_write() 556 self.addCleanup(tree.unlock) 557 # A 558 # |\ 559 # | B 560 # |/ 561 # C 562 a = tree.commit('initial commit') 563 tree_other = tree.controldir.sprout('there').open_workingtree() 564 b = tree_other.commit('another') 565 tree.merge_from_branch(tree_other.branch) 566 c = tree.commit('another') 567 kg = tree.branch.repository.get_known_graph_ancestry( 568 [c]) 569 self.assertEqual([c], list(kg.heads([a, b, c]))) 570 self.assertEqual([a, b, c], list(kg.topo_sort())) 571 572 def test_parent_map_type(self): 573 tree = self.make_branch_and_tree('here') 574 tree.lock_write() 575 self.addCleanup(tree.unlock) 576 rev1 = tree.commit('initial commit') 577 rev2 = tree.commit('next commit') 578 graph = tree.branch.repository.get_graph() 579 parents = graph.get_parent_map( 580 [_mod_revision.NULL_REVISION, rev1, rev2]) 581 for value in parents.values(): 582 self.assertIsInstance(value, tuple) 583 584 def test_implements_revision_graph_can_have_wrong_parents(self): 585 """All repositories should implement 586 revision_graph_can_have_wrong_parents, so that check and reconcile can 587 work correctly. 588 """ 589 repo = self.make_repository('.') 590 # This should work, not raise NotImplementedError: 591 if not repo._format.revision_graph_can_have_wrong_parents: 592 return 593 repo.lock_read() 594 self.addCleanup(repo.unlock) 595 # This repo must also implement 596 # _find_inconsistent_revision_parents and 597 # _check_for_inconsistent_revision_parents. So calling these 598 # should not raise NotImplementedError. 599 list(repo._find_inconsistent_revision_parents()) 600 repo._check_for_inconsistent_revision_parents() 601 602 def test_add_signature_text(self): 603 builder = self.make_branch_builder('.') 604 builder.start_series() 605 rev_a = builder.build_snapshot(None, [ 606 ('add', ('', None, 'directory', None))]) 607 builder.finish_series() 608 b = builder.get_branch() 609 b.lock_write() 610 self.addCleanup(b.unlock) 611 if b.repository._format.supports_revision_signatures: 612 b.repository.start_write_group() 613 b.repository.add_signature_text( 614 rev_a, b'This might be a signature') 615 b.repository.commit_write_group() 616 self.assertEqual(b'This might be a signature', 617 b.repository.get_signature_text(rev_a)) 618 else: 619 b.repository.start_write_group() 620 self.addCleanup(b.repository.abort_write_group) 621 self.assertRaises(errors.UnsupportedOperation, 622 b.repository.add_signature_text, rev_a, 623 b'This might be a signature') 624 625 # XXX: this helper duplicated from tests.test_repository 626 def make_remote_repository(self, path, shared=None): 627 """Make a RemoteRepository object backed by a real repository that will 628 be created at the given path.""" 629 repo = self.make_repository(path, shared=shared) 630 smart_server = test_server.SmartTCPServer_for_testing() 631 self.start_server(smart_server, self.get_server()) 632 remote_transport = transport.get_transport_from_url( 633 smart_server.get_url()).clone(path) 634 if not repo.controldir._format.supports_transport(remote_transport): 635 raise tests.TestNotApplicable( 636 "format does not support transport") 637 remote_bzrdir = controldir.ControlDir.open_from_transport( 638 remote_transport) 639 remote_repo = remote_bzrdir.open_repository() 640 return remote_repo 641 642 def test_sprout_from_hpss_preserves_format(self): 643 """repo.sprout from a smart server preserves the repository format.""" 644 remote_repo = self.make_remote_repository('remote') 645 local_bzrdir = self.make_controldir('local') 646 try: 647 local_repo = remote_repo.sprout(local_bzrdir) 648 except errors.TransportNotPossible: 649 raise tests.TestNotApplicable( 650 "Cannot lock_read old formats like AllInOne over HPSS.") 651 remote_backing_repo = controldir.ControlDir.open( 652 self.get_vfs_only_url('remote')).open_repository() 653 self.assertEqual( 654 remote_backing_repo._format.network_name(), 655 local_repo._format.network_name()) 656 657 def test_sprout_branch_from_hpss_preserves_repo_format(self): 658 """branch.sprout from a smart server preserves the repository format. 659 """ 660 if not self.repository_format.supports_leaving_lock: 661 raise tests.TestNotApplicable( 662 "Format can not be used over HPSS") 663 remote_repo = self.make_remote_repository('remote') 664 remote_branch = remote_repo.controldir.create_branch() 665 try: 666 local_bzrdir = remote_branch.controldir.sprout('local') 667 except errors.TransportNotPossible: 668 raise tests.TestNotApplicable( 669 "Cannot lock_read old formats like AllInOne over HPSS.") 670 local_repo = local_bzrdir.open_repository() 671 remote_backing_repo = controldir.ControlDir.open( 672 self.get_vfs_only_url('remote')).open_repository() 673 self.assertEqual(remote_backing_repo._format, local_repo._format) 674 675 def test_sprout_branch_from_hpss_preserves_shared_repo_format(self): 676 """branch.sprout from a smart server preserves the repository format of 677 a branch from a shared repository. 678 """ 679 if not self.repository_format.supports_leaving_lock: 680 raise tests.TestNotApplicable( 681 "Format can not be used over HPSS") 682 # Make a shared repo 683 remote_repo = self.make_remote_repository('remote', shared=True) 684 remote_backing_repo = controldir.ControlDir.open( 685 self.get_vfs_only_url('remote')).open_repository() 686 # Make a branch in that repo in an old format that isn't the default 687 # branch format for the repo. 688 from breezy.bzr.fullhistory import BzrBranchFormat5 689 format = remote_backing_repo.controldir.cloning_metadir() 690 format._branch_format = BzrBranchFormat5() 691 remote_transport = remote_repo.controldir.root_transport.clone( 692 'branch') 693 controldir.ControlDir.create_branch_convenience( 694 remote_transport.base, force_new_repo=False, format=format) 695 remote_branch = controldir.ControlDir.open_from_transport( 696 remote_transport).open_branch() 697 try: 698 local_bzrdir = remote_branch.controldir.sprout('local') 699 except errors.TransportNotPossible: 700 raise tests.TestNotApplicable( 701 "Cannot lock_read old formats like AllInOne over HPSS.") 702 local_repo = local_bzrdir.open_repository() 703 self.assertEqual(remote_backing_repo._format, local_repo._format) 704 705 def test_clone_to_hpss(self): 706 if not self.repository_format.supports_leaving_lock: 707 raise tests.TestNotApplicable( 708 "Cannot lock pre_metadir_formats remotely.") 709 remote_transport = self.make_smart_server('remote') 710 local_branch = self.make_branch('local') 711 remote_branch = local_branch.create_clone_on_transport( 712 remote_transport) 713 self.assertEqual( 714 local_branch.repository._format.supports_external_lookups, 715 remote_branch.repository._format.supports_external_lookups) 716 717 def test_clone_stacking_policy_upgrades(self): 718 """Cloning an unstackable branch format to somewhere with a default 719 stack-on branch upgrades branch and repo to match the target and honour 720 the policy. 721 """ 722 try: 723 repo = self.make_repository('repo', shared=True) 724 except errors.IncompatibleFormat: 725 raise tests.TestNotApplicable('Cannot make a shared repository') 726 if repo.controldir._format.fixed_components: 727 self.knownFailure( 728 "pre metadir branches do not upgrade on push " 729 "with stacking policy") 730 if isinstance(repo._format, 731 knitpack_repo.RepositoryFormatKnitPack5RichRootBroken): 732 raise tests.TestNotApplicable("unsupported format") 733 # Make a source branch in 'repo' in an unstackable branch format 734 bzrdir_format = self.repository_format._matchingcontroldir 735 transport = self.get_transport('repo/branch') 736 transport.mkdir('.') 737 target_bzrdir = bzrdir_format.initialize_on_transport(transport) 738 branch = _mod_bzrbranch.BzrBranchFormat6().initialize(target_bzrdir) 739 # Ensure that stack_on will be stackable and match the serializer of 740 # repo. 741 if isinstance(repo, remote.RemoteRepository): 742 repo._ensure_real() 743 info_repo = repo._real_repository 744 else: 745 info_repo = repo 746 format_description = info.describe_format(info_repo.controldir, 747 info_repo, None, None) 748 formats = format_description.split(' or ') 749 stack_on_format = formats[0] 750 if stack_on_format in ["pack-0.92", "dirstate", "metaweave"]: 751 stack_on_format = "1.9" 752 elif stack_on_format in ["dirstate-with-subtree", "rich-root", 753 "rich-root-pack", "pack-0.92-subtree"]: 754 stack_on_format = "1.9-rich-root" 755 # formats not tested for above are already stackable, so we can use the 756 # format as-is. 757 stack_on = self.make_branch('stack-on-me', format=stack_on_format) 758 self.make_controldir('.').get_config( 759 ).set_default_stack_on('stack-on-me') 760 target = branch.controldir.clone(self.get_url('target')) 761 # The target branch supports stacking. 762 self.assertTrue(target.open_branch()._format.supports_stacking()) 763 if isinstance(repo, remote.RemoteRepository): 764 repo._ensure_real() 765 repo = repo._real_repository 766 target_repo = target.open_repository() 767 if isinstance(target_repo, remote.RemoteRepository): 768 target_repo._ensure_real() 769 target_repo = target_repo._real_repository 770 # The repository format is unchanged if it could already stack, or the 771 # same as the stack on. 772 if repo._format.supports_external_lookups: 773 self.assertEqual(repo._format, target_repo._format) 774 else: 775 self.assertEqual(stack_on.repository._format, target_repo._format) 776 777 def test__make_parents_provider(self): 778 """Repositories must have a _make_parents_provider method that returns 779 an object with a get_parent_map method. 780 """ 781 repo = self.make_repository('repo') 782 repo._make_parents_provider().get_parent_map 783 784 def make_repository_and_foo_bar(self, shared=None): 785 made_control = self.make_controldir('repository') 786 repo = made_control.create_repository(shared=shared) 787 if not repo._format.supports_nesting_repositories: 788 raise tests.TestNotApplicable("repository does not support " 789 "nesting repositories") 790 controldir.ControlDir.create_branch_convenience( 791 self.get_url('repository/foo'), force_new_repo=False) 792 controldir.ControlDir.create_branch_convenience( 793 self.get_url('repository/bar'), force_new_repo=True) 794 baz = self.make_controldir('repository/baz') 795 qux = self.make_branch('repository/baz/qux') 796 quxx = self.make_branch('repository/baz/qux/quxx') 797 return repo 798 799 def test_find_branches(self): 800 repo = self.make_repository_and_foo_bar() 801 branches = list(repo.find_branches()) 802 self.assertContainsRe(branches[-1].base, 'repository/foo/$') 803 self.assertContainsRe(branches[-3].base, 'repository/baz/qux/$') 804 self.assertContainsRe(branches[-2].base, 'repository/baz/qux/quxx/$') 805 # in some formats, creating a repo creates a branch 806 if len(branches) == 6: 807 self.assertContainsRe(branches[-4].base, 'repository/baz/$') 808 self.assertContainsRe(branches[-5].base, 'repository/bar/$') 809 self.assertContainsRe(branches[-6].base, 'repository/$') 810 else: 811 self.assertEqual(4, len(branches)) 812 self.assertContainsRe(branches[-4].base, 'repository/bar/$') 813 814 def test_find_branches_using(self): 815 try: 816 repo = self.make_repository_and_foo_bar(shared=True) 817 except errors.IncompatibleFormat: 818 raise tests.TestNotApplicable 819 branches = list(repo.find_branches(using=True)) 820 self.assertContainsRe(branches[-1].base, 'repository/foo/$') 821 # in some formats, creating a repo creates a branch 822 if len(branches) == 2: 823 self.assertContainsRe(branches[-2].base, 'repository/$') 824 else: 825 self.assertEqual(1, len(branches)) 826 827 def test_find_branches_using_standalone(self): 828 branch = self.make_branch('branch') 829 if not branch.repository._format.supports_nesting_repositories: 830 raise tests.TestNotApplicable("format does not support nesting " 831 "repositories") 832 contained = self.make_branch('branch/contained') 833 branches = branch.repository.find_branches(using=True) 834 self.assertEqual([branch.base], [b.base for b in branches]) 835 branches = branch.repository.find_branches(using=False) 836 self.assertEqual([branch.base, contained.base], 837 [b.base for b in branches]) 838 839 def test_find_branches_using_empty_standalone_repo(self): 840 try: 841 repo = self.make_repository('repo', shared=False) 842 except errors.IncompatibleFormat: 843 raise tests.TestNotApplicable("format does not support standalone " 844 "repositories") 845 try: 846 repo.controldir.open_branch() 847 except errors.NotBranchError: 848 self.assertEqual([], list(repo.find_branches(using=True))) 849 else: 850 self.assertEqual([repo.controldir.root_transport.base], 851 [b.base for b in repo.find_branches(using=True)]) 852 853 def test_set_get_make_working_trees_true(self): 854 repo = self.make_repository('repo') 855 try: 856 repo.set_make_working_trees(True) 857 except (errors.RepositoryUpgradeRequired, errors.UnsupportedOperation) as e: 858 raise tests.TestNotApplicable('Format does not support this flag.') 859 self.assertTrue(repo.make_working_trees()) 860 861 def test_set_get_make_working_trees_false(self): 862 repo = self.make_repository('repo') 863 try: 864 repo.set_make_working_trees(False) 865 except (errors.RepositoryUpgradeRequired, errors.UnsupportedOperation) as e: 866 raise tests.TestNotApplicable('Format does not support this flag.') 867 self.assertFalse(repo.make_working_trees()) 868 869 870class TestRepositoryLocking(per_repository.TestCaseWithRepository): 871 872 def test_leave_lock_in_place(self): 873 repo = self.make_repository('r') 874 # Lock the repository, then use leave_lock_in_place so that when we 875 # unlock the repository the lock is still held on disk. 876 token = repo.lock_write().repository_token 877 try: 878 if token is None: 879 # This test does not apply, because this repository refuses lock 880 # tokens. 881 self.assertRaises(NotImplementedError, 882 repo.leave_lock_in_place) 883 return 884 repo.leave_lock_in_place() 885 finally: 886 repo.unlock() 887 # We should be unable to relock the repo. 888 self.assertRaises(errors.LockContention, repo.lock_write) 889 # Cleanup 890 repo.lock_write(token) 891 repo.dont_leave_lock_in_place() 892 repo.unlock() 893 894 def test_dont_leave_lock_in_place(self): 895 repo = self.make_repository('r') 896 # Create a lock on disk. 897 token = repo.lock_write().repository_token 898 try: 899 if token is None: 900 # This test does not apply, because this repository refuses lock 901 # tokens. 902 self.assertRaises(NotImplementedError, 903 repo.dont_leave_lock_in_place) 904 return 905 try: 906 repo.leave_lock_in_place() 907 except NotImplementedError: 908 # This repository doesn't support this API. 909 return 910 finally: 911 repo.unlock() 912 # Reacquire the lock (with a different repository object) by using the 913 # token. 914 new_repo = repo.controldir.open_repository() 915 new_repo.lock_write(token=token) 916 # Call dont_leave_lock_in_place, so that the lock will be released by 917 # this instance, even though the lock wasn't originally acquired by it. 918 new_repo.dont_leave_lock_in_place() 919 new_repo.unlock() 920 # Now the repository is unlocked. Test this by locking it (without a 921 # token). 922 repo.lock_write() 923 repo.unlock() 924 925 def test_lock_read_then_unlock(self): 926 # Calling lock_read then unlocking should work without errors. 927 repo = self.make_repository('r') 928 repo.lock_read() 929 repo.unlock() 930 931 def test_lock_read_returns_unlockable(self): 932 repo = self.make_repository('r') 933 self.assertThat(repo.lock_read, ReturnsUnlockable(repo)) 934 935 def test_lock_write_returns_unlockable(self): 936 repo = self.make_repository('r') 937 self.assertThat(repo.lock_write, ReturnsUnlockable(repo)) 938 939 940# FIXME: document why this is a TestCaseWithTransport rather than a 941# TestCaseWithRepository 942class TestEscaping(tests.TestCaseWithTransport): 943 """Test that repositories can be stored correctly on VFAT transports. 944 945 Makes sure we have proper escaping of invalid characters, etc. 946 947 It'd be better to test all operations on the FakeVFATTransportDecorator, 948 but working trees go straight to the os not through the Transport layer. 949 Therefore we build some history first in the regular way and then 950 check it's safe to access for vfat. 951 """ 952 953 def test_on_vfat(self): 954 # dont bother with remote repository testing, because this test is 955 # about local disk layout/support. 956 if isinstance(self.repository_format, remote.RemoteRepositoryFormat): 957 return 958 self.transport_server = test_server.FakeVFATServer 959 FOO_ID = b'foo<:>ID' 960 # this makes a default format repository always, which is wrong: 961 # it should be a TestCaseWithRepository in order to get the 962 # default format. 963 wt = self.make_branch_and_tree('repo') 964 if not wt.supports_setting_file_ids(): 965 self.skip("format does not support setting file ids") 966 self.build_tree(["repo/foo"], line_endings='binary') 967 # add file with id containing wierd characters 968 wt.add(['foo'], [FOO_ID]) 969 rev1 = wt.commit('this is my new commit') 970 # now access over vfat; should be safe 971 branch = controldir.ControlDir.open(self.get_url('repo')).open_branch() 972 revtree = branch.repository.revision_tree(rev1) 973 revtree.lock_read() 974 self.addCleanup(revtree.unlock) 975 contents = revtree.get_file_text('foo') 976 self.assertEqual(contents, b'contents of repo/foo\n') 977 978 def test_create_bundle(self): 979 wt = self.make_branch_and_tree('repo') 980 self.build_tree(['repo/file1']) 981 wt.add('file1') 982 rev1 = wt.commit('file1') 983 fileobj = BytesIO() 984 wt.branch.repository.create_bundle( 985 rev1, _mod_revision.NULL_REVISION, fileobj) 986 987 988class TestRepositoryControlComponent(per_repository.TestCaseWithRepository): 989 """Repository implementations adequately implement ControlComponent.""" 990 991 def test_urls(self): 992 repo = self.make_repository('repo') 993 self.assertIsInstance(repo.user_url, str) 994 self.assertEqual(repo.user_url, repo.user_transport.base) 995 # for all current bzrdir implementations the user dir must be 996 # above the control dir but we might need to relax that? 997 self.assertEqual(repo.control_url.find(repo.user_url), 0) 998 self.assertEqual(repo.control_url, repo.control_transport.base) 999 1000 1001class TestDeltaRevisionFilesFiltered(per_repository.TestCaseWithRepository): 1002 1003 def setUp(self): 1004 super(TestDeltaRevisionFilesFiltered, self).setUp() 1005 self.tree_a = self.make_branch_and_tree('a') 1006 self.build_tree( 1007 ['a/foo', 'a/bar/', 'a/bar/b1', 'a/bar/b2', 'a/baz', 'a/oldname']) 1008 self.tree_a.add(['foo', 'bar', 'bar/b1', 'bar/b2', 'baz', 'oldname']) 1009 self.rev1 = self.tree_a.commit('rev1') 1010 self.build_tree(['a/bar/b3']) 1011 self.tree_a.add('bar/b3') 1012 self.tree_a.rename_one('oldname', 'newname') 1013 self.rev2 = self.tree_a.commit('rev2') 1014 self.repository = self.tree_a.branch.repository 1015 self.addCleanup(self.repository.lock_read().unlock) 1016 1017 def test_multiple_files(self): 1018 # Test multiple files 1019 delta = list(self.repository.get_revision_deltas( 1020 [self.repository.get_revision(self.rev1)], specific_files=[ 1021 'foo', 'baz']))[0] 1022 self.assertIsInstance(delta, _mod_delta.TreeDelta) 1023 self.assertEqual([ 1024 ('baz', 'file'), 1025 ('foo', 'file'), 1026 ], [(c.path[1], c.kind[1]) for c in delta.added]) 1027 1028 def test_directory(self): 1029 # Test a directory 1030 delta = list(self.repository.get_revision_deltas( 1031 [self.repository.get_revision(self.rev1)], 1032 specific_files=['bar']))[0] 1033 self.assertIsInstance(delta, _mod_delta.TreeDelta) 1034 self.assertEqual([ 1035 ('bar', 'directory'), 1036 ('bar/b1', 'file'), 1037 ('bar/b2', 'file'), 1038 ], [(c.path[1], c.kind[1]) for c in delta.added]) 1039 1040 def test_unrelated(self): 1041 # Try another revision 1042 delta = list(self.repository.get_revision_deltas( 1043 [self.repository.get_revision(self.rev2)], 1044 specific_files=['foo']))[0] 1045 self.assertIsInstance(delta, _mod_delta.TreeDelta) 1046 self.assertEqual([], delta.added) 1047 1048 def test_renamed(self): 1049 # Try another revision 1050 self.assertTrue( 1051 self.repository.revision_tree(self.rev2).has_filename('newname')) 1052 self.assertTrue( 1053 self.repository.revision_tree(self.rev1).has_filename('oldname')) 1054 revs = [ 1055 self.repository.get_revision(self.rev2), 1056 self.repository.get_revision(self.rev1)] 1057 delta2, delta1 = list(self.repository.get_revision_deltas( 1058 revs, specific_files=['newname'])) 1059 self.assertIsInstance(delta1, _mod_delta.TreeDelta) 1060 self.assertEqual([('oldname', 'newname')], [c.path for c in delta2.renamed]) 1061 self.assertIsInstance(delta2, _mod_delta.TreeDelta) 1062 self.assertEqual(['oldname'], [c.path[1] for c in delta1.added]) 1063 1064 def test_file_in_directory(self): 1065 # Test a file in a directory, both of which were added 1066 delta = list(self.repository.get_revision_deltas( 1067 [self.repository.get_revision(self.rev1)], 1068 specific_files=['bar/b2']))[0] 1069 self.assertIsInstance(delta, _mod_delta.TreeDelta) 1070 self.assertEqual([ 1071 ('bar', 'directory'), 1072 ('bar/b2', 'file'), 1073 ], [(c.path[1], c.kind[1]) for c in delta.added]) 1074 1075 def test_file_in_unchanged_directory(self): 1076 delta = list(self.repository.get_revision_deltas( 1077 [self.repository.get_revision(self.rev2)], 1078 specific_files=['bar/b3']))[0] 1079 self.assertIsInstance(delta, _mod_delta.TreeDelta) 1080 if [(c.path[1], c.kind[1]) for c in delta.added] == [ 1081 ('bar', 'directory'), ('bar/b3', 'file')]: 1082 self.knownFailure("bzr incorrectly reports 'bar' as added - " 1083 "bug 878217") 1084 self.assertEqual([ 1085 ('bar/b3', 'file'), 1086 ], [(c.path[1], c.kind[1]) for c in delta.added]) 1087