1# -*- coding: utf-8 -*-
2# test_repo.py
3# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
4#
5# This module is part of GitPython and is released under
6# the BSD License: http://www.opensource.org/licenses/bsd-license.php
7import glob
8import io
9from io import BytesIO
10import itertools
11import os
12import pickle
13import tempfile
14from unittest import skipIf, SkipTest
15
16try:
17    import pathlib
18except ImportError:
19    pathlib = None
20
21from git import (
22    InvalidGitRepositoryError,
23    Repo,
24    NoSuchPathError,
25    Head,
26    Commit,
27    Object,
28    Tree,
29    IndexFile,
30    Git,
31    Reference,
32    GitDB,
33    Submodule,
34    GitCmdObjectDB,
35    Remote,
36    BadName,
37    GitCommandError
38)
39from git.compat import (
40    PY3,
41    is_win,
42    string_types,
43    win_encode,
44)
45from git.exc import (
46    BadObject,
47)
48from git.repo.fun import touch
49from git.test.lib import (
50    patch,
51    TestBase,
52    with_rw_repo,
53    fixture,
54    assert_false,
55    assert_equal,
56    assert_true,
57    raises
58)
59from git.util import HIDE_WINDOWS_KNOWN_ERRORS, cygpath
60from git.test.lib import with_rw_directory
61from git.util import join_path_native, rmtree, rmfile, bin_to_hex
62
63import functools as fnt
64import os.path as osp
65
66
67def iter_flatten(lol):
68    for items in lol:
69        for item in items:
70            yield item
71
72
73def flatten(lol):
74    return list(iter_flatten(lol))
75
76
77_tc_lock_fpaths = osp.join(osp.dirname(__file__), '../../.git/*.lock')
78
79
80def _rm_lock_files():
81    for lfp in glob.glob(_tc_lock_fpaths):
82        rmfile(lfp)
83
84
85class TestRepo(TestBase):
86
87    def setUp(self):
88        _rm_lock_files()
89
90    def tearDown(self):
91        for lfp in glob.glob(_tc_lock_fpaths):
92            if osp.isfile(lfp):
93                raise AssertionError('Previous TC left hanging git-lock file: %s', lfp)
94        import gc
95        gc.collect()
96
97    @raises(InvalidGitRepositoryError)
98    def test_new_should_raise_on_invalid_repo_location(self):
99        Repo(tempfile.gettempdir())
100
101    @raises(NoSuchPathError)
102    def test_new_should_raise_on_non_existent_path(self):
103        Repo("repos/foobar")
104
105    @with_rw_repo('0.3.2.1')
106    def test_repo_creation_from_different_paths(self, rw_repo):
107        r_from_gitdir = Repo(rw_repo.git_dir)
108        self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir)
109        assert r_from_gitdir.git_dir.endswith('.git')
110        assert not rw_repo.git.working_dir.endswith('.git')
111        self.assertEqual(r_from_gitdir.git.working_dir, rw_repo.git.working_dir)
112
113    @with_rw_repo('0.3.2.1')
114    def test_repo_creation_pathlib(self, rw_repo):
115        if pathlib is None:  # pythons bellow 3.4 don't have pathlib
116            raise SkipTest("pathlib was introduced in 3.4")
117
118        r_from_gitdir = Repo(pathlib.Path(rw_repo.git_dir))
119        self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir)
120
121    def test_description(self):
122        txt = "Test repository"
123        self.rorepo.description = txt
124        assert_equal(self.rorepo.description, txt)
125
126    def test_heads_should_return_array_of_head_objects(self):
127        for head in self.rorepo.heads:
128            assert_equal(Head, head.__class__)
129
130    def test_heads_should_populate_head_data(self):
131        for head in self.rorepo.heads:
132            assert head.name
133            self.assertIsInstance(head.commit, Commit)
134        # END for each head
135
136        self.assertIsInstance(self.rorepo.heads.master, Head)
137        self.assertIsInstance(self.rorepo.heads['master'], Head)
138
139    def test_tree_from_revision(self):
140        tree = self.rorepo.tree('0.1.6')
141        self.assertEqual(len(tree.hexsha), 40)
142        self.assertEqual(tree.type, "tree")
143        self.assertEqual(self.rorepo.tree(tree), tree)
144
145        # try from invalid revision that does not exist
146        self.failUnlessRaises(BadName, self.rorepo.tree, 'hello world')
147
148    def test_pickleable(self):
149        pickle.loads(pickle.dumps(self.rorepo))
150
151    def test_commit_from_revision(self):
152        commit = self.rorepo.commit('0.1.4')
153        self.assertEqual(commit.type, 'commit')
154        self.assertEqual(self.rorepo.commit(commit), commit)
155
156    def test_commits(self):
157        mc = 10
158        commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
159        self.assertEqual(len(commits), mc)
160
161        c = commits[0]
162        assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
163        assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
164        assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha)
165        assert_equal("Michael Trier", c.author.name)
166        assert_equal("mtrier@gmail.com", c.author.email)
167        assert_equal(1232829715, c.authored_date)
168        assert_equal(5 * 3600, c.author_tz_offset)
169        assert_equal("Michael Trier", c.committer.name)
170        assert_equal("mtrier@gmail.com", c.committer.email)
171        assert_equal(1232829715, c.committed_date)
172        assert_equal(5 * 3600, c.committer_tz_offset)
173        assert_equal("Bumped version 0.1.6\n", c.message)
174
175        c = commits[1]
176        self.assertIsInstance(c.parents, tuple)
177
178    def test_trees(self):
179        mc = 30
180        num_trees = 0
181        for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
182            num_trees += 1
183            self.assertIsInstance(tree, Tree)
184        # END for each tree
185        self.assertEqual(num_trees, mc)
186
187    def _assert_empty_repo(self, repo):
188        # test all kinds of things with an empty, freshly initialized repo.
189        # It should throw good errors
190
191        # entries should be empty
192        self.assertEqual(len(repo.index.entries), 0)
193
194        # head is accessible
195        assert repo.head
196        assert repo.head.ref
197        assert not repo.head.is_valid()
198
199        # we can change the head to some other ref
200        head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
201        assert not head_ref.is_valid()
202        repo.head.ref = head_ref
203
204        # is_dirty can handle all kwargs
205        for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
206            assert not repo.is_dirty(*args)
207        # END for each arg
208
209        # we can add a file to the index ( if we are not bare )
210        if not repo.bare:
211            pass
212        # END test repos with working tree
213
214    @with_rw_directory
215    def test_clone_from_keeps_env(self, rw_dir):
216        original_repo = Repo.init(osp.join(rw_dir, "repo"))
217        environment = {"entry1": "value", "another_entry": "10"}
218
219        cloned = Repo.clone_from(original_repo.git_dir, osp.join(rw_dir, "clone"), env=environment)
220
221        assert_equal(environment, cloned.git.environment())
222
223    @with_rw_directory
224    def test_clone_from_pathlib(self, rw_dir):
225        if pathlib is None:  # pythons bellow 3.4 don't have pathlib
226            raise SkipTest("pathlib was introduced in 3.4")
227
228        original_repo = Repo.init(osp.join(rw_dir, "repo"))
229
230        Repo.clone_from(original_repo.git_dir, pathlib.Path(rw_dir) / "clone_pathlib")
231
232    @with_rw_repo('HEAD')
233    def test_max_chunk_size(self, repo):
234        class TestOutputStream(object):
235            def __init__(self, max_chunk_size):
236                self.max_chunk_size = max_chunk_size
237
238            def write(self, b):
239                assert_true(len(b) <= self.max_chunk_size)
240
241        for chunk_size in [16, 128, 1024]:
242            repo.git.status(output_stream=TestOutputStream(chunk_size), max_chunk_size=chunk_size)
243
244        repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=None)
245        repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=-10)
246        repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE))
247
248    def test_init(self):
249        prev_cwd = os.getcwd()
250        os.chdir(tempfile.gettempdir())
251        git_dir_rela = "repos/foo/bar.git"
252        del_dir_abs = osp.abspath("repos")
253        git_dir_abs = osp.abspath(git_dir_rela)
254        try:
255            # with specific path
256            for path in (git_dir_rela, git_dir_abs):
257                r = Repo.init(path=path, bare=True)
258                self.assertIsInstance(r, Repo)
259                assert r.bare is True
260                assert not r.has_separate_working_tree()
261                assert osp.isdir(r.git_dir)
262
263                self._assert_empty_repo(r)
264
265                # test clone
266                clone_path = path + "_clone"
267                rc = r.clone(clone_path)
268                self._assert_empty_repo(rc)
269
270                try:
271                    rmtree(clone_path)
272                except OSError:
273                    # when relative paths are used, the clone may actually be inside
274                    # of the parent directory
275                    pass
276                # END exception handling
277
278                # try again, this time with the absolute version
279                rc = Repo.clone_from(r.git_dir, clone_path)
280                self._assert_empty_repo(rc)
281
282                rmtree(git_dir_abs)
283                try:
284                    rmtree(clone_path)
285                except OSError:
286                    # when relative paths are used, the clone may actually be inside
287                    # of the parent directory
288                    pass
289                # END exception handling
290
291            # END for each path
292
293            os.makedirs(git_dir_rela)
294            os.chdir(git_dir_rela)
295            r = Repo.init(bare=False)
296            assert r.bare is False
297            assert not r.has_separate_working_tree()
298
299            self._assert_empty_repo(r)
300        finally:
301            try:
302                rmtree(del_dir_abs)
303            except OSError:
304                pass
305            os.chdir(prev_cwd)
306        # END restore previous state
307
308    def test_bare_property(self):
309        self.rorepo.bare
310
311    def test_daemon_export(self):
312        orig_val = self.rorepo.daemon_export
313        self.rorepo.daemon_export = not orig_val
314        self.assertEqual(self.rorepo.daemon_export, (not orig_val))
315        self.rorepo.daemon_export = orig_val
316        self.assertEqual(self.rorepo.daemon_export, orig_val)
317
318    def test_alternates(self):
319        cur_alternates = self.rorepo.alternates
320        # empty alternates
321        self.rorepo.alternates = []
322        self.assertEqual(self.rorepo.alternates, [])
323        alts = ["other/location", "this/location"]
324        self.rorepo.alternates = alts
325        self.assertEqual(alts, self.rorepo.alternates)
326        self.rorepo.alternates = cur_alternates
327
328    def test_repr(self):
329        assert repr(self.rorepo).startswith('<git.Repo ')
330
331    def test_is_dirty_with_bare_repository(self):
332        orig_value = self.rorepo._bare
333        self.rorepo._bare = True
334        assert_false(self.rorepo.is_dirty())
335        self.rorepo._bare = orig_value
336
337    def test_is_dirty(self):
338        self.rorepo._bare = False
339        for index in (0, 1):
340            for working_tree in (0, 1):
341                for untracked_files in (0, 1):
342                    assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
343                # END untracked files
344            # END working tree
345        # END index
346        orig_val = self.rorepo._bare
347        self.rorepo._bare = True
348        assert self.rorepo.is_dirty() is False
349        self.rorepo._bare = orig_val
350
351    @with_rw_repo('HEAD')
352    def test_is_dirty_with_path(self, rwrepo):
353        assert rwrepo.is_dirty(path="git") is False
354
355        with open(osp.join(rwrepo.working_dir, "git", "util.py"), "at") as f:
356            f.write("junk")
357        assert rwrepo.is_dirty(path="git") is True
358        assert rwrepo.is_dirty(path="doc") is False
359
360        rwrepo.git.add(Git.polish_url(osp.join("git", "util.py")))
361        assert rwrepo.is_dirty(index=False, path="git") is False
362        assert rwrepo.is_dirty(path="git") is True
363
364        with open(osp.join(rwrepo.working_dir, "doc", "no-such-file.txt"), "wt") as f:
365            f.write("junk")
366        assert rwrepo.is_dirty(path="doc") is False
367        assert rwrepo.is_dirty(untracked_files=True, path="doc") is True
368
369    def test_head(self):
370        self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object)
371
372    def test_index(self):
373        index = self.rorepo.index
374        self.assertIsInstance(index, IndexFile)
375
376    def test_tag(self):
377        assert self.rorepo.tag('refs/tags/0.1.5').commit
378
379    def test_archive(self):
380        tmpfile = tempfile.mktemp(suffix='archive-test')
381        with open(tmpfile, 'wb') as stream:
382            self.rorepo.archive(stream, '0.1.6', path='doc')
383            assert stream.tell()
384        os.remove(tmpfile)
385
386    @patch.object(Git, '_call_process')
387    def test_should_display_blame_information(self, git):
388        git.return_value = fixture('blame')
389        b = self.rorepo.blame('master', 'lib/git.py')
390        assert_equal(13, len(b))
391        assert_equal(2, len(b[0]))
392        # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
393        assert_equal(hash(b[0][0]), hash(b[9][0]))
394        c = b[0][0]
395        assert_true(git.called)
396
397        assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
398        assert_equal('Tom Preston-Werner', c.author.name)
399        assert_equal('tom@mojombo.com', c.author.email)
400        assert_equal(1191997100, c.authored_date)
401        assert_equal('Tom Preston-Werner', c.committer.name)
402        assert_equal('tom@mojombo.com', c.committer.email)
403        assert_equal(1191997100, c.committed_date)
404        self.assertRaisesRegexp(ValueError, "634396b2f541a9f2d58b00be1a07f0c358b999b3 missing", lambda: c.message)
405
406        # test the 'lines per commit' entries
407        tlist = b[0][1]
408        assert_true(tlist)
409        assert_true(isinstance(tlist[0], string_types))
410        assert_true(len(tlist) < sum(len(t) for t in tlist))               # test for single-char bug
411
412        # BINARY BLAME
413        git.return_value = fixture('blame_binary')
414        blames = self.rorepo.blame('master', 'rps')
415        self.assertEqual(len(blames), 2)
416
417    def test_blame_real(self):
418        c = 0
419        nml = 0   # amount of multi-lines per blame
420        for item in self.rorepo.head.commit.tree.traverse(
421                predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
422            c += 1
423
424            for b in self.rorepo.blame(self.rorepo.head, item.path):
425                nml += int(len(b[1]) > 1)
426        # END for each item to traverse
427        assert c, "Should have executed at least one blame command"
428        assert nml, "There should at least be one blame commit that contains multiple lines"
429
430    @patch.object(Git, '_call_process')
431    def test_blame_incremental(self, git):
432        # loop over two fixtures, create a test fixture for 2.11.1+ syntax
433        for git_fixture in ('blame_incremental', 'blame_incremental_2.11.1_plus'):
434            git.return_value = fixture(git_fixture)
435            blame_output = self.rorepo.blame_incremental('9debf6b0aafb6f7781ea9d1383c86939a1aacde3', 'AUTHORS')
436            blame_output = list(blame_output)
437            self.assertEqual(len(blame_output), 5)
438
439            # Check all outputted line numbers
440            ranges = flatten([entry.linenos for entry in blame_output])
441            self.assertEqual(ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(3, 14), range(15, 17)]))
442
443            commits = [entry.commit.hexsha[:7] for entry in blame_output]
444            self.assertEqual(commits, ['82b8902', '82b8902', 'c76852d', 'c76852d', 'c76852d'])
445
446            # Original filenames
447            self.assertSequenceEqual([entry.orig_path for entry in blame_output], [u'AUTHORS'] * len(blame_output))
448
449            # Original line numbers
450            orig_ranges = flatten([entry.orig_linenos for entry in blame_output])
451            self.assertEqual(orig_ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(2, 13), range(13, 15)]))   # noqa E501
452
453    @patch.object(Git, '_call_process')
454    def test_blame_complex_revision(self, git):
455        git.return_value = fixture('blame_complex_revision')
456        res = self.rorepo.blame("HEAD~10..HEAD", "README.md")
457        self.assertEqual(len(res), 1)
458        self.assertEqual(len(res[0][1]), 83, "Unexpected amount of parsed blame lines")
459
460    @skipIf(HIDE_WINDOWS_KNOWN_ERRORS and Git.is_cygwin(),
461            """FIXME: File "C:\\projects\\gitpython\\git\\cmd.py", line 671, in execute
462                    raise GitCommandError(command, status, stderr_value, stdout_value)
463                GitCommandError: Cmd('git') failed due to: exit code(128)
464                  cmdline: git add 1__��ava verb��ten 1_test _myfile 1_test_other_file
465                          1_��ava-----verb��ten
466                  stderr: 'fatal: pathspec '"1__çava verböten"' did not match any files'
467                """)
468    @with_rw_repo('HEAD', bare=False)
469    def test_untracked_files(self, rwrepo):
470        for run, (repo_add, is_invoking_git) in enumerate((
471                (rwrepo.index.add, False),
472                (rwrepo.git.add, True),
473        )):
474            base = rwrepo.working_tree_dir
475            files = (join_path_native(base, u"%i_test _myfile" % run),
476                     join_path_native(base, "%i_test_other_file" % run),
477                     join_path_native(base, u"%i__çava verböten" % run),
478                     join_path_native(base, u"%i_çava-----verböten" % run))
479
480            num_recently_untracked = 0
481            for fpath in files:
482                with open(fpath, "wb"):
483                    pass
484            untracked_files = rwrepo.untracked_files
485            num_recently_untracked = len(untracked_files)
486
487            # assure we have all names - they are relative to the git-dir
488            num_test_untracked = 0
489            for utfile in untracked_files:
490                num_test_untracked += join_path_native(base, utfile) in files
491            self.assertEqual(len(files), num_test_untracked)
492
493            if is_win and not PY3 and is_invoking_git:
494                ## On Windows, shell needed when passing unicode cmd-args.
495                #
496                repo_add = fnt.partial(repo_add, shell=True)
497                untracked_files = [win_encode(f) for f in untracked_files]
498            repo_add(untracked_files)
499            self.assertEqual(len(rwrepo.untracked_files), (num_recently_untracked - len(files)))
500        # end for each run
501
502    def test_config_reader(self):
503        reader = self.rorepo.config_reader()                # all config files
504        assert reader.read_only
505        reader = self.rorepo.config_reader("repository")    # single config file
506        assert reader.read_only
507
508    def test_config_writer(self):
509        for config_level in self.rorepo.config_level:
510            try:
511                with self.rorepo.config_writer(config_level) as writer:
512                    self.assertFalse(writer.read_only)
513            except IOError:
514                # its okay not to get a writer for some configuration files if we
515                # have no permissions
516                pass
517
518    def test_config_level_paths(self):
519        for config_level in self.rorepo.config_level:
520            assert self.rorepo._get_config_path(config_level)
521
522    def test_creation_deletion(self):
523        # just a very quick test to assure it generally works. There are
524        # specialized cases in the test_refs module
525        head = self.rorepo.create_head("new_head", "HEAD~1")
526        self.rorepo.delete_head(head)
527
528        try:
529            tag = self.rorepo.create_tag("new_tag", "HEAD~2")
530        finally:
531            self.rorepo.delete_tag(tag)
532        with self.rorepo.config_writer():
533            pass
534        try:
535            remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
536        finally:
537            self.rorepo.delete_remote(remote)
538
539    def test_comparison_and_hash(self):
540        # this is only a preliminary test, more testing done in test_index
541        self.assertEqual(self.rorepo, self.rorepo)
542        self.assertFalse(self.rorepo != self.rorepo)
543        self.assertEqual(len({self.rorepo, self.rorepo}), 1)
544
545    @with_rw_directory
546    def test_tilde_and_env_vars_in_repo_path(self, rw_dir):
547        ph = os.environ.get('HOME')
548        try:
549            os.environ['HOME'] = rw_dir
550            Repo.init(osp.join('~', 'test.git'), bare=True)
551
552            os.environ['FOO'] = rw_dir
553            Repo.init(osp.join('$FOO', 'test.git'), bare=True)
554        finally:
555            if ph:
556                os.environ['HOME'] = ph
557                del os.environ['FOO']
558        # end assure HOME gets reset to what it was
559
560    def test_git_cmd(self):
561        # test CatFileContentStream, just to be very sure we have no fencepost errors
562        # last \n is the terminating newline that it expects
563        l1 = b"0123456789\n"
564        l2 = b"abcdefghijklmnopqrstxy\n"
565        l3 = b"z\n"
566        d = l1 + l2 + l3 + b"\n"
567
568        l1p = l1[:5]
569
570        # full size
571        # size is without terminating newline
572        def mkfull():
573            return Git.CatFileContentStream(len(d) - 1, BytesIO(d))
574
575        ts = 5
576
577        def mktiny():
578            return Git.CatFileContentStream(ts, BytesIO(d))
579
580        # readlines no limit
581        s = mkfull()
582        lines = s.readlines()
583        self.assertEqual(len(lines), 3)
584        self.assertTrue(lines[-1].endswith(b'\n'), lines[-1])
585        self.assertEqual(s._stream.tell(), len(d))  # must have scrubbed to the end
586
587        # realines line limit
588        s = mkfull()
589        lines = s.readlines(5)
590        self.assertEqual(len(lines), 1)
591
592        # readlines on tiny sections
593        s = mktiny()
594        lines = s.readlines()
595        self.assertEqual(len(lines), 1)
596        self.assertEqual(lines[0], l1p)
597        self.assertEqual(s._stream.tell(), ts + 1)
598
599        # readline no limit
600        s = mkfull()
601        self.assertEqual(s.readline(), l1)
602        self.assertEqual(s.readline(), l2)
603        self.assertEqual(s.readline(), l3)
604        self.assertEqual(s.readline(), b'')
605        self.assertEqual(s._stream.tell(), len(d))
606
607        # readline limit
608        s = mkfull()
609        self.assertEqual(s.readline(5), l1p)
610        self.assertEqual(s.readline(), l1[5:])
611
612        # readline on tiny section
613        s = mktiny()
614        self.assertEqual(s.readline(), l1p)
615        self.assertEqual(s.readline(), b'')
616        self.assertEqual(s._stream.tell(), ts + 1)
617
618        # read no limit
619        s = mkfull()
620        self.assertEqual(s.read(), d[:-1])
621        self.assertEqual(s.read(), b'')
622        self.assertEqual(s._stream.tell(), len(d))
623
624        # read limit
625        s = mkfull()
626        self.assertEqual(s.read(5), l1p)
627        self.assertEqual(s.read(6), l1[5:])
628        self.assertEqual(s._stream.tell(), 5 + 6)  # its not yet done
629
630        # read tiny
631        s = mktiny()
632        self.assertEqual(s.read(2), l1[:2])
633        self.assertEqual(s._stream.tell(), 2)
634        self.assertEqual(s.read(), l1[2:ts])
635        self.assertEqual(s._stream.tell(), ts + 1)
636
637    def _assert_rev_parse_types(self, name, rev_obj):
638        rev_parse = self.rorepo.rev_parse
639
640        if rev_obj.type == 'tag':
641            rev_obj = rev_obj.object
642
643        # tree and blob type
644        obj = rev_parse(name + '^{tree}')
645        self.assertEqual(obj, rev_obj.tree)
646
647        obj = rev_parse(name + ':CHANGES')
648        self.assertEqual(obj.type, 'blob')
649        self.assertEqual(obj.path, 'CHANGES')
650        self.assertEqual(rev_obj.tree['CHANGES'], obj)
651
652    def _assert_rev_parse(self, name):
653        """tries multiple different rev-parse syntaxes with the given name
654        :return: parsed object"""
655        rev_parse = self.rorepo.rev_parse
656        orig_obj = rev_parse(name)
657        if orig_obj.type == 'tag':
658            obj = orig_obj.object
659        else:
660            obj = orig_obj
661        # END deref tags by default
662
663        # try history
664        rev = name + "~"
665        obj2 = rev_parse(rev)
666        self.assertEqual(obj2, obj.parents[0])
667        self._assert_rev_parse_types(rev, obj2)
668
669        # history with number
670        ni = 11
671        history = [obj.parents[0]]
672        for pn in range(ni):
673            history.append(history[-1].parents[0])
674        # END get given amount of commits
675
676        for pn in range(11):
677            rev = name + "~%i" % (pn + 1)
678            obj2 = rev_parse(rev)
679            self.assertEqual(obj2, history[pn])
680            self._assert_rev_parse_types(rev, obj2)
681        # END history check
682
683        # parent ( default )
684        rev = name + "^"
685        obj2 = rev_parse(rev)
686        self.assertEqual(obj2, obj.parents[0])
687        self._assert_rev_parse_types(rev, obj2)
688
689        # parent with number
690        for pn, parent in enumerate(obj.parents):
691            rev = name + "^%i" % (pn + 1)
692            self.assertEqual(rev_parse(rev), parent)
693            self._assert_rev_parse_types(rev, parent)
694        # END for each parent
695
696        return orig_obj
697
698    @with_rw_repo('HEAD', bare=False)
699    def test_rw_rev_parse(self, rwrepo):
700        # verify it does not confuse branches with hexsha ids
701        ahead = rwrepo.create_head('aaaaaaaa')
702        assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
703
704    def test_rev_parse(self):
705        rev_parse = self.rorepo.rev_parse
706
707        # try special case: This one failed at some point, make sure its fixed
708        self.assertEqual(rev_parse("33ebe").hexsha, "33ebe7acec14b25c5f84f35a664803fcab2f7781")
709
710        # start from reference
711        num_resolved = 0
712
713        for ref_no, ref in enumerate(Reference.iter_items(self.rorepo)):
714            path_tokens = ref.path.split("/")
715            for pt in range(len(path_tokens)):
716                path_section = '/'.join(path_tokens[-(pt + 1):])
717                try:
718                    obj = self._assert_rev_parse(path_section)
719                    self.assertEqual(obj.type, ref.object.type)
720                    num_resolved += 1
721                except (BadName, BadObject):
722                    print("failed on %s" % path_section)
723                    # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112
724                    pass
725                # END exception handling
726            # END for each token
727            if ref_no == 3 - 1:
728                break
729        # END for each reference
730        assert num_resolved
731
732        # it works with tags !
733        tag = self._assert_rev_parse('0.1.4')
734        self.assertEqual(tag.type, 'tag')
735
736        # try full sha directly ( including type conversion )
737        self.assertEqual(tag.object, rev_parse(tag.object.hexsha))
738        self._assert_rev_parse_types(tag.object.hexsha, tag.object)
739
740        # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
741        rev = '0.1.4^{tree}^{tree}'
742        self.assertEqual(rev_parse(rev), tag.object.tree)
743        self.assertEqual(rev_parse(rev + ':CHANGES'), tag.object.tree['CHANGES'])
744
745        # try to get parents from first revision - it should fail as no such revision
746        # exists
747        first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
748        commit = rev_parse(first_rev)
749        self.assertEqual(len(commit.parents), 0)
750        self.assertEqual(commit.hexsha, first_rev)
751        self.failUnlessRaises(BadName, rev_parse, first_rev + "~")
752        self.failUnlessRaises(BadName, rev_parse, first_rev + "^")
753
754        # short SHA1
755        commit2 = rev_parse(first_rev[:20])
756        self.assertEqual(commit2, commit)
757        commit2 = rev_parse(first_rev[:5])
758        self.assertEqual(commit2, commit)
759
760        # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
761        # needs a tag which points to a blob
762
763        # ref^0 returns commit being pointed to, same with ref~0, and ^{}
764        tag = rev_parse('0.1.4')
765        for token in (('~0', '^0', '^{}')):
766            self.assertEqual(tag.object, rev_parse('0.1.4%s' % token))
767        # END handle multiple tokens
768
769        # try partial parsing
770        max_items = 40
771        for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
772            self.assertEqual(rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha, binsha)
773            if i > max_items:
774                # this is rather slow currently, as rev_parse returns an object
775                # which requires accessing packs, it has some additional overhead
776                break
777        # END for each binsha in repo
778
779        # missing closing brace commit^{tree
780        self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
781
782        # missing starting brace
783        self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
784
785        # REVLOG
786        #######
787        head = self.rorepo.head
788
789        # need to specify a ref when using the @ syntax
790        self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
791
792        # uses HEAD.ref by default
793        self.assertEqual(rev_parse('@{0}'), head.commit)
794        if not head.is_detached:
795            refspec = '%s@{0}' % head.ref.name
796            self.assertEqual(rev_parse(refspec), head.ref.commit)
797            # all additional specs work as well
798            self.assertEqual(rev_parse(refspec + "^{tree}"), head.commit.tree)
799            self.assertEqual(rev_parse(refspec + ":CHANGES").type, 'blob')
800        # END operate on non-detached head
801
802        # position doesn't exist
803        self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
804
805        # currently, nothing more is supported
806        self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
807
808        # the last position
809        assert rev_parse('@{1}') != head.commit
810
811    def test_repo_odbtype(self):
812        target_type = GitCmdObjectDB
813        self.assertIsInstance(self.rorepo.odb, target_type)
814
815    def test_submodules(self):
816        self.assertEqual(len(self.rorepo.submodules), 1)  # non-recursive
817        self.assertGreaterEqual(len(list(self.rorepo.iter_submodules())), 2)
818
819        self.assertIsInstance(self.rorepo.submodule("gitdb"), Submodule)
820        self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
821
822    @with_rw_repo('HEAD', bare=False)
823    def test_submodule_update(self, rwrepo):
824        # fails in bare mode
825        rwrepo._bare = True
826        self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update)
827        rwrepo._bare = False
828
829        # test create submodule
830        sm = rwrepo.submodules[0]
831        sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
832        self.assertIsInstance(sm, Submodule)
833
834        # note: the rest of this functionality is tested in test_submodule
835
836    @with_rw_repo('HEAD')
837    def test_git_file(self, rwrepo):
838        # Move the .git directory to another location and create the .git file.
839        real_path_abs = osp.abspath(join_path_native(rwrepo.working_tree_dir, '.real'))
840        os.rename(rwrepo.git_dir, real_path_abs)
841        git_file_path = join_path_native(rwrepo.working_tree_dir, '.git')
842        with open(git_file_path, 'wb') as fp:
843            fp.write(fixture('git_file'))
844
845        # Create a repo and make sure it's pointing to the relocated .git directory.
846        git_file_repo = Repo(rwrepo.working_tree_dir)
847        self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs)
848
849        # Test using an absolute gitdir path in the .git file.
850        with open(git_file_path, 'wb') as fp:
851            fp.write(('gitdir: %s\n' % real_path_abs).encode('ascii'))
852        git_file_repo = Repo(rwrepo.working_tree_dir)
853        self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs)
854
855    def test_file_handle_leaks(self):
856        def last_commit(repo, rev, path):
857            commit = next(repo.iter_commits(rev, path, max_count=1))
858            commit.tree[path]
859
860        # This is based on this comment
861        # https://github.com/gitpython-developers/GitPython/issues/60#issuecomment-23558741
862        # And we expect to set max handles to a low value, like 64
863        # You should set ulimit -n X, see .travis.yml
864        # The loops below would easily create 500 handles if these would leak (4 pipes + multiple mapped files)
865        for _ in range(64):
866            for repo_type in (GitCmdObjectDB, GitDB):
867                repo = Repo(self.rorepo.working_tree_dir, odbt=repo_type)
868                last_commit(repo, 'master', 'git/test/test_base.py')
869            # end for each repository type
870        # end for each iteration
871
872    def test_remote_method(self):
873        self.failUnlessRaises(ValueError, self.rorepo.remote, 'foo-blue')
874        self.assertIsInstance(self.rorepo.remote(name='origin'), Remote)
875
876    @with_rw_directory
877    def test_empty_repo(self, rw_dir):
878        """Assure we can handle empty repositories"""
879        r = Repo.init(rw_dir, mkdir=False)
880        # It's ok not to be able to iterate a commit, as there is none
881        self.failUnlessRaises(ValueError, r.iter_commits)
882        self.assertEqual(r.active_branch.name, 'master')
883        assert not r.active_branch.is_valid(), "Branch is yet to be born"
884
885        # actually, when trying to create a new branch without a commit, git itself fails
886        # We should, however, not fail ungracefully
887        self.failUnlessRaises(BadName, r.create_head, 'foo')
888        self.failUnlessRaises(BadName, r.create_head, 'master')
889        # It's expected to not be able to access a tree
890        self.failUnlessRaises(ValueError, r.tree)
891
892        new_file_path = osp.join(rw_dir, "new_file.ext")
893        touch(new_file_path)
894        r.index.add([new_file_path])
895        r.index.commit("initial commit\nBAD MESSAGE 1\n")
896
897        # Now a branch should be creatable
898        nb = r.create_head('foo')
899        assert nb.is_valid()
900
901        with open(new_file_path, 'w') as f:
902            f.write('Line 1\n')
903
904        r.index.add([new_file_path])
905        r.index.commit("add line 1\nBAD MESSAGE 2\n")
906
907        with open('%s/.git/logs/refs/heads/master' % (rw_dir,), 'r') as f:
908            contents = f.read()
909
910        assert 'BAD MESSAGE' not in contents, 'log is corrupt'
911
912    def test_merge_base(self):
913        repo = self.rorepo
914        c1 = 'f6aa8d1'
915        c2 = repo.commit('d46e3fe')
916        c3 = '763ef75'
917        self.failUnlessRaises(ValueError, repo.merge_base)
918        self.failUnlessRaises(ValueError, repo.merge_base, 'foo')
919
920        # two commit merge-base
921        res = repo.merge_base(c1, c2)
922        self.assertIsInstance(res, list)
923        self.assertEqual(len(res), 1)
924        self.assertIsInstance(res[0], Commit)
925        self.assertTrue(res[0].hexsha.startswith('3936084'))
926
927        for kw in ('a', 'all'):
928            res = repo.merge_base(c1, c2, c3, **{kw: True})
929            self.assertIsInstance(res, list)
930            self.assertEqual(len(res), 1)
931        # end for each keyword signalling all merge-bases to be returned
932
933        # Test for no merge base - can't do as we have
934        self.failUnlessRaises(GitCommandError, repo.merge_base, c1, 'ffffff')
935
936    def test_is_ancestor(self):
937        git = self.rorepo.git
938        if git.version_info[:3] < (1, 8, 0):
939            raise SkipTest("git merge-base --is-ancestor feature unsupported")
940
941        repo = self.rorepo
942        c1 = 'f6aa8d1'
943        c2 = '763ef75'
944        self.assertTrue(repo.is_ancestor(c1, c1))
945        self.assertTrue(repo.is_ancestor("master", "master"))
946        self.assertTrue(repo.is_ancestor(c1, c2))
947        self.assertTrue(repo.is_ancestor(c1, "master"))
948        self.assertFalse(repo.is_ancestor(c2, c1))
949        self.assertFalse(repo.is_ancestor("master", c1))
950        for i, j in itertools.permutations([c1, 'ffffff', ''], r=2):
951            self.assertRaises(GitCommandError, repo.is_ancestor, i, j)
952
953    @with_rw_directory
954    def test_git_work_tree_dotgit(self, rw_dir):
955        """Check that we find .git as a worktree file and find the worktree
956        based on it."""
957        git = Git(rw_dir)
958        if git.version_info[:3] < (2, 5, 1):
959            raise SkipTest("worktree feature unsupported")
960
961        rw_master = self.rorepo.clone(join_path_native(rw_dir, 'master_repo'))
962        branch = rw_master.create_head('aaaaaaaa')
963        worktree_path = join_path_native(rw_dir, 'worktree_repo')
964        if Git.is_cygwin():
965            worktree_path = cygpath(worktree_path)
966        rw_master.git.worktree('add', worktree_path, branch.name)
967
968        # this ensures that we can read the repo's gitdir correctly
969        repo = Repo(worktree_path)
970        self.assertIsInstance(repo, Repo)
971
972        # this ensures we're able to actually read the refs in the tree, which
973        # means we can read commondir correctly.
974        commit = repo.head.commit
975        self.assertIsInstance(commit, Object)
976
977        self.assertIsInstance(repo.heads['aaaaaaaa'], Head)
978
979    @with_rw_directory
980    def test_git_work_tree_env(self, rw_dir):
981        """Check that we yield to GIT_WORK_TREE"""
982        # clone a repo
983        # move .git directory to a subdirectory
984        # set GIT_DIR and GIT_WORK_TREE appropriately
985        # check that repo.working_tree_dir == rw_dir
986        self.rorepo.clone(join_path_native(rw_dir, 'master_repo'))
987
988        repo_dir = join_path_native(rw_dir, 'master_repo')
989        old_git_dir = join_path_native(repo_dir, '.git')
990        new_subdir = join_path_native(repo_dir, 'gitdir')
991        new_git_dir = join_path_native(new_subdir, 'git')
992        os.mkdir(new_subdir)
993        os.rename(old_git_dir, new_git_dir)
994
995        oldenv = os.environ.copy()
996        os.environ['GIT_DIR'] = new_git_dir
997        os.environ['GIT_WORK_TREE'] = repo_dir
998
999        try:
1000            r = Repo()
1001            self.assertEqual(r.working_tree_dir, repo_dir)
1002            self.assertEqual(r.working_dir, repo_dir)
1003        finally:
1004            os.environ = oldenv
1005