1# Copyright (C) 2006-2011 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for repository implementations - tests a repository format."""
18
19from io import BytesIO
20import re
21
22from ... import (
23    branch as _mod_branch,
24    commit,
25    controldir,
26    delta as _mod_delta,
27    errors,
28    gpg,
29    info,
30    repository,
31    revision as _mod_revision,
32    tests,
33    transport,
34    upgrade,
35    workingtree,
36    )
37from ...bzr import (
38    branch as _mod_bzrbranch,
39    inventory,
40    remote,
41    repository as bzrrepository,
42    )
43from ...bzr import (
44    knitpack_repo,
45    )
46from .. import (
47    per_repository,
48    test_server,
49    )
50from ..matchers import *
51
52
53class TestRepositoryMakeBranchAndTree(per_repository.TestCaseWithRepository):
54
55    def test_repository_format(self):
56        # make sure the repository on tree.branch is of the desired format,
57        # because developers use this api to setup the tree, branch and
58        # repository for their tests: having it now give the right repository
59        # type would invalidate the tests.
60        tree = self.make_branch_and_tree('repo')
61        self.assertIsInstance(tree.branch.repository._format,
62                              self.repository_format.__class__)
63
64
65class TestRepository(per_repository.TestCaseWithRepository):
66
67    def assertFormatAttribute(self, attribute, allowed_values):
68        """Assert that the format has an attribute 'attribute'."""
69        repo = self.make_repository('repo')
70        self.assertIn(getattr(repo._format, attribute), allowed_values)
71
72    def assertRepositoryAttribute(self, attribute, allowed_values):
73        """Assert that the repo has an attribute 'attribute'."""
74        repo = self.make_repository('repo')
75        self.assertIn(getattr(repo, attribute), allowed_values)
76
77    def test_attribute_fast_deltas(self):
78        """Test the format.fast_deltas attribute."""
79        self.assertFormatAttribute('fast_deltas', (True, False))
80
81    def test_attribute_supports_nesting_repositories(self):
82        """Test the format.supports_nesting_repositories."""
83        self.assertFormatAttribute('supports_nesting_repositories',
84                                   (True, False))
85
86    def test_attribute_supports_multiple_authors(self):
87        """Test the format.supports_multiple_authors."""
88        self.assertFormatAttribute('supports_multiple_authors',
89                                   (True, False))
90
91    def test_attribute_supports_unreferenced_revisions(self):
92        """Test the format.supports_unreferenced_revisions."""
93        self.assertFormatAttribute('supports_unreferenced_revisions',
94                                   (True, False))
95
96    def test_attribute__fetch_reconcile(self):
97        """Test the _fetch_reconcile attribute."""
98        self.assertFormatAttribute('_fetch_reconcile', (True, False))
99
100    def test_attribute_format_experimental(self):
101        self.assertFormatAttribute('experimental', (True, False))
102
103    def test_attribute_format_pack_compresses(self):
104        self.assertFormatAttribute('pack_compresses', (True, False))
105
106    def test_attribute_format_supports_full_versioned_files(self):
107        self.assertFormatAttribute('supports_full_versioned_files',
108                                   (True, False))
109
110    def test_attribute_format_supports_funky_characters(self):
111        self.assertFormatAttribute('supports_funky_characters',
112                                   (True, False))
113
114    def test_attribute_format_supports_leaving_lock(self):
115        self.assertFormatAttribute('supports_leaving_lock',
116                                   (True, False))
117
118    def test_attribute_format_versioned_directories(self):
119        self.assertFormatAttribute(
120            'supports_versioned_directories', (True, False))
121
122    def test_attribute_format_revision_graph_can_have_wrong_parents(self):
123        self.assertFormatAttribute('revision_graph_can_have_wrong_parents',
124                                   (True, False))
125
126    def test_attribute_format_supports_random_access(self):
127        self.assertRepositoryAttribute('supports_random_access', (True, False))
128
129    def test_attribute_format_supports_setting_revision_ids(self):
130        self.assertFormatAttribute('supports_setting_revision_ids',
131                                   (True, False))
132
133    def test_attribute_format_supports_storing_branch_nick(self):
134        self.assertFormatAttribute('supports_storing_branch_nick',
135                                   (True, False))
136
137    def test_attribute_format_supports_custom_revision_properties(self):
138        self.assertFormatAttribute(
139            'supports_custom_revision_properties',
140            (True, False))
141
142    def test_attribute_format_supports_overriding_transport(self):
143        repo = self.make_repository('repo')
144        self.assertIn(
145            repo._format.supports_overriding_transport, (True, False))
146
147        repo.control_transport.copy_tree('.', '../repository.backup')
148        backup_transport = repo.control_transport.clone('../repository.backup')
149        if repo._format.supports_overriding_transport:
150            backup = repo._format.open(
151                repo.controldir,
152                _override_transport=backup_transport)
153            self.assertIs(backup_transport, backup.control_transport)
154        else:
155            self.assertRaises(TypeError, repo._format.open,
156                              repo.controldir, _override_transport=backup_transport)
157
158    def test_format_is_deprecated(self):
159        repo = self.make_repository('repo')
160        self.assertIn(repo._format.is_deprecated(), (True, False))
161
162    def test_format_is_supported(self):
163        repo = self.make_repository('repo')
164        self.assertIn(repo._format.is_supported(), (True, False))
165
166    def test_attribute_format_records_per_file_revision(self):
167        self.assertFormatAttribute('records_per_file_revision',
168                                   (True, False))
169
170    def test_clone_to_default_format(self):
171        # TODO: Test that cloning a repository preserves all the information
172        # such as signatures[not tested yet] etc etc.
173        # when changing to the current default format.
174        tree_a = self.make_branch_and_tree('a')
175        self.build_tree(['a/foo'])
176        tree_a.add('foo')
177        file_id = tree_a.path2id('foo')
178        rev1 = tree_a.commit('rev1')
179        bzrdirb = self.make_controldir('b')
180        repo_b = tree_a.branch.repository.clone(bzrdirb)
181        tree_b = repo_b.revision_tree(rev1)
182        tree_b.lock_read()
183        self.addCleanup(tree_b.unlock)
184        tree_b.get_file_text('foo')
185        repo_b.get_revision(rev1)
186
187    def test_supports_rich_root(self):
188        tree = self.make_branch_and_tree('a')
189        tree.commit('')
190        second_revision = tree.commit('')
191        rev_tree = tree.branch.repository.revision_tree(second_revision)
192        rev_tree.lock_read()
193        self.addCleanup(rev_tree.unlock)
194        root_revision = rev_tree.get_file_revision(u'')
195        rich_root = (root_revision != second_revision)
196        self.assertEqual(rich_root,
197                         tree.branch.repository.supports_rich_root())
198
199    def test_clone_specific_format(self):
200        """todo"""
201
202    def test_format_initialize_find_open(self):
203        # loopback test to check the current format initializes to itself.
204        if not self.repository_format.is_supported():
205            # unsupported formats are not loopback testable
206            # because the default open will not open them and
207            # they may not be initializable.
208            return
209        # supported formats must be able to init and open
210        t = self.get_transport()
211        readonly_t = self.get_readonly_transport()
212        made_control = self.bzrdir_format.initialize(t.base)
213        made_repo = self.repository_format.initialize(made_control)
214        self.assertEqual(made_control, made_repo.controldir)
215
216        # find it via controldir opening:
217        opened_control = controldir.ControlDir.open(readonly_t.base)
218        direct_opened_repo = opened_control.open_repository()
219        self.assertEqual(direct_opened_repo.__class__, made_repo.__class__)
220        self.assertEqual(opened_control, direct_opened_repo.controldir)
221
222        self.assertIsInstance(direct_opened_repo._format,
223                              self.repository_format.__class__)
224        # find it via Repository.open
225        opened_repo = repository.Repository.open(readonly_t.base)
226        self.assertIsInstance(opened_repo, made_repo.__class__)
227        self.assertEqual(made_repo._format.__class__,
228                         opened_repo._format.__class__)
229        # if it has a unique id string, can we probe for it ?
230        try:
231            self.repository_format.get_format_string()
232        except NotImplementedError:
233            return
234        self.assertEqual(self.repository_format,
235                         bzrrepository.RepositoryFormatMetaDir.find_format(opened_control))
236
237    def test_format_matchingcontroldir(self):
238        self.assertEqual(self.repository_format,
239                         self.repository_format._matchingcontroldir.repository_format)
240        self.assertEqual(self.repository_format,
241                         self.bzrdir_format.repository_format)
242
243    def test_format_network_name(self):
244        repo = self.make_repository('r')
245        format = repo._format
246        network_name = format.network_name()
247        self.assertIsInstance(network_name, bytes)
248        # We want to test that the network_name matches the actual format on
249        # disk.  For local repositories, that means that using network_name as
250        # a key in the registry gives back the same format.  For remote
251        # repositories, that means that the network_name of the
252        # RemoteRepositoryFormat we have locally matches the actual format
253        # present on the remote side.
254        if isinstance(format, remote.RemoteRepositoryFormat):
255            repo._ensure_real()
256            real_repo = repo._real_repository
257            self.assertEqual(real_repo._format.network_name(), network_name)
258        else:
259            registry = repository.network_format_registry
260            looked_up_format = registry.get(network_name)
261            self.assertEqual(format.__class__, looked_up_format.__class__)
262
263    def test_create_repository(self):
264        # bzrdir can construct a repository for itself.
265        if not self.bzrdir_format.is_supported():
266            # unsupported formats are not loopback testable
267            # because the default open will not open them and
268            # they may not be initializable.
269            return
270        t = self.get_transport()
271        made_control = self.bzrdir_format.initialize(t.base)
272        made_repo = made_control.create_repository()
273        # Check that we have a repository object.
274        made_repo.has_revision(b'foo')
275        self.assertEqual(made_control, made_repo.controldir)
276
277    def test_create_repository_shared(self):
278        # bzrdir can construct a shared repository.
279        if not self.bzrdir_format.is_supported():
280            # unsupported formats are not loopback testable
281            # because the default open will not open them and
282            # they may not be initializable.
283            return
284        t = self.get_transport()
285        made_control = self.bzrdir_format.initialize(t.base)
286        try:
287            made_repo = made_control.create_repository(shared=True)
288        except errors.IncompatibleFormat:
289            # not all repository formats understand being shared, or
290            # may only be shared in some circumstances.
291            return
292        # Check that we have a repository object.
293        made_repo.has_revision(b'foo')
294        self.assertEqual(made_control, made_repo.controldir)
295        self.assertTrue(made_repo.is_shared())
296
297    def test_revision_tree(self):
298        wt = self.make_branch_and_tree('.')
299        rev1 = wt.commit('lala!', allow_pointless=True)
300        root_id = wt.path2id('')
301        tree = wt.branch.repository.revision_tree(rev1)
302        with tree.lock_read():
303            self.assertEqual(rev1, tree.get_file_revision(u''))
304            expected = inventory.InventoryDirectory(root_id, '', None)
305            expected.revision = rev1
306            self.assertEqual([('', 'V', 'directory', expected)],
307                             list(tree.list_files(include_root=True)))
308        self.assertRaises(ValueError, wt.branch.repository.revision_tree, None)
309        tree = wt.branch.repository.revision_tree(_mod_revision.NULL_REVISION)
310        with tree.lock_read():
311            self.assertEqual([], list(tree.list_files(include_root=True)))
312
313    def test_get_revision_delta(self):
314        tree_a = self.make_branch_and_tree('a')
315        self.build_tree(['a/foo'])
316        tree_a.add('foo')
317        rev1 = tree_a.commit('rev1')
318        self.build_tree(['a/vla'])
319        tree_a.add('vla')
320        rev2 = tree_a.commit('rev2')
321
322        delta = tree_a.branch.repository.get_revision_delta(rev1)
323        self.assertIsInstance(delta, _mod_delta.TreeDelta)
324        self.assertEqual([('foo', 'file')], [(c.path[1], c.kind[1]) for c in delta.added])
325        delta = tree_a.branch.repository.get_revision_delta(rev2)
326        self.assertIsInstance(delta, _mod_delta.TreeDelta)
327        self.assertEqual([('vla', 'file')], [(c.path[1], c.kind[1]) for c in delta.added])
328
329    def test_clone_bzrdir_repository_revision(self):
330        # make a repository with some revisions,
331        # and clone it, this should not have unreferenced revisions.
332        # also: test cloning with a revision id of NULL_REVISION -> empty repo.
333        raise tests.TestSkipped('revision limiting is not implemented yet.')
334
335    def test_clone_repository_basis_revision(self):
336        raise tests.TestSkipped(
337            'the use of a basis should not add noise data to the result.')
338
339    def test_clone_shared_no_tree(self):
340        # cloning a shared repository keeps it shared
341        # and preserves the make_working_tree setting.
342        made_control = self.make_controldir('source')
343        try:
344            made_repo = made_control.create_repository(shared=True)
345        except errors.IncompatibleFormat:
346            # not all repository formats understand being shared, or
347            # may only be shared in some circumstances.
348            return
349        try:
350            made_repo.set_make_working_trees(False)
351        except errors.UnsupportedOperation:
352            # the repository does not support having its tree-making flag
353            # toggled.
354            return
355        result = made_control.clone(self.get_url('target'))
356        # Check that we have a repository object.
357        made_repo.has_revision(b'foo')
358
359        self.assertEqual(made_control, made_repo.controldir)
360        self.assertTrue(result.open_repository().is_shared())
361        self.assertFalse(result.open_repository().make_working_trees())
362
363    def test_upgrade_preserves_signatures(self):
364        if not self.repository_format.supports_revision_signatures:
365            raise tests.TestNotApplicable(
366                "repository does not support signing revisions")
367        wt = self.make_branch_and_tree('source')
368        a = wt.commit('A', allow_pointless=True)
369        repo = wt.branch.repository
370        repo.lock_write()
371        repo.start_write_group()
372        try:
373            repo.sign_revision(a, gpg.LoopbackGPGStrategy(None))
374        except errors.UnsupportedOperation:
375            self.assertFalse(repo._format.supports_revision_signatures)
376            raise tests.TestNotApplicable(
377                "signatures not supported by repository format")
378        repo.commit_write_group()
379        repo.unlock()
380        old_signature = repo.get_signature_text(a)
381        try:
382            old_format = controldir.ControlDirFormat.get_default_format()
383            # This gives metadir branches something they can convert to.
384            # it would be nice to have a 'latest' vs 'default' concept.
385            format = controldir.format_registry.make_controldir(
386                'development-subtree')
387            upgrade.upgrade(repo.controldir.root_transport.base, format=format)
388        except errors.UpToDateFormat:
389            # this is in the most current format already.
390            return
391        except errors.BadConversionTarget as e:
392            raise tests.TestSkipped(str(e))
393        wt = workingtree.WorkingTree.open(wt.basedir)
394        new_signature = wt.branch.repository.get_signature_text(a)
395        self.assertEqual(old_signature, new_signature)
396
397    def test_format_description(self):
398        repo = self.make_repository('.')
399        text = repo._format.get_format_description()
400        self.assertTrue(len(text))
401
402    def test_format_supports_external_lookups(self):
403        repo = self.make_repository('.')
404        self.assertIn(repo._format.supports_external_lookups, (True, False))
405
406    def assertMessageRoundtrips(self, message):
407        """Assert that message roundtrips to a repository and back intact."""
408        tree = self.make_branch_and_tree('.')
409        a = tree.commit(message, allow_pointless=True)
410        rev = tree.branch.repository.get_revision(a)
411        serializer = getattr(tree.branch.repository, "_serializer", None)
412        if serializer is not None and serializer.squashes_xml_invalid_characters:
413            # we have to manually escape this as we dont try to
414            # roundtrip xml invalid characters in the xml-based serializers.
415            escaped_message, escape_count = re.subn(
416                u'[^\x09\x0A\x0D\u0020-\uD7FF\uE000-\uFFFD]+',
417                lambda match: match.group(0).encode(
418                    'unicode_escape').decode('ascii'),
419                message)
420            self.assertEqual(rev.message, escaped_message)
421        else:
422            self.assertEqual(rev.message, message)
423        # insist the class is unicode no matter what came in for
424        # consistency.
425        self.assertIsInstance(rev.message, str)
426
427    def test_commit_unicode_message(self):
428        # a siple unicode message should be preserved
429        self.assertMessageRoundtrips(u'foo bar gamm\xae plop')
430
431    def test_commit_unicode_control_characters(self):
432        # a unicode message with control characters should roundtrip too.
433        unichars = [chr(x) for x in range(256)]
434        # '\r' is not directly allowed anymore, as it used to be translated
435        # into '\n' anyway
436        unichars[ord('\r')] = u'\n'
437        self.assertMessageRoundtrips(
438            u"All 8-bit chars: " + ''.join(unichars))
439
440    def test_check_repository(self):
441        """Check a fairly simple repository's history"""
442        tree = self.make_branch_and_tree('.')
443        a_rev = tree.commit('initial empty commit', allow_pointless=True)
444        result = tree.branch.repository.check()
445        # writes to log; should accept both verbose or non-verbose
446        result.report_results(verbose=True)
447        result.report_results(verbose=False)
448
449    def test_get_revisions(self):
450        tree = self.make_branch_and_tree('.')
451        a_rev = tree.commit('initial empty commit', allow_pointless=True)
452        b_rev = tree.commit('second empty commit', allow_pointless=True)
453        c_rev = tree.commit('third empty commit', allow_pointless=True)
454        repo = tree.branch.repository
455        revision_ids = [a_rev, b_rev, c_rev]
456        revisions = repo.get_revisions(revision_ids)
457        self.assertEqual(len(revisions), 3)
458        zipped = list(zip(revisions, revision_ids))
459        self.assertEqual(len(zipped), 3)
460        for revision, revision_id in zipped:
461            self.assertEqual(revision.revision_id, revision_id)
462            self.assertEqual(revision, repo.get_revision(revision_id))
463
464    def test_iter_revisions(self):
465        tree = self.make_branch_and_tree('.')
466        a_rev = tree.commit('initial empty commit', allow_pointless=True)
467        b_rev = tree.commit('second empty commit', allow_pointless=True)
468        c_rev = tree.commit('third empty commit', allow_pointless=True)
469        d_rev = b'd-rev'
470        repo = tree.branch.repository
471        revision_ids = [a_rev, c_rev, b_rev, d_rev]
472        revid_with_rev = repo.iter_revisions(revision_ids)
473        self.assertEqual(
474            set((revid, rev.revision_id if rev is not None else None)
475                for (revid, rev) in revid_with_rev),
476            {(a_rev, a_rev),
477             (b_rev, b_rev),
478             (c_rev, c_rev),
479             (d_rev, None)})
480
481    def test_root_entry_has_revision(self):
482        tree = self.make_branch_and_tree('.')
483        revid = tree.commit('message')
484        rev_tree = tree.branch.repository.revision_tree(tree.last_revision())
485        rev_tree.lock_read()
486        self.addCleanup(rev_tree.unlock)
487        self.assertEqual(revid, rev_tree.get_file_revision(u''))
488
489    def test_pointless_commit(self):
490        tree = self.make_branch_and_tree('.')
491        self.assertRaises(commit.PointlessCommit, tree.commit, 'pointless',
492                          allow_pointless=False)
493        tree.commit('pointless', allow_pointless=True)
494
495    def test_format_attributes(self):
496        """All repository formats should have some basic attributes."""
497        # create a repository to get a real format instance, not the
498        # template from the test suite parameterization.
499        repo = self.make_repository('.')
500        repo._format.rich_root_data
501        repo._format.supports_tree_reference
502
503    def test_iter_files_bytes(self):
504        tree = self.make_branch_and_tree('tree')
505        self.build_tree_contents([('tree/file1', b'foo'),
506                                  ('tree/file2', b'bar')])
507        tree.add(['file1', 'file2'])
508        file1_id = tree.path2id('file1')
509        file2_id = tree.path2id('file2')
510        rev1 = tree.commit('rev1')
511        self.build_tree_contents([('tree/file1', b'baz')])
512        rev2 = tree.commit('rev2')
513        repository = tree.branch.repository
514        repository.lock_read()
515        self.addCleanup(repository.unlock)
516        extracted = dict((i, b''.join(b)) for i, b in
517                         repository.iter_files_bytes(
518                         [(file1_id, rev1, 'file1-old'),
519                          (file1_id, rev2, 'file1-new'),
520                          (file2_id, rev1, 'file2'),
521                          ]))
522        self.assertEqual(b'foo', extracted['file1-old'])
523        self.assertEqual(b'bar', extracted['file2'])
524        self.assertEqual(b'baz', extracted['file1-new'])
525        self.assertRaises(errors.RevisionNotPresent, list,
526                          repository.iter_files_bytes(
527                              [(file1_id, b'rev3', 'file1-notpresent')]))
528        self.assertRaises((errors.RevisionNotPresent, errors.NoSuchId), list,
529                          repository.iter_files_bytes(
530                          [(b'file3-id', b'rev3', 'file1-notpresent')]))
531
532    def test_get_graph(self):
533        """Bare-bones smoketest that all repositories implement get_graph."""
534        repo = self.make_repository('repo')
535        repo.lock_read()
536        self.addCleanup(repo.unlock)
537        repo.get_graph()
538
539    def test_graph_ghost_handling(self):
540        if not self.repository_format.supports_ghosts:
541            raise tests.TestNotApplicable('format does not support ghosts')
542        tree = self.make_branch_and_tree('here')
543        tree.lock_write()
544        self.addCleanup(tree.unlock)
545        rev1 = tree.commit('initial commit')
546        tree.add_parent_tree_id(b'ghost')
547        rev2 = tree.commit('commit-with-ghost')
548        graph = tree.branch.repository.get_graph()
549        parents = graph.get_parent_map([b'ghost', rev2])
550        self.assertTrue(b'ghost' not in parents)
551        self.assertEqual(parents[rev2], (rev1, b'ghost'))
552
553    def test_get_known_graph_ancestry(self):
554        tree = self.make_branch_and_tree('here')
555        tree.lock_write()
556        self.addCleanup(tree.unlock)
557        # A
558        # |\
559        # | B
560        # |/
561        # C
562        a = tree.commit('initial commit')
563        tree_other = tree.controldir.sprout('there').open_workingtree()
564        b = tree_other.commit('another')
565        tree.merge_from_branch(tree_other.branch)
566        c = tree.commit('another')
567        kg = tree.branch.repository.get_known_graph_ancestry(
568            [c])
569        self.assertEqual([c], list(kg.heads([a, b, c])))
570        self.assertEqual([a, b, c], list(kg.topo_sort()))
571
572    def test_parent_map_type(self):
573        tree = self.make_branch_and_tree('here')
574        tree.lock_write()
575        self.addCleanup(tree.unlock)
576        rev1 = tree.commit('initial commit')
577        rev2 = tree.commit('next commit')
578        graph = tree.branch.repository.get_graph()
579        parents = graph.get_parent_map(
580            [_mod_revision.NULL_REVISION, rev1, rev2])
581        for value in parents.values():
582            self.assertIsInstance(value, tuple)
583
584    def test_implements_revision_graph_can_have_wrong_parents(self):
585        """All repositories should implement
586        revision_graph_can_have_wrong_parents, so that check and reconcile can
587        work correctly.
588        """
589        repo = self.make_repository('.')
590        # This should work, not raise NotImplementedError:
591        if not repo._format.revision_graph_can_have_wrong_parents:
592            return
593        repo.lock_read()
594        self.addCleanup(repo.unlock)
595        # This repo must also implement
596        # _find_inconsistent_revision_parents and
597        # _check_for_inconsistent_revision_parents.  So calling these
598        # should not raise NotImplementedError.
599        list(repo._find_inconsistent_revision_parents())
600        repo._check_for_inconsistent_revision_parents()
601
602    def test_add_signature_text(self):
603        builder = self.make_branch_builder('.')
604        builder.start_series()
605        rev_a = builder.build_snapshot(None, [
606            ('add', ('', None, 'directory', None))])
607        builder.finish_series()
608        b = builder.get_branch()
609        b.lock_write()
610        self.addCleanup(b.unlock)
611        if b.repository._format.supports_revision_signatures:
612            b.repository.start_write_group()
613            b.repository.add_signature_text(
614                rev_a, b'This might be a signature')
615            b.repository.commit_write_group()
616            self.assertEqual(b'This might be a signature',
617                             b.repository.get_signature_text(rev_a))
618        else:
619            b.repository.start_write_group()
620            self.addCleanup(b.repository.abort_write_group)
621            self.assertRaises(errors.UnsupportedOperation,
622                              b.repository.add_signature_text, rev_a,
623                              b'This might be a signature')
624
625    # XXX: this helper duplicated from tests.test_repository
626    def make_remote_repository(self, path, shared=None):
627        """Make a RemoteRepository object backed by a real repository that will
628        be created at the given path."""
629        repo = self.make_repository(path, shared=shared)
630        smart_server = test_server.SmartTCPServer_for_testing()
631        self.start_server(smart_server, self.get_server())
632        remote_transport = transport.get_transport_from_url(
633            smart_server.get_url()).clone(path)
634        if not repo.controldir._format.supports_transport(remote_transport):
635            raise tests.TestNotApplicable(
636                "format does not support transport")
637        remote_bzrdir = controldir.ControlDir.open_from_transport(
638            remote_transport)
639        remote_repo = remote_bzrdir.open_repository()
640        return remote_repo
641
642    def test_sprout_from_hpss_preserves_format(self):
643        """repo.sprout from a smart server preserves the repository format."""
644        remote_repo = self.make_remote_repository('remote')
645        local_bzrdir = self.make_controldir('local')
646        try:
647            local_repo = remote_repo.sprout(local_bzrdir)
648        except errors.TransportNotPossible:
649            raise tests.TestNotApplicable(
650                "Cannot lock_read old formats like AllInOne over HPSS.")
651        remote_backing_repo = controldir.ControlDir.open(
652            self.get_vfs_only_url('remote')).open_repository()
653        self.assertEqual(
654            remote_backing_repo._format.network_name(),
655            local_repo._format.network_name())
656
657    def test_sprout_branch_from_hpss_preserves_repo_format(self):
658        """branch.sprout from a smart server preserves the repository format.
659        """
660        if not self.repository_format.supports_leaving_lock:
661            raise tests.TestNotApplicable(
662                "Format can not be used over HPSS")
663        remote_repo = self.make_remote_repository('remote')
664        remote_branch = remote_repo.controldir.create_branch()
665        try:
666            local_bzrdir = remote_branch.controldir.sprout('local')
667        except errors.TransportNotPossible:
668            raise tests.TestNotApplicable(
669                "Cannot lock_read old formats like AllInOne over HPSS.")
670        local_repo = local_bzrdir.open_repository()
671        remote_backing_repo = controldir.ControlDir.open(
672            self.get_vfs_only_url('remote')).open_repository()
673        self.assertEqual(remote_backing_repo._format, local_repo._format)
674
675    def test_sprout_branch_from_hpss_preserves_shared_repo_format(self):
676        """branch.sprout from a smart server preserves the repository format of
677        a branch from a shared repository.
678        """
679        if not self.repository_format.supports_leaving_lock:
680            raise tests.TestNotApplicable(
681                "Format can not be used over HPSS")
682        # Make a shared repo
683        remote_repo = self.make_remote_repository('remote', shared=True)
684        remote_backing_repo = controldir.ControlDir.open(
685            self.get_vfs_only_url('remote')).open_repository()
686        # Make a branch in that repo in an old format that isn't the default
687        # branch format for the repo.
688        from breezy.bzr.fullhistory import BzrBranchFormat5
689        format = remote_backing_repo.controldir.cloning_metadir()
690        format._branch_format = BzrBranchFormat5()
691        remote_transport = remote_repo.controldir.root_transport.clone(
692            'branch')
693        controldir.ControlDir.create_branch_convenience(
694            remote_transport.base, force_new_repo=False, format=format)
695        remote_branch = controldir.ControlDir.open_from_transport(
696            remote_transport).open_branch()
697        try:
698            local_bzrdir = remote_branch.controldir.sprout('local')
699        except errors.TransportNotPossible:
700            raise tests.TestNotApplicable(
701                "Cannot lock_read old formats like AllInOne over HPSS.")
702        local_repo = local_bzrdir.open_repository()
703        self.assertEqual(remote_backing_repo._format, local_repo._format)
704
705    def test_clone_to_hpss(self):
706        if not self.repository_format.supports_leaving_lock:
707            raise tests.TestNotApplicable(
708                "Cannot lock pre_metadir_formats remotely.")
709        remote_transport = self.make_smart_server('remote')
710        local_branch = self.make_branch('local')
711        remote_branch = local_branch.create_clone_on_transport(
712            remote_transport)
713        self.assertEqual(
714            local_branch.repository._format.supports_external_lookups,
715            remote_branch.repository._format.supports_external_lookups)
716
717    def test_clone_stacking_policy_upgrades(self):
718        """Cloning an unstackable branch format to somewhere with a default
719        stack-on branch upgrades branch and repo to match the target and honour
720        the policy.
721        """
722        try:
723            repo = self.make_repository('repo', shared=True)
724        except errors.IncompatibleFormat:
725            raise tests.TestNotApplicable('Cannot make a shared repository')
726        if repo.controldir._format.fixed_components:
727            self.knownFailure(
728                "pre metadir branches do not upgrade on push "
729                "with stacking policy")
730        if isinstance(repo._format,
731                      knitpack_repo.RepositoryFormatKnitPack5RichRootBroken):
732            raise tests.TestNotApplicable("unsupported format")
733        # Make a source branch in 'repo' in an unstackable branch format
734        bzrdir_format = self.repository_format._matchingcontroldir
735        transport = self.get_transport('repo/branch')
736        transport.mkdir('.')
737        target_bzrdir = bzrdir_format.initialize_on_transport(transport)
738        branch = _mod_bzrbranch.BzrBranchFormat6().initialize(target_bzrdir)
739        # Ensure that stack_on will be stackable and match the serializer of
740        # repo.
741        if isinstance(repo, remote.RemoteRepository):
742            repo._ensure_real()
743            info_repo = repo._real_repository
744        else:
745            info_repo = repo
746        format_description = info.describe_format(info_repo.controldir,
747                                                  info_repo, None, None)
748        formats = format_description.split(' or ')
749        stack_on_format = formats[0]
750        if stack_on_format in ["pack-0.92", "dirstate", "metaweave"]:
751            stack_on_format = "1.9"
752        elif stack_on_format in ["dirstate-with-subtree", "rich-root",
753                                 "rich-root-pack", "pack-0.92-subtree"]:
754            stack_on_format = "1.9-rich-root"
755        # formats not tested for above are already stackable, so we can use the
756        # format as-is.
757        stack_on = self.make_branch('stack-on-me', format=stack_on_format)
758        self.make_controldir('.').get_config(
759        ).set_default_stack_on('stack-on-me')
760        target = branch.controldir.clone(self.get_url('target'))
761        # The target branch supports stacking.
762        self.assertTrue(target.open_branch()._format.supports_stacking())
763        if isinstance(repo, remote.RemoteRepository):
764            repo._ensure_real()
765            repo = repo._real_repository
766        target_repo = target.open_repository()
767        if isinstance(target_repo, remote.RemoteRepository):
768            target_repo._ensure_real()
769            target_repo = target_repo._real_repository
770        # The repository format is unchanged if it could already stack, or the
771        # same as the stack on.
772        if repo._format.supports_external_lookups:
773            self.assertEqual(repo._format, target_repo._format)
774        else:
775            self.assertEqual(stack_on.repository._format, target_repo._format)
776
777    def test__make_parents_provider(self):
778        """Repositories must have a _make_parents_provider method that returns
779        an object with a get_parent_map method.
780        """
781        repo = self.make_repository('repo')
782        repo._make_parents_provider().get_parent_map
783
784    def make_repository_and_foo_bar(self, shared=None):
785        made_control = self.make_controldir('repository')
786        repo = made_control.create_repository(shared=shared)
787        if not repo._format.supports_nesting_repositories:
788            raise tests.TestNotApplicable("repository does not support "
789                                          "nesting repositories")
790        controldir.ControlDir.create_branch_convenience(
791            self.get_url('repository/foo'), force_new_repo=False)
792        controldir.ControlDir.create_branch_convenience(
793            self.get_url('repository/bar'), force_new_repo=True)
794        baz = self.make_controldir('repository/baz')
795        qux = self.make_branch('repository/baz/qux')
796        quxx = self.make_branch('repository/baz/qux/quxx')
797        return repo
798
799    def test_find_branches(self):
800        repo = self.make_repository_and_foo_bar()
801        branches = list(repo.find_branches())
802        self.assertContainsRe(branches[-1].base, 'repository/foo/$')
803        self.assertContainsRe(branches[-3].base, 'repository/baz/qux/$')
804        self.assertContainsRe(branches[-2].base, 'repository/baz/qux/quxx/$')
805        # in some formats, creating a repo creates a branch
806        if len(branches) == 6:
807            self.assertContainsRe(branches[-4].base, 'repository/baz/$')
808            self.assertContainsRe(branches[-5].base, 'repository/bar/$')
809            self.assertContainsRe(branches[-6].base, 'repository/$')
810        else:
811            self.assertEqual(4, len(branches))
812            self.assertContainsRe(branches[-4].base, 'repository/bar/$')
813
814    def test_find_branches_using(self):
815        try:
816            repo = self.make_repository_and_foo_bar(shared=True)
817        except errors.IncompatibleFormat:
818            raise tests.TestNotApplicable
819        branches = list(repo.find_branches(using=True))
820        self.assertContainsRe(branches[-1].base, 'repository/foo/$')
821        # in some formats, creating a repo creates a branch
822        if len(branches) == 2:
823            self.assertContainsRe(branches[-2].base, 'repository/$')
824        else:
825            self.assertEqual(1, len(branches))
826
827    def test_find_branches_using_standalone(self):
828        branch = self.make_branch('branch')
829        if not branch.repository._format.supports_nesting_repositories:
830            raise tests.TestNotApplicable("format does not support nesting "
831                                          "repositories")
832        contained = self.make_branch('branch/contained')
833        branches = branch.repository.find_branches(using=True)
834        self.assertEqual([branch.base], [b.base for b in branches])
835        branches = branch.repository.find_branches(using=False)
836        self.assertEqual([branch.base, contained.base],
837                         [b.base for b in branches])
838
839    def test_find_branches_using_empty_standalone_repo(self):
840        try:
841            repo = self.make_repository('repo', shared=False)
842        except errors.IncompatibleFormat:
843            raise tests.TestNotApplicable("format does not support standalone "
844                                          "repositories")
845        try:
846            repo.controldir.open_branch()
847        except errors.NotBranchError:
848            self.assertEqual([], list(repo.find_branches(using=True)))
849        else:
850            self.assertEqual([repo.controldir.root_transport.base],
851                             [b.base for b in repo.find_branches(using=True)])
852
853    def test_set_get_make_working_trees_true(self):
854        repo = self.make_repository('repo')
855        try:
856            repo.set_make_working_trees(True)
857        except (errors.RepositoryUpgradeRequired, errors.UnsupportedOperation) as e:
858            raise tests.TestNotApplicable('Format does not support this flag.')
859        self.assertTrue(repo.make_working_trees())
860
861    def test_set_get_make_working_trees_false(self):
862        repo = self.make_repository('repo')
863        try:
864            repo.set_make_working_trees(False)
865        except (errors.RepositoryUpgradeRequired, errors.UnsupportedOperation) as e:
866            raise tests.TestNotApplicable('Format does not support this flag.')
867        self.assertFalse(repo.make_working_trees())
868
869
870class TestRepositoryLocking(per_repository.TestCaseWithRepository):
871
872    def test_leave_lock_in_place(self):
873        repo = self.make_repository('r')
874        # Lock the repository, then use leave_lock_in_place so that when we
875        # unlock the repository the lock is still held on disk.
876        token = repo.lock_write().repository_token
877        try:
878            if token is None:
879                # This test does not apply, because this repository refuses lock
880                # tokens.
881                self.assertRaises(NotImplementedError,
882                                  repo.leave_lock_in_place)
883                return
884            repo.leave_lock_in_place()
885        finally:
886            repo.unlock()
887        # We should be unable to relock the repo.
888        self.assertRaises(errors.LockContention, repo.lock_write)
889        # Cleanup
890        repo.lock_write(token)
891        repo.dont_leave_lock_in_place()
892        repo.unlock()
893
894    def test_dont_leave_lock_in_place(self):
895        repo = self.make_repository('r')
896        # Create a lock on disk.
897        token = repo.lock_write().repository_token
898        try:
899            if token is None:
900                # This test does not apply, because this repository refuses lock
901                # tokens.
902                self.assertRaises(NotImplementedError,
903                                  repo.dont_leave_lock_in_place)
904                return
905            try:
906                repo.leave_lock_in_place()
907            except NotImplementedError:
908                # This repository doesn't support this API.
909                return
910        finally:
911            repo.unlock()
912        # Reacquire the lock (with a different repository object) by using the
913        # token.
914        new_repo = repo.controldir.open_repository()
915        new_repo.lock_write(token=token)
916        # Call dont_leave_lock_in_place, so that the lock will be released by
917        # this instance, even though the lock wasn't originally acquired by it.
918        new_repo.dont_leave_lock_in_place()
919        new_repo.unlock()
920        # Now the repository is unlocked.  Test this by locking it (without a
921        # token).
922        repo.lock_write()
923        repo.unlock()
924
925    def test_lock_read_then_unlock(self):
926        # Calling lock_read then unlocking should work without errors.
927        repo = self.make_repository('r')
928        repo.lock_read()
929        repo.unlock()
930
931    def test_lock_read_returns_unlockable(self):
932        repo = self.make_repository('r')
933        self.assertThat(repo.lock_read, ReturnsUnlockable(repo))
934
935    def test_lock_write_returns_unlockable(self):
936        repo = self.make_repository('r')
937        self.assertThat(repo.lock_write, ReturnsUnlockable(repo))
938
939
940# FIXME: document why this is a TestCaseWithTransport rather than a
941#        TestCaseWithRepository
942class TestEscaping(tests.TestCaseWithTransport):
943    """Test that repositories can be stored correctly on VFAT transports.
944
945    Makes sure we have proper escaping of invalid characters, etc.
946
947    It'd be better to test all operations on the FakeVFATTransportDecorator,
948    but working trees go straight to the os not through the Transport layer.
949    Therefore we build some history first in the regular way and then
950    check it's safe to access for vfat.
951    """
952
953    def test_on_vfat(self):
954        # dont bother with remote repository testing, because this test is
955        # about local disk layout/support.
956        if isinstance(self.repository_format, remote.RemoteRepositoryFormat):
957            return
958        self.transport_server = test_server.FakeVFATServer
959        FOO_ID = b'foo<:>ID'
960        # this makes a default format repository always, which is wrong:
961        # it should be a TestCaseWithRepository in order to get the
962        # default format.
963        wt = self.make_branch_and_tree('repo')
964        if not wt.supports_setting_file_ids():
965            self.skip("format does not support setting file ids")
966        self.build_tree(["repo/foo"], line_endings='binary')
967        # add file with id containing wierd characters
968        wt.add(['foo'], [FOO_ID])
969        rev1 = wt.commit('this is my new commit')
970        # now access over vfat; should be safe
971        branch = controldir.ControlDir.open(self.get_url('repo')).open_branch()
972        revtree = branch.repository.revision_tree(rev1)
973        revtree.lock_read()
974        self.addCleanup(revtree.unlock)
975        contents = revtree.get_file_text('foo')
976        self.assertEqual(contents, b'contents of repo/foo\n')
977
978    def test_create_bundle(self):
979        wt = self.make_branch_and_tree('repo')
980        self.build_tree(['repo/file1'])
981        wt.add('file1')
982        rev1 = wt.commit('file1')
983        fileobj = BytesIO()
984        wt.branch.repository.create_bundle(
985            rev1, _mod_revision.NULL_REVISION, fileobj)
986
987
988class TestRepositoryControlComponent(per_repository.TestCaseWithRepository):
989    """Repository implementations adequately implement ControlComponent."""
990
991    def test_urls(self):
992        repo = self.make_repository('repo')
993        self.assertIsInstance(repo.user_url, str)
994        self.assertEqual(repo.user_url, repo.user_transport.base)
995        # for all current bzrdir implementations the user dir must be
996        # above the control dir but we might need to relax that?
997        self.assertEqual(repo.control_url.find(repo.user_url), 0)
998        self.assertEqual(repo.control_url, repo.control_transport.base)
999
1000
1001class TestDeltaRevisionFilesFiltered(per_repository.TestCaseWithRepository):
1002
1003    def setUp(self):
1004        super(TestDeltaRevisionFilesFiltered, self).setUp()
1005        self.tree_a = self.make_branch_and_tree('a')
1006        self.build_tree(
1007            ['a/foo', 'a/bar/', 'a/bar/b1', 'a/bar/b2', 'a/baz', 'a/oldname'])
1008        self.tree_a.add(['foo', 'bar', 'bar/b1', 'bar/b2', 'baz', 'oldname'])
1009        self.rev1 = self.tree_a.commit('rev1')
1010        self.build_tree(['a/bar/b3'])
1011        self.tree_a.add('bar/b3')
1012        self.tree_a.rename_one('oldname', 'newname')
1013        self.rev2 = self.tree_a.commit('rev2')
1014        self.repository = self.tree_a.branch.repository
1015        self.addCleanup(self.repository.lock_read().unlock)
1016
1017    def test_multiple_files(self):
1018        # Test multiple files
1019        delta = list(self.repository.get_revision_deltas(
1020            [self.repository.get_revision(self.rev1)], specific_files=[
1021                'foo', 'baz']))[0]
1022        self.assertIsInstance(delta, _mod_delta.TreeDelta)
1023        self.assertEqual([
1024            ('baz', 'file'),
1025            ('foo', 'file'),
1026            ], [(c.path[1], c.kind[1]) for c in delta.added])
1027
1028    def test_directory(self):
1029        # Test a directory
1030        delta = list(self.repository.get_revision_deltas(
1031            [self.repository.get_revision(self.rev1)],
1032            specific_files=['bar']))[0]
1033        self.assertIsInstance(delta, _mod_delta.TreeDelta)
1034        self.assertEqual([
1035            ('bar', 'directory'),
1036            ('bar/b1', 'file'),
1037            ('bar/b2', 'file'),
1038            ], [(c.path[1], c.kind[1]) for c in delta.added])
1039
1040    def test_unrelated(self):
1041        # Try another revision
1042        delta = list(self.repository.get_revision_deltas(
1043            [self.repository.get_revision(self.rev2)],
1044            specific_files=['foo']))[0]
1045        self.assertIsInstance(delta, _mod_delta.TreeDelta)
1046        self.assertEqual([], delta.added)
1047
1048    def test_renamed(self):
1049        # Try another revision
1050        self.assertTrue(
1051            self.repository.revision_tree(self.rev2).has_filename('newname'))
1052        self.assertTrue(
1053            self.repository.revision_tree(self.rev1).has_filename('oldname'))
1054        revs = [
1055            self.repository.get_revision(self.rev2),
1056            self.repository.get_revision(self.rev1)]
1057        delta2, delta1 = list(self.repository.get_revision_deltas(
1058            revs, specific_files=['newname']))
1059        self.assertIsInstance(delta1, _mod_delta.TreeDelta)
1060        self.assertEqual([('oldname', 'newname')], [c.path for c in delta2.renamed])
1061        self.assertIsInstance(delta2, _mod_delta.TreeDelta)
1062        self.assertEqual(['oldname'], [c.path[1] for c in delta1.added])
1063
1064    def test_file_in_directory(self):
1065        # Test a file in a directory, both of which were added
1066        delta = list(self.repository.get_revision_deltas(
1067            [self.repository.get_revision(self.rev1)],
1068            specific_files=['bar/b2']))[0]
1069        self.assertIsInstance(delta, _mod_delta.TreeDelta)
1070        self.assertEqual([
1071            ('bar', 'directory'),
1072            ('bar/b2', 'file'),
1073            ], [(c.path[1], c.kind[1]) for c in delta.added])
1074
1075    def test_file_in_unchanged_directory(self):
1076        delta = list(self.repository.get_revision_deltas(
1077            [self.repository.get_revision(self.rev2)],
1078            specific_files=['bar/b3']))[0]
1079        self.assertIsInstance(delta, _mod_delta.TreeDelta)
1080        if [(c.path[1], c.kind[1]) for c in delta.added] == [
1081                ('bar', 'directory'), ('bar/b3', 'file')]:
1082            self.knownFailure("bzr incorrectly reports 'bar' as added - "
1083                              "bug 878217")
1084        self.assertEqual([
1085            ('bar/b3', 'file'),
1086            ], [(c.path[1], c.kind[1]) for c in delta.added])
1087