1from alembic import util
2from alembic.migration import HeadMaintainer
3from alembic.migration import MigrationStep
4from alembic.testing import assert_raises_message
5from alembic.testing import eq_
6from alembic.testing import mock
7from alembic.testing.env import clear_staging_env
8from alembic.testing.env import staging_env
9from alembic.testing.fixtures import TestBase
10
11
12class MigrationTest(TestBase):
13    def up_(self, rev):
14        return MigrationStep.upgrade_from_script(self.env.revision_map, rev)
15
16    def down_(self, rev):
17        return MigrationStep.downgrade_from_script(self.env.revision_map, rev)
18
19    def _assert_downgrade(self, destination, source, expected, expected_heads):
20        revs = self.env._downgrade_revs(destination, source)
21        eq_(revs, expected)
22        heads = set(util.to_tuple(source, default=()))
23        head = HeadMaintainer(mock.Mock(), heads)
24        for rev in revs:
25            head.update_to_step(rev)
26        eq_(head.heads, expected_heads)
27
28    def _assert_upgrade(self, destination, source, expected, expected_heads):
29        revs = self.env._upgrade_revs(destination, source)
30        eq_(revs, expected)
31        heads = set(util.to_tuple(source, default=()))
32        head = HeadMaintainer(mock.Mock(), heads)
33        for rev in revs:
34            head.update_to_step(rev)
35        eq_(head.heads, expected_heads)
36
37
38class RevisionPathTest(MigrationTest):
39    @classmethod
40    def setup_class(cls):
41        cls.env = env = staging_env()
42        cls.a = env.generate_revision(util.rev_id(), "->a")
43        cls.b = env.generate_revision(util.rev_id(), "a->b")
44        cls.c = env.generate_revision(util.rev_id(), "b->c")
45        cls.d = env.generate_revision(util.rev_id(), "c->d")
46        cls.e = env.generate_revision(util.rev_id(), "d->e")
47
48    @classmethod
49    def teardown_class(cls):
50        clear_staging_env()
51
52    def test_upgrade_path(self):
53        self._assert_upgrade(
54            self.e.revision,
55            self.c.revision,
56            [self.up_(self.d), self.up_(self.e)],
57            set([self.e.revision]),
58        )
59
60        self._assert_upgrade(
61            self.c.revision,
62            None,
63            [self.up_(self.a), self.up_(self.b), self.up_(self.c)],
64            set([self.c.revision]),
65        )
66
67    def test_relative_upgrade_path(self):
68        self._assert_upgrade(
69            "+2",
70            self.a.revision,
71            [self.up_(self.b), self.up_(self.c)],
72            set([self.c.revision]),
73        )
74
75        self._assert_upgrade(
76            "+1", self.a.revision, [self.up_(self.b)], set([self.b.revision])
77        )
78
79        self._assert_upgrade(
80            "+3",
81            self.b.revision,
82            [self.up_(self.c), self.up_(self.d), self.up_(self.e)],
83            set([self.e.revision]),
84        )
85
86        self._assert_upgrade(
87            "%s+2" % self.b.revision,
88            self.a.revision,
89            [self.up_(self.b), self.up_(self.c), self.up_(self.d)],
90            set([self.d.revision]),
91        )
92
93        self._assert_upgrade(
94            "%s-2" % self.d.revision,
95            self.a.revision,
96            [self.up_(self.b)],
97            set([self.b.revision]),
98        )
99
100    def test_invalid_relative_upgrade_path(self):
101
102        assert_raises_message(
103            util.CommandError,
104            "Relative revision -2 didn't produce 2 migrations",
105            self.env._upgrade_revs,
106            "-2",
107            self.b.revision,
108        )
109
110        assert_raises_message(
111            util.CommandError,
112            r"Relative revision \+5 didn't produce 5 migrations",
113            self.env._upgrade_revs,
114            "+5",
115            self.b.revision,
116        )
117
118    def test_downgrade_path(self):
119
120        self._assert_downgrade(
121            self.c.revision,
122            self.e.revision,
123            [self.down_(self.e), self.down_(self.d)],
124            set([self.c.revision]),
125        )
126
127        self._assert_downgrade(
128            None,
129            self.c.revision,
130            [self.down_(self.c), self.down_(self.b), self.down_(self.a)],
131            set(),
132        )
133
134    def test_relative_downgrade_path(self):
135
136        self._assert_downgrade(
137            "-1", self.c.revision, [self.down_(self.c)], set([self.b.revision])
138        )
139
140        self._assert_downgrade(
141            "-3",
142            self.e.revision,
143            [self.down_(self.e), self.down_(self.d), self.down_(self.c)],
144            set([self.b.revision]),
145        )
146
147        self._assert_downgrade(
148            "%s+2" % self.a.revision,
149            self.d.revision,
150            [self.down_(self.d)],
151            set([self.c.revision]),
152        )
153
154        self._assert_downgrade(
155            "%s-2" % self.c.revision,
156            self.d.revision,
157            [self.down_(self.d), self.down_(self.c), self.down_(self.b)],
158            set([self.a.revision]),
159        )
160
161    def test_invalid_relative_downgrade_path(self):
162
163        assert_raises_message(
164            util.CommandError,
165            "Relative revision -5 didn't produce 5 migrations",
166            self.env._downgrade_revs,
167            "-5",
168            self.b.revision,
169        )
170
171        assert_raises_message(
172            util.CommandError,
173            r"Relative revision \+2 didn't produce 2 migrations",
174            self.env._downgrade_revs,
175            "+2",
176            self.b.revision,
177        )
178
179    def test_invalid_move_rev_to_none(self):
180
181        assert_raises_message(
182            util.CommandError,
183            r"Destination %s is not a valid downgrade "
184            r"target from current head\(s\)" % self.b.revision[0:3],
185            self.env._downgrade_revs,
186            self.b.revision[0:3],
187            None,
188        )
189
190    def test_invalid_move_higher_to_lower(self):
191
192        assert_raises_message(
193            util.CommandError,
194            r"Destination %s is not a valid downgrade "
195            r"target from current head\(s\)" % self.c.revision[0:4],
196            self.env._downgrade_revs,
197            self.c.revision[0:4],
198            self.b.revision,
199        )
200
201    def test_stamp_to_base(self):
202        revs = self.env._stamp_revs("base", self.d.revision)
203        eq_(len(revs), 1)
204        assert revs[0].should_delete_branch
205        eq_(revs[0].delete_version_num, self.d.revision)
206
207
208class BranchedPathTest(MigrationTest):
209    @classmethod
210    def setup_class(cls):
211        cls.env = env = staging_env()
212        cls.a = env.generate_revision(util.rev_id(), "->a")
213        cls.b = env.generate_revision(util.rev_id(), "a->b")
214
215        cls.c1 = env.generate_revision(
216            util.rev_id(), "b->c1", branch_labels="c1branch", refresh=True
217        )
218        cls.d1 = env.generate_revision(util.rev_id(), "c1->d1")
219
220        cls.c2 = env.generate_revision(
221            util.rev_id(),
222            "b->c2",
223            branch_labels="c2branch",
224            head=cls.b.revision,
225            splice=True,
226        )
227        cls.d2 = env.generate_revision(
228            util.rev_id(), "c2->d2", head=cls.c2.revision
229        )
230
231    @classmethod
232    def teardown_class(cls):
233        clear_staging_env()
234
235    def test_stamp_down_across_multiple_branch_to_branchpoint(self):
236        heads = [self.d1.revision, self.c2.revision]
237        revs = self.env._stamp_revs(self.b.revision, heads)
238        eq_(len(revs), 1)
239        eq_(
240            revs[0].merge_branch_idents(heads),
241            # DELETE d1 revision, UPDATE c2 to b
242            ([self.d1.revision], self.c2.revision, self.b.revision),
243        )
244
245    def test_stamp_to_labeled_base_multiple_heads(self):
246        revs = self.env._stamp_revs(
247            "c1branch@base", [self.d1.revision, self.c2.revision]
248        )
249        eq_(len(revs), 1)
250        assert revs[0].should_delete_branch
251        eq_(revs[0].delete_version_num, self.d1.revision)
252
253    def test_stamp_to_labeled_head_multiple_heads(self):
254        heads = [self.d1.revision, self.c2.revision]
255        revs = self.env._stamp_revs("c2branch@head", heads)
256        eq_(len(revs), 1)
257        eq_(
258            revs[0].merge_branch_idents(heads),
259            # the c1branch remains unchanged
260            ([], self.c2.revision, self.d2.revision),
261        )
262
263    def test_upgrade_single_branch(self):
264
265        self._assert_upgrade(
266            self.d1.revision,
267            self.b.revision,
268            [self.up_(self.c1), self.up_(self.d1)],
269            set([self.d1.revision]),
270        )
271
272    def test_upgrade_multiple_branch(self):
273        # move from a single head to multiple heads
274
275        self._assert_upgrade(
276            (self.d1.revision, self.d2.revision),
277            self.a.revision,
278            [
279                self.up_(self.b),
280                self.up_(self.c2),
281                self.up_(self.d2),
282                self.up_(self.c1),
283                self.up_(self.d1),
284            ],
285            set([self.d1.revision, self.d2.revision]),
286        )
287
288    def test_downgrade_multiple_branch(self):
289        self._assert_downgrade(
290            self.a.revision,
291            (self.d1.revision, self.d2.revision),
292            [
293                self.down_(self.d1),
294                self.down_(self.c1),
295                self.down_(self.d2),
296                self.down_(self.c2),
297                self.down_(self.b),
298            ],
299            set([self.a.revision]),
300        )
301
302    def test_relative_upgrade(self):
303
304        self._assert_upgrade(
305            "c2branch@head-1",
306            self.b.revision,
307            [self.up_(self.c2)],
308            set([self.c2.revision]),
309        )
310
311    def test_relative_downgrade(self):
312
313        self._assert_downgrade(
314            "c2branch@base+2",
315            [self.d2.revision, self.d1.revision],
316            [self.down_(self.d2), self.down_(self.c2), self.down_(self.d1)],
317            set([self.c1.revision]),
318        )
319
320
321class BranchFromMergepointTest(MigrationTest):
322
323    """this is a form that will come up frequently in the
324    "many independent roots with cross-dependencies" case.
325
326    """
327
328    @classmethod
329    def setup_class(cls):
330        cls.env = env = staging_env()
331        cls.a1 = env.generate_revision(util.rev_id(), "->a1")
332        cls.b1 = env.generate_revision(util.rev_id(), "a1->b1")
333        cls.c1 = env.generate_revision(util.rev_id(), "b1->c1")
334
335        cls.a2 = env.generate_revision(
336            util.rev_id(), "->a2", head=(), refresh=True
337        )
338        cls.b2 = env.generate_revision(
339            util.rev_id(), "a2->b2", head=cls.a2.revision
340        )
341        cls.c2 = env.generate_revision(
342            util.rev_id(), "b2->c2", head=cls.b2.revision
343        )
344
345        # mergepoint between c1, c2
346        # d1 dependent on c2
347        cls.d1 = env.generate_revision(
348            util.rev_id(),
349            "d1",
350            head=(cls.c1.revision, cls.c2.revision),
351            refresh=True,
352        )
353
354        # but then c2 keeps going into d2
355        cls.d2 = env.generate_revision(
356            util.rev_id(),
357            "d2",
358            head=cls.c2.revision,
359            refresh=True,
360            splice=True,
361        )
362
363    @classmethod
364    def teardown_class(cls):
365        clear_staging_env()
366
367    def test_mergepoint_to_only_one_side_upgrade(self):
368
369        self._assert_upgrade(
370            self.d1.revision,
371            (self.d2.revision, self.b1.revision),
372            [self.up_(self.c1), self.up_(self.d1)],
373            set([self.d2.revision, self.d1.revision]),
374        )
375
376    def test_mergepoint_to_only_one_side_downgrade(self):
377
378        self._assert_downgrade(
379            self.b1.revision,
380            (self.d2.revision, self.d1.revision),
381            [self.down_(self.d1), self.down_(self.c1)],
382            set([self.d2.revision, self.b1.revision]),
383        )
384
385
386class BranchFrom3WayMergepointTest(MigrationTest):
387
388    """this is a form that will come up frequently in the
389    "many independent roots with cross-dependencies" case.
390
391    """
392
393    @classmethod
394    def setup_class(cls):
395        cls.env = env = staging_env()
396        cls.a1 = env.generate_revision(util.rev_id(), "->a1")
397        cls.b1 = env.generate_revision(util.rev_id(), "a1->b1")
398        cls.c1 = env.generate_revision(util.rev_id(), "b1->c1")
399
400        cls.a2 = env.generate_revision(
401            util.rev_id(), "->a2", head=(), refresh=True
402        )
403        cls.b2 = env.generate_revision(
404            util.rev_id(), "a2->b2", head=cls.a2.revision
405        )
406        cls.c2 = env.generate_revision(
407            util.rev_id(), "b2->c2", head=cls.b2.revision
408        )
409
410        cls.a3 = env.generate_revision(
411            util.rev_id(), "->a3", head=(), refresh=True
412        )
413        cls.b3 = env.generate_revision(
414            util.rev_id(), "a3->b3", head=cls.a3.revision
415        )
416        cls.c3 = env.generate_revision(
417            util.rev_id(), "b3->c3", head=cls.b3.revision
418        )
419
420        # mergepoint between c1, c2, c3
421        # d1 dependent on c2, c3
422        cls.d1 = env.generate_revision(
423            util.rev_id(),
424            "d1",
425            head=(cls.c1.revision, cls.c2.revision, cls.c3.revision),
426            refresh=True,
427        )
428
429        # but then c2 keeps going into d2
430        cls.d2 = env.generate_revision(
431            util.rev_id(),
432            "d2",
433            head=cls.c2.revision,
434            refresh=True,
435            splice=True,
436        )
437
438        # c3 keeps going into d3
439        cls.d3 = env.generate_revision(
440            util.rev_id(),
441            "d3",
442            head=cls.c3.revision,
443            refresh=True,
444            splice=True,
445        )
446
447    @classmethod
448    def teardown_class(cls):
449        clear_staging_env()
450
451    def test_mergepoint_to_only_one_side_upgrade(self):
452
453        self._assert_upgrade(
454            self.d1.revision,
455            (self.d3.revision, self.d2.revision, self.b1.revision),
456            [self.up_(self.c1), self.up_(self.d1)],
457            set([self.d3.revision, self.d2.revision, self.d1.revision]),
458        )
459
460    def test_mergepoint_to_only_one_side_downgrade(self):
461        self._assert_downgrade(
462            self.b1.revision,
463            (self.d3.revision, self.d2.revision, self.d1.revision),
464            [self.down_(self.d1), self.down_(self.c1)],
465            set([self.d3.revision, self.d2.revision, self.b1.revision]),
466        )
467
468    def test_mergepoint_to_two_sides_upgrade(self):
469
470        self._assert_upgrade(
471            self.d1.revision,
472            (self.d3.revision, self.b2.revision, self.b1.revision),
473            [self.up_(self.c2), self.up_(self.c1), self.up_(self.d1)],
474            # this will merge b2 and b1 into d1
475            set([self.d3.revision, self.d1.revision]),
476        )
477
478        # but then!  b2 will break out again if we keep going with it
479        self._assert_upgrade(
480            self.d2.revision,
481            (self.d3.revision, self.d1.revision),
482            [self.up_(self.d2)],
483            set([self.d3.revision, self.d2.revision, self.d1.revision]),
484        )
485
486
487class TwinMergeTest(MigrationTest):
488    """Test #297, where we have two mergepoints from the same set of
489    originating branches.
490
491    """
492
493    @classmethod
494    def setup_class(cls):
495        """
496
497        33e21c000cfe -> 178d4e761bbd (head),
498        2bef33cb3a58, 3904558db1c6, 968330f320d -> 33e21c000cfe (mergepoint)
499        46c99f866004 -> 18f46b42410d (head),
500        2bef33cb3a58, 3904558db1c6, 968330f320d -> 46c99f866004 (mergepoint)
501        f0fa4315825 -> 3904558db1c6 (branchpoint),
502
503        --------------------------
504
505        A -> B2 (branchpoint),
506
507        B1, B2, B3 -> C1 (mergepoint)
508        B1, B2, B3 -> C2 (mergepoint)
509
510        C1 -> D1 (head),
511
512        C2 -> D2 (head),
513
514
515        """
516        cls.env = env = staging_env()
517
518        cls.a = env.generate_revision("a", "a")
519        cls.b1 = env.generate_revision("b1", "b1", head=cls.a.revision)
520        cls.b2 = env.generate_revision(
521            "b2", "b2", splice=True, head=cls.a.revision
522        )
523        cls.b3 = env.generate_revision(
524            "b3", "b3", splice=True, head=cls.a.revision
525        )
526
527        cls.c1 = env.generate_revision(
528            "c1",
529            "c1",
530            head=(cls.b1.revision, cls.b2.revision, cls.b3.revision),
531        )
532
533        cls.c2 = env.generate_revision(
534            "c2",
535            "c2",
536            splice=True,
537            head=(cls.b1.revision, cls.b2.revision, cls.b3.revision),
538        )
539
540        cls.d1 = env.generate_revision("d1", "d1", head=cls.c1.revision)
541
542        cls.d2 = env.generate_revision("d2", "d2", head=cls.c2.revision)
543
544    @classmethod
545    def teardown_class(cls):
546        clear_staging_env()
547
548    def test_upgrade(self):
549        head = HeadMaintainer(mock.Mock(), [self.a.revision])
550
551        steps = [
552            (self.up_(self.b3), ("b3",)),
553            (self.up_(self.b1), ("b1", "b3")),
554            (self.up_(self.b2), ("b1", "b2", "b3")),
555            (self.up_(self.c2), ("c2",)),
556            (self.up_(self.d2), ("d2",)),
557            (self.up_(self.c1), ("c1", "d2")),
558            (self.up_(self.d1), ("d1", "d2")),
559        ]
560        for step, assert_ in steps:
561            head.update_to_step(step)
562            eq_(head.heads, set(assert_))
563
564
565class NotQuiteTwinMergeTest(MigrationTest):
566    """Test a variant of #297.
567
568    """
569
570    @classmethod
571    def setup_class(cls):
572        """
573        A -> B2 (branchpoint),
574
575        B1, B2 -> C1 (mergepoint)
576        B2, B3 -> C2 (mergepoint)
577
578        C1 -> D1 (head),
579
580        C2 -> D2 (head),
581
582
583        """
584        cls.env = env = staging_env()
585
586        cls.a = env.generate_revision("a", "a")
587        cls.b1 = env.generate_revision("b1", "b1", head=cls.a.revision)
588        cls.b2 = env.generate_revision(
589            "b2", "b2", splice=True, head=cls.a.revision
590        )
591        cls.b3 = env.generate_revision(
592            "b3", "b3", splice=True, head=cls.a.revision
593        )
594
595        cls.c1 = env.generate_revision(
596            "c1", "c1", head=(cls.b1.revision, cls.b2.revision)
597        )
598
599        cls.c2 = env.generate_revision(
600            "c2", "c2", splice=True, head=(cls.b2.revision, cls.b3.revision)
601        )
602
603        cls.d1 = env.generate_revision("d1", "d1", head=cls.c1.revision)
604
605        cls.d2 = env.generate_revision("d2", "d2", head=cls.c2.revision)
606
607    @classmethod
608    def teardown_class(cls):
609        clear_staging_env()
610
611    def test_upgrade(self):
612        head = HeadMaintainer(mock.Mock(), [self.a.revision])
613
614        """
615        upgrade a -> b2, b2
616        upgrade a -> b3, b3
617        upgrade b2, b3 -> c2, c2
618        upgrade c2 -> d2, d2
619        upgrade a -> b1, b1
620        upgrade b1, b2 -> c1, c1
621        upgrade c1 -> d1, d1
622        """
623
624        steps = [
625            (self.up_(self.b2), ("b2",)),
626            (self.up_(self.b3), ("b2", "b3")),
627            (self.up_(self.c2), ("c2",)),
628            (self.up_(self.d2), ("d2",)),
629            (self.up_(self.b1), ("b1", "d2")),
630            (self.up_(self.c1), ("c1", "d2")),
631            (self.up_(self.d1), ("d1", "d2")),
632        ]
633        for step, assert_ in steps:
634            head.update_to_step(step)
635            eq_(head.heads, set(assert_))
636
637
638class DependsOnBranchTestOne(MigrationTest):
639    @classmethod
640    def setup_class(cls):
641        cls.env = env = staging_env()
642        cls.a1 = env.generate_revision(
643            util.rev_id(), "->a1", branch_labels=["lib1"]
644        )
645        cls.b1 = env.generate_revision(util.rev_id(), "a1->b1")
646        cls.c1 = env.generate_revision(util.rev_id(), "b1->c1")
647
648        cls.a2 = env.generate_revision(util.rev_id(), "->a2", head=())
649        cls.b2 = env.generate_revision(
650            util.rev_id(), "a2->b2", head=cls.a2.revision
651        )
652        cls.c2 = env.generate_revision(
653            util.rev_id(),
654            "b2->c2",
655            head=cls.b2.revision,
656            depends_on=cls.c1.revision,
657        )
658
659        cls.d1 = env.generate_revision(
660            util.rev_id(), "c1->d1", head=cls.c1.revision
661        )
662        cls.e1 = env.generate_revision(
663            util.rev_id(), "d1->e1", head=cls.d1.revision
664        )
665        cls.f1 = env.generate_revision(
666            util.rev_id(), "e1->f1", head=cls.e1.revision
667        )
668
669    @classmethod
670    def teardown_class(cls):
671        clear_staging_env()
672
673    def test_downgrade_to_dependency(self):
674        heads = [self.c2.revision, self.d1.revision]
675        head = HeadMaintainer(mock.Mock(), heads)
676        head.update_to_step(self.down_(self.d1))
677        eq_(head.heads, set([self.c2.revision]))
678
679    def test_stamp_across_dependency(self):
680        heads = [self.e1.revision, self.c2.revision]
681        head = HeadMaintainer(mock.Mock(), heads)
682        for step in self.env._stamp_revs(self.b1.revision, heads):
683            head.update_to_step(step)
684        eq_(head.heads, set([self.b1.revision]))
685
686
687class DependsOnBranchTestTwo(MigrationTest):
688    @classmethod
689    def setup_class(cls):
690        """
691        Structure::
692
693            a1 ---+
694                  |
695            a2 ---+--> amerge
696                  |
697            a3 ---+
698             ^
699             |
700             +---------------------------+
701                                         |
702            b1 ---+                      |
703                  +--> bmerge        overmerge / d1
704            b2 ---+                     |  |
705             ^                          |  |
706             |                          |  |
707             +--------------------------+  |
708                                           |
709             +-----------------------------+
710             |
711             v
712            c1 ---+
713                  |
714            c2 ---+--> cmerge
715                  |
716            c3 ---+
717
718        """
719        cls.env = env = staging_env()
720        cls.a1 = env.generate_revision("a1", "->a1", head="base")
721        cls.a2 = env.generate_revision("a2", "->a2", head="base")
722        cls.a3 = env.generate_revision("a3", "->a3", head="base")
723        cls.amerge = env.generate_revision(
724            "amerge",
725            "amerge",
726            head=[cls.a1.revision, cls.a2.revision, cls.a3.revision],
727        )
728
729        cls.b1 = env.generate_revision("b1", "->b1", head="base")
730        cls.b2 = env.generate_revision("b2", "->b2", head="base")
731        cls.bmerge = env.generate_revision(
732            "bmerge", "bmerge", head=[cls.b1.revision, cls.b2.revision]
733        )
734
735        cls.c1 = env.generate_revision("c1", "->c1", head="base")
736        cls.c2 = env.generate_revision("c2", "->c2", head="base")
737        cls.c3 = env.generate_revision("c3", "->c3", head="base")
738        cls.cmerge = env.generate_revision(
739            "cmerge",
740            "cmerge",
741            head=[cls.c1.revision, cls.c2.revision, cls.c3.revision],
742        )
743
744        cls.d1 = env.generate_revision(
745            "d1",
746            "o",
747            head="base",
748            depends_on=[cls.a3.revision, cls.b2.revision, cls.c1.revision],
749        )
750
751    @classmethod
752    def teardown_class(cls):
753        clear_staging_env()
754
755    def test_kaboom(self):
756        # here's the upgrade path:
757        # ['->c1', '->b2', '->a3', 'overmerge', '->c3', '->c2', 'cmerge',
758        # '->b1', 'bmerge', '->a2', '->a1', 'amerge'],
759
760        heads = [
761            self.amerge.revision,
762            self.bmerge.revision,
763            self.cmerge.revision,
764            self.d1.revision,
765        ]
766
767        self._assert_downgrade(
768            self.b2.revision,
769            heads,
770            [self.down_(self.bmerge)],
771            set(
772                [
773                    self.amerge.revision,
774                    self.b1.revision,
775                    self.cmerge.revision,
776                    self.d1.revision,
777                ]
778            ),
779        )
780
781        # start with those heads..
782        heads = [
783            self.amerge.revision,
784            self.d1.revision,
785            self.b1.revision,
786            self.cmerge.revision,
787        ]
788
789        # downgrade d1...
790        self._assert_downgrade(
791            "d1@base",
792            heads,
793            [self.down_(self.d1)],
794            # b2 has to be INSERTed, because it was implied by d1
795            set(
796                [
797                    self.amerge.revision,
798                    self.b1.revision,
799                    self.b2.revision,
800                    self.cmerge.revision,
801                ]
802            ),
803        )
804
805        # start with those heads ...
806        heads = [
807            self.amerge.revision,
808            self.b1.revision,
809            self.b2.revision,
810            self.cmerge.revision,
811        ]
812
813        self._assert_downgrade(
814            "base",
815            heads,
816            [
817                self.down_(self.amerge),
818                self.down_(self.a1),
819                self.down_(self.a2),
820                self.down_(self.a3),
821                self.down_(self.b1),
822                self.down_(self.b2),
823                self.down_(self.cmerge),
824                self.down_(self.c1),
825                self.down_(self.c2),
826                self.down_(self.c3),
827            ],
828            set([]),
829        )
830
831
832class DependsOnBranchTestThree(MigrationTest):
833    @classmethod
834    def setup_class(cls):
835        """
836        issue #377
837
838        Structure::
839
840            <base> -> a1 --+--> a2 -------> a3
841                           |     ^          |
842                           |     |   +------+
843                           |     |   |
844                           |     +---|------+
845                           |         |      |
846                           |         v      |
847                           +-------> b1 --> b2 --> b3
848
849        """
850        cls.env = env = staging_env()
851        cls.a1 = env.generate_revision("a1", "->a1", head="base")
852        cls.a2 = env.generate_revision("a2", "->a2")
853
854        cls.b1 = env.generate_revision("b1", "->b1", head="base")
855        cls.b2 = env.generate_revision(
856            "b2", "->b2", depends_on="a2", head="b1"
857        )
858        cls.b3 = env.generate_revision("b3", "->b3", head="b2")
859
860        cls.a3 = env.generate_revision(
861            "a3", "->a3", head="a2", depends_on="b1"
862        )
863
864    @classmethod
865    def teardown_class(cls):
866        clear_staging_env()
867
868    def test_downgrade_over_crisscross(self):
869        # this state was not possible prior to
870        # #377.  a3 would be considered half of a merge point
871        # between a3 and b2, and the head would be forced down
872        # to b1.   In this test however, we're not allowed to remove
873        # b2 because a2 is dependent on it, hence we add the ability
874        # to remove half of a merge point.
875        self._assert_downgrade(
876            "b1",
877            ["a3", "b2"],
878            [self.down_(self.b2)],
879            set(["a3"]),  # we have b1 also, which is implied by a3
880        )
881
882
883class DependsOnBranchLabelTest(MigrationTest):
884    @classmethod
885    def setup_class(cls):
886        cls.env = env = staging_env()
887        cls.a1 = env.generate_revision(
888            util.rev_id(), "->a1", branch_labels=["lib1"]
889        )
890        cls.b1 = env.generate_revision(util.rev_id(), "a1->b1")
891        cls.c1 = env.generate_revision(
892            util.rev_id(), "b1->c1", branch_labels=["c1lib"]
893        )
894
895        cls.a2 = env.generate_revision(util.rev_id(), "->a2", head=())
896        cls.b2 = env.generate_revision(
897            util.rev_id(), "a2->b2", head=cls.a2.revision
898        )
899        cls.c2 = env.generate_revision(
900            util.rev_id(), "b2->c2", head=cls.b2.revision, depends_on=["c1lib"]
901        )
902
903        cls.d1 = env.generate_revision(
904            util.rev_id(), "c1->d1", head=cls.c1.revision
905        )
906        cls.e1 = env.generate_revision(
907            util.rev_id(), "d1->e1", head=cls.d1.revision
908        )
909        cls.f1 = env.generate_revision(
910            util.rev_id(), "e1->f1", head=cls.e1.revision
911        )
912
913    @classmethod
914    def teardown_class(cls):
915        clear_staging_env()
916
917    def test_upgrade_path(self):
918        self._assert_upgrade(
919            self.c2.revision,
920            self.a2.revision,
921            [
922                self.up_(self.a1),
923                self.up_(self.b1),
924                self.up_(self.c1),
925                self.up_(self.b2),
926                self.up_(self.c2),
927            ],
928            set([self.c2.revision]),
929        )
930
931
932class ForestTest(MigrationTest):
933    @classmethod
934    def setup_class(cls):
935        cls.env = env = staging_env()
936        cls.a1 = env.generate_revision(util.rev_id(), "->a1")
937        cls.b1 = env.generate_revision(util.rev_id(), "a1->b1")
938
939        cls.a2 = env.generate_revision(
940            util.rev_id(), "->a2", head=(), refresh=True
941        )
942        cls.b2 = env.generate_revision(
943            util.rev_id(), "a2->b2", head=cls.a2.revision
944        )
945
946    @classmethod
947    def teardown_class(cls):
948        clear_staging_env()
949
950    def test_base_to_heads(self):
951        eq_(
952            self.env._upgrade_revs("heads", "base"),
953            [
954                self.up_(self.a2),
955                self.up_(self.b2),
956                self.up_(self.a1),
957                self.up_(self.b1),
958            ],
959        )
960
961    def test_stamp_to_heads(self):
962        revs = self.env._stamp_revs("heads", ())
963        eq_(len(revs), 2)
964        eq_(
965            set(r.to_revisions for r in revs),
966            set([(self.b1.revision,), (self.b2.revision,)]),
967        )
968
969    def test_stamp_to_heads_no_moves_needed(self):
970        revs = self.env._stamp_revs(
971            "heads", (self.b1.revision, self.b2.revision)
972        )
973        eq_(len(revs), 0)
974
975
976class MergedPathTest(MigrationTest):
977    @classmethod
978    def setup_class(cls):
979        cls.env = env = staging_env()
980        cls.a = env.generate_revision(util.rev_id(), "->a")
981        cls.b = env.generate_revision(util.rev_id(), "a->b")
982
983        cls.c1 = env.generate_revision(util.rev_id(), "b->c1")
984        cls.d1 = env.generate_revision(util.rev_id(), "c1->d1")
985
986        cls.c2 = env.generate_revision(
987            util.rev_id(),
988            "b->c2",
989            branch_labels="c2branch",
990            head=cls.b.revision,
991            splice=True,
992        )
993        cls.d2 = env.generate_revision(
994            util.rev_id(), "c2->d2", head=cls.c2.revision
995        )
996
997        cls.e = env.generate_revision(
998            util.rev_id(),
999            "merge d1 and d2",
1000            head=(cls.d1.revision, cls.d2.revision),
1001        )
1002
1003        cls.f = env.generate_revision(util.rev_id(), "e->f")
1004
1005    @classmethod
1006    def teardown_class(cls):
1007        clear_staging_env()
1008
1009    def test_stamp_down_across_merge_point_branch(self):
1010        heads = [self.e.revision]
1011        revs = self.env._stamp_revs(self.c2.revision, heads)
1012        eq_(len(revs), 1)
1013        eq_(
1014            revs[0].merge_branch_idents(heads),
1015            # no deletes, UPDATE e to c2
1016            ([], self.e.revision, self.c2.revision),
1017        )
1018
1019    def test_stamp_down_across_merge_prior_branching(self):
1020        heads = [self.e.revision]
1021        revs = self.env._stamp_revs(self.a.revision, heads)
1022        eq_(len(revs), 1)
1023        eq_(
1024            revs[0].merge_branch_idents(heads),
1025            # no deletes, UPDATE e to c2
1026            ([], self.e.revision, self.a.revision),
1027        )
1028
1029    def test_stamp_up_across_merge_from_single_branch(self):
1030        revs = self.env._stamp_revs(self.e.revision, [self.c2.revision])
1031        eq_(len(revs), 1)
1032        eq_(
1033            revs[0].merge_branch_idents([self.c2.revision]),
1034            # no deletes, UPDATE e to c2
1035            ([], self.c2.revision, self.e.revision),
1036        )
1037
1038    def test_stamp_labled_head_across_merge_from_multiple_branch(self):
1039        # this is testing that filter_for_lineage() checks for
1040        # d1 both in terms of "c2branch" as well as that the "head"
1041        # revision "f" is the head of both d1 and d2
1042        revs = self.env._stamp_revs(
1043            "c2branch@head", [self.d1.revision, self.c2.revision]
1044        )
1045        eq_(len(revs), 1)
1046        eq_(
1047            revs[0].merge_branch_idents([self.d1.revision, self.c2.revision]),
1048            # DELETE d1 revision, UPDATE c2 to e
1049            ([self.d1.revision], self.c2.revision, self.f.revision),
1050        )
1051
1052    def test_stamp_up_across_merge_from_multiple_branch(self):
1053        heads = [self.d1.revision, self.c2.revision]
1054        revs = self.env._stamp_revs(self.e.revision, heads)
1055        eq_(len(revs), 1)
1056        eq_(
1057            revs[0].merge_branch_idents(heads),
1058            # DELETE d1 revision, UPDATE c2 to e
1059            ([self.d1.revision], self.c2.revision, self.e.revision),
1060        )
1061
1062    def test_stamp_up_across_merge_prior_branching(self):
1063        heads = [self.b.revision]
1064        revs = self.env._stamp_revs(self.e.revision, heads)
1065        eq_(len(revs), 1)
1066        eq_(
1067            revs[0].merge_branch_idents(heads),
1068            # no deletes, UPDATE e to c2
1069            ([], self.b.revision, self.e.revision),
1070        )
1071
1072    def test_upgrade_across_merge_point(self):
1073
1074        eq_(
1075            self.env._upgrade_revs(self.f.revision, self.b.revision),
1076            [
1077                self.up_(self.c2),
1078                self.up_(self.d2),
1079                self.up_(self.c1),  # b->c1, create new branch
1080                self.up_(self.d1),
1081                self.up_(self.e),  # d1/d2 -> e, merge branches
1082                # (DELETE d2, UPDATE d1->e)
1083                self.up_(self.f),
1084            ],
1085        )
1086
1087    def test_downgrade_across_merge_point(self):
1088
1089        eq_(
1090            self.env._downgrade_revs(self.b.revision, self.f.revision),
1091            [
1092                self.down_(self.f),
1093                self.down_(self.e),  # e -> d1 and d2, unmerge branches
1094                # (UPDATE e->d1, INSERT d2)
1095                self.down_(self.d1),
1096                self.down_(self.c1),
1097                self.down_(self.d2),
1098                self.down_(self.c2),  # c2->b, delete branch
1099            ],
1100        )
1101