1# Copyright (C) 2007 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17import re 18 19from .. import ( 20 errors, 21 gpg, 22 mail_client, 23 merge_directive, 24 tests, 25 trace, 26 ) 27 28 29OUTPUT1 = b"""# Bazaar merge directive format 1 30# revision_id: example: 31# target_branch: http://example.com 32# testament_sha1: sha 33# timestamp: 1970-01-01 00:09:33 +0002 34#\x20 35booga""" 36 37OUTPUT1_2 = b"""# Bazaar merge directive format 2 (Bazaar 0.90) 38# revision_id: example: 39# target_branch: http://example.com 40# testament_sha1: sha 41# timestamp: 1970-01-01 00:09:33 +0002 42# base_revision_id: null: 43#\x20 44# Begin bundle 45booga""" 46 47OUTPUT2 = b"""# Bazaar merge directive format 1 48# revision_id: example: 49# target_branch: http://example.com 50# testament_sha1: sha 51# timestamp: 1970-01-01 00:09:33 +0002 52# source_branch: http://example.org 53# message: Hi mom! 54#\x20 55booga""" 56 57OUTPUT2_2 = b"""# Bazaar merge directive format 2 (Bazaar 0.90) 58# revision_id: example: 59# target_branch: http://example.com 60# testament_sha1: sha 61# timestamp: 1970-01-01 00:09:33 +0002 62# source_branch: http://example.org 63# message: Hi mom! 64# base_revision_id: null: 65#\x20 66# Begin patch 67booga""" 68 69INPUT1 = b""" 70I was thinking today about creating a merge directive. 71 72So I did. 73 74Here it is. 75 76(I've pasted it in the body of this message) 77 78Aaron 79 80# Bazaar merge directive format 1\r 81# revision_id: example: 82# target_branch: http://example.com 83# testament_sha1: sha 84# timestamp: 1970-01-01 00:09:33 +0002 85# source_branch: http://example.org 86# message: Hi mom! 87#\x20 88booga""".splitlines(True) 89 90 91INPUT1_2 = b""" 92I was thinking today about creating a merge directive. 93 94So I did. 95 96Here it is. 97 98(I've pasted it in the body of this message) 99 100Aaron 101 102# Bazaar merge directive format 2 (Bazaar 0.90)\r 103# revision_id: example: 104# target_branch: http://example.com 105# testament_sha1: sha 106# timestamp: 1970-01-01 00:09:33 +0002 107# source_branch: http://example.org 108# base_revision_id: null: 109# message: Hi mom! 110#\x20 111# Begin patch 112booga""".splitlines(True) 113 114 115INPUT1_2_OLD = b""" 116I was thinking today about creating a merge directive. 117 118So I did. 119 120Here it is. 121 122(I've pasted it in the body of this message) 123 124Aaron 125 126# Bazaar merge directive format 2 (Bazaar 0.19)\r 127# revision_id: example: 128# target_branch: http://example.com 129# testament_sha1: sha 130# timestamp: 1970-01-01 00:09:33 +0002 131# source_branch: http://example.org 132# base_revision_id: null: 133# message: Hi mom! 134#\x20 135# Begin patch 136booga""".splitlines(True) 137 138 139OLD_DIRECTIVE_2 = b"""# Bazaar merge directive format 2 (Bazaar 0.19) 140# revision_id: abentley@panoramicfeedback.com-20070807234458-\ 141# nzhkoyza56lan7z5 142# target_branch: http://panoramicfeedback.com/opensource/bzr/repo\ 143# /bzr.ab 144# testament_sha1: d825a5cdb267a90ec2ba86b00895f3d8a9bed6bf 145# timestamp: 2007-08-10 16:15:02 -0400 146# source_branch: http://panoramicfeedback.com/opensource/bzr/repo\ 147# /bzr.ab 148# base_revision_id: abentley@panoramicfeedback.com-20070731163346-\ 149# 623xwcycwij91xen 150# 151""".splitlines(True) 152 153 154class TestMergeDirective(object): 155 156 def test_merge_source(self): 157 time = 500000.0 158 timezone = 5 * 3600 159 self.assertRaises(errors.NoMergeSource, self.make_merge_directive, 160 b'example:', b'sha', time, timezone, 'http://example.com') 161 self.assertRaises(errors.NoMergeSource, self.make_merge_directive, 162 b'example:', b'sha', time, timezone, 'http://example.com', 163 patch_type='diff') 164 self.make_merge_directive(b'example:', b'sha', time, timezone, 165 'http://example.com', source_branch='http://example.org') 166 md = self.make_merge_directive(b'null:', b'sha', time, timezone, 167 'http://example.com', patch=b'blah', patch_type='bundle') 168 self.assertIs(None, md.source_branch) 169 md2 = self.make_merge_directive(b'null:', b'sha', time, timezone, 170 'http://example.com', patch=b'blah', patch_type='bundle', 171 source_branch='bar') 172 self.assertEqual('bar', md2.source_branch) 173 174 def test_serialization(self): 175 time = 453 176 timezone = 120 177 md = self.make_merge_directive(b'example:', b'sha', time, timezone, 178 'http://example.com', patch=b'booga', patch_type='bundle') 179 self.assertEqualDiff(self.OUTPUT1, b''.join(md.to_lines())) 180 md = self.make_merge_directive(b'example:', b'sha', time, timezone, 181 'http://example.com', source_branch="http://example.org", 182 patch=b'booga', patch_type='diff', message="Hi mom!") 183 self.assertEqualDiff(self.OUTPUT2, b''.join(md.to_lines())) 184 185 def test_deserialize_junk(self): 186 time = 501 187 self.assertRaises(errors.NotAMergeDirective, 188 merge_directive.MergeDirective.from_lines, [b'lala']) 189 190 def test_deserialize_empty(self): 191 self.assertRaises(errors.NotAMergeDirective, 192 merge_directive.MergeDirective.from_lines, []) 193 194 def test_deserialize_leading_junk(self): 195 md = merge_directive.MergeDirective.from_lines(self.INPUT1) 196 self.assertEqual(b'example:', md.revision_id) 197 self.assertEqual(b'sha', md.testament_sha1) 198 self.assertEqual('http://example.com', md.target_branch) 199 self.assertEqual('http://example.org', md.source_branch) 200 self.assertEqual(453, md.time) 201 self.assertEqual(120, md.timezone) 202 self.assertEqual(b'booga', md.patch) 203 self.assertEqual('diff', md.patch_type) 204 self.assertEqual('Hi mom!', md.message) 205 206 def test_roundtrip(self): 207 time = 500000 208 timezone = 7.5 * 3600 209 md = self.make_merge_directive(b'example:', b'sha', time, timezone, 210 'http://example.com', source_branch="http://example.org", 211 patch=b'booga', patch_type='diff') 212 md2 = merge_directive.MergeDirective.from_lines(md.to_lines()) 213 self.assertEqual(b'example:', md2.revision_id) 214 self.assertIsInstance(md2.revision_id, bytes) 215 self.assertEqual(b'sha', md2.testament_sha1) 216 self.assertEqual('http://example.com', md2.target_branch) 217 self.assertEqual('http://example.org', md2.source_branch) 218 self.assertEqual(time, md2.time) 219 self.assertEqual(timezone, md2.timezone) 220 self.assertEqual('diff', md2.patch_type) 221 self.assertEqual(b'booga', md2.patch) 222 self.assertEqual(None, md2.message) 223 self.set_bundle(md, b"# Bazaar revision bundle v0.9\n#\n") 224 md.message = "Hi mom!" 225 lines = md.to_lines() 226 md3 = merge_directive.MergeDirective.from_lines(lines) 227 self.assertEqual(b"# Bazaar revision bundle v0.9\n#\n", md3.bundle) 228 self.assertEqual("bundle", md3.patch_type) 229 self.assertContainsRe(md3.to_lines()[0], 230 b'^# Bazaar merge directive format ') 231 self.assertEqual("Hi mom!", md3.message) 232 md3.clear_payload() 233 self.assertIs(None, md3.get_raw_bundle()) 234 md4 = merge_directive.MergeDirective.from_lines(md3.to_lines()) 235 self.assertIs(None, md4.patch_type) 236 237 238class TestMergeDirective1(tests.TestCase, TestMergeDirective): 239 """Test merge directive format 1""" 240 241 INPUT1 = INPUT1 242 243 OUTPUT1 = OUTPUT1 244 245 OUTPUT2 = OUTPUT2 246 247 def make_merge_directive(self, revision_id, testament_sha1, time, timezone, 248 target_branch, patch=None, patch_type=None, 249 source_branch=None, message=None): 250 return merge_directive.MergeDirective(revision_id, testament_sha1, 251 time, timezone, target_branch, patch, patch_type, 252 source_branch, message) 253 254 @staticmethod 255 def set_bundle(md, value): 256 md.patch = value 257 258 def test_require_patch(self): 259 time = 500.0 260 timezone = 120 261 self.assertRaises(errors.PatchMissing, merge_directive.MergeDirective, 262 b'example:', b'sha', time, timezone, 'http://example.com', 263 patch_type='bundle') 264 md = merge_directive.MergeDirective(b'example:', b'sha1', time, timezone, 265 'http://example.com', source_branch="http://example.org", 266 patch=b'', patch_type='diff') 267 self.assertEqual(md.patch, b'') 268 269 270class TestMergeDirective2(tests.TestCase, TestMergeDirective): 271 """Test merge directive format 2""" 272 273 INPUT1 = INPUT1_2 274 275 OUTPUT1 = OUTPUT1_2 276 277 OUTPUT2 = OUTPUT2_2 278 279 def make_merge_directive(self, revision_id, testament_sha1, time, timezone, 280 target_branch, patch=None, patch_type=None, 281 source_branch=None, message=None, base_revision_id=b'null:'): 282 if patch_type == 'bundle': 283 bundle = patch 284 patch = None 285 else: 286 bundle = None 287 return merge_directive.MergeDirective2(revision_id, testament_sha1, 288 time, timezone, target_branch, patch, source_branch, message, 289 bundle, base_revision_id) 290 291 @staticmethod 292 def set_bundle(md, value): 293 md.bundle = value 294 295 296EMAIL1 = """From: "J. Random Hacker" <jrandom@example.com> 297Subject: Commit of rev2a 298To: pqm@example.com 299User-Agent: Bazaar \\(.*\\) 300 301# Bazaar merge directive format 1 302# revision_id: rev2a 303# target_branch: (.|\n)* 304# testament_sha1: .* 305# timestamp: 1970-01-01 00:08:56 \\+0001 306# source_branch: (.|\n)* 307""" 308 309 310EMAIL1_2 = """From: "J. Random Hacker" <jrandom@example.com> 311Subject: Commit of rev2a 312To: pqm@example.com 313User-Agent: Bazaar \\(.*\\) 314 315# Bazaar merge directive format 2 \\(Bazaar 0.90\\) 316# revision_id: rev2a 317# target_branch: (.|\n)* 318# testament_sha1: .* 319# timestamp: 1970-01-01 00:08:56 \\+0001 320# source_branch: (.|\n)* 321""" 322 323 324EMAIL2 = """From: "J. Random Hacker" <jrandom@example.com> 325Subject: Commit of rev2a with special message 326To: pqm@example.com 327User-Agent: Bazaar \\(.*\\) 328 329# Bazaar merge directive format 1 330# revision_id: rev2a 331# target_branch: (.|\n)* 332# testament_sha1: .* 333# timestamp: 1970-01-01 00:08:56 \\+0001 334# source_branch: (.|\n)* 335# message: Commit of rev2a with special message 336""" 337 338EMAIL2_2 = """From: "J. Random Hacker" <jrandom@example.com> 339Subject: Commit of rev2a with special message 340To: pqm@example.com 341User-Agent: Bazaar \\(.*\\) 342 343# Bazaar merge directive format 2 \\(Bazaar 0.90\\) 344# revision_id: rev2a 345# target_branch: (.|\n)* 346# testament_sha1: .* 347# timestamp: 1970-01-01 00:08:56 \\+0001 348# source_branch: (.|\n)* 349# message: Commit of rev2a with special message 350""" 351 352 353class TestMergeDirectiveBranch(object): 354 355 def make_trees(self): 356 tree_a = self.make_branch_and_tree('tree_a') 357 tree_a.branch.get_config_stack().set( 358 'email', 'J. Random Hacker <jrandom@example.com>') 359 self.build_tree_contents([('tree_a/file', b'content_a\ncontent_b\n'), 360 ('tree_a/file_2', b'content_x\rcontent_y\r')]) 361 tree_a.add(['file', 'file_2']) 362 tree_a.commit('message', rev_id=b'rev1') 363 tree_b = tree_a.controldir.sprout('tree_b').open_workingtree() 364 branch_c = tree_a.controldir.sprout('branch_c').open_branch() 365 tree_b.commit('message', rev_id=b'rev2b') 366 self.build_tree_contents([('tree_a/file', b'content_a\ncontent_c \n'), 367 ('tree_a/file_2', b'content_x\rcontent_z\r')]) 368 tree_a.commit('Commit of rev2a', rev_id=b'rev2a') 369 return tree_a, tree_b, branch_c 370 371 def test_empty_target(self): 372 tree_a, tree_b, branch_c = self.make_trees() 373 tree_d = self.make_branch_and_tree('tree_d') 374 md2 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 120, 375 tree_d.branch.base, patch_type='diff', 376 public_branch=tree_a.branch.base) 377 378 def test_disk_name(self): 379 tree_a, tree_b, branch_c = self.make_trees() 380 tree_a.branch.nick = 'fancy <name>' 381 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 120, 382 tree_b.branch.base) 383 self.assertEqual('fancy-name-2', md.get_disk_name(tree_a.branch)) 384 385 def test_disk_name_old_revno(self): 386 tree_a, tree_b, branch_c = self.make_trees() 387 tree_a.branch.nick = 'fancy-name' 388 md = self.from_objects(tree_a.branch.repository, b'rev1', 500, 120, 389 tree_b.branch.base) 390 self.assertEqual('fancy-name-1', md.get_disk_name(tree_a.branch)) 391 392 def test_generate_patch(self): 393 tree_a, tree_b, branch_c = self.make_trees() 394 md2 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 120, 395 tree_b.branch.base, patch_type='diff', 396 public_branch=tree_a.branch.base) 397 self.assertNotContainsRe(md2.patch, b'Bazaar revision bundle') 398 self.assertContainsRe(md2.patch, b'\\+content_c') 399 self.assertNotContainsRe(md2.patch, b'\\+\\+\\+ b/') 400 self.assertContainsRe(md2.patch, b'\\+\\+\\+ file') 401 402 def test_public_branch(self): 403 tree_a, tree_b, branch_c = self.make_trees() 404 self.assertRaises(errors.PublicBranchOutOfDate, 405 self.from_objects, tree_a.branch.repository, b'rev2a', 500, 144, 406 tree_b.branch.base, public_branch=branch_c.base, patch_type='diff') 407 self.assertRaises(errors.PublicBranchOutOfDate, 408 self.from_objects, tree_a.branch.repository, b'rev2a', 500, 144, 409 tree_b.branch.base, public_branch=branch_c.base, patch_type=None) 410 # public branch is not checked if patch format is bundle. 411 md1 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 144, 412 tree_b.branch.base, public_branch=branch_c.base) 413 # public branch is provided with a bundle, despite possibly being out 414 # of date, because it's not required if a bundle is present. 415 self.assertEqual(md1.source_branch, branch_c.base) 416 # Once we update the public branch, we can generate a diff. 417 branch_c.pull(tree_a.branch) 418 md3 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 144, 419 tree_b.branch.base, patch_type=None, public_branch=branch_c.base) 420 421 def test_use_public_submit_branch(self): 422 tree_a, tree_b, branch_c = self.make_trees() 423 branch_c.pull(tree_a.branch) 424 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 144, 425 tree_b.branch.base, patch_type=None, public_branch=branch_c.base) 426 self.assertEqual(md.target_branch, tree_b.branch.base) 427 tree_b.branch.set_public_branch('http://example.com') 428 md2 = self.from_objects( 429 tree_a.branch.repository, b'rev2a', 500, 144, tree_b.branch.base, 430 patch_type=None, public_branch=branch_c.base) 431 self.assertEqual(md2.target_branch, 'http://example.com') 432 433 def test_message(self): 434 tree_a, tree_b, branch_c = self.make_trees() 435 md3 = self.from_objects(tree_a.branch.repository, b'rev1', 500, 120, 436 tree_b.branch.base, patch_type=None, public_branch=branch_c.base, 437 message='Merge message') 438 md3.to_lines() 439 self.assertIs(None, md3.patch) 440 self.assertEqual('Merge message', md3.message) 441 442 def test_generate_bundle(self): 443 tree_a, tree_b, branch_c = self.make_trees() 444 md1 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 120, 445 tree_b.branch.base, public_branch=branch_c.base) 446 447 self.assertContainsRe(md1.get_raw_bundle(), b'Bazaar revision bundle') 448 self.assertContainsRe(md1.patch, b'\\+content_c') 449 self.assertNotContainsRe(md1.patch, b'\\+content_a') 450 self.assertContainsRe(md1.patch, b'\\+content_c') 451 self.assertNotContainsRe(md1.patch, b'\\+content_a') 452 453 def test_broken_bundle(self): 454 tree_a, tree_b, branch_c = self.make_trees() 455 md1 = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 120, 456 tree_b.branch.base, public_branch=branch_c.base) 457 lines = md1.to_lines() 458 lines = [l.replace(b'\n', b'\r\n') for l in lines] 459 md2 = merge_directive.MergeDirective.from_lines(lines) 460 self.assertEqual(b'rev2a', md2.revision_id) 461 462 def test_signing(self): 463 time = 453 464 timezone = 7200 465 466 class FakeBranch(object): 467 def get_config_stack(self): 468 return self 469 md = self.make_merge_directive(b'example:', b'sha', time, timezone, 470 'http://example.com', source_branch="http://example.org", 471 patch=b'booga', patch_type='diff') 472 old_strategy = gpg.GPGStrategy 473 gpg.GPGStrategy = gpg.LoopbackGPGStrategy 474 try: 475 signed = md.to_signed(FakeBranch()) 476 finally: 477 gpg.GPGStrategy = old_strategy 478 self.assertContainsRe(signed, b'^-----BEGIN PSEUDO-SIGNED CONTENT') 479 self.assertContainsRe(signed, b'example.org') 480 self.assertContainsRe(signed, b'booga') 481 482 def test_email(self): 483 tree_a, tree_b, branch_c = self.make_trees() 484 md = self.from_objects(tree_a.branch.repository, b'rev2a', 476, 60, 485 tree_b.branch.base, patch_type=None, 486 public_branch=tree_a.branch.base) 487 message = md.to_email('pqm@example.com', tree_a.branch) 488 self.assertContainsRe(message.as_string(), self.EMAIL1) 489 md.message = 'Commit of rev2a with special message' 490 message = md.to_email('pqm@example.com', tree_a.branch) 491 self.assertContainsRe(message.as_string(), self.EMAIL2) 492 493 def test_install_revisions_branch(self): 494 tree_a, tree_b, branch_c = self.make_trees() 495 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 496 tree_b.branch.base, patch_type=None, 497 public_branch=tree_a.branch.base) 498 self.assertFalse(tree_b.branch.repository.has_revision(b'rev2a')) 499 revision = md.install_revisions(tree_b.branch.repository) 500 self.assertEqual(b'rev2a', revision) 501 self.assertTrue(tree_b.branch.repository.has_revision(b'rev2a')) 502 503 def test_get_merge_request(self): 504 tree_a, tree_b, branch_c = self.make_trees() 505 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 506 tree_b.branch.base, patch_type='bundle', 507 public_branch=tree_a.branch.base) 508 self.assertFalse(tree_b.branch.repository.has_revision(b'rev2a')) 509 md.install_revisions(tree_b.branch.repository) 510 base, revision, verified = md.get_merge_request( 511 tree_b.branch.repository) 512 if isinstance(md, merge_directive.MergeDirective): 513 self.assertIs(None, base) 514 self.assertEqual('inapplicable', verified) 515 else: 516 self.assertEqual(b'rev1', base) 517 self.assertEqual('verified', verified) 518 self.assertEqual(b'rev2a', revision) 519 self.assertTrue(tree_b.branch.repository.has_revision(b'rev2a')) 520 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 521 tree_b.branch.base, patch_type=None, 522 public_branch=tree_a.branch.base) 523 base, revision, verified = md.get_merge_request( 524 tree_b.branch.repository) 525 if isinstance(md, merge_directive.MergeDirective): 526 self.assertIs(None, base) 527 self.assertEqual('inapplicable', verified) 528 else: 529 self.assertEqual(b'rev1', base) 530 self.assertEqual('inapplicable', verified) 531 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 532 tree_b.branch.base, patch_type='diff', 533 public_branch=tree_a.branch.base) 534 base, revision, verified = md.get_merge_request( 535 tree_b.branch.repository) 536 if isinstance(md, merge_directive.MergeDirective): 537 self.assertIs(None, base) 538 self.assertEqual('inapplicable', verified) 539 else: 540 self.assertEqual(b'rev1', base) 541 self.assertEqual('verified', verified) 542 md.patch = b'asdf' 543 base, revision, verified = md.get_merge_request( 544 tree_b.branch.repository) 545 if isinstance(md, merge_directive.MergeDirective): 546 self.assertIs(None, base) 547 self.assertEqual('inapplicable', verified) 548 else: 549 self.assertEqual(b'rev1', base) 550 self.assertEqual('failed', verified) 551 552 def test_install_revisions_bundle(self): 553 tree_a, tree_b, branch_c = self.make_trees() 554 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 555 tree_b.branch.base, patch_type='bundle', 556 public_branch=tree_a.branch.base) 557 self.assertFalse(tree_b.branch.repository.has_revision(b'rev2a')) 558 revision = md.install_revisions(tree_b.branch.repository) 559 self.assertEqual(b'rev2a', revision) 560 self.assertTrue(tree_b.branch.repository.has_revision(b'rev2a')) 561 562 def test_get_target_revision_nofetch(self): 563 tree_a, tree_b, branch_c = self.make_trees() 564 tree_b.branch.fetch(tree_a.branch) 565 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 36, 566 tree_b.branch.base, patch_type=None, 567 public_branch=tree_a.branch.base) 568 md.source_branch = '/dev/null' 569 revision = md.install_revisions(tree_b.branch.repository) 570 self.assertEqual(b'rev2a', revision) 571 572 def test_use_submit_for_missing_dependency(self): 573 tree_a, tree_b, branch_c = self.make_trees() 574 branch_c.pull(tree_a.branch) 575 self.build_tree_contents([('tree_a/file', b'content_q\ncontent_r\n')]) 576 tree_a.commit('rev3a', rev_id=b'rev3a') 577 md = self.from_objects(tree_a.branch.repository, b'rev3a', 500, 36, 578 branch_c.base, base_revision_id=b'rev2a') 579 revision = md.install_revisions(tree_b.branch.repository) 580 581 def test_handle_target_not_a_branch(self): 582 tree_a, tree_b, branch_c = self.make_trees() 583 branch_c.pull(tree_a.branch) 584 self.build_tree_contents([('tree_a/file', b'content_q\ncontent_r\n')]) 585 tree_a.commit('rev3a', rev_id=b'rev3a') 586 md = self.from_objects(tree_a.branch.repository, b'rev3a', 500, 36, 587 branch_c.base, base_revision_id=b'rev2a') 588 md.target_branch = self.get_url('not-a-branch') 589 self.assertRaises(errors.TargetNotBranch, md.install_revisions, 590 tree_b.branch.repository) 591 592 593class TestMergeDirective1Branch(tests.TestCaseWithTransport, 594 TestMergeDirectiveBranch): 595 """Test merge directive format 1 with a branch""" 596 597 EMAIL1 = EMAIL1 598 599 EMAIL2 = EMAIL2 600 601 def from_objects(self, repository, revision_id, time, timezone, 602 target_branch, patch_type='bundle', local_target_branch=None, 603 public_branch=None, message=None, base_revision_id=None): 604 if base_revision_id is not None: 605 raise tests.TestNotApplicable('This format does not support' 606 ' explicit bases.') 607 with repository.lock_write(): 608 return merge_directive.MergeDirective.from_objects(repository, 609 revision_id, time, timezone, target_branch, patch_type, 610 local_target_branch, public_branch, message) 611 612 def make_merge_directive(self, revision_id, testament_sha1, time, timezone, 613 target_branch, patch=None, patch_type=None, 614 source_branch=None, message=None): 615 return merge_directive.MergeDirective(revision_id, testament_sha1, 616 time, timezone, target_branch, patch, patch_type, 617 source_branch, message) 618 619 620class TestMergeDirective2Branch(tests.TestCaseWithTransport, 621 TestMergeDirectiveBranch): 622 """Test merge directive format 2 with a branch""" 623 624 EMAIL1 = EMAIL1_2 625 626 EMAIL2 = EMAIL2_2 627 628 def from_objects(self, repository, revision_id, time, timezone, 629 target_branch, patch_type='bundle', local_target_branch=None, 630 public_branch=None, message=None, base_revision_id=None): 631 include_patch = (patch_type in ('bundle', 'diff')) 632 include_bundle = (patch_type == 'bundle') 633 self.assertTrue(patch_type in ('bundle', 'diff', None)) 634 return merge_directive.MergeDirective2.from_objects( 635 repository, revision_id, time, timezone, target_branch, 636 include_patch, include_bundle, local_target_branch, public_branch, 637 message, base_revision_id) 638 639 def make_merge_directive(self, revision_id, testament_sha1, time, timezone, 640 target_branch, patch=None, patch_type=None, 641 source_branch=None, message=None, base_revision_id=b'null:'): 642 if patch_type == 'bundle': 643 bundle = patch 644 patch = None 645 else: 646 bundle = None 647 return merge_directive.MergeDirective2(revision_id, testament_sha1, 648 time, timezone, target_branch, patch, source_branch, message, 649 bundle, base_revision_id) 650 651 def test_base_revision(self): 652 tree_a, tree_b, branch_c = self.make_trees() 653 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 60, 654 tree_b.branch.base, patch_type='bundle', 655 public_branch=tree_a.branch.base, base_revision_id=None) 656 self.assertEqual(b'rev1', md.base_revision_id) 657 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 60, 658 tree_b.branch.base, patch_type='bundle', 659 public_branch=tree_a.branch.base, base_revision_id=b'null:') 660 self.assertEqual(b'null:', md.base_revision_id) 661 lines = md.to_lines() 662 md2 = merge_directive.MergeDirective.from_lines(lines) 663 self.assertEqual(md2.base_revision_id, md.base_revision_id) 664 665 def test_patch_verification(self): 666 tree_a, tree_b, branch_c = self.make_trees() 667 md = self.from_objects(tree_a.branch.repository, b'rev2a', 500, 60, 668 tree_b.branch.base, patch_type='bundle', 669 public_branch=tree_a.branch.base) 670 lines = md.to_lines() 671 md2 = merge_directive.MergeDirective.from_lines(lines) 672 md2._verify_patch(tree_a.branch.repository) 673 # Strip trailing whitespace 674 md2.patch = md2.patch.replace(b' \n', b'\n') 675 md2._verify_patch(tree_a.branch.repository) 676 # Convert to Mac line-endings 677 md2.patch = re.sub(b'(\r\n|\r|\n)', b'\r', md2.patch) 678 self.assertTrue(md2._verify_patch(tree_a.branch.repository)) 679 # Convert to DOS line-endings 680 md2.patch = re.sub(b'(\r\n|\r|\n)', b'\r\n', md2.patch) 681 self.assertTrue(md2._verify_patch(tree_a.branch.repository)) 682 md2.patch = md2.patch.replace(b'content_c', b'content_d') 683 self.assertFalse(md2._verify_patch(tree_a.branch.repository)) 684 685 686class TestParseOldMergeDirective2(tests.TestCase): 687 688 def test_parse_old_merge_directive(self): 689 md = merge_directive.MergeDirective.from_lines(INPUT1_2_OLD) 690 self.assertEqual(b'example:', md.revision_id) 691 self.assertEqual(b'sha', md.testament_sha1) 692 self.assertEqual('http://example.com', md.target_branch) 693 self.assertEqual('http://example.org', md.source_branch) 694 self.assertEqual(453, md.time) 695 self.assertEqual(120, md.timezone) 696 self.assertEqual(b'booga', md.patch) 697 self.assertEqual('diff', md.patch_type) 698 self.assertEqual('Hi mom!', md.message) 699 700 701class TestHook(object): 702 """Hook callback for test purposes.""" 703 704 def __init__(self, result=None): 705 self.calls = [] 706 self.result = result 707 708 def __call__(self, params): 709 self.calls.append(params) 710 return self.result 711 712 713class HookMailClient(mail_client.MailClient): 714 """Mail client for testing hooks.""" 715 716 def __init__(self, config): 717 self.body = None 718 self.config = config 719 720 def compose(self, prompt, to, subject, attachment, mime_subtype, 721 extension, basename=None, body=None): 722 self.body = body 723 724 725class TestBodyHook(tests.TestCaseWithTransport): 726 727 def compose_with_hooks(self, test_hooks, supports_body=True): 728 client = HookMailClient({}) 729 client.supports_body = supports_body 730 for test_hook in test_hooks: 731 merge_directive.MergeDirective.hooks.install_named_hook( 732 'merge_request_body', test_hook, 'test') 733 tree = self.make_branch_and_tree('foo') 734 tree.commit('foo') 735 directive = merge_directive.MergeDirective2( 736 tree.branch.last_revision(), b'sha', 0, 0, b'sha', 737 source_branch=tree.branch.base, 738 base_revision_id=tree.branch.last_revision(), 739 message='This code rox') 740 directive.compose_merge_request(client, 'jrandom@example.com', 741 None, tree.branch) 742 return client, directive 743 744 def test_no_supports_body(self): 745 test_hook = TestHook('foo') 746 old_warn = trace.warning 747 warnings = [] 748 749 def warn(*args): 750 warnings.append(args) 751 trace.warning = warn 752 try: 753 client, directive = self.compose_with_hooks([test_hook], 754 supports_body=False) 755 finally: 756 trace.warning = old_warn 757 self.assertEqual(0, len(test_hook.calls)) 758 self.assertEqual(('Cannot run merge_request_body hooks because mail' 759 ' client %s does not support message bodies.', 760 'HookMailClient'), warnings[0]) 761 762 def test_body_hook(self): 763 test_hook = TestHook('foo') 764 client, directive = self.compose_with_hooks([test_hook]) 765 self.assertEqual(1, len(test_hook.calls)) 766 self.assertEqual('foo', client.body) 767 params = test_hook.calls[0] 768 self.assertIsInstance(params, 769 merge_directive.MergeRequestBodyParams) 770 self.assertIs(None, params.body) 771 self.assertIs(None, params.orig_body) 772 self.assertEqual('jrandom@example.com', params.to) 773 self.assertEqual('[MERGE] This code rox', params.subject) 774 self.assertEqual(directive, params.directive) 775 self.assertEqual('foo-1', params.basename) 776 777 def test_body_hook_chaining(self): 778 test_hook1 = TestHook('foo') 779 test_hook2 = TestHook('bar') 780 client = self.compose_with_hooks([test_hook1, test_hook2])[0] 781 self.assertEqual(None, test_hook1.calls[0].body) 782 self.assertEqual(None, test_hook1.calls[0].orig_body) 783 self.assertEqual('foo', test_hook2.calls[0].body) 784 self.assertEqual(None, test_hook2.calls[0].orig_body) 785 self.assertEqual('bar', client.body) 786