1# Copyright (C) 2005-2013, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17import bz2
18from io import BytesIO
19import os
20import sys
21
22from ... import (
23    diff,
24    errors,
25    merge,
26    osutils,
27    revision as _mod_revision,
28    tests,
29    treebuilder,
30    )
31from .. import (
32    bzrdir,
33    inventory,
34    )
35from ..bundle.apply_bundle import install_bundle, merge_bundle
36from ..bundle.bundle_data import BundleTree
37from ..bundle.serializer import write_bundle, read_bundle, v09, v4
38from ..bundle.serializer.v08 import BundleSerializerV08
39from ..bundle.serializer.v09 import BundleSerializerV09
40from ..bundle.serializer.v4 import BundleSerializerV4
41from ..import knitrepo
42from ..inventorytree import InventoryTree
43from ...tests import (
44    features,
45    test_commit,
46    )
47from ...tree import InterTree
48
49
50def get_text(vf, key):
51    """Get the fulltext for a given revision id that is present in the vf"""
52    stream = vf.get_record_stream([key], 'unordered', True)
53    record = next(stream)
54    return record.get_bytes_as('fulltext')
55
56
57def get_inventory_text(repo, revision_id):
58    """Get the fulltext for the inventory at revision id"""
59    with repo.lock_read():
60        return get_text(repo.inventories, (revision_id,))
61
62
63class MockTree(InventoryTree):
64
65    def __init__(self):
66        from ..inventory import InventoryDirectory, ROOT_ID
67        object.__init__(self)
68        self.paths = {ROOT_ID: ""}
69        self.ids = {"": ROOT_ID}
70        self.contents = {}
71        self.root = InventoryDirectory(ROOT_ID, '', None)
72
73    inventory = property(lambda x: x)
74    root_inventory = property(lambda x: x)
75
76    def get_root_id(self):
77        return self.root.file_id
78
79    def all_file_ids(self):
80        return set(self.paths.keys())
81
82    def all_versioned_paths(self):
83        return set(self.paths.values())
84
85    def is_executable(self, path):
86        # Not all the files are executable.
87        return False
88
89    def __getitem__(self, file_id):
90        if file_id == self.root.file_id:
91            return self.root
92        else:
93            return self.make_entry(file_id, self.paths[file_id])
94
95    def get_entry_by_path(self, path):
96        return self[self.path2id(path)]
97
98    def parent_id(self, file_id):
99        parent_dir = os.path.dirname(self.paths[file_id])
100        if parent_dir == "":
101            return None
102        return self.ids[parent_dir]
103
104    def iter_entries(self):
105        for path, file_id in self.ids.items():
106            yield path, self[file_id]
107
108    def kind(self, path):
109        if path in self.contents:
110            kind = 'file'
111        else:
112            kind = 'directory'
113        return kind
114
115    def make_entry(self, file_id, path):
116        from ..inventory import (InventoryFile, InventoryDirectory,
117                                 InventoryLink)
118        if not isinstance(file_id, bytes):
119            raise TypeError(file_id)
120        name = os.path.basename(path)
121        kind = self.kind(path)
122        parent_id = self.parent_id(file_id)
123        text_sha_1, text_size = self.contents_stats(path)
124        if kind == 'directory':
125            ie = InventoryDirectory(file_id, name, parent_id)
126        elif kind == 'file':
127            ie = InventoryFile(file_id, name, parent_id)
128            ie.text_sha1 = text_sha_1
129            ie.text_size = text_size
130        elif kind == 'symlink':
131            ie = InventoryLink(file_id, name, parent_id)
132        else:
133            raise errors.BzrError('unknown kind %r' % kind)
134        return ie
135
136    def add_dir(self, file_id, path):
137        if not isinstance(file_id, bytes):
138            raise TypeError(file_id)
139        self.paths[file_id] = path
140        self.ids[path] = file_id
141
142    def add_file(self, file_id, path, contents):
143        if not isinstance(file_id, bytes):
144            raise TypeError(file_id)
145        self.add_dir(file_id, path)
146        self.contents[path] = contents
147
148    def path2id(self, path):
149        return self.ids.get(path)
150
151    def id2path(self, file_id, recurse='down'):
152        try:
153            return self.paths[file_id]
154        except KeyError:
155            raise errors.NoSuchId(file_id, self)
156
157    def get_file(self, path):
158        result = BytesIO()
159        try:
160            result.write(self.contents[path])
161        except KeyError:
162            raise errors.NoSuchFile(path)
163        result.seek(0, 0)
164        return result
165
166    def get_file_revision(self, path):
167        return self.inventory.get_entry_by_path(path).revision
168
169    def get_file_size(self, path):
170        return self.inventory.get_entry_by_path(path).text_size
171
172    def get_file_sha1(self, path, file_id=None):
173        return self.inventory.get_entry_by_path(path).text_sha1
174
175    def contents_stats(self, path):
176        if path not in self.contents:
177            return None, None
178        text_sha1 = osutils.sha_file(self.get_file(path))
179        return text_sha1, len(self.contents[path])
180
181
182class BTreeTester(tests.TestCase):
183    """A simple unittest tester for the BundleTree class."""
184
185    def make_tree_1(self):
186        mtree = MockTree()
187        mtree.add_dir(b"a", "grandparent")
188        mtree.add_dir(b"b", "grandparent/parent")
189        mtree.add_file(b"c", "grandparent/parent/file", b"Hello\n")
190        mtree.add_dir(b"d", "grandparent/alt_parent")
191        return BundleTree(mtree, b''), mtree
192
193    def test_renames(self):
194        """Ensure that file renames have the proper effect on children"""
195        btree = self.make_tree_1()[0]
196        self.assertEqual(btree.old_path("grandparent"), "grandparent")
197        self.assertEqual(btree.old_path("grandparent/parent"),
198                         "grandparent/parent")
199        self.assertEqual(btree.old_path("grandparent/parent/file"),
200                         "grandparent/parent/file")
201
202        self.assertEqual(btree.id2path(b"a"), "grandparent")
203        self.assertEqual(btree.id2path(b"b"), "grandparent/parent")
204        self.assertEqual(btree.id2path(b"c"), "grandparent/parent/file")
205
206        self.assertEqual(btree.path2id("grandparent"), b"a")
207        self.assertEqual(btree.path2id("grandparent/parent"), b"b")
208        self.assertEqual(btree.path2id("grandparent/parent/file"), b"c")
209
210        self.assertIs(btree.path2id("grandparent2"), None)
211        self.assertIs(btree.path2id("grandparent2/parent"), None)
212        self.assertIs(btree.path2id("grandparent2/parent/file"), None)
213
214        btree.note_rename("grandparent", "grandparent2")
215        self.assertIs(btree.old_path("grandparent"), None)
216        self.assertIs(btree.old_path("grandparent/parent"), None)
217        self.assertIs(btree.old_path("grandparent/parent/file"), None)
218
219        self.assertEqual(btree.id2path(b"a"), "grandparent2")
220        self.assertEqual(btree.id2path(b"b"), "grandparent2/parent")
221        self.assertEqual(btree.id2path(b"c"), "grandparent2/parent/file")
222
223        self.assertEqual(btree.path2id("grandparent2"), b"a")
224        self.assertEqual(btree.path2id("grandparent2/parent"), b"b")
225        self.assertEqual(btree.path2id("grandparent2/parent/file"), b"c")
226
227        self.assertTrue(btree.path2id("grandparent") is None)
228        self.assertTrue(btree.path2id("grandparent/parent") is None)
229        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
230
231        btree.note_rename("grandparent/parent", "grandparent2/parent2")
232        self.assertEqual(btree.id2path(b"a"), "grandparent2")
233        self.assertEqual(btree.id2path(b"b"), "grandparent2/parent2")
234        self.assertEqual(btree.id2path(b"c"), "grandparent2/parent2/file")
235
236        self.assertEqual(btree.path2id("grandparent2"), b"a")
237        self.assertEqual(btree.path2id("grandparent2/parent2"), b"b")
238        self.assertEqual(btree.path2id("grandparent2/parent2/file"), b"c")
239
240        self.assertTrue(btree.path2id("grandparent2/parent") is None)
241        self.assertTrue(btree.path2id("grandparent2/parent/file") is None)
242
243        btree.note_rename("grandparent/parent/file",
244                          "grandparent2/parent2/file2")
245        self.assertEqual(btree.id2path(b"a"), "grandparent2")
246        self.assertEqual(btree.id2path(b"b"), "grandparent2/parent2")
247        self.assertEqual(btree.id2path(b"c"), "grandparent2/parent2/file2")
248
249        self.assertEqual(btree.path2id("grandparent2"), b"a")
250        self.assertEqual(btree.path2id("grandparent2/parent2"), b"b")
251        self.assertEqual(btree.path2id("grandparent2/parent2/file2"), b"c")
252
253        self.assertTrue(btree.path2id("grandparent2/parent2/file") is None)
254
255    def test_moves(self):
256        """Ensure that file moves have the proper effect on children"""
257        btree = self.make_tree_1()[0]
258        btree.note_rename("grandparent/parent/file",
259                          "grandparent/alt_parent/file")
260        self.assertEqual(btree.id2path(b"c"), "grandparent/alt_parent/file")
261        self.assertEqual(btree.path2id("grandparent/alt_parent/file"), b"c")
262        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
263
264    def unified_diff(self, old, new):
265        out = BytesIO()
266        diff.internal_diff("old", old, "new", new, out)
267        out.seek(0, 0)
268        return out.read()
269
270    def make_tree_2(self):
271        btree = self.make_tree_1()[0]
272        btree.note_rename("grandparent/parent/file",
273                          "grandparent/alt_parent/file")
274        self.assertRaises(errors.NoSuchId, btree.id2path, b"e")
275        self.assertFalse(btree.is_versioned("grandparent/parent/file"))
276        btree.note_id(b"e", "grandparent/parent/file")
277        return btree
278
279    def test_adds(self):
280        """File/inventory adds"""
281        btree = self.make_tree_2()
282        add_patch = self.unified_diff([], [b"Extra cheese\n"])
283        btree.note_patch("grandparent/parent/file", add_patch)
284        btree.note_id(b'f', 'grandparent/parent/symlink', kind='symlink')
285        btree.note_target('grandparent/parent/symlink', 'venus')
286        self.adds_test(btree)
287
288    def adds_test(self, btree):
289        self.assertEqual(btree.id2path(b"e"), "grandparent/parent/file")
290        self.assertEqual(btree.path2id("grandparent/parent/file"), b"e")
291        with btree.get_file("grandparent/parent/file") as f:
292            self.assertEqual(f.read(), b"Extra cheese\n")
293        self.assertEqual(
294            btree.get_symlink_target('grandparent/parent/symlink'), 'venus')
295
296    def make_tree_3(self):
297        btree, mtree = self.make_tree_1()
298        mtree.add_file(b"e", "grandparent/parent/topping", b"Anchovies\n")
299        btree.note_rename("grandparent/parent/file",
300                          "grandparent/alt_parent/file")
301        btree.note_rename("grandparent/parent/topping",
302                          "grandparent/alt_parent/stopping")
303        return btree
304
305    def get_file_test(self, btree):
306        with btree.get_file(btree.id2path(b"e")) as f:
307            self.assertEqual(f.read(), b"Lemon\n")
308        with btree.get_file(btree.id2path(b"c")) as f:
309            self.assertEqual(f.read(), b"Hello\n")
310
311    def test_get_file(self):
312        """Get file contents"""
313        btree = self.make_tree_3()
314        mod_patch = self.unified_diff([b"Anchovies\n"], [b"Lemon\n"])
315        btree.note_patch("grandparent/alt_parent/stopping", mod_patch)
316        self.get_file_test(btree)
317
318    def test_delete(self):
319        "Deletion by bundle"
320        btree = self.make_tree_1()[0]
321        with btree.get_file(btree.id2path(b"c")) as f:
322            self.assertEqual(f.read(), b"Hello\n")
323        btree.note_deletion("grandparent/parent/file")
324        self.assertRaises(errors.NoSuchId, btree.id2path, b"c")
325        self.assertFalse(btree.is_versioned("grandparent/parent/file"))
326
327    def sorted_ids(self, tree):
328        ids = sorted(tree.all_file_ids())
329        return ids
330
331    def test_iteration(self):
332        """Ensure that iteration through ids works properly"""
333        btree = self.make_tree_1()[0]
334        self.assertEqual(self.sorted_ids(btree),
335                         [inventory.ROOT_ID, b'a', b'b', b'c', b'd'])
336        btree.note_deletion("grandparent/parent/file")
337        btree.note_id(b"e", "grandparent/alt_parent/fool", kind="directory")
338        btree.note_last_changed("grandparent/alt_parent/fool",
339                                "revisionidiguess")
340        self.assertEqual(self.sorted_ids(btree),
341                         [inventory.ROOT_ID, b'a', b'b', b'd', b'e'])
342
343
344class BundleTester1(tests.TestCaseWithTransport):
345
346    def test_mismatched_bundle(self):
347        format = bzrdir.BzrDirMetaFormat1()
348        format.repository_format = knitrepo.RepositoryFormatKnit3()
349        serializer = BundleSerializerV08('0.8')
350        b = self.make_branch('.', format=format)
351        self.assertRaises(errors.IncompatibleBundleFormat, serializer.write,
352                          b.repository, [], {}, BytesIO())
353
354    def test_matched_bundle(self):
355        """Don't raise IncompatibleBundleFormat for knit2 and bundle0.9"""
356        format = bzrdir.BzrDirMetaFormat1()
357        format.repository_format = knitrepo.RepositoryFormatKnit3()
358        serializer = BundleSerializerV09('0.9')
359        b = self.make_branch('.', format=format)
360        serializer.write(b.repository, [], {}, BytesIO())
361
362    def test_mismatched_model(self):
363        """Try copying a bundle from knit2 to knit1"""
364        format = bzrdir.BzrDirMetaFormat1()
365        format.repository_format = knitrepo.RepositoryFormatKnit3()
366        source = self.make_branch_and_tree('source', format=format)
367        source.commit('one', rev_id=b'one-id')
368        source.commit('two', rev_id=b'two-id')
369        text = BytesIO()
370        write_bundle(source.branch.repository, b'two-id', b'null:', text,
371                     format='0.9')
372        text.seek(0)
373
374        format = bzrdir.BzrDirMetaFormat1()
375        format.repository_format = knitrepo.RepositoryFormatKnit1()
376        target = self.make_branch('target', format=format)
377        self.assertRaises(errors.IncompatibleRevision, install_bundle,
378                          target.repository, read_bundle(text))
379
380
381class BundleTester(object):
382
383    def bzrdir_format(self):
384        format = bzrdir.BzrDirMetaFormat1()
385        format.repository_format = knitrepo.RepositoryFormatKnit1()
386        return format
387
388    def make_branch_and_tree(self, path, format=None):
389        if format is None:
390            format = self.bzrdir_format()
391        return tests.TestCaseWithTransport.make_branch_and_tree(
392            self, path, format)
393
394    def make_branch(self, path, format=None):
395        if format is None:
396            format = self.bzrdir_format()
397        return tests.TestCaseWithTransport.make_branch(self, path, format)
398
399    def create_bundle_text(self, base_rev_id, rev_id):
400        bundle_txt = BytesIO()
401        rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
402                               bundle_txt, format=self.format)
403        bundle_txt.seek(0)
404        self.assertEqual(bundle_txt.readline(),
405                         b'# Bazaar revision bundle v%s\n' % self.format.encode('ascii'))
406        self.assertEqual(bundle_txt.readline(), b'#\n')
407
408        rev = self.b1.repository.get_revision(rev_id)
409        self.assertEqual(bundle_txt.readline().decode('utf-8'),
410                         u'# message:\n')
411        bundle_txt.seek(0)
412        return bundle_txt, rev_ids
413
414    def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None):
415        """Create a bundle from base_rev_id -> rev_id in built-in branch.
416        Make sure that the text generated is valid, and that it
417        can be applied against the base, and generate the same information.
418
419        :return: The in-memory bundle
420        """
421        bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
422
423        # This should also validate the generated bundle
424        bundle = read_bundle(bundle_txt)
425        repository = self.b1.repository
426        for bundle_rev in bundle.real_revisions:
427            # These really should have already been checked when we read the
428            # bundle, since it computes the sha1 hash for the revision, which
429            # only will match if everything is okay, but lets be explicit about
430            # it
431            branch_rev = repository.get_revision(bundle_rev.revision_id)
432            for a in ('inventory_sha1', 'revision_id', 'parent_ids',
433                      'timestamp', 'timezone', 'message', 'committer',
434                      'parent_ids', 'properties'):
435                self.assertEqual(getattr(branch_rev, a),
436                                 getattr(bundle_rev, a))
437            self.assertEqual(len(branch_rev.parent_ids),
438                             len(bundle_rev.parent_ids))
439        self.assertEqual(rev_ids,
440                         [r.revision_id for r in bundle.real_revisions])
441        self.valid_apply_bundle(base_rev_id, bundle, checkout_dir=checkout_dir)
442
443        return bundle
444
445    def get_invalid_bundle(self, base_rev_id, rev_id):
446        """Create a bundle from base_rev_id -> rev_id in built-in branch.
447        Munge the text so that it's invalid.
448
449        :return: The in-memory bundle
450        """
451        bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
452        new_text = bundle_txt.getvalue().replace(b'executable:no',
453                                                 b'executable:yes')
454        bundle_txt = BytesIO(new_text)
455        bundle = read_bundle(bundle_txt)
456        self.valid_apply_bundle(base_rev_id, bundle)
457        return bundle
458
459    def test_non_bundle(self):
460        self.assertRaises(errors.NotABundle,
461                          read_bundle, BytesIO(b'#!/bin/sh\n'))
462
463    def test_malformed(self):
464        self.assertRaises(errors.BadBundle, read_bundle,
465                          BytesIO(b'# Bazaar revision bundle v'))
466
467    def test_crlf_bundle(self):
468        try:
469            read_bundle(BytesIO(b'# Bazaar revision bundle v0.8\r\n'))
470        except errors.BadBundle:
471            # It is currently permitted for bundles with crlf line endings to
472            # make read_bundle raise a BadBundle, but this should be fixed.
473            # Anything else, especially NotABundle, is an error.
474            pass
475
476    def get_checkout(self, rev_id, checkout_dir=None):
477        """Get a new tree, with the specified revision in it.
478        """
479
480        if checkout_dir is None:
481            checkout_dir = osutils.mkdtemp(prefix='test-branch-', dir='.')
482        else:
483            if not os.path.exists(checkout_dir):
484                os.mkdir(checkout_dir)
485        tree = self.make_branch_and_tree(checkout_dir)
486        s = BytesIO()
487        ancestors = write_bundle(self.b1.repository, rev_id, b'null:', s,
488                                 format=self.format)
489        s.seek(0)
490        self.assertIsInstance(s.getvalue(), bytes)
491        install_bundle(tree.branch.repository, read_bundle(s))
492        for ancestor in ancestors:
493            old = self.b1.repository.revision_tree(ancestor)
494            new = tree.branch.repository.revision_tree(ancestor)
495            with old.lock_read(), new.lock_read():
496                # Check that there aren't any inventory level changes
497                delta = new.changes_from(old)
498                self.assertFalse(delta.has_changed(),
499                                 'Revision %s not copied correctly.'
500                                 % (ancestor,))
501
502                # Now check that the file contents are all correct
503                for path in old.all_versioned_paths():
504                    try:
505                        old_file = old.get_file(path)
506                    except errors.NoSuchFile:
507                        continue
508                    self.assertEqual(
509                        old_file.read(), new.get_file(path).read())
510        if not _mod_revision.is_null(rev_id):
511            tree.branch.generate_revision_history(rev_id)
512            tree.update()
513            delta = tree.changes_from(self.b1.repository.revision_tree(rev_id))
514            self.assertFalse(delta.has_changed(),
515                             'Working tree has modifications: %s' % delta)
516        return tree
517
518    def valid_apply_bundle(self, base_rev_id, info, checkout_dir=None):
519        """Get the base revision, apply the changes, and make
520        sure everything matches the builtin branch.
521        """
522        to_tree = self.get_checkout(base_rev_id, checkout_dir=checkout_dir)
523        to_tree.lock_write()
524        try:
525            self._valid_apply_bundle(base_rev_id, info, to_tree)
526        finally:
527            to_tree.unlock()
528
529    def _valid_apply_bundle(self, base_rev_id, info, to_tree):
530        original_parents = to_tree.get_parent_ids()
531        repository = to_tree.branch.repository
532        original_parents = to_tree.get_parent_ids()
533        self.assertIs(repository.has_revision(base_rev_id), True)
534        for rev in info.real_revisions:
535            self.assertTrue(not repository.has_revision(rev.revision_id),
536                            'Revision {%s} present before applying bundle'
537                            % rev.revision_id)
538        merge_bundle(info, to_tree, True, merge.Merge3Merger, False, False)
539
540        for rev in info.real_revisions:
541            self.assertTrue(repository.has_revision(rev.revision_id),
542                            'Missing revision {%s} after applying bundle'
543                            % rev.revision_id)
544
545        self.assertTrue(to_tree.branch.repository.has_revision(info.target))
546        # Do we also want to verify that all the texts have been added?
547
548        self.assertEqual(original_parents + [info.target],
549                         to_tree.get_parent_ids())
550
551        rev = info.real_revisions[-1]
552        base_tree = self.b1.repository.revision_tree(rev.revision_id)
553        to_tree = to_tree.branch.repository.revision_tree(rev.revision_id)
554
555        # TODO: make sure the target tree is identical to base tree
556        #       we might also check the working tree.
557
558        base_files = list(base_tree.list_files())
559        to_files = list(to_tree.list_files())
560        self.assertEqual(len(base_files), len(to_files))
561        for base_file, to_file in zip(base_files, to_files):
562            self.assertEqual(base_file, to_file)
563
564        for path, status, kind, entry in base_files:
565            # Check that the meta information is the same
566            to_path = InterTree.get(base_tree, to_tree).find_target_path(path)
567            self.assertEqual(
568                base_tree.get_file_size(path),
569                to_tree.get_file_size(to_path))
570            self.assertEqual(
571                base_tree.get_file_sha1(path),
572                to_tree.get_file_sha1(to_path))
573            # Check that the contents are the same
574            # This is pretty expensive
575            # self.assertEqual(base_tree.get_file(fileid).read(),
576            #         to_tree.get_file(fileid).read())
577
578    def test_bundle(self):
579        self.tree1 = self.make_branch_and_tree('b1')
580        self.b1 = self.tree1.branch
581
582        self.build_tree_contents([('b1/one', b'one\n')])
583        self.tree1.add('one', b'one-id')
584        self.tree1.set_root_id(b'root-id')
585        self.tree1.commit('add one', rev_id=b'a@cset-0-1')
586
587        bundle = self.get_valid_bundle(b'null:', b'a@cset-0-1')
588
589        # Make sure we can handle files with spaces, tabs, other
590        # bogus characters
591        self.build_tree([
592            'b1/with space.txt', 'b1/dir/', 'b1/dir/filein subdir.c', 'b1/dir/WithCaps.txt', 'b1/dir/ pre space', 'b1/sub/', 'b1/sub/sub/', 'b1/sub/sub/nonempty.txt'
593            ])
594        self.build_tree_contents([('b1/sub/sub/emptyfile.txt', b''),
595                                  ('b1/dir/nolastnewline.txt', b'bloop')])
596        tt = self.tree1.transform()
597        tt.new_file('executable', tt.root, [b'#!/bin/sh\n'], b'exe-1', True)
598        tt.apply()
599        # have to fix length of file-id so that we can predictably rewrite
600        # a (length-prefixed) record containing it later.
601        self.tree1.add('with space.txt', b'withspace-id')
602        self.tree1.add([
603            'dir', 'dir/filein subdir.c', 'dir/WithCaps.txt', 'dir/ pre space', 'dir/nolastnewline.txt', 'sub', 'sub/sub', 'sub/sub/nonempty.txt', 'sub/sub/emptyfile.txt'
604            ])
605        self.tree1.commit('add whitespace', rev_id=b'a@cset-0-2')
606
607        bundle = self.get_valid_bundle(b'a@cset-0-1', b'a@cset-0-2')
608
609        # Check a rollup bundle
610        bundle = self.get_valid_bundle(b'null:', b'a@cset-0-2')
611
612        # Now delete entries
613        self.tree1.remove(
614            ['sub/sub/nonempty.txt', 'sub/sub/emptyfile.txt', 'sub/sub'
615             ])
616        tt = self.tree1.transform()
617        trans_id = tt.trans_id_tree_path('executable')
618        tt.set_executability(False, trans_id)
619        tt.apply()
620        self.tree1.commit('removed', rev_id=b'a@cset-0-3')
621
622        bundle = self.get_valid_bundle(b'a@cset-0-2', b'a@cset-0-3')
623        self.assertRaises((errors.TestamentMismatch,
624                           errors.VersionedFileInvalidChecksum,
625                           errors.BadBundle), self.get_invalid_bundle,
626                          b'a@cset-0-2', b'a@cset-0-3')
627        # Check a rollup bundle
628        bundle = self.get_valid_bundle(b'null:', b'a@cset-0-3')
629
630        # Now move the directory
631        self.tree1.rename_one('dir', 'sub/dir')
632        self.tree1.commit('rename dir', rev_id=b'a@cset-0-4')
633
634        bundle = self.get_valid_bundle(b'a@cset-0-3', b'a@cset-0-4')
635        # Check a rollup bundle
636        bundle = self.get_valid_bundle(b'null:', b'a@cset-0-4')
637
638        # Modified files
639        with open('b1/sub/dir/WithCaps.txt', 'ab') as f:
640            f.write(b'\nAdding some text\n')
641        with open('b1/sub/dir/ pre space', 'ab') as f:
642            f.write(
643                b'\r\nAdding some\r\nDOS format lines\r\n')
644        with open('b1/sub/dir/nolastnewline.txt', 'ab') as f:
645            f.write(b'\n')
646        self.tree1.rename_one('sub/dir/ pre space',
647                              'sub/ start space')
648        self.tree1.commit('Modified files', rev_id=b'a@cset-0-5')
649        bundle = self.get_valid_bundle(b'a@cset-0-4', b'a@cset-0-5')
650
651        self.tree1.rename_one('sub/dir/WithCaps.txt', 'temp')
652        self.tree1.rename_one('with space.txt', 'WithCaps.txt')
653        self.tree1.rename_one('temp', 'with space.txt')
654        self.tree1.commit(u'swap filenames', rev_id=b'a@cset-0-6',
655                          verbose=False)
656        bundle = self.get_valid_bundle(b'a@cset-0-5', b'a@cset-0-6')
657        other = self.get_checkout(b'a@cset-0-5')
658        tree1_inv = get_inventory_text(self.tree1.branch.repository,
659                                       b'a@cset-0-5')
660        tree2_inv = get_inventory_text(other.branch.repository,
661                                       b'a@cset-0-5')
662        self.assertEqualDiff(tree1_inv, tree2_inv)
663        other.rename_one('sub/dir/nolastnewline.txt', 'sub/nolastnewline.txt')
664        other.commit('rename file', rev_id=b'a@cset-0-6b')
665        self.tree1.merge_from_branch(other.branch)
666        self.tree1.commit(u'Merge', rev_id=b'a@cset-0-7',
667                          verbose=False)
668        bundle = self.get_valid_bundle(b'a@cset-0-6', b'a@cset-0-7')
669
670    def _test_symlink_bundle(self, link_name, link_target, new_link_target):
671        link_id = b'link-1'
672
673        self.requireFeature(features.SymlinkFeature)
674        self.tree1 = self.make_branch_and_tree('b1')
675        self.b1 = self.tree1.branch
676
677        tt = self.tree1.transform()
678        tt.new_symlink(link_name, tt.root, link_target, link_id)
679        tt.apply()
680        self.tree1.commit('add symlink', rev_id=b'l@cset-0-1')
681        bundle = self.get_valid_bundle(b'null:', b'l@cset-0-1')
682        if getattr(bundle, 'revision_tree', None) is not None:
683            # Not all bundle formats supports revision_tree
684            bund_tree = bundle.revision_tree(self.b1.repository, b'l@cset-0-1')
685            self.assertEqual(
686                link_target, bund_tree.get_symlink_target(link_name))
687
688        tt = self.tree1.transform()
689        trans_id = tt.trans_id_tree_path(link_name)
690        tt.adjust_path('link2', tt.root, trans_id)
691        tt.delete_contents(trans_id)
692        tt.create_symlink(new_link_target, trans_id)
693        tt.apply()
694        self.tree1.commit('rename and change symlink', rev_id=b'l@cset-0-2')
695        bundle = self.get_valid_bundle(b'l@cset-0-1', b'l@cset-0-2')
696        if getattr(bundle, 'revision_tree', None) is not None:
697            # Not all bundle formats supports revision_tree
698            bund_tree = bundle.revision_tree(self.b1.repository, b'l@cset-0-2')
699            self.assertEqual(new_link_target,
700                             bund_tree.get_symlink_target('link2'))
701
702        tt = self.tree1.transform()
703        trans_id = tt.trans_id_tree_path('link2')
704        tt.delete_contents(trans_id)
705        tt.create_symlink('jupiter', trans_id)
706        tt.apply()
707        self.tree1.commit('just change symlink target', rev_id=b'l@cset-0-3')
708        bundle = self.get_valid_bundle(b'l@cset-0-2', b'l@cset-0-3')
709
710        tt = self.tree1.transform()
711        trans_id = tt.trans_id_tree_path('link2')
712        tt.delete_contents(trans_id)
713        tt.apply()
714        self.tree1.commit('Delete symlink', rev_id=b'l@cset-0-4')
715        bundle = self.get_valid_bundle(b'l@cset-0-3', b'l@cset-0-4')
716
717    def test_symlink_bundle(self):
718        self._test_symlink_bundle('link', 'bar/foo', 'mars')
719
720    def test_unicode_symlink_bundle(self):
721        self.requireFeature(features.UnicodeFilenameFeature)
722        self._test_symlink_bundle(u'\N{Euro Sign}link',
723                                  u'bar/\N{Euro Sign}foo',
724                                  u'mars\N{Euro Sign}')
725
726    def test_binary_bundle(self):
727        self.tree1 = self.make_branch_and_tree('b1')
728        self.b1 = self.tree1.branch
729        tt = self.tree1.transform()
730
731        # Add
732        tt.new_file('file', tt.root, [
733                    b'\x00\n\x00\r\x01\n\x02\r\xff'], b'binary-1')
734        tt.new_file('file2', tt.root, [b'\x01\n\x02\r\x03\n\x04\r\xff'],
735                    b'binary-2')
736        tt.apply()
737        self.tree1.commit('add binary', rev_id=b'b@cset-0-1')
738        self.get_valid_bundle(b'null:', b'b@cset-0-1')
739
740        # Delete
741        tt = self.tree1.transform()
742        trans_id = tt.trans_id_tree_path('file')
743        tt.delete_contents(trans_id)
744        tt.apply()
745        self.tree1.commit('delete binary', rev_id=b'b@cset-0-2')
746        self.get_valid_bundle(b'b@cset-0-1', b'b@cset-0-2')
747
748        # Rename & modify
749        tt = self.tree1.transform()
750        trans_id = tt.trans_id_tree_path('file2')
751        tt.adjust_path('file3', tt.root, trans_id)
752        tt.delete_contents(trans_id)
753        tt.create_file([b'file\rcontents\x00\n\x00'], trans_id)
754        tt.apply()
755        self.tree1.commit('rename and modify binary', rev_id=b'b@cset-0-3')
756        self.get_valid_bundle(b'b@cset-0-2', b'b@cset-0-3')
757
758        # Modify
759        tt = self.tree1.transform()
760        trans_id = tt.trans_id_tree_path('file3')
761        tt.delete_contents(trans_id)
762        tt.create_file([b'\x00file\rcontents'], trans_id)
763        tt.apply()
764        self.tree1.commit('just modify binary', rev_id=b'b@cset-0-4')
765        self.get_valid_bundle(b'b@cset-0-3', b'b@cset-0-4')
766
767        # Rollup
768        self.get_valid_bundle(b'null:', b'b@cset-0-4')
769
770    def test_last_modified(self):
771        self.tree1 = self.make_branch_and_tree('b1')
772        self.b1 = self.tree1.branch
773        tt = self.tree1.transform()
774        tt.new_file('file', tt.root, [b'file'], b'file')
775        tt.apply()
776        self.tree1.commit('create file', rev_id=b'a@lmod-0-1')
777
778        tt = self.tree1.transform()
779        trans_id = tt.trans_id_tree_path('file')
780        tt.delete_contents(trans_id)
781        tt.create_file([b'file2'], trans_id)
782        tt.apply()
783        self.tree1.commit('modify text', rev_id=b'a@lmod-0-2a')
784
785        other = self.get_checkout(b'a@lmod-0-1')
786        tt = other.transform()
787        trans_id = tt.trans_id_tree_path('file2')
788        tt.delete_contents(trans_id)
789        tt.create_file([b'file2'], trans_id)
790        tt.apply()
791        other.commit('modify text in another tree', rev_id=b'a@lmod-0-2b')
792        self.tree1.merge_from_branch(other.branch)
793        self.tree1.commit(u'Merge', rev_id=b'a@lmod-0-3',
794                          verbose=False)
795        self.tree1.commit(u'Merge', rev_id=b'a@lmod-0-4')
796        bundle = self.get_valid_bundle(b'a@lmod-0-2a', b'a@lmod-0-4')
797
798    def test_hide_history(self):
799        self.tree1 = self.make_branch_and_tree('b1')
800        self.b1 = self.tree1.branch
801
802        with open('b1/one', 'wb') as f:
803            f.write(b'one\n')
804        self.tree1.add('one')
805        self.tree1.commit('add file', rev_id=b'a@cset-0-1')
806        with open('b1/one', 'wb') as f:
807            f.write(b'two\n')
808        self.tree1.commit('modify', rev_id=b'a@cset-0-2')
809        with open('b1/one', 'wb') as f:
810            f.write(b'three\n')
811        self.tree1.commit('modify', rev_id=b'a@cset-0-3')
812        bundle_file = BytesIO()
813        rev_ids = write_bundle(self.tree1.branch.repository, b'a@cset-0-3',
814                               b'a@cset-0-1', bundle_file, format=self.format)
815        self.assertNotContainsRe(bundle_file.getvalue(), b'\btwo\b')
816        self.assertContainsRe(self.get_raw(bundle_file), b'one')
817        self.assertContainsRe(self.get_raw(bundle_file), b'three')
818
819    def test_bundle_same_basis(self):
820        """Ensure using the basis as the target doesn't cause an error"""
821        self.tree1 = self.make_branch_and_tree('b1')
822        self.tree1.commit('add file', rev_id=b'a@cset-0-1')
823        bundle_file = BytesIO()
824        rev_ids = write_bundle(self.tree1.branch.repository, b'a@cset-0-1',
825                               b'a@cset-0-1', bundle_file)
826
827    @staticmethod
828    def get_raw(bundle_file):
829        return bundle_file.getvalue()
830
831    def test_unicode_bundle(self):
832        self.requireFeature(features.UnicodeFilenameFeature)
833        # Handle international characters
834        os.mkdir('b1')
835        f = open(u'b1/with Dod\N{Euro Sign}', 'wb')
836
837        self.tree1 = self.make_branch_and_tree('b1')
838        self.b1 = self.tree1.branch
839
840        f.write((u'A file\n'
841                 u'With international man of mystery\n'
842                 u'William Dod\xe9\n').encode('utf-8'))
843        f.close()
844
845        self.tree1.add([u'with Dod\N{Euro Sign}'], [b'withdod-id'])
846        self.tree1.commit(u'i18n commit from William Dod\xe9',
847                          rev_id=b'i18n-1', committer=u'William Dod\xe9')
848
849        # Add
850        bundle = self.get_valid_bundle(b'null:', b'i18n-1')
851
852        # Modified
853        f = open(u'b1/with Dod\N{Euro Sign}', 'wb')
854        f.write(u'Modified \xb5\n'.encode('utf8'))
855        f.close()
856        self.tree1.commit(u'modified', rev_id=b'i18n-2')
857
858        bundle = self.get_valid_bundle(b'i18n-1', b'i18n-2')
859
860        # Renamed
861        self.tree1.rename_one(u'with Dod\N{Euro Sign}', u'B\N{Euro Sign}gfors')
862        self.tree1.commit(u'renamed, the new i18n man', rev_id=b'i18n-3',
863                          committer=u'Erik B\xe5gfors')
864
865        bundle = self.get_valid_bundle(b'i18n-2', b'i18n-3')
866
867        # Removed
868        self.tree1.remove([u'B\N{Euro Sign}gfors'])
869        self.tree1.commit(u'removed', rev_id=b'i18n-4')
870
871        bundle = self.get_valid_bundle(b'i18n-3', b'i18n-4')
872
873        # Rollup
874        bundle = self.get_valid_bundle(b'null:', b'i18n-4')
875
876    def test_whitespace_bundle(self):
877        if sys.platform in ('win32', 'cygwin'):
878            raise tests.TestSkipped('Windows doesn\'t support filenames'
879                                    ' with tabs or trailing spaces')
880        self.tree1 = self.make_branch_and_tree('b1')
881        self.b1 = self.tree1.branch
882
883        self.build_tree(['b1/trailing space '])
884        self.tree1.add(['trailing space '])
885        # TODO: jam 20060701 Check for handling files with '\t' characters
886        #       once we actually support them
887
888        # Added
889        self.tree1.commit('funky whitespace', rev_id=b'white-1')
890
891        bundle = self.get_valid_bundle(b'null:', b'white-1')
892
893        # Modified
894        with open('b1/trailing space ', 'ab') as f:
895            f.write(b'add some text\n')
896        self.tree1.commit('add text', rev_id=b'white-2')
897
898        bundle = self.get_valid_bundle(b'white-1', b'white-2')
899
900        # Renamed
901        self.tree1.rename_one('trailing space ', ' start and end space ')
902        self.tree1.commit('rename', rev_id=b'white-3')
903
904        bundle = self.get_valid_bundle(b'white-2', b'white-3')
905
906        # Removed
907        self.tree1.remove([' start and end space '])
908        self.tree1.commit('removed', rev_id=b'white-4')
909
910        bundle = self.get_valid_bundle(b'white-3', b'white-4')
911
912        # Now test a complet roll-up
913        bundle = self.get_valid_bundle(b'null:', b'white-4')
914
915    def test_alt_timezone_bundle(self):
916        self.tree1 = self.make_branch_and_memory_tree('b1')
917        self.b1 = self.tree1.branch
918        builder = treebuilder.TreeBuilder()
919
920        self.tree1.lock_write()
921        builder.start_tree(self.tree1)
922        builder.build(['newfile'])
923        builder.finish_tree()
924
925        # Asia/Colombo offset = 5 hours 30 minutes
926        self.tree1.commit('non-hour offset timezone', rev_id=b'tz-1',
927                          timezone=19800, timestamp=1152544886.0)
928
929        bundle = self.get_valid_bundle(b'null:', b'tz-1')
930
931        rev = bundle.revisions[0]
932        self.assertEqual('Mon 2006-07-10 20:51:26.000000000 +0530', rev.date)
933        self.assertEqual(19800, rev.timezone)
934        self.assertEqual(1152544886.0, rev.timestamp)
935        self.tree1.unlock()
936
937    def test_bundle_root_id(self):
938        self.tree1 = self.make_branch_and_tree('b1')
939        self.b1 = self.tree1.branch
940        self.tree1.commit('message', rev_id=b'revid1')
941        bundle = self.get_valid_bundle(b'null:', b'revid1')
942        tree = self.get_bundle_tree(bundle, b'revid1')
943        root_revision = tree.get_file_revision(u'')
944        self.assertEqual(b'revid1', root_revision)
945
946    def test_install_revisions(self):
947        self.tree1 = self.make_branch_and_tree('b1')
948        self.b1 = self.tree1.branch
949        self.tree1.commit('message', rev_id=b'rev2a')
950        bundle = self.get_valid_bundle(b'null:', b'rev2a')
951        branch2 = self.make_branch('b2')
952        self.assertFalse(branch2.repository.has_revision(b'rev2a'))
953        target_revision = bundle.install_revisions(branch2.repository)
954        self.assertTrue(branch2.repository.has_revision(b'rev2a'))
955        self.assertEqual(b'rev2a', target_revision)
956
957    def test_bundle_empty_property(self):
958        """Test serializing revision properties with an empty value."""
959        tree = self.make_branch_and_memory_tree('tree')
960        tree.lock_write()
961        self.addCleanup(tree.unlock)
962        tree.add([''], [b'TREE_ROOT'])
963        tree.commit('One', revprops={u'one': 'two',
964                                     u'empty': ''}, rev_id=b'rev1')
965        self.b1 = tree.branch
966        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
967        bundle = read_bundle(bundle_sio)
968        revision_info = bundle.revisions[0]
969        self.assertEqual(b'rev1', revision_info.revision_id)
970        rev = revision_info.as_revision()
971        self.assertEqual({'branch-nick': 'tree', 'empty': '', 'one': 'two'},
972                         rev.properties)
973
974    def test_bundle_sorted_properties(self):
975        """For stability the writer should write properties in sorted order."""
976        tree = self.make_branch_and_memory_tree('tree')
977        tree.lock_write()
978        self.addCleanup(tree.unlock)
979
980        tree.add([''], [b'TREE_ROOT'])
981        tree.commit('One', rev_id=b'rev1',
982                    revprops={u'a': '4', u'b': '3', u'c': '2', u'd': '1'})
983        self.b1 = tree.branch
984        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
985        bundle = read_bundle(bundle_sio)
986        revision_info = bundle.revisions[0]
987        self.assertEqual(b'rev1', revision_info.revision_id)
988        rev = revision_info.as_revision()
989        self.assertEqual({'branch-nick': 'tree', 'a': '4', 'b': '3', 'c': '2',
990                          'd': '1'}, rev.properties)
991
992    def test_bundle_unicode_properties(self):
993        """We should be able to round trip a non-ascii property."""
994        tree = self.make_branch_and_memory_tree('tree')
995        tree.lock_write()
996        self.addCleanup(tree.unlock)
997
998        tree.add([''], [b'TREE_ROOT'])
999        # Revisions themselves do not require anything about revision property
1000        # keys, other than that they are a basestring, and do not contain
1001        # whitespace.
1002        # However, Testaments assert than they are str(), and thus should not
1003        # be Unicode.
1004        tree.commit('One', rev_id=b'rev1',
1005                    revprops={u'omega': u'\u03a9', u'alpha': u'\u03b1'})
1006        self.b1 = tree.branch
1007        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
1008        bundle = read_bundle(bundle_sio)
1009        revision_info = bundle.revisions[0]
1010        self.assertEqual(b'rev1', revision_info.revision_id)
1011        rev = revision_info.as_revision()
1012        self.assertEqual({'branch-nick': 'tree', 'omega': u'\u03a9',
1013                          'alpha': u'\u03b1'}, rev.properties)
1014
1015    def test_bundle_with_ghosts(self):
1016        tree = self.make_branch_and_tree('tree')
1017        self.b1 = tree.branch
1018        self.build_tree_contents([('tree/file', b'content1')])
1019        tree.add(['file'])
1020        tree.commit('rev1')
1021        self.build_tree_contents([('tree/file', b'content2')])
1022        tree.add_parent_tree_id(b'ghost')
1023        tree.commit('rev2', rev_id=b'rev2')
1024        bundle = self.get_valid_bundle(b'null:', b'rev2')
1025
1026    def make_simple_tree(self, format=None):
1027        tree = self.make_branch_and_tree('b1', format=format)
1028        self.b1 = tree.branch
1029        self.build_tree(['b1/file'])
1030        tree.add('file')
1031        return tree
1032
1033    def test_across_serializers(self):
1034        tree = self.make_simple_tree('knit')
1035        tree.commit('hello', rev_id=b'rev1')
1036        tree.commit('hello', rev_id=b'rev2')
1037        bundle = read_bundle(self.create_bundle_text(b'null:', b'rev2')[0])
1038        repo = self.make_repository('repo', format='dirstate-with-subtree')
1039        bundle.install_revisions(repo)
1040        inv_text = b''.join(repo._get_inventory_xml(b'rev2'))
1041        self.assertNotContainsRe(inv_text, b'format="5"')
1042        self.assertContainsRe(inv_text, b'format="7"')
1043
1044    def make_repo_with_installed_revisions(self):
1045        tree = self.make_simple_tree('knit')
1046        tree.commit('hello', rev_id=b'rev1')
1047        tree.commit('hello', rev_id=b'rev2')
1048        bundle = read_bundle(self.create_bundle_text(b'null:', b'rev2')[0])
1049        repo = self.make_repository('repo', format='dirstate-with-subtree')
1050        bundle.install_revisions(repo)
1051        return repo
1052
1053    def test_across_models(self):
1054        repo = self.make_repo_with_installed_revisions()
1055        inv = repo.get_inventory(b'rev2')
1056        self.assertEqual(b'rev2', inv.root.revision)
1057        root_id = inv.root.file_id
1058        repo.lock_read()
1059        self.addCleanup(repo.unlock)
1060        self.assertEqual({(root_id, b'rev1'): (),
1061                          (root_id, b'rev2'): ((root_id, b'rev1'),)},
1062                         repo.texts.get_parent_map([(root_id, b'rev1'), (root_id, b'rev2')]))
1063
1064    def test_inv_hash_across_serializers(self):
1065        repo = self.make_repo_with_installed_revisions()
1066        recorded_inv_sha1 = repo.get_revision(b'rev2').inventory_sha1
1067        xml = b''.join(repo._get_inventory_xml(b'rev2'))
1068        self.assertEqual(osutils.sha_string(xml), recorded_inv_sha1)
1069
1070    def test_across_models_incompatible(self):
1071        tree = self.make_simple_tree('dirstate-with-subtree')
1072        tree.commit('hello', rev_id=b'rev1')
1073        tree.commit('hello', rev_id=b'rev2')
1074        try:
1075            bundle = read_bundle(self.create_bundle_text(b'null:', b'rev1')[0])
1076        except errors.IncompatibleBundleFormat:
1077            raise tests.TestSkipped("Format 0.8 doesn't work with knit3")
1078        repo = self.make_repository('repo', format='knit')
1079        bundle.install_revisions(repo)
1080
1081        bundle = read_bundle(self.create_bundle_text(b'null:', b'rev2')[0])
1082        self.assertRaises(errors.IncompatibleRevision,
1083                          bundle.install_revisions, repo)
1084
1085    def test_get_merge_request(self):
1086        tree = self.make_simple_tree()
1087        tree.commit('hello', rev_id=b'rev1')
1088        tree.commit('hello', rev_id=b'rev2')
1089        bundle = read_bundle(self.create_bundle_text(b'null:', b'rev1')[0])
1090        result = bundle.get_merge_request(tree.branch.repository)
1091        self.assertEqual((None, b'rev1', 'inapplicable'), result)
1092
1093    def test_with_subtree(self):
1094        tree = self.make_branch_and_tree('tree',
1095                                         format='dirstate-with-subtree')
1096        self.b1 = tree.branch
1097        subtree = self.make_branch_and_tree('tree/subtree',
1098                                            format='dirstate-with-subtree')
1099        tree.add('subtree')
1100        tree.commit('hello', rev_id=b'rev1')
1101        try:
1102            bundle = read_bundle(self.create_bundle_text(b'null:', b'rev1')[0])
1103        except errors.IncompatibleBundleFormat:
1104            raise tests.TestSkipped("Format 0.8 doesn't work with knit3")
1105        if isinstance(bundle, v09.BundleInfo09):
1106            raise tests.TestSkipped("Format 0.9 doesn't work with subtrees")
1107        repo = self.make_repository('repo', format='knit')
1108        self.assertRaises(errors.IncompatibleRevision,
1109                          bundle.install_revisions, repo)
1110        repo2 = self.make_repository('repo2', format='dirstate-with-subtree')
1111        bundle.install_revisions(repo2)
1112
1113    def test_revision_id_with_slash(self):
1114        self.tree1 = self.make_branch_and_tree('tree')
1115        self.b1 = self.tree1.branch
1116        try:
1117            self.tree1.commit('Revision/id/with/slashes', rev_id=b'rev/id')
1118        except ValueError:
1119            raise tests.TestSkipped(
1120                "Repository doesn't support revision ids with slashes")
1121        bundle = self.get_valid_bundle(b'null:', b'rev/id')
1122
1123    def test_skip_file(self):
1124        """Make sure we don't accidentally write to the wrong versionedfile"""
1125        self.tree1 = self.make_branch_and_tree('tree')
1126        self.b1 = self.tree1.branch
1127        # rev1 is not present in bundle, done by fetch
1128        self.build_tree_contents([('tree/file2', b'contents1')])
1129        self.tree1.add('file2', b'file2-id')
1130        self.tree1.commit('rev1', rev_id=b'reva')
1131        self.build_tree_contents([('tree/file3', b'contents2')])
1132        # rev2 is present in bundle, and done by fetch
1133        # having file1 in the bunle causes file1's versionedfile to be opened.
1134        self.tree1.add('file3', b'file3-id')
1135        rev2 = self.tree1.commit('rev2')
1136        # Updating file2 should not cause an attempt to add to file1's vf
1137        target = self.tree1.controldir.sprout('target').open_workingtree()
1138        self.build_tree_contents([('tree/file2', b'contents3')])
1139        self.tree1.commit('rev3', rev_id=b'rev3')
1140        bundle = self.get_valid_bundle(b'reva', b'rev3')
1141        if getattr(bundle, 'get_bundle_reader', None) is None:
1142            raise tests.TestSkipped('Bundle format cannot provide reader')
1143        file_ids = set(
1144            (f, r) for b, m, k, r, f in bundle.get_bundle_reader().iter_records()
1145            if f is not None)
1146        self.assertEqual(
1147            {(b'file2-id', b'rev3'), (b'file3-id', rev2)}, file_ids)
1148        bundle.install_revisions(target.branch.repository)
1149
1150
1151class V08BundleTester(BundleTester, tests.TestCaseWithTransport):
1152
1153    format = '0.8'
1154
1155    def test_bundle_empty_property(self):
1156        """Test serializing revision properties with an empty value."""
1157        tree = self.make_branch_and_memory_tree('tree')
1158        tree.lock_write()
1159        self.addCleanup(tree.unlock)
1160        tree.add([''], [b'TREE_ROOT'])
1161        tree.commit('One', revprops={u'one': 'two',
1162                                     u'empty': ''}, rev_id=b'rev1')
1163        self.b1 = tree.branch
1164        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
1165        self.assertContainsRe(bundle_sio.getvalue(),
1166                              b'# properties:\n'
1167                              b'#   branch-nick: tree\n'
1168                              b'#   empty: \n'
1169                              b'#   one: two\n'
1170                              )
1171        bundle = read_bundle(bundle_sio)
1172        revision_info = bundle.revisions[0]
1173        self.assertEqual(b'rev1', revision_info.revision_id)
1174        rev = revision_info.as_revision()
1175        self.assertEqual({'branch-nick': 'tree', 'empty': '', 'one': 'two'},
1176                         rev.properties)
1177
1178    def get_bundle_tree(self, bundle, revision_id):
1179        repository = self.make_repository('repo')
1180        return bundle.revision_tree(repository, b'revid1')
1181
1182    def test_bundle_empty_property_alt(self):
1183        """Test serializing revision properties with an empty value.
1184
1185        Older readers had a bug when reading an empty property.
1186        They assumed that all keys ended in ': \n'. However they would write an
1187        empty value as ':\n'. This tests make sure that all newer bzr versions
1188        can handle th second form.
1189        """
1190        tree = self.make_branch_and_memory_tree('tree')
1191        tree.lock_write()
1192        self.addCleanup(tree.unlock)
1193        tree.add([''], [b'TREE_ROOT'])
1194        tree.commit('One', revprops={u'one': 'two',
1195                                     u'empty': ''}, rev_id=b'rev1')
1196        self.b1 = tree.branch
1197        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
1198        txt = bundle_sio.getvalue()
1199        loc = txt.find(b'#   empty: ') + len(b'#   empty:')
1200        # Create a new bundle, which strips the trailing space after empty
1201        bundle_sio = BytesIO(txt[:loc] + txt[loc + 1:])
1202
1203        self.assertContainsRe(bundle_sio.getvalue(),
1204                              b'# properties:\n'
1205                              b'#   branch-nick: tree\n'
1206                              b'#   empty:\n'
1207                              b'#   one: two\n'
1208                              )
1209        bundle = read_bundle(bundle_sio)
1210        revision_info = bundle.revisions[0]
1211        self.assertEqual(b'rev1', revision_info.revision_id)
1212        rev = revision_info.as_revision()
1213        self.assertEqual({'branch-nick': 'tree', 'empty': '', 'one': 'two'},
1214                         rev.properties)
1215
1216    def test_bundle_sorted_properties(self):
1217        """For stability the writer should write properties in sorted order."""
1218        tree = self.make_branch_and_memory_tree('tree')
1219        tree.lock_write()
1220        self.addCleanup(tree.unlock)
1221
1222        tree.add([''], [b'TREE_ROOT'])
1223        tree.commit('One', rev_id=b'rev1',
1224                    revprops={u'a': '4', u'b': '3', u'c': '2', u'd': '1'})
1225        self.b1 = tree.branch
1226        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
1227        self.assertContainsRe(bundle_sio.getvalue(),
1228                              b'# properties:\n'
1229                              b'#   a: 4\n'
1230                              b'#   b: 3\n'
1231                              b'#   branch-nick: tree\n'
1232                              b'#   c: 2\n'
1233                              b'#   d: 1\n'
1234                              )
1235        bundle = read_bundle(bundle_sio)
1236        revision_info = bundle.revisions[0]
1237        self.assertEqual(b'rev1', revision_info.revision_id)
1238        rev = revision_info.as_revision()
1239        self.assertEqual({'branch-nick': 'tree', 'a': '4', 'b': '3', 'c': '2',
1240                          'd': '1'}, rev.properties)
1241
1242    def test_bundle_unicode_properties(self):
1243        """We should be able to round trip a non-ascii property."""
1244        tree = self.make_branch_and_memory_tree('tree')
1245        tree.lock_write()
1246        self.addCleanup(tree.unlock)
1247
1248        tree.add([''], [b'TREE_ROOT'])
1249        # Revisions themselves do not require anything about revision property
1250        # keys, other than that they are a basestring, and do not contain
1251        # whitespace.
1252        # However, Testaments assert than they are str(), and thus should not
1253        # be Unicode.
1254        tree.commit('One', rev_id=b'rev1',
1255                    revprops={u'omega': u'\u03a9', u'alpha': u'\u03b1'})
1256        self.b1 = tree.branch
1257        bundle_sio, revision_ids = self.create_bundle_text(b'null:', b'rev1')
1258        self.assertContainsRe(bundle_sio.getvalue(),
1259                              b'# properties:\n'
1260                              b'#   alpha: \xce\xb1\n'
1261                              b'#   branch-nick: tree\n'
1262                              b'#   omega: \xce\xa9\n'
1263                              )
1264        bundle = read_bundle(bundle_sio)
1265        revision_info = bundle.revisions[0]
1266        self.assertEqual(b'rev1', revision_info.revision_id)
1267        rev = revision_info.as_revision()
1268        self.assertEqual({'branch-nick': 'tree', 'omega': u'\u03a9',
1269                          'alpha': u'\u03b1'}, rev.properties)
1270
1271
1272class V09BundleKnit2Tester(V08BundleTester):
1273
1274    format = '0.9'
1275
1276    def bzrdir_format(self):
1277        format = bzrdir.BzrDirMetaFormat1()
1278        format.repository_format = knitrepo.RepositoryFormatKnit3()
1279        return format
1280
1281
1282class V09BundleKnit1Tester(V08BundleTester):
1283
1284    format = '0.9'
1285
1286    def bzrdir_format(self):
1287        format = bzrdir.BzrDirMetaFormat1()
1288        format.repository_format = knitrepo.RepositoryFormatKnit1()
1289        return format
1290
1291
1292class V4BundleTester(BundleTester, tests.TestCaseWithTransport):
1293
1294    format = '4'
1295
1296    def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None):
1297        """Create a bundle from base_rev_id -> rev_id in built-in branch.
1298        Make sure that the text generated is valid, and that it
1299        can be applied against the base, and generate the same information.
1300
1301        :return: The in-memory bundle
1302        """
1303        bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
1304
1305        # This should also validate the generated bundle
1306        bundle = read_bundle(bundle_txt)
1307        repository = self.b1.repository
1308        for bundle_rev in bundle.real_revisions:
1309            # These really should have already been checked when we read the
1310            # bundle, since it computes the sha1 hash for the revision, which
1311            # only will match if everything is okay, but lets be explicit about
1312            # it
1313            branch_rev = repository.get_revision(bundle_rev.revision_id)
1314            for a in ('inventory_sha1', 'revision_id', 'parent_ids',
1315                      'timestamp', 'timezone', 'message', 'committer',
1316                      'parent_ids', 'properties'):
1317                self.assertEqual(getattr(branch_rev, a),
1318                                 getattr(bundle_rev, a))
1319            self.assertEqual(len(branch_rev.parent_ids),
1320                             len(bundle_rev.parent_ids))
1321        self.assertEqual(set(rev_ids),
1322                         {r.revision_id for r in bundle.real_revisions})
1323        self.valid_apply_bundle(base_rev_id, bundle,
1324                                checkout_dir=checkout_dir)
1325
1326        return bundle
1327
1328    def get_invalid_bundle(self, base_rev_id, rev_id):
1329        """Create a bundle from base_rev_id -> rev_id in built-in branch.
1330        Munge the text so that it's invalid.
1331
1332        :return: The in-memory bundle
1333        """
1334        from ..bundle import serializer
1335        bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
1336        new_text = self.get_raw(BytesIO(b''.join(bundle_txt)))
1337        new_text = new_text.replace(b'<file file_id="exe-1"',
1338                                    b'<file executable="y" file_id="exe-1"')
1339        new_text = new_text.replace(b'B260', b'B275')
1340        bundle_txt = BytesIO()
1341        bundle_txt.write(serializer._get_bundle_header('4'))
1342        bundle_txt.write(b'\n')
1343        bundle_txt.write(bz2.compress(new_text))
1344        bundle_txt.seek(0)
1345        bundle = read_bundle(bundle_txt)
1346        self.valid_apply_bundle(base_rev_id, bundle)
1347        return bundle
1348
1349    def create_bundle_text(self, base_rev_id, rev_id):
1350        bundle_txt = BytesIO()
1351        rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
1352                               bundle_txt, format=self.format)
1353        bundle_txt.seek(0)
1354        self.assertEqual(bundle_txt.readline(),
1355                         b'# Bazaar revision bundle v%s\n' % self.format.encode('ascii'))
1356        self.assertEqual(bundle_txt.readline(), b'#\n')
1357        rev = self.b1.repository.get_revision(rev_id)
1358        bundle_txt.seek(0)
1359        return bundle_txt, rev_ids
1360
1361    def get_bundle_tree(self, bundle, revision_id):
1362        repository = self.make_repository('repo')
1363        bundle.install_revisions(repository)
1364        return repository.revision_tree(revision_id)
1365
1366    def test_creation(self):
1367        tree = self.make_branch_and_tree('tree')
1368        self.build_tree_contents([('tree/file', b'contents1\nstatic\n')])
1369        tree.add('file', b'fileid-2')
1370        tree.commit('added file', rev_id=b'rev1')
1371        self.build_tree_contents([('tree/file', b'contents2\nstatic\n')])
1372        tree.commit('changed file', rev_id=b'rev2')
1373        s = BytesIO()
1374        serializer = BundleSerializerV4('1.0')
1375        with tree.lock_read():
1376            serializer.write_bundle(
1377                tree.branch.repository, b'rev2', b'null:', s)
1378        s.seek(0)
1379        tree2 = self.make_branch_and_tree('target')
1380        target_repo = tree2.branch.repository
1381        install_bundle(target_repo, serializer.read(s))
1382        target_repo.lock_read()
1383        self.addCleanup(target_repo.unlock)
1384        # Turn the 'iterators_of_bytes' back into simple strings for comparison
1385        repo_texts = dict((i, b''.join(content)) for i, content
1386                          in target_repo.iter_files_bytes(
1387            [(b'fileid-2', b'rev1', '1'),
1388             (b'fileid-2', b'rev2', '2')]))
1389        self.assertEqual({'1': b'contents1\nstatic\n',
1390                          '2': b'contents2\nstatic\n'},
1391                         repo_texts)
1392        rtree = target_repo.revision_tree(b'rev2')
1393        inventory_vf = target_repo.inventories
1394        # If the inventory store has a graph, it must match the revision graph.
1395        self.assertSubset(
1396            [inventory_vf.get_parent_map([(b'rev2',)])[(b'rev2',)]],
1397            [None, ((b'rev1',),)])
1398        self.assertEqual('changed file',
1399                         target_repo.get_revision(b'rev2').message)
1400
1401    @staticmethod
1402    def get_raw(bundle_file):
1403        bundle_file.seek(0)
1404        line = bundle_file.readline()
1405        line = bundle_file.readline()
1406        lines = bundle_file.readlines()
1407        return bz2.decompress(b''.join(lines))
1408
1409    def test_copy_signatures(self):
1410        tree_a = self.make_branch_and_tree('tree_a')
1411        import breezy.gpg
1412        import breezy.commit as commit
1413        oldstrategy = breezy.gpg.GPGStrategy
1414        branch = tree_a.branch
1415        repo_a = branch.repository
1416        tree_a.commit("base", allow_pointless=True, rev_id=b'A')
1417        self.assertFalse(branch.repository.has_signature_for_revision_id(b'A'))
1418        try:
1419            from ..testament import Testament
1420            # monkey patch gpg signing mechanism
1421            breezy.gpg.GPGStrategy = breezy.gpg.LoopbackGPGStrategy
1422            new_config = test_commit.MustSignConfig()
1423            commit.Commit(config_stack=new_config).commit(message="base",
1424                                                          allow_pointless=True,
1425                                                          rev_id=b'B',
1426                                                          working_tree=tree_a)
1427
1428            def sign(text):
1429                return breezy.gpg.LoopbackGPGStrategy(None).sign(text)
1430            self.assertTrue(repo_a.has_signature_for_revision_id(b'B'))
1431        finally:
1432            breezy.gpg.GPGStrategy = oldstrategy
1433        tree_b = self.make_branch_and_tree('tree_b')
1434        repo_b = tree_b.branch.repository
1435        s = BytesIO()
1436        serializer = BundleSerializerV4('4')
1437        with tree_a.lock_read():
1438            serializer.write_bundle(
1439                tree_a.branch.repository, b'B', b'null:', s)
1440        s.seek(0)
1441        install_bundle(repo_b, serializer.read(s))
1442        self.assertTrue(repo_b.has_signature_for_revision_id(b'B'))
1443        self.assertEqual(repo_b.get_signature_text(b'B'),
1444                         repo_a.get_signature_text(b'B'))
1445        s.seek(0)
1446        # ensure repeat installs are harmless
1447        install_bundle(repo_b, serializer.read(s))
1448
1449
1450class V4_2aBundleTester(V4BundleTester):
1451
1452    def bzrdir_format(self):
1453        return '2a'
1454
1455    def get_invalid_bundle(self, base_rev_id, rev_id):
1456        """Create a bundle from base_rev_id -> rev_id in built-in branch.
1457        Munge the text so that it's invalid.
1458
1459        :return: The in-memory bundle
1460        """
1461        from ..bundle import serializer
1462        bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
1463        new_text = self.get_raw(BytesIO(b''.join(bundle_txt)))
1464        # We are going to be replacing some text to set the executable bit on a
1465        # file. Make sure the text replacement actually works correctly.
1466        self.assertContainsRe(new_text, b'(?m)B244\n\ni 1\n<inventory')
1467        new_text = new_text.replace(b'<file file_id="exe-1"',
1468                                    b'<file executable="y" file_id="exe-1"')
1469        new_text = new_text.replace(b'B244', b'B259')
1470        bundle_txt = BytesIO()
1471        bundle_txt.write(serializer._get_bundle_header('4'))
1472        bundle_txt.write(b'\n')
1473        bundle_txt.write(bz2.compress(new_text))
1474        bundle_txt.seek(0)
1475        bundle = read_bundle(bundle_txt)
1476        self.valid_apply_bundle(base_rev_id, bundle)
1477        return bundle
1478
1479    def make_merged_branch(self):
1480        builder = self.make_branch_builder('source')
1481        builder.start_series()
1482        builder.build_snapshot(None, [
1483            ('add', ('', b'root-id', 'directory', None)),
1484            ('add', ('file', b'file-id', 'file', b'original content\n')),
1485            ], revision_id=b'a@cset-0-1')
1486        builder.build_snapshot([b'a@cset-0-1'], [
1487            ('modify', ('file', b'new-content\n')),
1488            ], revision_id=b'a@cset-0-2a')
1489        builder.build_snapshot([b'a@cset-0-1'], [
1490            ('add', ('other-file', b'file2-id', 'file', b'file2-content\n')),
1491            ], revision_id=b'a@cset-0-2b')
1492        builder.build_snapshot([b'a@cset-0-2a', b'a@cset-0-2b'], [
1493            ('add', ('other-file', b'file2-id', 'file', b'file2-content\n')),
1494            ], revision_id=b'a@cset-0-3')
1495        builder.finish_series()
1496        self.b1 = builder.get_branch()
1497        self.b1.lock_read()
1498        self.addCleanup(self.b1.unlock)
1499
1500    def make_bundle_just_inventories(self, base_revision_id,
1501                                     target_revision_id,
1502                                     revision_ids):
1503        sio = BytesIO()
1504        writer = v4.BundleWriteOperation(base_revision_id, target_revision_id,
1505                                         self.b1.repository, sio)
1506        writer.bundle.begin()
1507        writer._add_inventory_mpdiffs_from_serializer(revision_ids)
1508        writer.bundle.end()
1509        sio.seek(0)
1510        return sio
1511
1512    def test_single_inventory_multiple_parents_as_xml(self):
1513        self.make_merged_branch()
1514        sio = self.make_bundle_just_inventories(b'a@cset-0-1', b'a@cset-0-3',
1515                                                [b'a@cset-0-3'])
1516        reader = v4.BundleReader(sio, stream_input=False)
1517        records = list(reader.iter_records())
1518        self.assertEqual(1, len(records))
1519        (bytes, metadata, repo_kind, revision_id,
1520         file_id) = records[0]
1521        self.assertIs(None, file_id)
1522        self.assertEqual(b'a@cset-0-3', revision_id)
1523        self.assertEqual('inventory', repo_kind)
1524        self.assertEqual({b'parents': [b'a@cset-0-2a', b'a@cset-0-2b'],
1525                          b'sha1': b'09c53b0c4de0895e11a2aacc34fef60a6e70865c',
1526                          b'storage_kind': b'mpdiff',
1527                          }, metadata)
1528        # We should have an mpdiff that takes some lines from both parents.
1529        self.assertEqualDiff(
1530            b'i 1\n'
1531            b'<inventory format="10" revision_id="a@cset-0-3">\n'
1532            b'\n'
1533            b'c 0 1 1 2\n'
1534            b'c 1 3 3 2\n', bytes)
1535
1536    def test_single_inv_no_parents_as_xml(self):
1537        self.make_merged_branch()
1538        sio = self.make_bundle_just_inventories(b'null:', b'a@cset-0-1',
1539                                                [b'a@cset-0-1'])
1540        reader = v4.BundleReader(sio, stream_input=False)
1541        records = list(reader.iter_records())
1542        self.assertEqual(1, len(records))
1543        (bytes, metadata, repo_kind, revision_id,
1544         file_id) = records[0]
1545        self.assertIs(None, file_id)
1546        self.assertEqual(b'a@cset-0-1', revision_id)
1547        self.assertEqual('inventory', repo_kind)
1548        self.assertEqual({b'parents': [],
1549                          b'sha1': b'a13f42b142d544aac9b085c42595d304150e31a2',
1550                          b'storage_kind': b'mpdiff',
1551                          }, metadata)
1552        # We should have an mpdiff that takes some lines from both parents.
1553        self.assertEqualDiff(
1554            b'i 4\n'
1555            b'<inventory format="10" revision_id="a@cset-0-1">\n'
1556            b'<directory file_id="root-id" name=""'
1557            b' revision="a@cset-0-1" />\n'
1558            b'<file file_id="file-id" name="file" parent_id="root-id"'
1559            b' revision="a@cset-0-1"'
1560            b' text_sha1="09c2f8647e14e49e922b955c194102070597c2d1"'
1561            b' text_size="17" />\n'
1562            b'</inventory>\n'
1563            b'\n', bytes)
1564
1565    def test_multiple_inventories_as_xml(self):
1566        self.make_merged_branch()
1567        sio = self.make_bundle_just_inventories(b'a@cset-0-1', b'a@cset-0-3',
1568                                                [b'a@cset-0-2a', b'a@cset-0-2b', b'a@cset-0-3'])
1569        reader = v4.BundleReader(sio, stream_input=False)
1570        records = list(reader.iter_records())
1571        self.assertEqual(3, len(records))
1572        revision_ids = [rev_id for b, m, k, rev_id, f in records]
1573        self.assertEqual([b'a@cset-0-2a', b'a@cset-0-2b', b'a@cset-0-3'],
1574                         revision_ids)
1575        metadata_2a = records[0][1]
1576        self.assertEqual({b'parents': [b'a@cset-0-1'],
1577                          b'sha1': b'1e105886d62d510763e22885eec733b66f5f09bf',
1578                          b'storage_kind': b'mpdiff',
1579                          }, metadata_2a)
1580        metadata_2b = records[1][1]
1581        self.assertEqual({b'parents': [b'a@cset-0-1'],
1582                          b'sha1': b'f03f12574bdb5ed2204c28636c98a8547544ccd8',
1583                          b'storage_kind': b'mpdiff',
1584                          }, metadata_2b)
1585        metadata_3 = records[2][1]
1586        self.assertEqual({b'parents': [b'a@cset-0-2a', b'a@cset-0-2b'],
1587                          b'sha1': b'09c53b0c4de0895e11a2aacc34fef60a6e70865c',
1588                          b'storage_kind': b'mpdiff',
1589                          }, metadata_3)
1590        bytes_2a = records[0][0]
1591        self.assertEqualDiff(
1592            b'i 1\n'
1593            b'<inventory format="10" revision_id="a@cset-0-2a">\n'
1594            b'\n'
1595            b'c 0 1 1 1\n'
1596            b'i 1\n'
1597            b'<file file_id="file-id" name="file" parent_id="root-id"'
1598            b' revision="a@cset-0-2a"'
1599            b' text_sha1="50f545ff40e57b6924b1f3174b267ffc4576e9a9"'
1600            b' text_size="12" />\n'
1601            b'\n'
1602            b'c 0 3 3 1\n', bytes_2a)
1603        bytes_2b = records[1][0]
1604        self.assertEqualDiff(
1605            b'i 1\n'
1606            b'<inventory format="10" revision_id="a@cset-0-2b">\n'
1607            b'\n'
1608            b'c 0 1 1 2\n'
1609            b'i 1\n'
1610            b'<file file_id="file2-id" name="other-file" parent_id="root-id"'
1611            b' revision="a@cset-0-2b"'
1612            b' text_sha1="b46c0c8ea1e5ef8e46fc8894bfd4752a88ec939e"'
1613            b' text_size="14" />\n'
1614            b'\n'
1615            b'c 0 3 4 1\n', bytes_2b)
1616        bytes_3 = records[2][0]
1617        self.assertEqualDiff(
1618            b'i 1\n'
1619            b'<inventory format="10" revision_id="a@cset-0-3">\n'
1620            b'\n'
1621            b'c 0 1 1 2\n'
1622            b'c 1 3 3 2\n', bytes_3)
1623
1624    def test_creating_bundle_preserves_chk_pages(self):
1625        self.make_merged_branch()
1626        target = self.b1.controldir.sprout('target',
1627                                           revision_id=b'a@cset-0-2a').open_branch()
1628        bundle_txt, rev_ids = self.create_bundle_text(b'a@cset-0-2a',
1629                                                      b'a@cset-0-3')
1630        self.assertEqual(set([b'a@cset-0-2b', b'a@cset-0-3']), set(rev_ids))
1631        bundle = read_bundle(bundle_txt)
1632        target.lock_write()
1633        self.addCleanup(target.unlock)
1634        install_bundle(target.repository, bundle)
1635        inv1 = next(self.b1.repository.inventories.get_record_stream([
1636            (b'a@cset-0-3',)], 'unordered',
1637            True)).get_bytes_as('fulltext')
1638        inv2 = next(target.repository.inventories.get_record_stream([
1639            (b'a@cset-0-3',)], 'unordered',
1640            True)).get_bytes_as('fulltext')
1641        self.assertEqualDiff(inv1, inv2)
1642
1643
1644class MungedBundleTester(object):
1645
1646    def build_test_bundle(self):
1647        wt = self.make_branch_and_tree('b1')
1648
1649        self.build_tree(['b1/one'])
1650        wt.add('one')
1651        wt.commit('add one', rev_id=b'a@cset-0-1')
1652        self.build_tree(['b1/two'])
1653        wt.add('two')
1654        wt.commit('add two', rev_id=b'a@cset-0-2',
1655                  revprops={u'branch-nick': 'test'})
1656
1657        bundle_txt = BytesIO()
1658        rev_ids = write_bundle(wt.branch.repository, b'a@cset-0-2',
1659                               b'a@cset-0-1', bundle_txt, self.format)
1660        self.assertEqual({b'a@cset-0-2'}, set(rev_ids))
1661        bundle_txt.seek(0, 0)
1662        return bundle_txt
1663
1664    def check_valid(self, bundle):
1665        """Check that after whatever munging, the final object is valid."""
1666        self.assertEqual([b'a@cset-0-2'],
1667                         [r.revision_id for r in bundle.real_revisions])
1668
1669    def test_extra_whitespace(self):
1670        bundle_txt = self.build_test_bundle()
1671
1672        # Seek to the end of the file
1673        # Adding one extra newline used to give us
1674        # TypeError: float() argument must be a string or a number
1675        bundle_txt.seek(0, 2)
1676        bundle_txt.write(b'\n')
1677        bundle_txt.seek(0)
1678
1679        bundle = read_bundle(bundle_txt)
1680        self.check_valid(bundle)
1681
1682    def test_extra_whitespace_2(self):
1683        bundle_txt = self.build_test_bundle()
1684
1685        # Seek to the end of the file
1686        # Adding two extra newlines used to give us
1687        # MalformedPatches: The first line of all patches should be ...
1688        bundle_txt.seek(0, 2)
1689        bundle_txt.write(b'\n\n')
1690        bundle_txt.seek(0)
1691
1692        bundle = read_bundle(bundle_txt)
1693        self.check_valid(bundle)
1694
1695
1696class MungedBundleTesterV09(tests.TestCaseWithTransport, MungedBundleTester):
1697
1698    format = '0.9'
1699
1700    def test_missing_trailing_whitespace(self):
1701        bundle_txt = self.build_test_bundle()
1702
1703        # Remove a trailing newline, it shouldn't kill the parser
1704        raw = bundle_txt.getvalue()
1705        # The contents of the bundle don't have to be this, but this
1706        # test is concerned with the exact case where the serializer
1707        # creates a blank line at the end, and fails if that
1708        # line is stripped
1709        self.assertEqual(b'\n\n', raw[-2:])
1710        bundle_txt = BytesIO(raw[:-1])
1711
1712        bundle = read_bundle(bundle_txt)
1713        self.check_valid(bundle)
1714
1715    def test_opening_text(self):
1716        bundle_txt = self.build_test_bundle()
1717
1718        bundle_txt = BytesIO(
1719            b"Some random\nemail comments\n" + bundle_txt.getvalue())
1720
1721        bundle = read_bundle(bundle_txt)
1722        self.check_valid(bundle)
1723
1724    def test_trailing_text(self):
1725        bundle_txt = self.build_test_bundle()
1726
1727        bundle_txt = BytesIO(
1728            bundle_txt.getvalue() + b"Some trailing\nrandom\ntext\n")
1729
1730        bundle = read_bundle(bundle_txt)
1731        self.check_valid(bundle)
1732
1733
1734class MungedBundleTesterV4(tests.TestCaseWithTransport, MungedBundleTester):
1735
1736    format = '4'
1737
1738
1739class TestBundleWriterReader(tests.TestCase):
1740
1741    def test_roundtrip_record(self):
1742        fileobj = BytesIO()
1743        writer = v4.BundleWriter(fileobj)
1744        writer.begin()
1745        writer.add_info_record({b'foo': b'bar'})
1746        writer._add_record(b"Record body", {b'parents': [b'1', b'3'],
1747                                            b'storage_kind': b'fulltext'}, 'file', b'revid', b'fileid')
1748        writer.end()
1749        fileobj.seek(0)
1750        reader = v4.BundleReader(fileobj, stream_input=True)
1751        record_iter = reader.iter_records()
1752        record = next(record_iter)
1753        self.assertEqual((None, {b'foo': b'bar', b'storage_kind': b'header'},
1754                          'info', None, None), record)
1755        record = next(record_iter)
1756        self.assertEqual((b"Record body", {b'storage_kind': b'fulltext',
1757                                           b'parents': [b'1', b'3']}, 'file', b'revid', b'fileid'),
1758                         record)
1759
1760    def test_roundtrip_record_memory_hungry(self):
1761        fileobj = BytesIO()
1762        writer = v4.BundleWriter(fileobj)
1763        writer.begin()
1764        writer.add_info_record({b'foo': b'bar'})
1765        writer._add_record(b"Record body", {b'parents': [b'1', b'3'],
1766                                            b'storage_kind': b'fulltext'}, 'file', b'revid', b'fileid')
1767        writer.end()
1768        fileobj.seek(0)
1769        reader = v4.BundleReader(fileobj, stream_input=False)
1770        record_iter = reader.iter_records()
1771        record = next(record_iter)
1772        self.assertEqual((None, {b'foo': b'bar', b'storage_kind': b'header'},
1773                          'info', None, None), record)
1774        record = next(record_iter)
1775        self.assertEqual((b"Record body", {b'storage_kind': b'fulltext',
1776                                           b'parents': [b'1', b'3']}, 'file', b'revid', b'fileid'),
1777                         record)
1778
1779    def test_encode_name(self):
1780        self.assertEqual(b'revision/rev1',
1781                         v4.BundleWriter.encode_name('revision', b'rev1'))
1782        self.assertEqual(b'file/rev//1/file-id-1',
1783                         v4.BundleWriter.encode_name('file', b'rev/1', b'file-id-1'))
1784        self.assertEqual(b'info',
1785                         v4.BundleWriter.encode_name('info', None, None))
1786
1787    def test_decode_name(self):
1788        self.assertEqual(('revision', b'rev1', None),
1789                         v4.BundleReader.decode_name(b'revision/rev1'))
1790        self.assertEqual(('file', b'rev/1', b'file-id-1'),
1791                         v4.BundleReader.decode_name(b'file/rev//1/file-id-1'))
1792        self.assertEqual(('info', None, None),
1793                         v4.BundleReader.decode_name(b'info'))
1794
1795    def test_too_many_names(self):
1796        fileobj = BytesIO()
1797        writer = v4.BundleWriter(fileobj)
1798        writer.begin()
1799        writer.add_info_record({b'foo': b'bar'})
1800        writer._container.add_bytes_record([b'blah'], len(b'blah'), [(b'two', ), (b'names', )])
1801        writer.end()
1802        fileobj.seek(0)
1803        record_iter = v4.BundleReader(fileobj).iter_records()
1804        record = next(record_iter)
1805        self.assertEqual((None, {b'foo': b'bar', b'storage_kind': b'header'},
1806                          'info', None, None), record)
1807        self.assertRaises(errors.BadBundle, next, record_iter)
1808