1# -*- coding: utf-8 -*- 2# test_repository.py -- tests for repository.py 3# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 4# 5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 6# General Public License as public by the Free Software Foundation; version 2.0 7# or (at your option) any later version. You can redistribute it and/or 8# modify it under the terms of either of these two licenses. 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16# You should have received a copy of the licenses; if not, see 17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 19# License, Version 2.0. 20# 21 22"""Tests for the repository.""" 23 24import locale 25import os 26import stat 27import shutil 28import sys 29import tempfile 30import warnings 31 32from dulwich import errors 33from dulwich.object_store import ( 34 tree_lookup_path, 35 ) 36from dulwich import objects 37from dulwich.config import Config 38from dulwich.errors import NotGitRepository 39from dulwich.repo import ( 40 InvalidUserIdentity, 41 Repo, 42 MemoryRepo, 43 check_user_identity, 44 ) 45from dulwich.tests import ( 46 TestCase, 47 skipIf, 48 ) 49from dulwich.tests.utils import ( 50 open_repo, 51 tear_down_repo, 52 setup_warning_catcher, 53 ) 54 55missing_sha = b'b91fa4d900e17e99b433218e988c4eb4a3e9a097' 56 57 58class CreateRepositoryTests(TestCase): 59 60 def assertFileContentsEqual(self, expected, repo, path): 61 f = repo.get_named_file(path) 62 if not f: 63 self.assertEqual(expected, None) 64 else: 65 with f: 66 self.assertEqual(expected, f.read()) 67 68 def _check_repo_contents(self, repo, expect_bare): 69 self.assertEqual(expect_bare, repo.bare) 70 self.assertFileContentsEqual( 71 b'Unnamed repository', repo, 'description') 72 self.assertFileContentsEqual( 73 b'', repo, os.path.join('info', 'exclude')) 74 self.assertFileContentsEqual(None, repo, 'nonexistent file') 75 barestr = b'bare = ' + str(expect_bare).lower().encode('ascii') 76 with repo.get_named_file('config') as f: 77 config_text = f.read() 78 self.assertTrue(barestr in config_text, "%r" % config_text) 79 expect_filemode = sys.platform != 'win32' 80 barestr = b'filemode = ' + str(expect_filemode).lower().encode('ascii') 81 with repo.get_named_file('config') as f: 82 config_text = f.read() 83 self.assertTrue(barestr in config_text, "%r" % config_text) 84 85 def test_create_memory(self): 86 repo = MemoryRepo.init_bare([], {}) 87 self._check_repo_contents(repo, True) 88 89 def test_create_disk_bare(self): 90 tmp_dir = tempfile.mkdtemp() 91 self.addCleanup(shutil.rmtree, tmp_dir) 92 repo = Repo.init_bare(tmp_dir) 93 self.assertEqual(tmp_dir, repo._controldir) 94 self._check_repo_contents(repo, True) 95 96 def test_create_disk_non_bare(self): 97 tmp_dir = tempfile.mkdtemp() 98 self.addCleanup(shutil.rmtree, tmp_dir) 99 repo = Repo.init(tmp_dir) 100 self.assertEqual(os.path.join(tmp_dir, '.git'), repo._controldir) 101 self._check_repo_contents(repo, False) 102 103 def test_create_disk_non_bare_mkdir(self): 104 tmp_dir = tempfile.mkdtemp() 105 target_dir = os.path.join(tmp_dir, "target") 106 self.addCleanup(shutil.rmtree, tmp_dir) 107 repo = Repo.init(target_dir, mkdir=True) 108 self.assertEqual(os.path.join(target_dir, '.git'), repo._controldir) 109 self._check_repo_contents(repo, False) 110 111 def test_create_disk_bare_mkdir(self): 112 tmp_dir = tempfile.mkdtemp() 113 target_dir = os.path.join(tmp_dir, "target") 114 self.addCleanup(shutil.rmtree, tmp_dir) 115 repo = Repo.init_bare(target_dir, mkdir=True) 116 self.assertEqual(target_dir, repo._controldir) 117 self._check_repo_contents(repo, True) 118 119 120class MemoryRepoTests(TestCase): 121 122 def test_set_description(self): 123 r = MemoryRepo.init_bare([], {}) 124 description = b"Some description" 125 r.set_description(description) 126 self.assertEqual(description, r.get_description()) 127 128 129class RepositoryRootTests(TestCase): 130 131 def mkdtemp(self): 132 return tempfile.mkdtemp() 133 134 def open_repo(self, name): 135 temp_dir = self.mkdtemp() 136 repo = open_repo(name, temp_dir) 137 self.addCleanup(tear_down_repo, repo) 138 return repo 139 140 def test_simple_props(self): 141 r = self.open_repo('a.git') 142 self.assertEqual(r.controldir(), r.path) 143 144 def test_setitem(self): 145 r = self.open_repo('a.git') 146 r[b"refs/tags/foo"] = b'a90fa2d900a17e99b433217e988c4eb4a2e9a097' 147 self.assertEqual(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 148 r[b"refs/tags/foo"].id) 149 150 def test_getitem_unicode(self): 151 r = self.open_repo('a.git') 152 153 test_keys = [ 154 (b'refs/heads/master', True), 155 (b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', True), 156 (b'11' * 19 + b'--', False), 157 ] 158 159 for k, contained in test_keys: 160 self.assertEqual(k in r, contained) 161 162 # Avoid deprecation warning under Py3.2+ 163 if getattr(self, 'assertRaisesRegex', None): 164 assertRaisesRegexp = self.assertRaisesRegex 165 else: 166 assertRaisesRegexp = self.assertRaisesRegexp 167 for k, _ in test_keys: 168 assertRaisesRegexp( 169 TypeError, "'name' must be bytestring, not int", 170 r.__getitem__, 12 171 ) 172 173 def test_delitem(self): 174 r = self.open_repo('a.git') 175 176 del r[b'refs/heads/master'] 177 self.assertRaises(KeyError, lambda: r[b'refs/heads/master']) 178 179 del r[b'HEAD'] 180 self.assertRaises(KeyError, lambda: r[b'HEAD']) 181 182 self.assertRaises(ValueError, r.__delitem__, b'notrefs/foo') 183 184 def test_get_refs(self): 185 r = self.open_repo('a.git') 186 self.assertEqual({ 187 b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 188 b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 189 b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', 190 b'refs/tags/mytag-packed': 191 b'b0931cadc54336e78a1d980420e3268903b57a50', 192 }, r.get_refs()) 193 194 def test_head(self): 195 r = self.open_repo('a.git') 196 self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097') 197 198 def test_get_object(self): 199 r = self.open_repo('a.git') 200 obj = r.get_object(r.head()) 201 self.assertEqual(obj.type_name, b'commit') 202 203 def test_get_object_non_existant(self): 204 r = self.open_repo('a.git') 205 self.assertRaises(KeyError, r.get_object, missing_sha) 206 207 def test_contains_object(self): 208 r = self.open_repo('a.git') 209 self.assertTrue(r.head() in r) 210 211 def test_contains_ref(self): 212 r = self.open_repo('a.git') 213 self.assertTrue(b"HEAD" in r) 214 215 def test_get_no_description(self): 216 r = self.open_repo('a.git') 217 self.assertIs(None, r.get_description()) 218 219 def test_get_description(self): 220 r = self.open_repo('a.git') 221 with open(os.path.join(r.path, 'description'), 'wb') as f: 222 f.write(b"Some description") 223 self.assertEqual(b"Some description", r.get_description()) 224 225 def test_set_description(self): 226 r = self.open_repo('a.git') 227 description = b"Some description" 228 r.set_description(description) 229 self.assertEqual(description, r.get_description()) 230 231 def test_contains_missing(self): 232 r = self.open_repo('a.git') 233 self.assertFalse(b"bar" in r) 234 235 def test_get_peeled(self): 236 # unpacked ref 237 r = self.open_repo('a.git') 238 tag_sha = b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a' 239 self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head()) 240 self.assertEqual(r.get_peeled(b'refs/tags/mytag'), r.head()) 241 242 # packed ref with cached peeled value 243 packed_tag_sha = b'b0931cadc54336e78a1d980420e3268903b57a50' 244 parent_sha = r[r.head()].parents[0] 245 self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha) 246 self.assertEqual(r.get_peeled(b'refs/tags/mytag-packed'), parent_sha) 247 248 # TODO: add more corner cases to test repo 249 250 def test_get_peeled_not_tag(self): 251 r = self.open_repo('a.git') 252 self.assertEqual(r.get_peeled(b'HEAD'), r.head()) 253 254 def test_get_walker(self): 255 r = self.open_repo('a.git') 256 # include defaults to [r.head()] 257 self.assertEqual( 258 [e.commit.id for e in r.get_walker()], 259 [r.head(), b'2a72d929692c41d8554c07f6301757ba18a65d91']) 260 self.assertEqual( 261 [e.commit.id for e in 262 r.get_walker([b'2a72d929692c41d8554c07f6301757ba18a65d91'])], 263 [b'2a72d929692c41d8554c07f6301757ba18a65d91']) 264 self.assertEqual( 265 [e.commit.id for e in 266 r.get_walker(b'2a72d929692c41d8554c07f6301757ba18a65d91')], 267 [b'2a72d929692c41d8554c07f6301757ba18a65d91']) 268 269 def assertFilesystemHidden(self, path): 270 if sys.platform != 'win32': 271 return 272 import ctypes 273 from ctypes.wintypes import DWORD, LPCWSTR 274 GetFileAttributesW = ctypes.WINFUNCTYPE(DWORD, LPCWSTR)( 275 ('GetFileAttributesW', ctypes.windll.kernel32)) 276 self.assertTrue(2 & GetFileAttributesW(path)) 277 278 def test_init_existing(self): 279 tmp_dir = self.mkdtemp() 280 self.addCleanup(shutil.rmtree, tmp_dir) 281 t = Repo.init(tmp_dir) 282 self.addCleanup(t.close) 283 self.assertEqual(os.listdir(tmp_dir), ['.git']) 284 self.assertFilesystemHidden(os.path.join(tmp_dir, '.git')) 285 286 def test_init_mkdir(self): 287 tmp_dir = self.mkdtemp() 288 self.addCleanup(shutil.rmtree, tmp_dir) 289 repo_dir = os.path.join(tmp_dir, 'a-repo') 290 291 t = Repo.init(repo_dir, mkdir=True) 292 self.addCleanup(t.close) 293 self.assertEqual(os.listdir(repo_dir), ['.git']) 294 self.assertFilesystemHidden(os.path.join(repo_dir, '.git')) 295 296 def test_init_mkdir_unicode(self): 297 repo_name = u'\xa7' 298 try: 299 repo_name.encode(sys.getfilesystemencoding()) 300 except UnicodeEncodeError: 301 self.skipTest('filesystem lacks unicode support') 302 tmp_dir = self.mkdtemp() 303 self.addCleanup(shutil.rmtree, tmp_dir) 304 repo_dir = os.path.join(tmp_dir, repo_name) 305 306 t = Repo.init(repo_dir, mkdir=True) 307 self.addCleanup(t.close) 308 self.assertEqual(os.listdir(repo_dir), ['.git']) 309 self.assertFilesystemHidden(os.path.join(repo_dir, '.git')) 310 311 @skipIf(sys.platform == 'win32', 'fails on Windows') 312 def test_fetch(self): 313 r = self.open_repo('a.git') 314 tmp_dir = self.mkdtemp() 315 self.addCleanup(shutil.rmtree, tmp_dir) 316 t = Repo.init(tmp_dir) 317 self.addCleanup(t.close) 318 r.fetch(t) 319 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 320 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 321 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 322 self.assertIn(b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', t) 323 self.assertIn(b'b0931cadc54336e78a1d980420e3268903b57a50', t) 324 325 @skipIf(sys.platform == 'win32', 'fails on Windows') 326 def test_fetch_ignores_missing_refs(self): 327 r = self.open_repo('a.git') 328 missing = b'1234566789123456789123567891234657373833' 329 r.refs[b'refs/heads/blah'] = missing 330 tmp_dir = self.mkdtemp() 331 self.addCleanup(shutil.rmtree, tmp_dir) 332 t = Repo.init(tmp_dir) 333 self.addCleanup(t.close) 334 r.fetch(t) 335 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 336 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 337 self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t) 338 self.assertIn(b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', t) 339 self.assertIn(b'b0931cadc54336e78a1d980420e3268903b57a50', t) 340 self.assertNotIn(missing, t) 341 342 def test_clone(self): 343 r = self.open_repo('a.git') 344 tmp_dir = self.mkdtemp() 345 self.addCleanup(shutil.rmtree, tmp_dir) 346 with r.clone(tmp_dir, mkdir=False) as t: 347 self.assertEqual({ 348 b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 349 b'refs/remotes/origin/master': 350 b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 351 b'refs/heads/master': 352 b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', 353 b'refs/tags/mytag': 354 b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', 355 b'refs/tags/mytag-packed': 356 b'b0931cadc54336e78a1d980420e3268903b57a50', 357 }, t.refs.as_dict()) 358 shas = [e.commit.id for e in r.get_walker()] 359 self.assertEqual(shas, [t.head(), 360 b'2a72d929692c41d8554c07f6301757ba18a65d91']) 361 c = t.get_config() 362 encoded_path = r.path 363 if not isinstance(encoded_path, bytes): 364 encoded_path = encoded_path.encode(sys.getfilesystemencoding()) 365 self.assertEqual(encoded_path, 366 c.get((b'remote', b'origin'), b'url')) 367 self.assertEqual( 368 b'+refs/heads/*:refs/remotes/origin/*', 369 c.get((b'remote', b'origin'), b'fetch')) 370 371 def test_clone_no_head(self): 372 temp_dir = self.mkdtemp() 373 self.addCleanup(shutil.rmtree, temp_dir) 374 repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos') 375 dest_dir = os.path.join(temp_dir, 'a.git') 376 shutil.copytree(os.path.join(repo_dir, 'a.git'), 377 dest_dir, symlinks=True) 378 r = Repo(dest_dir) 379 del r.refs[b"refs/heads/master"] 380 del r.refs[b"HEAD"] 381 t = r.clone(os.path.join(temp_dir, 'b.git'), mkdir=True) 382 self.assertEqual({ 383 b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', 384 b'refs/tags/mytag-packed': 385 b'b0931cadc54336e78a1d980420e3268903b57a50', 386 }, t.refs.as_dict()) 387 388 def test_clone_empty(self): 389 """Test clone() doesn't crash if HEAD points to a non-existing ref. 390 391 This simulates cloning server-side bare repository either when it is 392 still empty or if user renames master branch and pushes private repo 393 to the server. 394 Non-bare repo HEAD always points to an existing ref. 395 """ 396 r = self.open_repo('empty.git') 397 tmp_dir = self.mkdtemp() 398 self.addCleanup(shutil.rmtree, tmp_dir) 399 r.clone(tmp_dir, mkdir=False, bare=True) 400 401 def test_clone_bare(self): 402 r = self.open_repo('a.git') 403 tmp_dir = self.mkdtemp() 404 self.addCleanup(shutil.rmtree, tmp_dir) 405 t = r.clone(tmp_dir, mkdir=False) 406 t.close() 407 408 def test_clone_checkout_and_bare(self): 409 r = self.open_repo('a.git') 410 tmp_dir = self.mkdtemp() 411 self.addCleanup(shutil.rmtree, tmp_dir) 412 self.assertRaises(ValueError, r.clone, tmp_dir, mkdir=False, 413 checkout=True, bare=True) 414 415 def test_merge_history(self): 416 r = self.open_repo('simple_merge.git') 417 shas = [e.commit.id for e in r.get_walker()] 418 self.assertEqual(shas, [b'5dac377bdded4c9aeb8dff595f0faeebcc8498cc', 419 b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', 420 b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', 421 b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', 422 b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']) 423 424 def test_out_of_order_merge(self): 425 """Test that revision history is ordered by date, not parent order.""" 426 r = self.open_repo('ooo_merge.git') 427 shas = [e.commit.id for e in r.get_walker()] 428 self.assertEqual(shas, [b'7601d7f6231db6a57f7bbb79ee52e4d462fd44d1', 429 b'f507291b64138b875c28e03469025b1ea20bc614', 430 b'fb5b0425c7ce46959bec94d54b9a157645e114f5', 431 b'f9e39b120c68182a4ba35349f832d0e4e61f485c']) 432 433 def test_get_tags_empty(self): 434 r = self.open_repo('ooo_merge.git') 435 self.assertEqual({}, r.refs.as_dict(b'refs/tags')) 436 437 def test_get_config(self): 438 r = self.open_repo('ooo_merge.git') 439 self.assertIsInstance(r.get_config(), Config) 440 441 def test_get_config_stack(self): 442 r = self.open_repo('ooo_merge.git') 443 self.assertIsInstance(r.get_config_stack(), Config) 444 445 @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support') 446 def test_submodule(self): 447 temp_dir = self.mkdtemp() 448 self.addCleanup(shutil.rmtree, temp_dir) 449 repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos') 450 shutil.copytree(os.path.join(repo_dir, 'a.git'), 451 os.path.join(temp_dir, 'a.git'), symlinks=True) 452 rel = os.path.relpath(os.path.join(repo_dir, 'submodule'), temp_dir) 453 os.symlink(os.path.join(rel, 'dotgit'), os.path.join(temp_dir, '.git')) 454 with Repo(temp_dir) as r: 455 self.assertEqual(r.head(), 456 b'a90fa2d900a17e99b433217e988c4eb4a2e9a097') 457 458 def test_common_revisions(self): 459 """ 460 This test demonstrates that ``find_common_revisions()`` actually 461 returns common heads, not revisions; dulwich already uses 462 ``find_common_revisions()`` in such a manner (see 463 ``Repo.fetch_objects()``). 464 """ 465 466 expected_shas = set([b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e']) 467 468 # Source for objects. 469 r_base = self.open_repo('simple_merge.git') 470 471 # Re-create each-side of the merge in simple_merge.git. 472 # 473 # Since the trees and blobs are missing, the repository created is 474 # corrupted, but we're only checking for commits for the purpose of 475 # this test, so it's immaterial. 476 r1_dir = self.mkdtemp() 477 self.addCleanup(shutil.rmtree, r1_dir) 478 r1_commits = [b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD 479 b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', 480 b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'] 481 482 r2_dir = self.mkdtemp() 483 self.addCleanup(shutil.rmtree, r2_dir) 484 r2_commits = [b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD 485 b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e', 486 b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'] 487 488 r1 = Repo.init_bare(r1_dir) 489 for c in r1_commits: 490 r1.object_store.add_object(r_base.get_object(c)) 491 r1.refs[b'HEAD'] = r1_commits[0] 492 493 r2 = Repo.init_bare(r2_dir) 494 for c in r2_commits: 495 r2.object_store.add_object(r_base.get_object(c)) 496 r2.refs[b'HEAD'] = r2_commits[0] 497 498 # Finally, the 'real' testing! 499 shas = r2.object_store.find_common_revisions(r1.get_graph_walker()) 500 self.assertEqual(set(shas), expected_shas) 501 502 shas = r1.object_store.find_common_revisions(r2.get_graph_walker()) 503 self.assertEqual(set(shas), expected_shas) 504 505 def test_shell_hook_pre_commit(self): 506 if os.name != 'posix': 507 self.skipTest('shell hook tests requires POSIX shell') 508 509 pre_commit_fail = """#!/bin/sh 510exit 1 511""" 512 513 pre_commit_success = """#!/bin/sh 514exit 0 515""" 516 517 repo_dir = os.path.join(self.mkdtemp()) 518 self.addCleanup(shutil.rmtree, repo_dir) 519 r = Repo.init(repo_dir) 520 self.addCleanup(r.close) 521 522 pre_commit = os.path.join(r.controldir(), 'hooks', 'pre-commit') 523 524 with open(pre_commit, 'w') as f: 525 f.write(pre_commit_fail) 526 os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 527 528 self.assertRaises(errors.CommitError, r.do_commit, 'failed commit', 529 committer='Test Committer <test@nodomain.com>', 530 author='Test Author <test@nodomain.com>', 531 commit_timestamp=12345, commit_timezone=0, 532 author_timestamp=12345, author_timezone=0) 533 534 with open(pre_commit, 'w') as f: 535 f.write(pre_commit_success) 536 os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 537 538 commit_sha = r.do_commit( 539 b'empty commit', 540 committer=b'Test Committer <test@nodomain.com>', 541 author=b'Test Author <test@nodomain.com>', 542 commit_timestamp=12395, commit_timezone=0, 543 author_timestamp=12395, author_timezone=0) 544 self.assertEqual([], r[commit_sha].parents) 545 546 def test_shell_hook_commit_msg(self): 547 if os.name != 'posix': 548 self.skipTest('shell hook tests requires POSIX shell') 549 550 commit_msg_fail = """#!/bin/sh 551exit 1 552""" 553 554 commit_msg_success = """#!/bin/sh 555exit 0 556""" 557 558 repo_dir = self.mkdtemp() 559 self.addCleanup(shutil.rmtree, repo_dir) 560 r = Repo.init(repo_dir) 561 self.addCleanup(r.close) 562 563 commit_msg = os.path.join(r.controldir(), 'hooks', 'commit-msg') 564 565 with open(commit_msg, 'w') as f: 566 f.write(commit_msg_fail) 567 os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 568 569 self.assertRaises(errors.CommitError, r.do_commit, b'failed commit', 570 committer=b'Test Committer <test@nodomain.com>', 571 author=b'Test Author <test@nodomain.com>', 572 commit_timestamp=12345, commit_timezone=0, 573 author_timestamp=12345, author_timezone=0) 574 575 with open(commit_msg, 'w') as f: 576 f.write(commit_msg_success) 577 os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 578 579 commit_sha = r.do_commit( 580 b'empty commit', 581 committer=b'Test Committer <test@nodomain.com>', 582 author=b'Test Author <test@nodomain.com>', 583 commit_timestamp=12395, commit_timezone=0, 584 author_timestamp=12395, author_timezone=0) 585 self.assertEqual([], r[commit_sha].parents) 586 587 def test_shell_hook_post_commit(self): 588 if os.name != 'posix': 589 self.skipTest('shell hook tests requires POSIX shell') 590 591 repo_dir = self.mkdtemp() 592 self.addCleanup(shutil.rmtree, repo_dir) 593 594 r = Repo.init(repo_dir) 595 self.addCleanup(r.close) 596 597 (fd, path) = tempfile.mkstemp(dir=repo_dir) 598 os.close(fd) 599 post_commit_msg = """#!/bin/sh 600rm """ + path + """ 601""" 602 603 root_sha = r.do_commit( 604 b'empty commit', 605 committer=b'Test Committer <test@nodomain.com>', 606 author=b'Test Author <test@nodomain.com>', 607 commit_timestamp=12345, commit_timezone=0, 608 author_timestamp=12345, author_timezone=0) 609 self.assertEqual([], r[root_sha].parents) 610 611 post_commit = os.path.join(r.controldir(), 'hooks', 'post-commit') 612 613 with open(post_commit, 'wb') as f: 614 f.write(post_commit_msg.encode(locale.getpreferredencoding())) 615 os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 616 617 commit_sha = r.do_commit( 618 b'empty commit', 619 committer=b'Test Committer <test@nodomain.com>', 620 author=b'Test Author <test@nodomain.com>', 621 commit_timestamp=12345, commit_timezone=0, 622 author_timestamp=12345, author_timezone=0) 623 self.assertEqual([root_sha], r[commit_sha].parents) 624 625 self.assertFalse(os.path.exists(path)) 626 627 post_commit_msg_fail = """#!/bin/sh 628exit 1 629""" 630 with open(post_commit, 'w') as f: 631 f.write(post_commit_msg_fail) 632 os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) 633 634 warnings.simplefilter("always", UserWarning) 635 self.addCleanup(warnings.resetwarnings) 636 warnings_list, restore_warnings = setup_warning_catcher() 637 self.addCleanup(restore_warnings) 638 639 commit_sha2 = r.do_commit( 640 b'empty commit', 641 committer=b'Test Committer <test@nodomain.com>', 642 author=b'Test Author <test@nodomain.com>', 643 commit_timestamp=12345, commit_timezone=0, 644 author_timestamp=12345, author_timezone=0) 645 expected_warning = UserWarning( 646 'post-commit hook failed: Hook post-commit exited with ' 647 'non-zero status',) 648 for w in warnings_list: 649 if (type(w) == type(expected_warning) and 650 w.args == expected_warning.args): 651 break 652 else: 653 raise AssertionError( 654 'Expected warning %r not in %r' % 655 (expected_warning, warnings_list)) 656 self.assertEqual([commit_sha], r[commit_sha2].parents) 657 658 def test_as_dict(self): 659 def check(repo): 660 self.assertEqual( 661 repo.refs.subkeys(b'refs/tags'), 662 repo.refs.subkeys(b'refs/tags/')) 663 self.assertEqual( 664 repo.refs.as_dict(b'refs/tags'), 665 repo.refs.as_dict(b'refs/tags/')) 666 self.assertEqual( 667 repo.refs.as_dict(b'refs/heads'), 668 repo.refs.as_dict(b'refs/heads/')) 669 670 bare = self.open_repo('a.git') 671 tmp_dir = self.mkdtemp() 672 self.addCleanup(shutil.rmtree, tmp_dir) 673 with bare.clone(tmp_dir, mkdir=False) as nonbare: 674 check(nonbare) 675 check(bare) 676 677 def test_working_tree(self): 678 temp_dir = tempfile.mkdtemp() 679 self.addCleanup(shutil.rmtree, temp_dir) 680 worktree_temp_dir = tempfile.mkdtemp() 681 self.addCleanup(shutil.rmtree, worktree_temp_dir) 682 r = Repo.init(temp_dir) 683 self.addCleanup(r.close) 684 root_sha = r.do_commit( 685 b'empty commit', 686 committer=b'Test Committer <test@nodomain.com>', 687 author=b'Test Author <test@nodomain.com>', 688 commit_timestamp=12345, commit_timezone=0, 689 author_timestamp=12345, author_timezone=0) 690 r.refs[b'refs/heads/master'] = root_sha 691 w = Repo._init_new_working_directory(worktree_temp_dir, r) 692 self.addCleanup(w.close) 693 new_sha = w.do_commit( 694 b'new commit', 695 committer=b'Test Committer <test@nodomain.com>', 696 author=b'Test Author <test@nodomain.com>', 697 commit_timestamp=12345, commit_timezone=0, 698 author_timestamp=12345, author_timezone=0) 699 w.refs[b'HEAD'] = new_sha 700 self.assertEqual(os.path.abspath(r.controldir()), 701 os.path.abspath(w.commondir())) 702 self.assertEqual(r.refs.keys(), w.refs.keys()) 703 self.assertNotEqual(r.head(), w.head()) 704 705 706class BuildRepoRootTests(TestCase): 707 """Tests that build on-disk repos from scratch. 708 709 Repos live in a temp dir and are torn down after each test. They start with 710 a single commit in master having single file named 'a'. 711 """ 712 713 def get_repo_dir(self): 714 return os.path.join(tempfile.mkdtemp(), 'test') 715 716 def setUp(self): 717 super(BuildRepoRootTests, self).setUp() 718 self._repo_dir = self.get_repo_dir() 719 os.makedirs(self._repo_dir) 720 r = self._repo = Repo.init(self._repo_dir) 721 self.addCleanup(tear_down_repo, r) 722 self.assertFalse(r.bare) 723 self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD')) 724 self.assertRaises(KeyError, lambda: r.refs[b'refs/heads/master']) 725 726 with open(os.path.join(r.path, 'a'), 'wb') as f: 727 f.write(b'file contents') 728 r.stage(['a']) 729 commit_sha = r.do_commit( 730 b'msg', 731 committer=b'Test Committer <test@nodomain.com>', 732 author=b'Test Author <test@nodomain.com>', 733 commit_timestamp=12345, commit_timezone=0, 734 author_timestamp=12345, author_timezone=0) 735 self.assertEqual([], r[commit_sha].parents) 736 self._root_commit = commit_sha 737 738 def test_get_shallow(self): 739 self.assertEqual(set(), self._repo.get_shallow()) 740 with open(os.path.join(self._repo.path, '.git', 'shallow'), 'wb') as f: 741 f.write(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097\n') 742 self.assertEqual({b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'}, 743 self._repo.get_shallow()) 744 745 def test_update_shallow(self): 746 self._repo.update_shallow(None, None) # no op 747 self.assertEqual(set(), self._repo.get_shallow()) 748 self._repo.update_shallow( 749 [b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'], 750 None) 751 self.assertEqual( 752 {b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'}, 753 self._repo.get_shallow()) 754 self._repo.update_shallow( 755 [b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'], 756 [b'f9e39b120c68182a4ba35349f832d0e4e61f485c']) 757 self.assertEqual({b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'}, 758 self._repo.get_shallow()) 759 760 def test_build_repo(self): 761 r = self._repo 762 self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD')) 763 self.assertEqual(self._root_commit, r.refs[b'refs/heads/master']) 764 expected_blob = objects.Blob.from_string(b'file contents') 765 self.assertEqual(expected_blob.data, r[expected_blob.id].data) 766 actual_commit = r[self._root_commit] 767 self.assertEqual(b'msg', actual_commit.message) 768 769 def test_commit_modified(self): 770 r = self._repo 771 with open(os.path.join(r.path, 'a'), 'wb') as f: 772 f.write(b'new contents') 773 r.stage(['a']) 774 commit_sha = r.do_commit( 775 b'modified a', 776 committer=b'Test Committer <test@nodomain.com>', 777 author=b'Test Author <test@nodomain.com>', 778 commit_timestamp=12395, commit_timezone=0, 779 author_timestamp=12395, author_timezone=0) 780 self.assertEqual([self._root_commit], r[commit_sha].parents) 781 a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'a') 782 self.assertEqual(stat.S_IFREG | 0o644, a_mode) 783 self.assertEqual(b'new contents', r[a_id].data) 784 785 @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support') 786 def test_commit_symlink(self): 787 r = self._repo 788 os.symlink('a', os.path.join(r.path, 'b')) 789 r.stage(['a', 'b']) 790 commit_sha = r.do_commit( 791 b'Symlink b', 792 committer=b'Test Committer <test@nodomain.com>', 793 author=b'Test Author <test@nodomain.com>', 794 commit_timestamp=12395, commit_timezone=0, 795 author_timestamp=12395, author_timezone=0) 796 self.assertEqual([self._root_commit], r[commit_sha].parents) 797 b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'b') 798 self.assertTrue(stat.S_ISLNK(b_mode)) 799 self.assertEqual(b'a', r[b_id].data) 800 801 def test_commit_merge_heads_file(self): 802 tmp_dir = tempfile.mkdtemp() 803 self.addCleanup(shutil.rmtree, tmp_dir) 804 r = Repo.init(tmp_dir) 805 with open(os.path.join(r.path, 'a'), 'w') as f: 806 f.write('initial text') 807 c1 = r.do_commit( 808 b'initial commit', 809 committer=b'Test Committer <test@nodomain.com>', 810 author=b'Test Author <test@nodomain.com>', 811 commit_timestamp=12395, commit_timezone=0, 812 author_timestamp=12395, author_timezone=0) 813 with open(os.path.join(r.path, 'a'), 'w') as f: 814 f.write('merged text') 815 with open(os.path.join(r.path, '.git', 'MERGE_HEADS'), 'w') as f: 816 f.write('c27a2d21dd136312d7fa9e8baabb82561a1727d0\n') 817 r.stage(['a']) 818 commit_sha = r.do_commit( 819 b'deleted a', 820 committer=b'Test Committer <test@nodomain.com>', 821 author=b'Test Author <test@nodomain.com>', 822 commit_timestamp=12395, commit_timezone=0, 823 author_timestamp=12395, author_timezone=0) 824 self.assertEqual([ 825 c1, 826 b'c27a2d21dd136312d7fa9e8baabb82561a1727d0'], 827 r[commit_sha].parents) 828 829 def test_commit_deleted(self): 830 r = self._repo 831 os.remove(os.path.join(r.path, 'a')) 832 r.stage(['a']) 833 commit_sha = r.do_commit( 834 b'deleted a', 835 committer=b'Test Committer <test@nodomain.com>', 836 author=b'Test Author <test@nodomain.com>', 837 commit_timestamp=12395, commit_timezone=0, 838 author_timestamp=12395, author_timezone=0) 839 self.assertEqual([self._root_commit], r[commit_sha].parents) 840 self.assertEqual([], list(r.open_index())) 841 tree = r[r[commit_sha].tree] 842 self.assertEqual([], list(tree.iteritems())) 843 844 def test_commit_follows(self): 845 r = self._repo 846 r.refs.set_symbolic_ref(b'HEAD', b'refs/heads/bla') 847 commit_sha = r.do_commit( 848 b'commit with strange character', 849 committer=b'Test Committer <test@nodomain.com>', 850 author=b'Test Author <test@nodomain.com>', 851 commit_timestamp=12395, commit_timezone=0, 852 author_timestamp=12395, author_timezone=0, 853 ref=b'HEAD') 854 self.assertEqual(commit_sha, r[b'refs/heads/bla'].id) 855 856 def test_commit_encoding(self): 857 r = self._repo 858 commit_sha = r.do_commit( 859 b'commit with strange character \xee', 860 committer=b'Test Committer <test@nodomain.com>', 861 author=b'Test Author <test@nodomain.com>', 862 commit_timestamp=12395, commit_timezone=0, 863 author_timestamp=12395, author_timezone=0, 864 encoding=b"iso8859-1") 865 self.assertEqual(b"iso8859-1", r[commit_sha].encoding) 866 867 def test_compression_level(self): 868 r = self._repo 869 c = r.get_config() 870 c.set(('core',), 'compression', '3') 871 c.set(('core',), 'looseCompression', '4') 872 c.write_to_path() 873 r = Repo(self._repo_dir) 874 self.assertEqual(r.object_store.loose_compression_level, 4) 875 876 def test_commit_encoding_from_config(self): 877 r = self._repo 878 c = r.get_config() 879 c.set(('i18n',), 'commitEncoding', 'iso8859-1') 880 c.write_to_path() 881 commit_sha = r.do_commit( 882 b'commit with strange character \xee', 883 committer=b'Test Committer <test@nodomain.com>', 884 author=b'Test Author <test@nodomain.com>', 885 commit_timestamp=12395, commit_timezone=0, 886 author_timestamp=12395, author_timezone=0) 887 self.assertEqual(b"iso8859-1", r[commit_sha].encoding) 888 889 def test_commit_config_identity(self): 890 # commit falls back to the users' identity if it wasn't specified 891 r = self._repo 892 c = r.get_config() 893 c.set((b"user", ), b"name", b"Jelmer") 894 c.set((b"user", ), b"email", b"jelmer@apache.org") 895 c.write_to_path() 896 commit_sha = r.do_commit(b'message') 897 self.assertEqual( 898 b"Jelmer <jelmer@apache.org>", 899 r[commit_sha].author) 900 self.assertEqual( 901 b"Jelmer <jelmer@apache.org>", 902 r[commit_sha].committer) 903 904 def test_commit_config_identity_strips_than(self): 905 # commit falls back to the users' identity if it wasn't specified, 906 # and strips superfluous <> 907 r = self._repo 908 c = r.get_config() 909 c.set((b"user", ), b"name", b"Jelmer") 910 c.set((b"user", ), b"email", b"<jelmer@apache.org>") 911 c.write_to_path() 912 commit_sha = r.do_commit(b'message') 913 self.assertEqual( 914 b"Jelmer <jelmer@apache.org>", 915 r[commit_sha].author) 916 self.assertEqual( 917 b"Jelmer <jelmer@apache.org>", 918 r[commit_sha].committer) 919 920 def test_commit_config_identity_in_memoryrepo(self): 921 # commit falls back to the users' identity if it wasn't specified 922 r = MemoryRepo.init_bare([], {}) 923 c = r.get_config() 924 c.set((b"user", ), b"name", b"Jelmer") 925 c.set((b"user", ), b"email", b"jelmer@apache.org") 926 927 commit_sha = r.do_commit(b'message', tree=objects.Tree().id) 928 self.assertEqual( 929 b"Jelmer <jelmer@apache.org>", 930 r[commit_sha].author) 931 self.assertEqual( 932 b"Jelmer <jelmer@apache.org>", 933 r[commit_sha].committer) 934 935 def overrideEnv(self, name, value): 936 def restore(): 937 if oldval is not None: 938 os.environ[name] = oldval 939 else: 940 del os.environ[name] 941 oldval = os.environ.get(name) 942 os.environ[name] = value 943 self.addCleanup(restore) 944 945 def test_commit_config_identity_from_env(self): 946 # commit falls back to the users' identity if it wasn't specified 947 self.overrideEnv('GIT_COMMITTER_NAME', 'joe') 948 self.overrideEnv('GIT_COMMITTER_EMAIL', 'joe@example.com') 949 r = self._repo 950 c = r.get_config() 951 c.set((b"user", ), b"name", b"Jelmer") 952 c.set((b"user", ), b"email", b"jelmer@apache.org") 953 c.write_to_path() 954 commit_sha = r.do_commit(b'message') 955 self.assertEqual( 956 b"Jelmer <jelmer@apache.org>", 957 r[commit_sha].author) 958 self.assertEqual( 959 b"joe <joe@example.com>", 960 r[commit_sha].committer) 961 962 def test_commit_fail_ref(self): 963 r = self._repo 964 965 def set_if_equals(name, old_ref, new_ref, **kwargs): 966 return False 967 r.refs.set_if_equals = set_if_equals 968 969 def add_if_new(name, new_ref, **kwargs): 970 self.fail('Unexpected call to add_if_new') 971 r.refs.add_if_new = add_if_new 972 973 old_shas = set(r.object_store) 974 self.assertRaises(errors.CommitError, r.do_commit, b'failed commit', 975 committer=b'Test Committer <test@nodomain.com>', 976 author=b'Test Author <test@nodomain.com>', 977 commit_timestamp=12345, commit_timezone=0, 978 author_timestamp=12345, author_timezone=0) 979 new_shas = set(r.object_store) - old_shas 980 self.assertEqual(1, len(new_shas)) 981 # Check that the new commit (now garbage) was added. 982 new_commit = r[new_shas.pop()] 983 self.assertEqual(r[self._root_commit].tree, new_commit.tree) 984 self.assertEqual(b'failed commit', new_commit.message) 985 986 def test_commit_branch(self): 987 r = self._repo 988 989 commit_sha = r.do_commit( 990 b'commit to branch', 991 committer=b'Test Committer <test@nodomain.com>', 992 author=b'Test Author <test@nodomain.com>', 993 commit_timestamp=12395, commit_timezone=0, 994 author_timestamp=12395, author_timezone=0, 995 ref=b"refs/heads/new_branch") 996 self.assertEqual(self._root_commit, r[b"HEAD"].id) 997 self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id) 998 self.assertEqual([], r[commit_sha].parents) 999 self.assertTrue(b"refs/heads/new_branch" in r) 1000 1001 new_branch_head = commit_sha 1002 1003 commit_sha = r.do_commit( 1004 b'commit to branch 2', 1005 committer=b'Test Committer <test@nodomain.com>', 1006 author=b'Test Author <test@nodomain.com>', 1007 commit_timestamp=12395, commit_timezone=0, 1008 author_timestamp=12395, author_timezone=0, 1009 ref=b"refs/heads/new_branch") 1010 self.assertEqual(self._root_commit, r[b"HEAD"].id) 1011 self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id) 1012 self.assertEqual([new_branch_head], r[commit_sha].parents) 1013 1014 def test_commit_merge_heads(self): 1015 r = self._repo 1016 merge_1 = r.do_commit( 1017 b'commit to branch 2', 1018 committer=b'Test Committer <test@nodomain.com>', 1019 author=b'Test Author <test@nodomain.com>', 1020 commit_timestamp=12395, commit_timezone=0, 1021 author_timestamp=12395, author_timezone=0, 1022 ref=b"refs/heads/new_branch") 1023 commit_sha = r.do_commit( 1024 b'commit with merge', 1025 committer=b'Test Committer <test@nodomain.com>', 1026 author=b'Test Author <test@nodomain.com>', 1027 commit_timestamp=12395, commit_timezone=0, 1028 author_timestamp=12395, author_timezone=0, 1029 merge_heads=[merge_1]) 1030 self.assertEqual( 1031 [self._root_commit, merge_1], 1032 r[commit_sha].parents) 1033 1034 def test_commit_dangling_commit(self): 1035 r = self._repo 1036 1037 old_shas = set(r.object_store) 1038 old_refs = r.get_refs() 1039 commit_sha = r.do_commit( 1040 b'commit with no ref', 1041 committer=b'Test Committer <test@nodomain.com>', 1042 author=b'Test Author <test@nodomain.com>', 1043 commit_timestamp=12395, commit_timezone=0, 1044 author_timestamp=12395, author_timezone=0, 1045 ref=None) 1046 new_shas = set(r.object_store) - old_shas 1047 1048 # New sha is added, but no new refs 1049 self.assertEqual(1, len(new_shas)) 1050 new_commit = r[new_shas.pop()] 1051 self.assertEqual(r[self._root_commit].tree, new_commit.tree) 1052 self.assertEqual([], r[commit_sha].parents) 1053 self.assertEqual(old_refs, r.get_refs()) 1054 1055 def test_commit_dangling_commit_with_parents(self): 1056 r = self._repo 1057 1058 old_shas = set(r.object_store) 1059 old_refs = r.get_refs() 1060 commit_sha = r.do_commit( 1061 b'commit with no ref', 1062 committer=b'Test Committer <test@nodomain.com>', 1063 author=b'Test Author <test@nodomain.com>', 1064 commit_timestamp=12395, commit_timezone=0, 1065 author_timestamp=12395, author_timezone=0, 1066 ref=None, merge_heads=[self._root_commit]) 1067 new_shas = set(r.object_store) - old_shas 1068 1069 # New sha is added, but no new refs 1070 self.assertEqual(1, len(new_shas)) 1071 new_commit = r[new_shas.pop()] 1072 self.assertEqual(r[self._root_commit].tree, new_commit.tree) 1073 self.assertEqual([self._root_commit], r[commit_sha].parents) 1074 self.assertEqual(old_refs, r.get_refs()) 1075 1076 def test_stage_absolute(self): 1077 r = self._repo 1078 os.remove(os.path.join(r.path, 'a')) 1079 self.assertRaises(ValueError, r.stage, [os.path.join(r.path, 'a')]) 1080 1081 def test_stage_deleted(self): 1082 r = self._repo 1083 os.remove(os.path.join(r.path, 'a')) 1084 r.stage(['a']) 1085 r.stage(['a']) # double-stage a deleted path 1086 1087 def test_stage_directory(self): 1088 r = self._repo 1089 os.mkdir(os.path.join(r.path, 'c')) 1090 r.stage(['c']) 1091 self.assertEqual([b'a'], list(r.open_index())) 1092 1093 @skipIf(sys.platform == 'win32' and sys.version_info[:2] >= (3, 6), 1094 'tries to implicitly decode as utf8') 1095 def test_commit_no_encode_decode(self): 1096 r = self._repo 1097 repo_path_bytes = r.path.encode(sys.getfilesystemencoding()) 1098 encodings = ('utf8', 'latin1') 1099 names = [u'À'.encode(encoding) for encoding in encodings] 1100 for name, encoding in zip(names, encodings): 1101 full_path = os.path.join(repo_path_bytes, name) 1102 with open(full_path, 'wb') as f: 1103 f.write(encoding.encode('ascii')) 1104 # These files are break tear_down_repo, so cleanup these files 1105 # ourselves. 1106 self.addCleanup(os.remove, full_path) 1107 1108 r.stage(names) 1109 commit_sha = r.do_commit( 1110 b'Files with different encodings', 1111 committer=b'Test Committer <test@nodomain.com>', 1112 author=b'Test Author <test@nodomain.com>', 1113 commit_timestamp=12395, commit_timezone=0, 1114 author_timestamp=12395, author_timezone=0, 1115 ref=None, merge_heads=[self._root_commit]) 1116 1117 for name, encoding in zip(names, encodings): 1118 mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name) 1119 self.assertEqual(stat.S_IFREG | 0o644, mode) 1120 self.assertEqual(encoding.encode('ascii'), r[id].data) 1121 1122 def test_discover_intended(self): 1123 path = os.path.join(self._repo_dir, 'b/c') 1124 r = Repo.discover(path) 1125 self.assertEqual(r.head(), self._repo.head()) 1126 1127 def test_discover_isrepo(self): 1128 r = Repo.discover(self._repo_dir) 1129 self.assertEqual(r.head(), self._repo.head()) 1130 1131 def test_discover_notrepo(self): 1132 with self.assertRaises(NotGitRepository): 1133 Repo.discover('/') 1134 1135 1136class CheckUserIdentityTests(TestCase): 1137 1138 def test_valid(self): 1139 check_user_identity(b'Me <me@example.com>') 1140 1141 def test_invalid(self): 1142 self.assertRaises(InvalidUserIdentity, 1143 check_user_identity, b'No Email') 1144 self.assertRaises(InvalidUserIdentity, 1145 check_user_identity, b'Fullname <missing') 1146 self.assertRaises(InvalidUserIdentity, 1147 check_user_identity, b'Fullname missing>') 1148 self.assertRaises(InvalidUserIdentity, 1149 check_user_identity, b'Fullname >order<>') 1150