1from io import BytesIO 2from stat import S_IFDIR, S_IFREG, S_IFLNK 3from os import stat 4import os.path as osp 5from unittest import skipIf, SkipTest 6 7from git import Git 8from git.compat import PY3 9from git.index import IndexFile 10from git.index.fun import ( 11 aggressive_tree_merge 12) 13from git.objects.fun import ( 14 traverse_tree_recursive, 15 traverse_trees_recursive, 16 tree_to_stream, 17 tree_entries_from_data, 18) 19from git.repo.fun import ( 20 find_worktree_git_dir 21) 22from git.test.lib import ( 23 assert_true, 24 TestBase, 25 with_rw_repo, 26 with_rw_directory 27) 28from git.util import bin_to_hex, cygpath, join_path_native 29from gitdb.base import IStream 30from gitdb.typ import str_tree_type 31 32 33class TestFun(TestBase): 34 35 def _assert_index_entries(self, entries, trees): 36 index = IndexFile.from_tree(self.rorepo, *[self.rorepo.tree(bin_to_hex(t).decode('ascii')) for t in trees]) 37 assert entries 38 assert len(index.entries) == len(entries) 39 for entry in entries: 40 assert (entry.path, entry.stage) in index.entries 41 # END assert entry matches fully 42 43 def test_aggressive_tree_merge(self): 44 # head tree with additions, removals and modification compared to its predecessor 45 odb = self.rorepo.odb 46 HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c") 47 H = HC.tree 48 B = HC.parents[0].tree 49 50 # entries from single tree 51 trees = [H.binsha] 52 self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) 53 54 # from multiple trees 55 trees = [B.binsha, H.binsha] 56 self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) 57 58 # three way, no conflict 59 tree = self.rorepo.tree 60 B = tree("35a09c0534e89b2d43ec4101a5fb54576b577905") 61 H = tree("4fe5cfa0e063a8d51a1eb6f014e2aaa994e5e7d4") 62 M = tree("1f2b19de3301e76ab3a6187a49c9c93ff78bafbd") 63 trees = [B.binsha, H.binsha, M.binsha] 64 self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) 65 66 # three-way, conflict in at least one file, both modified 67 B = tree("a7a4388eeaa4b6b94192dce67257a34c4a6cbd26") 68 H = tree("f9cec00938d9059882bb8eabdaf2f775943e00e5") 69 M = tree("44a601a068f4f543f73fd9c49e264c931b1e1652") 70 trees = [B.binsha, H.binsha, M.binsha] 71 self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) 72 73 # too many trees 74 self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees * 2) 75 76 def mktree(self, odb, entries): 77 """create a tree from the given tree entries and safe it to the database""" 78 sio = BytesIO() 79 tree_to_stream(entries, sio.write) 80 sio.seek(0) 81 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) 82 return istream.binsha 83 84 @with_rw_repo('0.1.6') 85 def test_three_way_merge(self, rwrepo): 86 def mkfile(name, sha, executable=0): 87 return (sha, S_IFREG | 0o644 | executable * 0o111, name) 88 89 def mkcommit(name, sha): 90 return (sha, S_IFDIR | S_IFLNK, name) 91 92 def assert_entries(entries, num_entries, has_conflict=False): 93 assert len(entries) == num_entries 94 assert has_conflict == (len([e for e in entries if e.stage != 0]) > 0) 95 mktree = self.mktree 96 97 shaa = b"\1" * 20 98 shab = b"\2" * 20 99 shac = b"\3" * 20 100 101 odb = rwrepo.odb 102 103 # base tree 104 bfn = 'basefile' 105 fbase = mkfile(bfn, shaa) 106 tb = mktree(odb, [fbase]) 107 108 # non-conflicting new files, same data 109 fa = mkfile('1', shab) 110 th = mktree(odb, [fbase, fa]) 111 fb = mkfile('2', shac) 112 tm = mktree(odb, [fbase, fb]) 113 114 # two new files, same base file 115 trees = [tb, th, tm] 116 assert_entries(aggressive_tree_merge(odb, trees), 3) 117 118 # both delete same file, add own one 119 fa = mkfile('1', shab) 120 th = mktree(odb, [fa]) 121 fb = mkfile('2', shac) 122 tm = mktree(odb, [fb]) 123 124 # two new files 125 trees = [tb, th, tm] 126 assert_entries(aggressive_tree_merge(odb, trees), 2) 127 128 # same file added in both, differently 129 fa = mkfile('1', shab) 130 th = mktree(odb, [fa]) 131 fb = mkfile('1', shac) 132 tm = mktree(odb, [fb]) 133 134 # expect conflict 135 trees = [tb, th, tm] 136 assert_entries(aggressive_tree_merge(odb, trees), 2, True) 137 138 # same file added, different mode 139 fa = mkfile('1', shab) 140 th = mktree(odb, [fa]) 141 fb = mkcommit('1', shab) 142 tm = mktree(odb, [fb]) 143 144 # expect conflict 145 trees = [tb, th, tm] 146 assert_entries(aggressive_tree_merge(odb, trees), 2, True) 147 148 # same file added in both 149 fa = mkfile('1', shab) 150 th = mktree(odb, [fa]) 151 fb = mkfile('1', shab) 152 tm = mktree(odb, [fb]) 153 154 # expect conflict 155 trees = [tb, th, tm] 156 assert_entries(aggressive_tree_merge(odb, trees), 1) 157 158 # modify same base file, differently 159 fa = mkfile(bfn, shab) 160 th = mktree(odb, [fa]) 161 fb = mkfile(bfn, shac) 162 tm = mktree(odb, [fb]) 163 164 # conflict, 3 versions on 3 stages 165 trees = [tb, th, tm] 166 assert_entries(aggressive_tree_merge(odb, trees), 3, True) 167 168 # change mode on same base file, by making one a commit, the other executable 169 # no content change ( this is totally unlikely to happen in the real world ) 170 fa = mkcommit(bfn, shaa) 171 th = mktree(odb, [fa]) 172 fb = mkfile(bfn, shaa, executable=1) 173 tm = mktree(odb, [fb]) 174 175 # conflict, 3 versions on 3 stages, because of different mode 176 trees = [tb, th, tm] 177 assert_entries(aggressive_tree_merge(odb, trees), 3, True) 178 179 for is_them in range(2): 180 # only we/they change contents 181 fa = mkfile(bfn, shab) 182 th = mktree(odb, [fa]) 183 184 trees = [tb, th, tb] 185 if is_them: 186 trees = [tb, tb, th] 187 entries = aggressive_tree_merge(odb, trees) 188 assert len(entries) == 1 and entries[0].binsha == shab 189 190 # only we/they change the mode 191 fa = mkcommit(bfn, shaa) 192 th = mktree(odb, [fa]) 193 194 trees = [tb, th, tb] 195 if is_them: 196 trees = [tb, tb, th] 197 entries = aggressive_tree_merge(odb, trees) 198 assert len(entries) == 1 and entries[0].binsha == shaa and entries[0].mode == fa[1] 199 200 # one side deletes, the other changes = conflict 201 fa = mkfile(bfn, shab) 202 th = mktree(odb, [fa]) 203 tm = mktree(odb, []) 204 trees = [tb, th, tm] 205 if is_them: 206 trees = [tb, tm, th] 207 # as one is deleted, there are only 2 entries 208 assert_entries(aggressive_tree_merge(odb, trees), 2, True) 209 # END handle ours, theirs 210 211 def _assert_tree_entries(self, entries, num_trees): 212 for entry in entries: 213 assert len(entry) == num_trees 214 paths = {e[2] for e in entry if e} 215 216 # only one path per set of entries 217 assert len(paths) == 1 218 # END verify entry 219 220 def test_tree_traversal(self): 221 # low level tree tarversal 222 odb = self.rorepo.odb 223 H = self.rorepo.tree('29eb123beb1c55e5db4aa652d843adccbd09ae18') # head tree 224 M = self.rorepo.tree('e14e3f143e7260de9581aee27e5a9b2645db72de') # merge tree 225 B = self.rorepo.tree('f606937a7a21237c866efafcad33675e6539c103') # base tree 226 B_old = self.rorepo.tree('1f66cfbbce58b4b552b041707a12d437cc5f400a') # old base tree 227 228 # two very different trees 229 entries = traverse_trees_recursive(odb, [B_old.binsha, H.binsha], '') 230 self._assert_tree_entries(entries, 2) 231 232 oentries = traverse_trees_recursive(odb, [H.binsha, B_old.binsha], '') 233 assert len(oentries) == len(entries) 234 self._assert_tree_entries(oentries, 2) 235 236 # single tree 237 is_no_tree = lambda i, d: i.type != 'tree' 238 entries = traverse_trees_recursive(odb, [B.binsha], '') 239 assert len(entries) == len(list(B.traverse(predicate=is_no_tree))) 240 self._assert_tree_entries(entries, 1) 241 242 # two trees 243 entries = traverse_trees_recursive(odb, [B.binsha, H.binsha], '') 244 self._assert_tree_entries(entries, 2) 245 246 # tree trees 247 entries = traverse_trees_recursive(odb, [B.binsha, H.binsha, M.binsha], '') 248 self._assert_tree_entries(entries, 3) 249 250 def test_tree_traversal_single(self): 251 max_count = 50 252 count = 0 253 odb = self.rorepo.odb 254 for commit in self.rorepo.commit("29eb123beb1c55e5db4aa652d843adccbd09ae18").traverse(): 255 if count >= max_count: 256 break 257 count += 1 258 entries = traverse_tree_recursive(odb, commit.tree.binsha, '') 259 assert entries 260 # END for each commit 261 262 @with_rw_directory 263 def test_linked_worktree_traversal(self, rw_dir): 264 """Check that we can identify a linked worktree based on a .git file""" 265 git = Git(rw_dir) 266 if git.version_info[:3] < (2, 5, 1): 267 raise SkipTest("worktree feature unsupported") 268 269 rw_master = self.rorepo.clone(join_path_native(rw_dir, 'master_repo')) 270 branch = rw_master.create_head('aaaaaaaa') 271 worktree_path = join_path_native(rw_dir, 'worktree_repo') 272 if Git.is_cygwin(): 273 worktree_path = cygpath(worktree_path) 274 rw_master.git.worktree('add', worktree_path, branch.name) 275 276 dotgit = osp.join(worktree_path, ".git") 277 statbuf = stat(dotgit) 278 assert_true(statbuf.st_mode & S_IFREG) 279 280 gitdir = find_worktree_git_dir(dotgit) 281 self.assertIsNotNone(gitdir) 282 statbuf = stat(gitdir) 283 assert_true(statbuf.st_mode & S_IFDIR) 284 285 @skipIf(PY3, 'odd types returned ... maybe figure it out one day') 286 def test_tree_entries_from_data_with_failing_name_decode_py2(self): 287 r = tree_entries_from_data(b'100644 \x9f\0aaa') 288 assert r == [('aaa', 33188, u'\udc9f')], r 289 290 @skipIf(not PY3, 'odd types returned ... maybe figure it out one day') 291 def test_tree_entries_from_data_with_failing_name_decode_py3(self): 292 r = tree_entries_from_data(b'100644 \x9f\0aaa') 293 assert r == [(b'aaa', 33188, '\udc9f')], r 294