1# Copyright (C) 2007-2011 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for branch.push behaviour."""
18
19from io import BytesIO
20import os
21
22from ... import (
23    branch,
24    builtins,
25    controldir,
26    check,
27    errors,
28    push,
29    revision,
30    tests,
31    transport,
32    )
33from ...bzr import (
34    branch as bzrbranch,
35    )
36from ...bzr.smart import (
37    client,
38    )
39from .. import (
40    per_branch,
41    test_server,
42    )
43
44
45class TestPush(per_branch.TestCaseWithBranch):
46
47    def test_push_convergence_simple(self):
48        # when revisions are pushed, the left-most accessible parents must
49        # become the revision-history.
50        mine = self.make_branch_and_tree('mine')
51        mine.commit('1st post', allow_pointless=True)
52        other = mine.controldir.sprout('other').open_workingtree()
53        m1 = other.commit('my change', allow_pointless=True)
54        mine.merge_from_branch(other.branch)
55        p2 = mine.commit('merge my change')
56        result = mine.branch.push(other.branch)
57        self.assertEqual(p2, other.branch.last_revision())
58        # result object contains some structured data
59        self.assertEqual(result.old_revid, m1)
60        self.assertEqual(result.new_revid, p2)
61
62    def test_push_merged_indirect(self):
63        # it should be possible to do a push from one branch into another
64        # when the tip of the target was merged into the source branch
65        # via a third branch - so its buried in the ancestry and is not
66        # directly accessible.
67        mine = self.make_branch_and_tree('mine')
68        mine.commit('1st post', allow_pointless=True)
69        target = mine.controldir.sprout('target').open_workingtree()
70        target.commit('my change', allow_pointless=True)
71        other = mine.controldir.sprout('other').open_workingtree()
72        other.merge_from_branch(target.branch)
73        other.commit('merge my change')
74        mine.merge_from_branch(other.branch)
75        p2 = mine.commit('merge other')
76        mine.branch.push(target.branch)
77        self.assertEqual(p2, target.branch.last_revision())
78
79    def test_push_to_checkout_updates_master(self):
80        """Pushing into a checkout updates the checkout and the master branch"""
81        master_tree = self.make_branch_and_tree('master')
82        checkout = self.make_branch_and_tree('checkout')
83        try:
84            checkout.branch.bind(master_tree.branch)
85        except errors.UpgradeRequired:
86            # cant bind this format, the test is irrelevant.
87            return
88        rev1 = checkout.commit('master')
89
90        other = master_tree.branch.controldir.sprout(
91            'other').open_workingtree()
92        rev2 = other.commit('other commit')
93        # now push, which should update both checkout and master.
94        other.branch.push(checkout.branch)
95        self.assertEqual(rev2, checkout.branch.last_revision())
96        self.assertEqual(rev2, master_tree.branch.last_revision())
97
98    def test_push_raises_specific_error_on_master_connection_error(self):
99        master_tree = self.make_branch_and_tree('master')
100        checkout = self.make_branch_and_tree('checkout')
101        try:
102            checkout.branch.bind(master_tree.branch)
103        except errors.UpgradeRequired:
104            # cant bind this format, the test is irrelevant.
105            return
106        other = master_tree.branch.controldir.sprout(
107            'other').open_workingtree()
108        # move the branch out of the way on disk to cause a connection
109        # error.
110        os.rename('master', 'master_gone')
111        # try to push, which should raise a BoundBranchConnectionFailure.
112        self.assertRaises(errors.BoundBranchConnectionFailure,
113                          other.branch.push, checkout.branch)
114
115    def test_push_new_tag_to_bound_branch(self):
116        master = self.make_branch('master')
117        bound = self.make_branch('bound')
118        try:
119            bound.bind(master)
120        except errors.UpgradeRequired:
121            raise tests.TestNotApplicable(
122                'Format does not support bound branches')
123        other = bound.controldir.sprout('other').open_branch()
124        try:
125            other.tags.set_tag('new-tag', b'some-rev')
126        except errors.TagsNotSupported:
127            raise tests.TestNotApplicable('Format does not support tags')
128        other.push(bound)
129        self.assertEqual({'new-tag': b'some-rev'}, bound.tags.get_tag_dict())
130        self.assertEqual({'new-tag': b'some-rev'}, master.tags.get_tag_dict())
131
132    def test_push_uses_read_lock(self):
133        """Push should only need a read lock on the source side."""
134        source = self.make_branch_and_tree('source')
135        target = self.make_branch('target')
136
137        self.build_tree(['source/a'])
138        source.add(['a'])
139        source.commit('a')
140
141        with source.branch.lock_read(), target.lock_write():
142            source.branch.push(
143                target, stop_revision=source.last_revision())
144
145    def test_push_within_repository(self):
146        """Push from one branch to another inside the same repository."""
147        try:
148            repo = self.make_repository('repo', shared=True)
149        except (errors.IncompatibleFormat, errors.UninitializableFormat):
150            # This Branch format cannot create shared repositories
151            return
152        if not repo._format.supports_nesting_repositories:
153            return
154        # This is a little bit trickier because make_branch_and_tree will not
155        # re-use a shared repository.
156        a_controldir = self.make_controldir('repo/tree')
157        try:
158            a_branch = self.branch_format.initialize(a_controldir)
159        except (errors.UninitializableFormat):
160            # Cannot create these branches
161            return
162        try:
163            tree = a_branch.controldir.create_workingtree()
164        except errors.NotLocalUrl:
165            if self.vfs_transport_factory is test_server.LocalURLServer:
166                # the branch is colocated on disk, we cannot create a checkout.
167                # hopefully callers will expect this.
168                local_controldir = controldir.ControlDir.open(
169                    self.get_vfs_only_url('repo/tree'))
170                tree = local_controldir.create_workingtree()
171            else:
172                tree = a_branch.create_checkout('repo/tree', lightweight=True)
173        self.build_tree(['repo/tree/a'])
174        tree.add(['a'])
175        tree.commit('a')
176
177        to_bzrdir = self.make_controldir('repo/branch')
178        to_branch = self.branch_format.initialize(to_bzrdir)
179        tree.branch.push(to_branch)
180
181        self.assertEqual(tree.branch.last_revision(),
182                         to_branch.last_revision())
183
184    def test_push_overwrite_with_older_mainline_rev(self):
185        """Pushing an older mainline revision with overwrite.
186
187        This was <https://bugs.launchpad.net/bzr/+bug/386576>.
188        """
189        source = self.make_branch_and_tree('source')
190        target = self.make_branch('target')
191
192        source.commit('1st commit')
193        rev2 = source.commit('2nd commit')
194        source.commit('3rd commit')
195        source.branch.push(target)
196        source.branch.push(target, stop_revision=rev2, overwrite=True)
197        self.assertEqual(rev2, target.last_revision())
198
199    def test_push_overwrite_of_non_tip_with_stop_revision(self):
200        """Combining the stop_revision and overwrite options works.
201
202        This was <https://bugs.launchpad.net/bzr/+bug/234229>.
203        """
204        source = self.make_branch_and_tree('source')
205        target = self.make_branch('target')
206
207        source.commit('1st commit')
208        source.branch.push(target)
209        rev2 = source.commit('2nd commit')
210        source.commit('3rd commit')
211
212        source.branch.push(target, stop_revision=rev2, overwrite=True)
213        self.assertEqual(rev2, target.last_revision())
214
215    def test_push_repository_no_branch_doesnt_fetch_all_revs(self):
216        # See https://bugs.launchpad.net/bzr/+bug/465517
217        t = self.get_transport('target')
218        t.ensure_base()
219        try:
220            bzrdir = self.bzrdir_format.initialize_on_transport(t)
221        except errors.UninitializableFormat:
222            raise tests.TestNotApplicable('cannot initialize this format')
223        try:
224            bzrdir.open_branch()
225        except errors.NotBranchError:
226            pass
227        else:
228            raise tests.TestNotApplicable('some formats can\'t have a repo'
229                                          ' without a branch')
230        try:
231            source = self.make_branch_builder('source',
232                                              format=self.bzrdir_format)
233        except errors.UninitializableFormat:
234            raise tests.TestNotApplicable('cannot initialize this format')
235        source.start_series()
236        revid_a = source.build_snapshot(None, [
237            ('add', ('', b'root-id', 'directory', None))])
238        revid_b = source.build_snapshot([revid_a], [])
239        revid_c = source.build_snapshot([revid_a], [])
240        source.finish_series()
241        b = source.get_branch()
242        # Note: We can't read lock the source branch. Some formats take a write
243        # lock to 'set_push_location', which breaks
244        self.addCleanup(b.lock_write().unlock)
245        repo = bzrdir.create_repository()
246        # This means 'push the source branch into this dir'
247        bzrdir.push_branch(b)
248        self.addCleanup(repo.lock_read().unlock)
249        # We should have pushed revid_c, but not revid_b, since it isn't in the
250        # ancestry
251        self.assertEqual(set([revid_a, revid_c]), set(repo.all_revision_ids()))
252
253    def test_push_with_default_stacking_does_not_create_broken_branch(self):
254        """Pushing a new standalone branch works even when there's a default
255        stacking policy at the destination.
256
257        The new branch will preserve the repo format (even if it isn't the
258        default for the branch), and will be stacked when the repo format
259        allows (which means that the branch format isn't necessarly preserved).
260        """
261        if self.bzrdir_format.fixed_components:
262            raise tests.TestNotApplicable('Not a metadir format.')
263        if isinstance(self.branch_format, bzrbranch.BranchReferenceFormat):
264            # This test could in principle apply to BranchReferenceFormat, but
265            # make_branch_builder doesn't support it.
266            raise tests.TestSkipped(
267                "BranchBuilder can't make reference branches.")
268        # Make a branch called "local" in a stackable repository
269        # The branch has 3 revisions:
270        #   - rev-1, adds a file
271        #   - rev-2, no changes
272        #   - rev-3, modifies the file.
273        repo = self.make_repository('repo', shared=True, format='1.6')
274        builder = self.make_branch_builder('repo/local')
275        builder.start_series()
276        revid1 = builder.build_snapshot(None, [
277            ('add', ('', b'root-id', 'directory', b'')),
278            ('add', ('filename', b'f-id', 'file', b'content\n'))])
279        revid2 = builder.build_snapshot([revid1], [])
280        revid3 = builder.build_snapshot([revid2],
281                                        [('modify', ('filename', b'new-content\n'))])
282        builder.finish_series()
283        trunk = builder.get_branch()
284        # Sprout rev-1 to "trunk", so that we can stack on it.
285        trunk.controldir.sprout(self.get_url('trunk'), revision_id=revid1)
286        # Set a default stacking policy so that new branches will automatically
287        # stack on trunk.
288        self.make_controldir('.').get_config().set_default_stack_on('trunk')
289        # Push rev-2 to a new branch "remote".  It will be stacked on "trunk".
290        output = BytesIO()
291        push._show_push_branch(trunk, revid2, self.get_url('remote'), output)
292        # Push rev-3 onto "remote".  If "remote" not stacked and is missing the
293        # fulltext record for f-id @ rev-1, then this will fail.
294        remote_branch = branch.Branch.open(self.get_url('remote'))
295        trunk.push(remote_branch)
296        check.check_dwim(remote_branch.base, False, True, True)
297
298
299class TestPushHook(per_branch.TestCaseWithBranch):
300
301    def setUp(self):
302        self.hook_calls = []
303        super(TestPushHook, self).setUp()
304
305    def capture_post_push_hook(self, result):
306        """Capture post push hook calls to self.hook_calls.
307
308        The call is logged, as is some state of the two branches.
309        """
310        if result.local_branch:
311            local_locked = result.local_branch.is_locked()
312            local_base = result.local_branch.base
313        else:
314            local_locked = None
315            local_base = None
316        self.hook_calls.append(
317            ('post_push', result.source_branch, local_base,
318             result.master_branch.base,
319             result.old_revno, result.old_revid,
320             result.new_revno, result.new_revid,
321             result.source_branch.is_locked(), local_locked,
322             result.master_branch.is_locked()))
323
324    def test_post_push_empty_history(self):
325        target = self.make_branch('target')
326        source = self.make_branch('source')
327        branch.Branch.hooks.install_named_hook(
328            'post_push', self.capture_post_push_hook, None)
329        source.push(target)
330        # with nothing there we should still get a notification, and
331        # have both branches locked at the notification time.
332        self.assertEqual([
333            ('post_push', source, None, target.base, 0, revision.NULL_REVISION,
334             0, revision.NULL_REVISION, True, None, True)
335            ],
336            self.hook_calls)
337
338    def test_post_push_bound_branch(self):
339        # pushing to a bound branch should pass in the master branch to the
340        # hook, allowing the correct number of emails to be sent, while still
341        # allowing hooks that want to modify the target to do so to both
342        # instances.
343        target = self.make_branch('target')
344        local = self.make_branch('local')
345        try:
346            local.bind(target)
347        except errors.UpgradeRequired:
348            # We can't bind this format to itself- typically it is the local
349            # branch that doesn't support binding.  As of May 2007
350            # remotebranches can't be bound.  Let's instead make a new local
351            # branch of the default type, which does allow binding.
352            # See https://bugs.launchpad.net/bzr/+bug/112020
353            local = controldir.ControlDir.create_branch_convenience('local2')
354            local.bind(target)
355        source = self.make_branch('source')
356        branch.Branch.hooks.install_named_hook(
357            'post_push', self.capture_post_push_hook, None)
358        source.push(local)
359        # with nothing there we should still get a notification, and
360        # have both branches locked at the notification time.
361        self.assertEqual([
362            ('post_push', source, local.base, target.base, 0,
363             revision.NULL_REVISION, 0, revision.NULL_REVISION,
364             True, True, True)
365            ],
366            self.hook_calls)
367
368    def test_post_push_nonempty_history(self):
369        target = self.make_branch_and_memory_tree('target')
370        target.lock_write()
371        target.add('')
372        rev1 = target.commit('rev 1')
373        target.unlock()
374        sourcedir = target.controldir.clone(self.get_url('source'))
375        source = sourcedir.open_branch().create_memorytree()
376        rev2 = source.commit('rev 2')
377        branch.Branch.hooks.install_named_hook(
378            'post_push', self.capture_post_push_hook, None)
379        source.branch.push(target.branch)
380        # with nothing there we should still get a notification, and
381        # have both branches locked at the notification time.
382        self.assertEqual([
383            ('post_push', source.branch, None, target.branch.base, 1, rev1,
384             2, rev2, True, None, True)
385            ],
386            self.hook_calls)
387
388
389class EmptyPushSmartEffortTests(per_branch.TestCaseWithBranch):
390    """Tests that a push of 0 revisions should make a limited number of smart
391    protocol RPCs.
392    """
393
394    def setUp(self):
395        # Skip some scenarios that don't apply to these tests.
396        if (self.transport_server is not None and
397            issubclass(self.transport_server,
398                       test_server.SmartTCPServer_for_testing)):
399            raise tests.TestNotApplicable(
400                'Does not apply when remote backing branch is also '
401                'a smart branch')
402        if not self.branch_format.supports_leaving_lock():
403            raise tests.TestNotApplicable(
404                'Branch format is not usable via HPSS.')
405        super(EmptyPushSmartEffortTests, self).setUp()
406        # Create a smart server that publishes whatever the backing VFS server
407        # does.
408        self.smart_server = test_server.SmartTCPServer_for_testing()
409        self.start_server(self.smart_server, self.get_server())
410        # Make two empty branches, 'empty' and 'target'.
411        self.empty_branch = self.make_branch('empty')
412        self.make_branch('target')
413        # Log all HPSS calls into self.hpss_calls.
414        client._SmartClient.hooks.install_named_hook(
415            'call', self.capture_hpss_call, None)
416        self.hpss_calls = []
417
418    def capture_hpss_call(self, params):
419        self.hpss_calls.append(params.method)
420
421    def test_empty_branch_api(self):
422        """The branch_obj.push API should make a limited number of HPSS calls.
423        """
424        t = transport.get_transport_from_url(
425            self.smart_server.get_url()).clone('target')
426        target = branch.Branch.open_from_transport(t)
427        self.empty_branch.push(target)
428        self.assertEqual(
429            [b'BzrDir.open_2.1',
430             b'BzrDir.open_branchV3',
431             b'BzrDir.find_repositoryV3',
432             b'Branch.get_stacked_on_url',
433             b'Branch.lock_write',
434             b'Branch.last_revision_info',
435             b'Branch.unlock'],
436            self.hpss_calls)
437
438    def test_empty_branch_command(self):
439        """The 'bzr push' command should make a limited number of HPSS calls.
440        """
441        cmd = builtins.cmd_push()
442        cmd.outf = BytesIO()
443        cmd.run(
444            directory=self.get_url('empty'),
445            location=self.smart_server.get_url() + 'target')
446        # HPSS calls as of 2008/09/22:
447        # [BzrDir.open, BzrDir.open_branch, BzrDir.find_repositoryV2,
448        # Branch.get_stacked_on_url, get, get, Branch.lock_write,
449        # Branch.last_revision_info, Branch.unlock]
450        self.assertTrue(len(self.hpss_calls) <= 9, self.hpss_calls)
451
452
453class TestLossyPush(per_branch.TestCaseWithBranch):
454
455    def setUp(self):
456        self.hook_calls = []
457        super(TestLossyPush, self).setUp()
458
459    def test_lossy_push_raises_same_vcs(self):
460        target = self.make_branch('target')
461        source = self.make_branch('source')
462        self.assertRaises(errors.LossyPushToSameVCS,
463                          source.push, target, lossy=True)
464