1# Copyright (C) 2007-2011 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for repository write groups."""
18
19import sys
20
21from breezy import (
22    branch,
23    controldir,
24    errors,
25    memorytree,
26    tests,
27    )
28from breezy.bzr import (
29    branch as bzrbranch,
30    remote,
31    versionedfile,
32    )
33from breezy.bzr.tests.per_repository_vf import (
34    TestCaseWithRepository,
35    all_repository_vf_format_scenarios,
36    )
37from breezy.tests.scenarios import load_tests_apply_scenarios
38
39
40load_tests = load_tests_apply_scenarios
41
42
43class TestGetMissingParentInventories(TestCaseWithRepository):
44
45    scenarios = all_repository_vf_format_scenarios()
46
47    def test_empty_get_missing_parent_inventories(self):
48        """A new write group has no missing parent inventories."""
49        repo = self.make_repository('.')
50        repo.lock_write()
51        repo.start_write_group()
52        try:
53            self.assertEqual(set(), set(repo.get_missing_parent_inventories()))
54        finally:
55            repo.commit_write_group()
56            repo.unlock()
57
58    def branch_trunk_and_make_tree(self, trunk_repo, relpath):
59        tree = self.make_branch_and_memory_tree('branch')
60        trunk_repo.lock_read()
61        self.addCleanup(trunk_repo.unlock)
62        tree.branch.repository.fetch(trunk_repo, revision_id=b'rev-1')
63        tree.set_parent_ids([b'rev-1'])
64        return tree
65
66    def make_first_commit(self, repo):
67        trunk = repo.controldir.create_branch()
68        tree = memorytree.MemoryTree.create_on_branch(trunk)
69        tree.lock_write()
70        tree.add([''], [b'TREE_ROOT'], ['directory'])
71        tree.add(['dir'], [b'dir-id'], ['directory'])
72        tree.add(['filename'], [b'file-id'], ['file'])
73        tree.put_file_bytes_non_atomic('filename', b'content\n')
74        tree.commit('Trunk commit', rev_id=b'rev-0')
75        tree.commit('Trunk commit', rev_id=b'rev-1')
76        tree.unlock()
77
78    def make_new_commit_in_new_repo(self, trunk_repo, parents=None):
79        tree = self.branch_trunk_and_make_tree(trunk_repo, 'branch')
80        tree.set_parent_ids(parents)
81        tree.commit('Branch commit', rev_id=b'rev-2')
82        branch_repo = tree.branch.repository
83        branch_repo.lock_read()
84        self.addCleanup(branch_repo.unlock)
85        return branch_repo
86
87    def make_stackable_repo(self, relpath='trunk'):
88        if isinstance(self.repository_format, remote.RemoteRepositoryFormat):
89            # RemoteRepository by default builds a default format real
90            # repository, but the default format is unstackble.  So explicitly
91            # make a stackable real repository and use that.
92            repo = self.make_repository(relpath, format='1.9')
93            dir = controldir.ControlDir.open(self.get_url(relpath))
94            repo = dir.open_repository()
95        else:
96            repo = self.make_repository(relpath)
97        if not repo._format.supports_external_lookups:
98            raise tests.TestNotApplicable('format not stackable')
99        repo.controldir._format.set_branch_format(bzrbranch.BzrBranchFormat7())
100        return repo
101
102    def reopen_repo_and_resume_write_group(self, repo):
103        try:
104            resume_tokens = repo.suspend_write_group()
105        except errors.UnsuspendableWriteGroup:
106            # If we got this far, and this repo does not support resuming write
107            # groups, then get_missing_parent_inventories works in all
108            # cases this repo supports.
109            repo.unlock()
110            return
111        repo.unlock()
112        reopened_repo = repo.controldir.open_repository()
113        reopened_repo.lock_write()
114        self.addCleanup(reopened_repo.unlock)
115        reopened_repo.resume_write_group(resume_tokens)
116        return reopened_repo
117
118    def test_ghost_revision(self):
119        """A parent inventory may be absent if all the needed texts are present.
120        i.e., a ghost revision isn't (necessarily) considered to be a missing
121        parent inventory.
122        """
123        # Make a trunk with one commit.
124        trunk_repo = self.make_stackable_repo()
125        self.make_first_commit(trunk_repo)
126        trunk_repo.lock_read()
127        self.addCleanup(trunk_repo.unlock)
128        # Branch the trunk, add a new commit.
129        branch_repo = self.make_new_commit_in_new_repo(
130            trunk_repo, parents=[b'rev-1', b'ghost-rev'])
131        inv = branch_repo.get_inventory(b'rev-2')
132        # Make a new repo stacked on trunk, and then copy into it:
133        #  - all texts in rev-2
134        #  - the new inventory (rev-2)
135        #  - the new revision (rev-2)
136        repo = self.make_stackable_repo('stacked')
137        repo.lock_write()
138        repo.start_write_group()
139        # Add all texts from in rev-2 inventory.  Note that this has to exclude
140        # the root if the repo format does not support rich roots.
141        rich_root = branch_repo._format.rich_root_data
142        all_texts = [
143            (ie.file_id, ie.revision) for ie in inv.iter_just_entries()
144            if rich_root or inv.id2path(ie.file_id) != '']
145        repo.texts.insert_record_stream(
146            branch_repo.texts.get_record_stream(all_texts, 'unordered', False))
147        # Add inventory and revision for rev-2.
148        repo.add_inventory(b'rev-2', inv, [b'rev-1', b'ghost-rev'])
149        repo.revisions.insert_record_stream(
150            branch_repo.revisions.get_record_stream(
151                [(b'rev-2',)], 'unordered', False))
152        # Now, no inventories are reported as missing, even though there is a
153        # ghost.
154        self.assertEqual(set(), repo.get_missing_parent_inventories())
155        # Resuming the write group does not affect
156        # get_missing_parent_inventories.
157        reopened_repo = self.reopen_repo_and_resume_write_group(repo)
158        self.assertEqual(set(), reopened_repo.get_missing_parent_inventories())
159        reopened_repo.abort_write_group()
160
161    def test_get_missing_parent_inventories(self):
162        """A stacked repo with a single revision and inventory (no parent
163        inventory) in it must have all the texts in its inventory (even if not
164        changed w.r.t. to the absent parent), otherwise it will report missing
165        texts/parent inventory.
166
167        The core of this test is that a file was changed in rev-1, but in a
168        stacked repo that only has rev-2
169        """
170        # Make a trunk with one commit.
171        trunk_repo = self.make_stackable_repo()
172        self.make_first_commit(trunk_repo)
173        trunk_repo.lock_read()
174        self.addCleanup(trunk_repo.unlock)
175        # Branch the trunk, add a new commit.
176        branch_repo = self.make_new_commit_in_new_repo(
177            trunk_repo, parents=[b'rev-1'])
178        inv = branch_repo.get_inventory(b'rev-2')
179        # Make a new repo stacked on trunk, and copy the new commit's revision
180        # and inventory records to it.
181        repo = self.make_stackable_repo('stacked')
182        repo.lock_write()
183        repo.start_write_group()
184        # Insert a single fulltext inv (using add_inventory because it's
185        # simpler than insert_record_stream)
186        repo.add_inventory(b'rev-2', inv, [b'rev-1'])
187        repo.revisions.insert_record_stream(
188            branch_repo.revisions.get_record_stream(
189                [(b'rev-2',)], 'unordered', False))
190        # There should be no missing compression parents
191        self.assertEqual(set(),
192                         repo.inventories.get_missing_compression_parent_keys())
193        self.assertEqual(
194            {('inventories', b'rev-1')},
195            repo.get_missing_parent_inventories())
196        # Resuming the write group does not affect
197        # get_missing_parent_inventories.
198        reopened_repo = self.reopen_repo_and_resume_write_group(repo)
199        self.assertEqual(
200            {('inventories', b'rev-1')},
201            reopened_repo.get_missing_parent_inventories())
202        # Adding the parent inventory satisfies get_missing_parent_inventories.
203        reopened_repo.inventories.insert_record_stream(
204            branch_repo.inventories.get_record_stream(
205                [(b'rev-1',)], 'unordered', False))
206        self.assertEqual(
207            set(), reopened_repo.get_missing_parent_inventories())
208        reopened_repo.abort_write_group()
209
210    def test_get_missing_parent_inventories_check(self):
211        builder = self.make_branch_builder('test')
212        builder.build_snapshot([b'ghost-parent-id'], [
213            ('add', ('', b'root-id', 'directory', None)),
214            ('add', ('file', b'file-id', 'file', b'content\n'))],
215            allow_leftmost_as_ghost=True, revision_id=b'A-id')
216        b = builder.get_branch()
217        b.lock_read()
218        self.addCleanup(b.unlock)
219        repo = self.make_repository('test-repo')
220        repo.lock_write()
221        self.addCleanup(repo.unlock)
222        repo.start_write_group()
223        self.addCleanup(repo.abort_write_group)
224        # Now, add the objects manually
225        text_keys = [(b'file-id', b'A-id')]
226        if repo.supports_rich_root():
227            text_keys.append((b'root-id', b'A-id'))
228        # Directly add the texts, inventory, and revision object for b'A-id'
229        repo.texts.insert_record_stream(b.repository.texts.get_record_stream(
230            text_keys, 'unordered', True))
231        repo.add_revision(b'A-id', b.repository.get_revision(b'A-id'),
232                          b.repository.get_inventory(b'A-id'))
233        get_missing = repo.get_missing_parent_inventories
234        if repo._format.supports_external_lookups:
235            self.assertEqual({('inventories', b'ghost-parent-id')},
236                             get_missing(check_for_missing_texts=False))
237            self.assertEqual(set(), get_missing(check_for_missing_texts=True))
238            self.assertEqual(set(), get_missing())
239        else:
240            # If we don't support external lookups, we always return empty
241            self.assertEqual(set(), get_missing(check_for_missing_texts=False))
242            self.assertEqual(set(), get_missing(check_for_missing_texts=True))
243            self.assertEqual(set(), get_missing())
244
245    def test_insert_stream_passes_resume_info(self):
246        repo = self.make_repository('test-repo')
247        if (not repo._format.supports_external_lookups or
248                isinstance(repo, remote.RemoteRepository)):
249            raise tests.TestNotApplicable(
250                'only valid for direct connections to resumable repos')
251        # log calls to get_missing_parent_inventories, so that we can assert it
252        # is called with the correct parameters
253        call_log = []
254        orig = repo.get_missing_parent_inventories
255
256        def get_missing(check_for_missing_texts=True):
257            call_log.append(check_for_missing_texts)
258            return orig(check_for_missing_texts=check_for_missing_texts)
259        repo.get_missing_parent_inventories = get_missing
260        repo.lock_write()
261        self.addCleanup(repo.unlock)
262        sink = repo._get_sink()
263        sink.insert_stream((), repo._format, [])
264        self.assertEqual([False], call_log)
265        del call_log[:]
266        repo.start_write_group()
267        # We need to insert something, or suspend_write_group won't actually
268        # create a token
269        repo.texts.insert_record_stream([versionedfile.FulltextContentFactory(
270            (b'file-id', b'rev-id'), (), None, b'lines\n')])
271        tokens = repo.suspend_write_group()
272        self.assertNotEqual([], tokens)
273        sink.insert_stream((), repo._format, tokens)
274        self.assertEqual([True], call_log)
275
276    def test_insert_stream_without_locking_fails_without_lock(self):
277        repo = self.make_repository('test-repo')
278        sink = repo._get_sink()
279        stream = [('texts', [versionedfile.FulltextContentFactory(
280            (b'file-id', b'rev-id'), (), None, b'lines\n')])]
281        self.assertRaises(errors.ObjectNotLocked,
282                          sink.insert_stream_without_locking, stream, repo._format)
283
284    def test_insert_stream_without_locking_fails_without_write_group(self):
285        repo = self.make_repository('test-repo')
286        self.addCleanup(repo.lock_write().unlock)
287        sink = repo._get_sink()
288        stream = [('texts', [versionedfile.FulltextContentFactory(
289            (b'file-id', b'rev-id'), (), None, b'lines\n')])]
290        self.assertRaises(errors.BzrError,
291                          sink.insert_stream_without_locking, stream, repo._format)
292
293    def test_insert_stream_without_locking(self):
294        repo = self.make_repository('test-repo')
295        self.addCleanup(repo.lock_write().unlock)
296        repo.start_write_group()
297        sink = repo._get_sink()
298        stream = [('texts', [versionedfile.FulltextContentFactory(
299            (b'file-id', b'rev-id'), (), None, b'lines\n')])]
300        missing_keys = sink.insert_stream_without_locking(stream, repo._format)
301        repo.commit_write_group()
302        self.assertEqual(set(), missing_keys)
303
304
305class TestResumeableWriteGroup(TestCaseWithRepository):
306
307    scenarios = all_repository_vf_format_scenarios()
308
309    def make_write_locked_repo(self, relpath='repo'):
310        repo = self.make_repository(relpath)
311        repo.lock_write()
312        self.addCleanup(repo.unlock)
313        return repo
314
315    def reopen_repo(self, repo):
316        same_repo = repo.controldir.open_repository()
317        same_repo.lock_write()
318        self.addCleanup(same_repo.unlock)
319        return same_repo
320
321    def require_suspendable_write_groups(self, reason):
322        repo = self.make_repository('__suspend_test')
323        repo.lock_write()
324        self.addCleanup(repo.unlock)
325        repo.start_write_group()
326        try:
327            wg_tokens = repo.suspend_write_group()
328        except errors.UnsuspendableWriteGroup:
329            repo.abort_write_group()
330            raise tests.TestNotApplicable(reason)
331
332    def test_suspend_write_group(self):
333        repo = self.make_write_locked_repo()
334        repo.start_write_group()
335        # Add some content so this isn't an empty write group (which may return
336        # 0 tokens)
337        repo.texts.add_lines((b'file-id', b'revid'), (), [b'lines'])
338        try:
339            wg_tokens = repo.suspend_write_group()
340        except errors.UnsuspendableWriteGroup:
341            # The contract for repos that don't support suspending write groups
342            # is that suspend_write_group raises UnsuspendableWriteGroup, but
343            # is otherwise a no-op.  So we can still e.g. abort the write group
344            # as usual.
345            self.assertTrue(repo.is_in_write_group())
346            repo.abort_write_group()
347        else:
348            # After suspending a write group we are no longer in a write group
349            self.assertFalse(repo.is_in_write_group())
350            # suspend_write_group returns a list of tokens, which are strs.  If
351            # no other write groups were resumed, there will only be one token.
352            self.assertEqual(1, len(wg_tokens))
353            self.assertIsInstance(wg_tokens[0], str)
354            # See also test_pack_repository's test of the same name.
355
356    def test_resume_write_group_then_abort(self):
357        repo = self.make_write_locked_repo()
358        repo.start_write_group()
359        # Add some content so this isn't an empty write group (which may return
360        # 0 tokens)
361        text_key = (b'file-id', b'revid')
362        repo.texts.add_lines(text_key, (), [b'lines'])
363        try:
364            wg_tokens = repo.suspend_write_group()
365        except errors.UnsuspendableWriteGroup:
366            # If the repo does not support suspending write groups, it doesn't
367            # support resuming them either.
368            repo.abort_write_group()
369            self.assertRaises(
370                errors.UnsuspendableWriteGroup, repo.resume_write_group, [])
371        else:
372            #self.assertEqual([], list(repo.texts.keys()))
373            same_repo = self.reopen_repo(repo)
374            same_repo.resume_write_group(wg_tokens)
375            self.assertEqual([text_key], list(same_repo.texts.keys()))
376            self.assertTrue(same_repo.is_in_write_group())
377            same_repo.abort_write_group()
378            self.assertEqual([], list(repo.texts.keys()))
379            # See also test_pack_repository's test of the same name.
380
381    def test_multiple_resume_write_group(self):
382        self.require_suspendable_write_groups(
383            'Cannot test resume on repo that does not support suspending')
384        repo = self.make_write_locked_repo()
385        repo.start_write_group()
386        # Add some content so this isn't an empty write group (which may return
387        # 0 tokens)
388        first_key = (b'file-id', b'revid')
389        repo.texts.add_lines(first_key, (), [b'lines'])
390        wg_tokens = repo.suspend_write_group()
391        same_repo = self.reopen_repo(repo)
392        same_repo.resume_write_group(wg_tokens)
393        self.assertTrue(same_repo.is_in_write_group())
394        second_key = (b'file-id', b'second-revid')
395        same_repo.texts.add_lines(second_key, (first_key,), [b'more lines'])
396        try:
397            new_wg_tokens = same_repo.suspend_write_group()
398        except:
399            same_repo.abort_write_group(suppress_errors=True)
400            raise
401        self.assertEqual(2, len(new_wg_tokens))
402        self.assertSubset(wg_tokens, new_wg_tokens)
403        same_repo = self.reopen_repo(repo)
404        same_repo.resume_write_group(new_wg_tokens)
405        both_keys = {first_key, second_key}
406        self.assertEqual(both_keys, same_repo.texts.keys())
407        same_repo.abort_write_group()
408
409    def test_no_op_suspend_resume(self):
410        self.require_suspendable_write_groups(
411            'Cannot test resume on repo that does not support suspending')
412        repo = self.make_write_locked_repo()
413        repo.start_write_group()
414        # Add some content so this isn't an empty write group (which may return
415        # 0 tokens)
416        text_key = (b'file-id', b'revid')
417        repo.texts.add_lines(text_key, (), [b'lines'])
418        wg_tokens = repo.suspend_write_group()
419        same_repo = self.reopen_repo(repo)
420        same_repo.resume_write_group(wg_tokens)
421        new_wg_tokens = same_repo.suspend_write_group()
422        self.assertEqual(wg_tokens, new_wg_tokens)
423        same_repo = self.reopen_repo(repo)
424        same_repo.resume_write_group(wg_tokens)
425        self.assertEqual([text_key], list(same_repo.texts.keys()))
426        same_repo.abort_write_group()
427
428    def test_read_after_suspend_fails(self):
429        self.require_suspendable_write_groups(
430            'Cannot test suspend on repo that does not support suspending')
431        repo = self.make_write_locked_repo()
432        repo.start_write_group()
433        # Add some content so this isn't an empty write group (which may return
434        # 0 tokens)
435        text_key = (b'file-id', b'revid')
436        repo.texts.add_lines(text_key, (), [b'lines'])
437        wg_tokens = repo.suspend_write_group()
438        self.assertEqual([], list(repo.texts.keys()))
439
440    def test_read_after_second_suspend_fails(self):
441        self.require_suspendable_write_groups(
442            'Cannot test suspend on repo that does not support suspending')
443        repo = self.make_write_locked_repo()
444        repo.start_write_group()
445        # Add some content so this isn't an empty write group (which may return
446        # 0 tokens)
447        text_key = (b'file-id', b'revid')
448        repo.texts.add_lines(text_key, (), [b'lines'])
449        wg_tokens = repo.suspend_write_group()
450        same_repo = self.reopen_repo(repo)
451        same_repo.resume_write_group(wg_tokens)
452        same_repo.suspend_write_group()
453        self.assertEqual([], list(same_repo.texts.keys()))
454
455    def test_read_after_resume_abort_fails(self):
456        self.require_suspendable_write_groups(
457            'Cannot test suspend on repo that does not support suspending')
458        repo = self.make_write_locked_repo()
459        repo.start_write_group()
460        # Add some content so this isn't an empty write group (which may return
461        # 0 tokens)
462        text_key = (b'file-id', b'revid')
463        repo.texts.add_lines(text_key, (), [b'lines'])
464        wg_tokens = repo.suspend_write_group()
465        same_repo = self.reopen_repo(repo)
466        same_repo.resume_write_group(wg_tokens)
467        same_repo.abort_write_group()
468        self.assertEqual([], list(same_repo.texts.keys()))
469
470    def test_cannot_resume_aborted_write_group(self):
471        self.require_suspendable_write_groups(
472            'Cannot test resume on repo that does not support suspending')
473        repo = self.make_write_locked_repo()
474        repo.start_write_group()
475        # Add some content so this isn't an empty write group (which may return
476        # 0 tokens)
477        text_key = (b'file-id', b'revid')
478        repo.texts.add_lines(text_key, (), [b'lines'])
479        wg_tokens = repo.suspend_write_group()
480        same_repo = self.reopen_repo(repo)
481        same_repo.resume_write_group(wg_tokens)
482        same_repo.abort_write_group()
483        same_repo = self.reopen_repo(repo)
484        self.assertRaises(
485            errors.UnresumableWriteGroup, same_repo.resume_write_group,
486            wg_tokens)
487
488    def test_commit_resumed_write_group_no_new_data(self):
489        self.require_suspendable_write_groups(
490            'Cannot test resume on repo that does not support suspending')
491        repo = self.make_write_locked_repo()
492        repo.start_write_group()
493        # Add some content so this isn't an empty write group (which may return
494        # 0 tokens)
495        text_key = (b'file-id', b'revid')
496        repo.texts.add_lines(text_key, (), [b'lines'])
497        wg_tokens = repo.suspend_write_group()
498        same_repo = self.reopen_repo(repo)
499        same_repo.resume_write_group(wg_tokens)
500        same_repo.commit_write_group()
501        self.assertEqual([text_key], list(same_repo.texts.keys()))
502        self.assertEqual(
503            b'lines', next(same_repo.texts.get_record_stream([text_key],
504                                                             'unordered', True)).get_bytes_as('fulltext'))
505        self.assertRaises(
506            errors.UnresumableWriteGroup, same_repo.resume_write_group,
507            wg_tokens)
508
509    def test_commit_resumed_write_group_plus_new_data(self):
510        self.require_suspendable_write_groups(
511            'Cannot test resume on repo that does not support suspending')
512        repo = self.make_write_locked_repo()
513        repo.start_write_group()
514        # Add some content so this isn't an empty write group (which may return
515        # 0 tokens)
516        first_key = (b'file-id', b'revid')
517        repo.texts.add_lines(first_key, (), [b'lines'])
518        wg_tokens = repo.suspend_write_group()
519        same_repo = self.reopen_repo(repo)
520        same_repo.resume_write_group(wg_tokens)
521        second_key = (b'file-id', b'second-revid')
522        same_repo.texts.add_lines(second_key, (first_key,), [b'more lines'])
523        same_repo.commit_write_group()
524        self.assertEqual(
525            {first_key, second_key}, set(same_repo.texts.keys()))
526        self.assertEqual(
527            b'lines', next(same_repo.texts.get_record_stream([first_key],
528                                                             'unordered', True)).get_bytes_as('fulltext'))
529        self.assertEqual(
530            b'more lines', next(same_repo.texts.get_record_stream([second_key],
531                                                                  'unordered', True)).get_bytes_as('fulltext'))
532
533    def make_source_with_delta_record(self):
534        # Make a source repository with a delta record in it.
535        source_repo = self.make_write_locked_repo('source')
536        source_repo.start_write_group()
537        key_base = (b'file-id', b'base')
538        key_delta = (b'file-id', b'delta')
539
540        def text_stream():
541            yield versionedfile.FulltextContentFactory(
542                key_base, (), None, b'lines\n')
543            yield versionedfile.FulltextContentFactory(
544                key_delta, (key_base,), None, b'more\nlines\n')
545        source_repo.texts.insert_record_stream(text_stream())
546        source_repo.commit_write_group()
547        return source_repo
548
549    def test_commit_resumed_write_group_with_missing_parents(self):
550        self.require_suspendable_write_groups(
551            'Cannot test resume on repo that does not support suspending')
552        source_repo = self.make_source_with_delta_record()
553        key_base = (b'file-id', b'base')
554        key_delta = (b'file-id', b'delta')
555        # Start a write group, insert just a delta.
556        repo = self.make_write_locked_repo()
557        repo.start_write_group()
558        stream = source_repo.texts.get_record_stream(
559            [key_delta], 'unordered', False)
560        repo.texts.insert_record_stream(stream)
561        # It's either not commitable due to the missing compression parent, or
562        # the stacked location has already filled in the fulltext.
563        try:
564            repo.commit_write_group()
565        except errors.BzrCheckError:
566            # It refused to commit because we have a missing parent
567            pass
568        else:
569            same_repo = self.reopen_repo(repo)
570            same_repo.lock_read()
571            record = next(same_repo.texts.get_record_stream([key_delta],
572                                                            'unordered', True))
573            self.assertEqual(b'more\nlines\n', record.get_bytes_as('fulltext'))
574            return
575        # Merely suspending and resuming doesn't make it commitable either.
576        wg_tokens = repo.suspend_write_group()
577        same_repo = self.reopen_repo(repo)
578        same_repo.resume_write_group(wg_tokens)
579        self.assertRaises(
580            errors.BzrCheckError, same_repo.commit_write_group)
581        same_repo.abort_write_group()
582
583    def test_commit_resumed_write_group_adding_missing_parents(self):
584        self.require_suspendable_write_groups(
585            'Cannot test resume on repo that does not support suspending')
586        source_repo = self.make_source_with_delta_record()
587        key_base = (b'file-id', b'base')
588        key_delta = (b'file-id', b'delta')
589        # Start a write group.
590        repo = self.make_write_locked_repo()
591        repo.start_write_group()
592        # Add some content so this isn't an empty write group (which may return
593        # 0 tokens)
594        text_key = (b'file-id', b'revid')
595        repo.texts.add_lines(text_key, (), [b'lines'])
596        # Suspend it, then resume it.
597        wg_tokens = repo.suspend_write_group()
598        same_repo = self.reopen_repo(repo)
599        same_repo.resume_write_group(wg_tokens)
600        # Add a record with a missing compression parent
601        stream = source_repo.texts.get_record_stream(
602            [key_delta], 'unordered', False)
603        same_repo.texts.insert_record_stream(stream)
604        # Just like if we'd added that record without a suspend/resume cycle,
605        # commit_write_group fails.
606        try:
607            same_repo.commit_write_group()
608        except errors.BzrCheckError:
609            pass
610        else:
611            # If the commit_write_group didn't fail, that is because the
612            # insert_record_stream already gave it a fulltext.
613            same_repo = self.reopen_repo(repo)
614            same_repo.lock_read()
615            record = next(same_repo.texts.get_record_stream([key_delta],
616                                                            'unordered', True))
617            self.assertEqual(b'more\nlines\n', record.get_bytes_as('fulltext'))
618            return
619        same_repo.abort_write_group()
620
621    def test_add_missing_parent_after_resume(self):
622        self.require_suspendable_write_groups(
623            'Cannot test resume on repo that does not support suspending')
624        source_repo = self.make_source_with_delta_record()
625        key_base = (b'file-id', b'base')
626        key_delta = (b'file-id', b'delta')
627        # Start a write group, insert just a delta.
628        repo = self.make_write_locked_repo()
629        repo.start_write_group()
630        stream = source_repo.texts.get_record_stream(
631            [key_delta], 'unordered', False)
632        repo.texts.insert_record_stream(stream)
633        # Suspend it, then resume it.
634        wg_tokens = repo.suspend_write_group()
635        same_repo = self.reopen_repo(repo)
636        same_repo.resume_write_group(wg_tokens)
637        # Fill in the missing compression parent.
638        stream = source_repo.texts.get_record_stream(
639            [key_base], 'unordered', False)
640        same_repo.texts.insert_record_stream(stream)
641        same_repo.commit_write_group()
642
643    def test_suspend_empty_initial_write_group(self):
644        """Suspending a write group with no writes returns an empty token
645        list.
646        """
647        self.require_suspendable_write_groups(
648            'Cannot test suspend on repo that does not support suspending')
649        repo = self.make_write_locked_repo()
650        repo.start_write_group()
651        wg_tokens = repo.suspend_write_group()
652        self.assertEqual([], wg_tokens)
653
654    def test_resume_empty_initial_write_group(self):
655        """Resuming an empty token list is equivalent to start_write_group."""
656        self.require_suspendable_write_groups(
657            'Cannot test resume on repo that does not support suspending')
658        repo = self.make_write_locked_repo()
659        repo.resume_write_group([])
660        repo.abort_write_group()
661