1# -*- coding: utf-8 -*- 2# test_repo.py 3# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 4# 5# This module is part of GitPython and is released under 6# the BSD License: http://www.opensource.org/licenses/bsd-license.php 7import glob 8import io 9from io import BytesIO 10import itertools 11import os 12import pickle 13import tempfile 14from unittest import skipIf, SkipTest 15 16try: 17 import pathlib 18except ImportError: 19 pathlib = None 20 21from git import ( 22 InvalidGitRepositoryError, 23 Repo, 24 NoSuchPathError, 25 Head, 26 Commit, 27 Object, 28 Tree, 29 IndexFile, 30 Git, 31 Reference, 32 GitDB, 33 Submodule, 34 GitCmdObjectDB, 35 Remote, 36 BadName, 37 GitCommandError 38) 39from git.compat import ( 40 PY3, 41 is_win, 42 string_types, 43 win_encode, 44) 45from git.exc import ( 46 BadObject, 47) 48from git.repo.fun import touch 49from git.test.lib import ( 50 patch, 51 TestBase, 52 with_rw_repo, 53 fixture, 54 assert_false, 55 assert_equal, 56 assert_true, 57 raises 58) 59from git.util import HIDE_WINDOWS_KNOWN_ERRORS, cygpath 60from git.test.lib import with_rw_directory 61from git.util import join_path_native, rmtree, rmfile, bin_to_hex 62 63import functools as fnt 64import os.path as osp 65 66 67def iter_flatten(lol): 68 for items in lol: 69 for item in items: 70 yield item 71 72 73def flatten(lol): 74 return list(iter_flatten(lol)) 75 76 77_tc_lock_fpaths = osp.join(osp.dirname(__file__), '../../.git/*.lock') 78 79 80def _rm_lock_files(): 81 for lfp in glob.glob(_tc_lock_fpaths): 82 rmfile(lfp) 83 84 85class TestRepo(TestBase): 86 87 def setUp(self): 88 _rm_lock_files() 89 90 def tearDown(self): 91 for lfp in glob.glob(_tc_lock_fpaths): 92 if osp.isfile(lfp): 93 raise AssertionError('Previous TC left hanging git-lock file: %s', lfp) 94 import gc 95 gc.collect() 96 97 @raises(InvalidGitRepositoryError) 98 def test_new_should_raise_on_invalid_repo_location(self): 99 Repo(tempfile.gettempdir()) 100 101 @raises(NoSuchPathError) 102 def test_new_should_raise_on_non_existent_path(self): 103 Repo("repos/foobar") 104 105 @with_rw_repo('0.3.2.1') 106 def test_repo_creation_from_different_paths(self, rw_repo): 107 r_from_gitdir = Repo(rw_repo.git_dir) 108 self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir) 109 assert r_from_gitdir.git_dir.endswith('.git') 110 assert not rw_repo.git.working_dir.endswith('.git') 111 self.assertEqual(r_from_gitdir.git.working_dir, rw_repo.git.working_dir) 112 113 @with_rw_repo('0.3.2.1') 114 def test_repo_creation_pathlib(self, rw_repo): 115 if pathlib is None: # pythons bellow 3.4 don't have pathlib 116 raise SkipTest("pathlib was introduced in 3.4") 117 118 r_from_gitdir = Repo(pathlib.Path(rw_repo.git_dir)) 119 self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir) 120 121 def test_description(self): 122 txt = "Test repository" 123 self.rorepo.description = txt 124 assert_equal(self.rorepo.description, txt) 125 126 def test_heads_should_return_array_of_head_objects(self): 127 for head in self.rorepo.heads: 128 assert_equal(Head, head.__class__) 129 130 def test_heads_should_populate_head_data(self): 131 for head in self.rorepo.heads: 132 assert head.name 133 self.assertIsInstance(head.commit, Commit) 134 # END for each head 135 136 self.assertIsInstance(self.rorepo.heads.master, Head) 137 self.assertIsInstance(self.rorepo.heads['master'], Head) 138 139 def test_tree_from_revision(self): 140 tree = self.rorepo.tree('0.1.6') 141 self.assertEqual(len(tree.hexsha), 40) 142 self.assertEqual(tree.type, "tree") 143 self.assertEqual(self.rorepo.tree(tree), tree) 144 145 # try from invalid revision that does not exist 146 self.failUnlessRaises(BadName, self.rorepo.tree, 'hello world') 147 148 def test_pickleable(self): 149 pickle.loads(pickle.dumps(self.rorepo)) 150 151 def test_commit_from_revision(self): 152 commit = self.rorepo.commit('0.1.4') 153 self.assertEqual(commit.type, 'commit') 154 self.assertEqual(self.rorepo.commit(commit), commit) 155 156 def test_commits(self): 157 mc = 10 158 commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) 159 self.assertEqual(len(commits), mc) 160 161 c = commits[0] 162 assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) 163 assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) 164 assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) 165 assert_equal("Michael Trier", c.author.name) 166 assert_equal("mtrier@gmail.com", c.author.email) 167 assert_equal(1232829715, c.authored_date) 168 assert_equal(5 * 3600, c.author_tz_offset) 169 assert_equal("Michael Trier", c.committer.name) 170 assert_equal("mtrier@gmail.com", c.committer.email) 171 assert_equal(1232829715, c.committed_date) 172 assert_equal(5 * 3600, c.committer_tz_offset) 173 assert_equal("Bumped version 0.1.6\n", c.message) 174 175 c = commits[1] 176 self.assertIsInstance(c.parents, tuple) 177 178 def test_trees(self): 179 mc = 30 180 num_trees = 0 181 for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): 182 num_trees += 1 183 self.assertIsInstance(tree, Tree) 184 # END for each tree 185 self.assertEqual(num_trees, mc) 186 187 def _assert_empty_repo(self, repo): 188 # test all kinds of things with an empty, freshly initialized repo. 189 # It should throw good errors 190 191 # entries should be empty 192 self.assertEqual(len(repo.index.entries), 0) 193 194 # head is accessible 195 assert repo.head 196 assert repo.head.ref 197 assert not repo.head.is_valid() 198 199 # we can change the head to some other ref 200 head_ref = Head.from_path(repo, Head.to_full_path('some_head')) 201 assert not head_ref.is_valid() 202 repo.head.ref = head_ref 203 204 # is_dirty can handle all kwargs 205 for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): 206 assert not repo.is_dirty(*args) 207 # END for each arg 208 209 # we can add a file to the index ( if we are not bare ) 210 if not repo.bare: 211 pass 212 # END test repos with working tree 213 214 @with_rw_directory 215 def test_clone_from_keeps_env(self, rw_dir): 216 original_repo = Repo.init(osp.join(rw_dir, "repo")) 217 environment = {"entry1": "value", "another_entry": "10"} 218 219 cloned = Repo.clone_from(original_repo.git_dir, osp.join(rw_dir, "clone"), env=environment) 220 221 assert_equal(environment, cloned.git.environment()) 222 223 @with_rw_directory 224 def test_clone_from_pathlib(self, rw_dir): 225 if pathlib is None: # pythons bellow 3.4 don't have pathlib 226 raise SkipTest("pathlib was introduced in 3.4") 227 228 original_repo = Repo.init(osp.join(rw_dir, "repo")) 229 230 Repo.clone_from(original_repo.git_dir, pathlib.Path(rw_dir) / "clone_pathlib") 231 232 @with_rw_repo('HEAD') 233 def test_max_chunk_size(self, repo): 234 class TestOutputStream(object): 235 def __init__(self, max_chunk_size): 236 self.max_chunk_size = max_chunk_size 237 238 def write(self, b): 239 assert_true(len(b) <= self.max_chunk_size) 240 241 for chunk_size in [16, 128, 1024]: 242 repo.git.status(output_stream=TestOutputStream(chunk_size), max_chunk_size=chunk_size) 243 244 repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=None) 245 repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=-10) 246 repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE)) 247 248 def test_init(self): 249 prev_cwd = os.getcwd() 250 os.chdir(tempfile.gettempdir()) 251 git_dir_rela = "repos/foo/bar.git" 252 del_dir_abs = osp.abspath("repos") 253 git_dir_abs = osp.abspath(git_dir_rela) 254 try: 255 # with specific path 256 for path in (git_dir_rela, git_dir_abs): 257 r = Repo.init(path=path, bare=True) 258 self.assertIsInstance(r, Repo) 259 assert r.bare is True 260 assert not r.has_separate_working_tree() 261 assert osp.isdir(r.git_dir) 262 263 self._assert_empty_repo(r) 264 265 # test clone 266 clone_path = path + "_clone" 267 rc = r.clone(clone_path) 268 self._assert_empty_repo(rc) 269 270 try: 271 rmtree(clone_path) 272 except OSError: 273 # when relative paths are used, the clone may actually be inside 274 # of the parent directory 275 pass 276 # END exception handling 277 278 # try again, this time with the absolute version 279 rc = Repo.clone_from(r.git_dir, clone_path) 280 self._assert_empty_repo(rc) 281 282 rmtree(git_dir_abs) 283 try: 284 rmtree(clone_path) 285 except OSError: 286 # when relative paths are used, the clone may actually be inside 287 # of the parent directory 288 pass 289 # END exception handling 290 291 # END for each path 292 293 os.makedirs(git_dir_rela) 294 os.chdir(git_dir_rela) 295 r = Repo.init(bare=False) 296 assert r.bare is False 297 assert not r.has_separate_working_tree() 298 299 self._assert_empty_repo(r) 300 finally: 301 try: 302 rmtree(del_dir_abs) 303 except OSError: 304 pass 305 os.chdir(prev_cwd) 306 # END restore previous state 307 308 def test_bare_property(self): 309 self.rorepo.bare 310 311 def test_daemon_export(self): 312 orig_val = self.rorepo.daemon_export 313 self.rorepo.daemon_export = not orig_val 314 self.assertEqual(self.rorepo.daemon_export, (not orig_val)) 315 self.rorepo.daemon_export = orig_val 316 self.assertEqual(self.rorepo.daemon_export, orig_val) 317 318 def test_alternates(self): 319 cur_alternates = self.rorepo.alternates 320 # empty alternates 321 self.rorepo.alternates = [] 322 self.assertEqual(self.rorepo.alternates, []) 323 alts = ["other/location", "this/location"] 324 self.rorepo.alternates = alts 325 self.assertEqual(alts, self.rorepo.alternates) 326 self.rorepo.alternates = cur_alternates 327 328 def test_repr(self): 329 assert repr(self.rorepo).startswith('<git.Repo ') 330 331 def test_is_dirty_with_bare_repository(self): 332 orig_value = self.rorepo._bare 333 self.rorepo._bare = True 334 assert_false(self.rorepo.is_dirty()) 335 self.rorepo._bare = orig_value 336 337 def test_is_dirty(self): 338 self.rorepo._bare = False 339 for index in (0, 1): 340 for working_tree in (0, 1): 341 for untracked_files in (0, 1): 342 assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) 343 # END untracked files 344 # END working tree 345 # END index 346 orig_val = self.rorepo._bare 347 self.rorepo._bare = True 348 assert self.rorepo.is_dirty() is False 349 self.rorepo._bare = orig_val 350 351 @with_rw_repo('HEAD') 352 def test_is_dirty_with_path(self, rwrepo): 353 assert rwrepo.is_dirty(path="git") is False 354 355 with open(osp.join(rwrepo.working_dir, "git", "util.py"), "at") as f: 356 f.write("junk") 357 assert rwrepo.is_dirty(path="git") is True 358 assert rwrepo.is_dirty(path="doc") is False 359 360 rwrepo.git.add(Git.polish_url(osp.join("git", "util.py"))) 361 assert rwrepo.is_dirty(index=False, path="git") is False 362 assert rwrepo.is_dirty(path="git") is True 363 364 with open(osp.join(rwrepo.working_dir, "doc", "no-such-file.txt"), "wt") as f: 365 f.write("junk") 366 assert rwrepo.is_dirty(path="doc") is False 367 assert rwrepo.is_dirty(untracked_files=True, path="doc") is True 368 369 def test_head(self): 370 self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object) 371 372 def test_index(self): 373 index = self.rorepo.index 374 self.assertIsInstance(index, IndexFile) 375 376 def test_tag(self): 377 assert self.rorepo.tag('refs/tags/0.1.5').commit 378 379 def test_archive(self): 380 tmpfile = tempfile.mktemp(suffix='archive-test') 381 with open(tmpfile, 'wb') as stream: 382 self.rorepo.archive(stream, '0.1.6', path='doc') 383 assert stream.tell() 384 os.remove(tmpfile) 385 386 @patch.object(Git, '_call_process') 387 def test_should_display_blame_information(self, git): 388 git.return_value = fixture('blame') 389 b = self.rorepo.blame('master', 'lib/git.py') 390 assert_equal(13, len(b)) 391 assert_equal(2, len(b[0])) 392 # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) 393 assert_equal(hash(b[0][0]), hash(b[9][0])) 394 c = b[0][0] 395 assert_true(git.called) 396 397 assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) 398 assert_equal('Tom Preston-Werner', c.author.name) 399 assert_equal('tom@mojombo.com', c.author.email) 400 assert_equal(1191997100, c.authored_date) 401 assert_equal('Tom Preston-Werner', c.committer.name) 402 assert_equal('tom@mojombo.com', c.committer.email) 403 assert_equal(1191997100, c.committed_date) 404 self.assertRaisesRegexp(ValueError, "634396b2f541a9f2d58b00be1a07f0c358b999b3 missing", lambda: c.message) 405 406 # test the 'lines per commit' entries 407 tlist = b[0][1] 408 assert_true(tlist) 409 assert_true(isinstance(tlist[0], string_types)) 410 assert_true(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug 411 412 # BINARY BLAME 413 git.return_value = fixture('blame_binary') 414 blames = self.rorepo.blame('master', 'rps') 415 self.assertEqual(len(blames), 2) 416 417 def test_blame_real(self): 418 c = 0 419 nml = 0 # amount of multi-lines per blame 420 for item in self.rorepo.head.commit.tree.traverse( 421 predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')): 422 c += 1 423 424 for b in self.rorepo.blame(self.rorepo.head, item.path): 425 nml += int(len(b[1]) > 1) 426 # END for each item to traverse 427 assert c, "Should have executed at least one blame command" 428 assert nml, "There should at least be one blame commit that contains multiple lines" 429 430 @patch.object(Git, '_call_process') 431 def test_blame_incremental(self, git): 432 # loop over two fixtures, create a test fixture for 2.11.1+ syntax 433 for git_fixture in ('blame_incremental', 'blame_incremental_2.11.1_plus'): 434 git.return_value = fixture(git_fixture) 435 blame_output = self.rorepo.blame_incremental('9debf6b0aafb6f7781ea9d1383c86939a1aacde3', 'AUTHORS') 436 blame_output = list(blame_output) 437 self.assertEqual(len(blame_output), 5) 438 439 # Check all outputted line numbers 440 ranges = flatten([entry.linenos for entry in blame_output]) 441 self.assertEqual(ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(3, 14), range(15, 17)])) 442 443 commits = [entry.commit.hexsha[:7] for entry in blame_output] 444 self.assertEqual(commits, ['82b8902', '82b8902', 'c76852d', 'c76852d', 'c76852d']) 445 446 # Original filenames 447 self.assertSequenceEqual([entry.orig_path for entry in blame_output], [u'AUTHORS'] * len(blame_output)) 448 449 # Original line numbers 450 orig_ranges = flatten([entry.orig_linenos for entry in blame_output]) 451 self.assertEqual(orig_ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(2, 13), range(13, 15)])) # noqa E501 452 453 @patch.object(Git, '_call_process') 454 def test_blame_complex_revision(self, git): 455 git.return_value = fixture('blame_complex_revision') 456 res = self.rorepo.blame("HEAD~10..HEAD", "README.md") 457 self.assertEqual(len(res), 1) 458 self.assertEqual(len(res[0][1]), 83, "Unexpected amount of parsed blame lines") 459 460 @skipIf(HIDE_WINDOWS_KNOWN_ERRORS and Git.is_cygwin(), 461 """FIXME: File "C:\\projects\\gitpython\\git\\cmd.py", line 671, in execute 462 raise GitCommandError(command, status, stderr_value, stdout_value) 463 GitCommandError: Cmd('git') failed due to: exit code(128) 464 cmdline: git add 1__��ava verb��ten 1_test _myfile 1_test_other_file 465 1_��ava-----verb��ten 466 stderr: 'fatal: pathspec '"1__çava verböten"' did not match any files' 467 """) 468 @with_rw_repo('HEAD', bare=False) 469 def test_untracked_files(self, rwrepo): 470 for run, (repo_add, is_invoking_git) in enumerate(( 471 (rwrepo.index.add, False), 472 (rwrepo.git.add, True), 473 )): 474 base = rwrepo.working_tree_dir 475 files = (join_path_native(base, u"%i_test _myfile" % run), 476 join_path_native(base, "%i_test_other_file" % run), 477 join_path_native(base, u"%i__çava verböten" % run), 478 join_path_native(base, u"%i_çava-----verböten" % run)) 479 480 num_recently_untracked = 0 481 for fpath in files: 482 with open(fpath, "wb"): 483 pass 484 untracked_files = rwrepo.untracked_files 485 num_recently_untracked = len(untracked_files) 486 487 # assure we have all names - they are relative to the git-dir 488 num_test_untracked = 0 489 for utfile in untracked_files: 490 num_test_untracked += join_path_native(base, utfile) in files 491 self.assertEqual(len(files), num_test_untracked) 492 493 if is_win and not PY3 and is_invoking_git: 494 ## On Windows, shell needed when passing unicode cmd-args. 495 # 496 repo_add = fnt.partial(repo_add, shell=True) 497 untracked_files = [win_encode(f) for f in untracked_files] 498 repo_add(untracked_files) 499 self.assertEqual(len(rwrepo.untracked_files), (num_recently_untracked - len(files))) 500 # end for each run 501 502 def test_config_reader(self): 503 reader = self.rorepo.config_reader() # all config files 504 assert reader.read_only 505 reader = self.rorepo.config_reader("repository") # single config file 506 assert reader.read_only 507 508 def test_config_writer(self): 509 for config_level in self.rorepo.config_level: 510 try: 511 with self.rorepo.config_writer(config_level) as writer: 512 self.assertFalse(writer.read_only) 513 except IOError: 514 # its okay not to get a writer for some configuration files if we 515 # have no permissions 516 pass 517 518 def test_config_level_paths(self): 519 for config_level in self.rorepo.config_level: 520 assert self.rorepo._get_config_path(config_level) 521 522 def test_creation_deletion(self): 523 # just a very quick test to assure it generally works. There are 524 # specialized cases in the test_refs module 525 head = self.rorepo.create_head("new_head", "HEAD~1") 526 self.rorepo.delete_head(head) 527 528 try: 529 tag = self.rorepo.create_tag("new_tag", "HEAD~2") 530 finally: 531 self.rorepo.delete_tag(tag) 532 with self.rorepo.config_writer(): 533 pass 534 try: 535 remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") 536 finally: 537 self.rorepo.delete_remote(remote) 538 539 def test_comparison_and_hash(self): 540 # this is only a preliminary test, more testing done in test_index 541 self.assertEqual(self.rorepo, self.rorepo) 542 self.assertFalse(self.rorepo != self.rorepo) 543 self.assertEqual(len({self.rorepo, self.rorepo}), 1) 544 545 @with_rw_directory 546 def test_tilde_and_env_vars_in_repo_path(self, rw_dir): 547 ph = os.environ.get('HOME') 548 try: 549 os.environ['HOME'] = rw_dir 550 Repo.init(osp.join('~', 'test.git'), bare=True) 551 552 os.environ['FOO'] = rw_dir 553 Repo.init(osp.join('$FOO', 'test.git'), bare=True) 554 finally: 555 if ph: 556 os.environ['HOME'] = ph 557 del os.environ['FOO'] 558 # end assure HOME gets reset to what it was 559 560 def test_git_cmd(self): 561 # test CatFileContentStream, just to be very sure we have no fencepost errors 562 # last \n is the terminating newline that it expects 563 l1 = b"0123456789\n" 564 l2 = b"abcdefghijklmnopqrstxy\n" 565 l3 = b"z\n" 566 d = l1 + l2 + l3 + b"\n" 567 568 l1p = l1[:5] 569 570 # full size 571 # size is without terminating newline 572 def mkfull(): 573 return Git.CatFileContentStream(len(d) - 1, BytesIO(d)) 574 575 ts = 5 576 577 def mktiny(): 578 return Git.CatFileContentStream(ts, BytesIO(d)) 579 580 # readlines no limit 581 s = mkfull() 582 lines = s.readlines() 583 self.assertEqual(len(lines), 3) 584 self.assertTrue(lines[-1].endswith(b'\n'), lines[-1]) 585 self.assertEqual(s._stream.tell(), len(d)) # must have scrubbed to the end 586 587 # realines line limit 588 s = mkfull() 589 lines = s.readlines(5) 590 self.assertEqual(len(lines), 1) 591 592 # readlines on tiny sections 593 s = mktiny() 594 lines = s.readlines() 595 self.assertEqual(len(lines), 1) 596 self.assertEqual(lines[0], l1p) 597 self.assertEqual(s._stream.tell(), ts + 1) 598 599 # readline no limit 600 s = mkfull() 601 self.assertEqual(s.readline(), l1) 602 self.assertEqual(s.readline(), l2) 603 self.assertEqual(s.readline(), l3) 604 self.assertEqual(s.readline(), b'') 605 self.assertEqual(s._stream.tell(), len(d)) 606 607 # readline limit 608 s = mkfull() 609 self.assertEqual(s.readline(5), l1p) 610 self.assertEqual(s.readline(), l1[5:]) 611 612 # readline on tiny section 613 s = mktiny() 614 self.assertEqual(s.readline(), l1p) 615 self.assertEqual(s.readline(), b'') 616 self.assertEqual(s._stream.tell(), ts + 1) 617 618 # read no limit 619 s = mkfull() 620 self.assertEqual(s.read(), d[:-1]) 621 self.assertEqual(s.read(), b'') 622 self.assertEqual(s._stream.tell(), len(d)) 623 624 # read limit 625 s = mkfull() 626 self.assertEqual(s.read(5), l1p) 627 self.assertEqual(s.read(6), l1[5:]) 628 self.assertEqual(s._stream.tell(), 5 + 6) # its not yet done 629 630 # read tiny 631 s = mktiny() 632 self.assertEqual(s.read(2), l1[:2]) 633 self.assertEqual(s._stream.tell(), 2) 634 self.assertEqual(s.read(), l1[2:ts]) 635 self.assertEqual(s._stream.tell(), ts + 1) 636 637 def _assert_rev_parse_types(self, name, rev_obj): 638 rev_parse = self.rorepo.rev_parse 639 640 if rev_obj.type == 'tag': 641 rev_obj = rev_obj.object 642 643 # tree and blob type 644 obj = rev_parse(name + '^{tree}') 645 self.assertEqual(obj, rev_obj.tree) 646 647 obj = rev_parse(name + ':CHANGES') 648 self.assertEqual(obj.type, 'blob') 649 self.assertEqual(obj.path, 'CHANGES') 650 self.assertEqual(rev_obj.tree['CHANGES'], obj) 651 652 def _assert_rev_parse(self, name): 653 """tries multiple different rev-parse syntaxes with the given name 654 :return: parsed object""" 655 rev_parse = self.rorepo.rev_parse 656 orig_obj = rev_parse(name) 657 if orig_obj.type == 'tag': 658 obj = orig_obj.object 659 else: 660 obj = orig_obj 661 # END deref tags by default 662 663 # try history 664 rev = name + "~" 665 obj2 = rev_parse(rev) 666 self.assertEqual(obj2, obj.parents[0]) 667 self._assert_rev_parse_types(rev, obj2) 668 669 # history with number 670 ni = 11 671 history = [obj.parents[0]] 672 for pn in range(ni): 673 history.append(history[-1].parents[0]) 674 # END get given amount of commits 675 676 for pn in range(11): 677 rev = name + "~%i" % (pn + 1) 678 obj2 = rev_parse(rev) 679 self.assertEqual(obj2, history[pn]) 680 self._assert_rev_parse_types(rev, obj2) 681 # END history check 682 683 # parent ( default ) 684 rev = name + "^" 685 obj2 = rev_parse(rev) 686 self.assertEqual(obj2, obj.parents[0]) 687 self._assert_rev_parse_types(rev, obj2) 688 689 # parent with number 690 for pn, parent in enumerate(obj.parents): 691 rev = name + "^%i" % (pn + 1) 692 self.assertEqual(rev_parse(rev), parent) 693 self._assert_rev_parse_types(rev, parent) 694 # END for each parent 695 696 return orig_obj 697 698 @with_rw_repo('HEAD', bare=False) 699 def test_rw_rev_parse(self, rwrepo): 700 # verify it does not confuse branches with hexsha ids 701 ahead = rwrepo.create_head('aaaaaaaa') 702 assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) 703 704 def test_rev_parse(self): 705 rev_parse = self.rorepo.rev_parse 706 707 # try special case: This one failed at some point, make sure its fixed 708 self.assertEqual(rev_parse("33ebe").hexsha, "33ebe7acec14b25c5f84f35a664803fcab2f7781") 709 710 # start from reference 711 num_resolved = 0 712 713 for ref_no, ref in enumerate(Reference.iter_items(self.rorepo)): 714 path_tokens = ref.path.split("/") 715 for pt in range(len(path_tokens)): 716 path_section = '/'.join(path_tokens[-(pt + 1):]) 717 try: 718 obj = self._assert_rev_parse(path_section) 719 self.assertEqual(obj.type, ref.object.type) 720 num_resolved += 1 721 except (BadName, BadObject): 722 print("failed on %s" % path_section) 723 # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 724 pass 725 # END exception handling 726 # END for each token 727 if ref_no == 3 - 1: 728 break 729 # END for each reference 730 assert num_resolved 731 732 # it works with tags ! 733 tag = self._assert_rev_parse('0.1.4') 734 self.assertEqual(tag.type, 'tag') 735 736 # try full sha directly ( including type conversion ) 737 self.assertEqual(tag.object, rev_parse(tag.object.hexsha)) 738 self._assert_rev_parse_types(tag.object.hexsha, tag.object) 739 740 # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES 741 rev = '0.1.4^{tree}^{tree}' 742 self.assertEqual(rev_parse(rev), tag.object.tree) 743 self.assertEqual(rev_parse(rev + ':CHANGES'), tag.object.tree['CHANGES']) 744 745 # try to get parents from first revision - it should fail as no such revision 746 # exists 747 first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" 748 commit = rev_parse(first_rev) 749 self.assertEqual(len(commit.parents), 0) 750 self.assertEqual(commit.hexsha, first_rev) 751 self.failUnlessRaises(BadName, rev_parse, first_rev + "~") 752 self.failUnlessRaises(BadName, rev_parse, first_rev + "^") 753 754 # short SHA1 755 commit2 = rev_parse(first_rev[:20]) 756 self.assertEqual(commit2, commit) 757 commit2 = rev_parse(first_rev[:5]) 758 self.assertEqual(commit2, commit) 759 760 # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one 761 # needs a tag which points to a blob 762 763 # ref^0 returns commit being pointed to, same with ref~0, and ^{} 764 tag = rev_parse('0.1.4') 765 for token in (('~0', '^0', '^{}')): 766 self.assertEqual(tag.object, rev_parse('0.1.4%s' % token)) 767 # END handle multiple tokens 768 769 # try partial parsing 770 max_items = 40 771 for i, binsha in enumerate(self.rorepo.odb.sha_iter()): 772 self.assertEqual(rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha, binsha) 773 if i > max_items: 774 # this is rather slow currently, as rev_parse returns an object 775 # which requires accessing packs, it has some additional overhead 776 break 777 # END for each binsha in repo 778 779 # missing closing brace commit^{tree 780 self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') 781 782 # missing starting brace 783 self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') 784 785 # REVLOG 786 ####### 787 head = self.rorepo.head 788 789 # need to specify a ref when using the @ syntax 790 self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) 791 792 # uses HEAD.ref by default 793 self.assertEqual(rev_parse('@{0}'), head.commit) 794 if not head.is_detached: 795 refspec = '%s@{0}' % head.ref.name 796 self.assertEqual(rev_parse(refspec), head.ref.commit) 797 # all additional specs work as well 798 self.assertEqual(rev_parse(refspec + "^{tree}"), head.commit.tree) 799 self.assertEqual(rev_parse(refspec + ":CHANGES").type, 'blob') 800 # END operate on non-detached head 801 802 # position doesn't exist 803 self.failUnlessRaises(IndexError, rev_parse, '@{10000}') 804 805 # currently, nothing more is supported 806 self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") 807 808 # the last position 809 assert rev_parse('@{1}') != head.commit 810 811 def test_repo_odbtype(self): 812 target_type = GitCmdObjectDB 813 self.assertIsInstance(self.rorepo.odb, target_type) 814 815 def test_submodules(self): 816 self.assertEqual(len(self.rorepo.submodules), 1) # non-recursive 817 self.assertGreaterEqual(len(list(self.rorepo.iter_submodules())), 2) 818 819 self.assertIsInstance(self.rorepo.submodule("gitdb"), Submodule) 820 self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") 821 822 @with_rw_repo('HEAD', bare=False) 823 def test_submodule_update(self, rwrepo): 824 # fails in bare mode 825 rwrepo._bare = True 826 self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) 827 rwrepo._bare = False 828 829 # test create submodule 830 sm = rwrepo.submodules[0] 831 sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) 832 self.assertIsInstance(sm, Submodule) 833 834 # note: the rest of this functionality is tested in test_submodule 835 836 @with_rw_repo('HEAD') 837 def test_git_file(self, rwrepo): 838 # Move the .git directory to another location and create the .git file. 839 real_path_abs = osp.abspath(join_path_native(rwrepo.working_tree_dir, '.real')) 840 os.rename(rwrepo.git_dir, real_path_abs) 841 git_file_path = join_path_native(rwrepo.working_tree_dir, '.git') 842 with open(git_file_path, 'wb') as fp: 843 fp.write(fixture('git_file')) 844 845 # Create a repo and make sure it's pointing to the relocated .git directory. 846 git_file_repo = Repo(rwrepo.working_tree_dir) 847 self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs) 848 849 # Test using an absolute gitdir path in the .git file. 850 with open(git_file_path, 'wb') as fp: 851 fp.write(('gitdir: %s\n' % real_path_abs).encode('ascii')) 852 git_file_repo = Repo(rwrepo.working_tree_dir) 853 self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs) 854 855 def test_file_handle_leaks(self): 856 def last_commit(repo, rev, path): 857 commit = next(repo.iter_commits(rev, path, max_count=1)) 858 commit.tree[path] 859 860 # This is based on this comment 861 # https://github.com/gitpython-developers/GitPython/issues/60#issuecomment-23558741 862 # And we expect to set max handles to a low value, like 64 863 # You should set ulimit -n X, see .travis.yml 864 # The loops below would easily create 500 handles if these would leak (4 pipes + multiple mapped files) 865 for _ in range(64): 866 for repo_type in (GitCmdObjectDB, GitDB): 867 repo = Repo(self.rorepo.working_tree_dir, odbt=repo_type) 868 last_commit(repo, 'master', 'git/test/test_base.py') 869 # end for each repository type 870 # end for each iteration 871 872 def test_remote_method(self): 873 self.failUnlessRaises(ValueError, self.rorepo.remote, 'foo-blue') 874 self.assertIsInstance(self.rorepo.remote(name='origin'), Remote) 875 876 @with_rw_directory 877 def test_empty_repo(self, rw_dir): 878 """Assure we can handle empty repositories""" 879 r = Repo.init(rw_dir, mkdir=False) 880 # It's ok not to be able to iterate a commit, as there is none 881 self.failUnlessRaises(ValueError, r.iter_commits) 882 self.assertEqual(r.active_branch.name, 'master') 883 assert not r.active_branch.is_valid(), "Branch is yet to be born" 884 885 # actually, when trying to create a new branch without a commit, git itself fails 886 # We should, however, not fail ungracefully 887 self.failUnlessRaises(BadName, r.create_head, 'foo') 888 self.failUnlessRaises(BadName, r.create_head, 'master') 889 # It's expected to not be able to access a tree 890 self.failUnlessRaises(ValueError, r.tree) 891 892 new_file_path = osp.join(rw_dir, "new_file.ext") 893 touch(new_file_path) 894 r.index.add([new_file_path]) 895 r.index.commit("initial commit\nBAD MESSAGE 1\n") 896 897 # Now a branch should be creatable 898 nb = r.create_head('foo') 899 assert nb.is_valid() 900 901 with open(new_file_path, 'w') as f: 902 f.write('Line 1\n') 903 904 r.index.add([new_file_path]) 905 r.index.commit("add line 1\nBAD MESSAGE 2\n") 906 907 with open('%s/.git/logs/refs/heads/master' % (rw_dir,), 'r') as f: 908 contents = f.read() 909 910 assert 'BAD MESSAGE' not in contents, 'log is corrupt' 911 912 def test_merge_base(self): 913 repo = self.rorepo 914 c1 = 'f6aa8d1' 915 c2 = repo.commit('d46e3fe') 916 c3 = '763ef75' 917 self.failUnlessRaises(ValueError, repo.merge_base) 918 self.failUnlessRaises(ValueError, repo.merge_base, 'foo') 919 920 # two commit merge-base 921 res = repo.merge_base(c1, c2) 922 self.assertIsInstance(res, list) 923 self.assertEqual(len(res), 1) 924 self.assertIsInstance(res[0], Commit) 925 self.assertTrue(res[0].hexsha.startswith('3936084')) 926 927 for kw in ('a', 'all'): 928 res = repo.merge_base(c1, c2, c3, **{kw: True}) 929 self.assertIsInstance(res, list) 930 self.assertEqual(len(res), 1) 931 # end for each keyword signalling all merge-bases to be returned 932 933 # Test for no merge base - can't do as we have 934 self.failUnlessRaises(GitCommandError, repo.merge_base, c1, 'ffffff') 935 936 def test_is_ancestor(self): 937 git = self.rorepo.git 938 if git.version_info[:3] < (1, 8, 0): 939 raise SkipTest("git merge-base --is-ancestor feature unsupported") 940 941 repo = self.rorepo 942 c1 = 'f6aa8d1' 943 c2 = '763ef75' 944 self.assertTrue(repo.is_ancestor(c1, c1)) 945 self.assertTrue(repo.is_ancestor("master", "master")) 946 self.assertTrue(repo.is_ancestor(c1, c2)) 947 self.assertTrue(repo.is_ancestor(c1, "master")) 948 self.assertFalse(repo.is_ancestor(c2, c1)) 949 self.assertFalse(repo.is_ancestor("master", c1)) 950 for i, j in itertools.permutations([c1, 'ffffff', ''], r=2): 951 self.assertRaises(GitCommandError, repo.is_ancestor, i, j) 952 953 @with_rw_directory 954 def test_git_work_tree_dotgit(self, rw_dir): 955 """Check that we find .git as a worktree file and find the worktree 956 based on it.""" 957 git = Git(rw_dir) 958 if git.version_info[:3] < (2, 5, 1): 959 raise SkipTest("worktree feature unsupported") 960 961 rw_master = self.rorepo.clone(join_path_native(rw_dir, 'master_repo')) 962 branch = rw_master.create_head('aaaaaaaa') 963 worktree_path = join_path_native(rw_dir, 'worktree_repo') 964 if Git.is_cygwin(): 965 worktree_path = cygpath(worktree_path) 966 rw_master.git.worktree('add', worktree_path, branch.name) 967 968 # this ensures that we can read the repo's gitdir correctly 969 repo = Repo(worktree_path) 970 self.assertIsInstance(repo, Repo) 971 972 # this ensures we're able to actually read the refs in the tree, which 973 # means we can read commondir correctly. 974 commit = repo.head.commit 975 self.assertIsInstance(commit, Object) 976 977 self.assertIsInstance(repo.heads['aaaaaaaa'], Head) 978 979 @with_rw_directory 980 def test_git_work_tree_env(self, rw_dir): 981 """Check that we yield to GIT_WORK_TREE""" 982 # clone a repo 983 # move .git directory to a subdirectory 984 # set GIT_DIR and GIT_WORK_TREE appropriately 985 # check that repo.working_tree_dir == rw_dir 986 self.rorepo.clone(join_path_native(rw_dir, 'master_repo')) 987 988 repo_dir = join_path_native(rw_dir, 'master_repo') 989 old_git_dir = join_path_native(repo_dir, '.git') 990 new_subdir = join_path_native(repo_dir, 'gitdir') 991 new_git_dir = join_path_native(new_subdir, 'git') 992 os.mkdir(new_subdir) 993 os.rename(old_git_dir, new_git_dir) 994 995 oldenv = os.environ.copy() 996 os.environ['GIT_DIR'] = new_git_dir 997 os.environ['GIT_WORK_TREE'] = repo_dir 998 999 try: 1000 r = Repo() 1001 self.assertEqual(r.working_tree_dir, repo_dir) 1002 self.assertEqual(r.working_dir, repo_dir) 1003 finally: 1004 os.environ = oldenv 1005