1# test_object_store.py -- tests for object_store.py 2# Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk> 3# 4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 5# General Public License as public by the Free Software Foundation; version 2.0 6# or (at your option) any later version. You can redistribute it and/or 7# modify it under the terms of either of these two licenses. 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# You should have received a copy of the licenses; if not, see 16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 18# License, Version 2.0. 19# 20 21"""Tests for the object store interface.""" 22 23 24from contextlib import closing 25from io import BytesIO 26import os 27import shutil 28import stat 29import tempfile 30 31from dulwich.index import ( 32 commit_tree, 33 ) 34from dulwich.errors import ( 35 NotTreeError, 36 EmptyFileException, 37 ) 38from dulwich.objects import ( 39 sha_to_hex, 40 Blob, 41 Tree, 42 TreeEntry, 43 ) 44from dulwich.object_store import ( 45 DiskObjectStore, 46 MemoryObjectStore, 47 OverlayObjectStore, 48 ObjectStoreGraphWalker, 49 commit_tree_changes, 50 read_packs_file, 51 tree_lookup_path, 52 ) 53from dulwich.pack import ( 54 REF_DELTA, 55 write_pack_objects, 56 ) 57from dulwich.tests import ( 58 TestCase, 59 ) 60from dulwich.tests.utils import ( 61 make_object, 62 make_tag, 63 build_pack, 64 ) 65 66 67testobject = make_object(Blob, data=b"yummy data") 68 69 70class ObjectStoreTests(object): 71 72 def test_determine_wants_all(self): 73 self.assertEqual( 74 [b"1" * 40], 75 self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40})) 76 77 def test_determine_wants_all_zero(self): 78 self.assertEqual( 79 [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40})) 80 81 def test_iter(self): 82 self.assertEqual([], list(self.store)) 83 84 def test_get_nonexistant(self): 85 self.assertRaises(KeyError, lambda: self.store[b"a" * 40]) 86 87 def test_contains_nonexistant(self): 88 self.assertFalse((b"a" * 40) in self.store) 89 90 def test_add_objects_empty(self): 91 self.store.add_objects([]) 92 93 def test_add_commit(self): 94 # TODO: Argh, no way to construct Git commit objects without 95 # access to a serialized form. 96 self.store.add_objects([]) 97 98 def test_store_resilience(self): 99 """Test if updating an existing stored object doesn't erase the 100 object from the store. 101 """ 102 test_object = make_object(Blob, data=b'data') 103 104 self.store.add_object(test_object) 105 test_object_id = test_object.id 106 test_object.data = test_object.data + b'update' 107 stored_test_object = self.store[test_object_id] 108 109 self.assertNotEqual(test_object.id, stored_test_object.id) 110 self.assertEqual(stored_test_object.id, test_object_id) 111 112 def test_add_object(self): 113 self.store.add_object(testobject) 114 self.assertEqual(set([testobject.id]), set(self.store)) 115 self.assertTrue(testobject.id in self.store) 116 r = self.store[testobject.id] 117 self.assertEqual(r, testobject) 118 119 def test_add_objects(self): 120 data = [(testobject, "mypath")] 121 self.store.add_objects(data) 122 self.assertEqual(set([testobject.id]), set(self.store)) 123 self.assertTrue(testobject.id in self.store) 124 r = self.store[testobject.id] 125 self.assertEqual(r, testobject) 126 127 def test_tree_changes(self): 128 blob_a1 = make_object(Blob, data=b'a1') 129 blob_a2 = make_object(Blob, data=b'a2') 130 blob_b = make_object(Blob, data=b'b') 131 for blob in [blob_a1, blob_a2, blob_b]: 132 self.store.add_object(blob) 133 134 blobs_1 = [(b'a', blob_a1.id, 0o100644), (b'b', blob_b.id, 0o100644)] 135 tree1_id = commit_tree(self.store, blobs_1) 136 blobs_2 = [(b'a', blob_a2.id, 0o100644), (b'b', blob_b.id, 0o100644)] 137 tree2_id = commit_tree(self.store, blobs_2) 138 change_a = ((b'a', b'a'), (0o100644, 0o100644), 139 (blob_a1.id, blob_a2.id)) 140 self.assertEqual([change_a], 141 list(self.store.tree_changes(tree1_id, tree2_id))) 142 self.assertEqual( 143 [change_a, ((b'b', b'b'), (0o100644, 0o100644), 144 (blob_b.id, blob_b.id))], 145 list(self.store.tree_changes(tree1_id, tree2_id, 146 want_unchanged=True))) 147 148 def test_iter_tree_contents(self): 149 blob_a = make_object(Blob, data=b'a') 150 blob_b = make_object(Blob, data=b'b') 151 blob_c = make_object(Blob, data=b'c') 152 for blob in [blob_a, blob_b, blob_c]: 153 self.store.add_object(blob) 154 155 blobs = [ 156 (b'a', blob_a.id, 0o100644), 157 (b'ad/b', blob_b.id, 0o100644), 158 (b'ad/bd/c', blob_c.id, 0o100755), 159 (b'ad/c', blob_c.id, 0o100644), 160 (b'c', blob_c.id, 0o100644), 161 ] 162 tree_id = commit_tree(self.store, blobs) 163 self.assertEqual([TreeEntry(p, m, h) for (p, h, m) in blobs], 164 list(self.store.iter_tree_contents(tree_id))) 165 166 def test_iter_tree_contents_include_trees(self): 167 blob_a = make_object(Blob, data=b'a') 168 blob_b = make_object(Blob, data=b'b') 169 blob_c = make_object(Blob, data=b'c') 170 for blob in [blob_a, blob_b, blob_c]: 171 self.store.add_object(blob) 172 173 blobs = [ 174 (b'a', blob_a.id, 0o100644), 175 (b'ad/b', blob_b.id, 0o100644), 176 (b'ad/bd/c', blob_c.id, 0o100755), 177 ] 178 tree_id = commit_tree(self.store, blobs) 179 tree = self.store[tree_id] 180 tree_ad = self.store[tree[b'ad'][1]] 181 tree_bd = self.store[tree_ad[b'bd'][1]] 182 183 expected = [ 184 TreeEntry(b'', 0o040000, tree_id), 185 TreeEntry(b'a', 0o100644, blob_a.id), 186 TreeEntry(b'ad', 0o040000, tree_ad.id), 187 TreeEntry(b'ad/b', 0o100644, blob_b.id), 188 TreeEntry(b'ad/bd', 0o040000, tree_bd.id), 189 TreeEntry(b'ad/bd/c', 0o100755, blob_c.id), 190 ] 191 actual = self.store.iter_tree_contents(tree_id, include_trees=True) 192 self.assertEqual(expected, list(actual)) 193 194 def make_tag(self, name, obj): 195 tag = make_tag(obj, name=name) 196 self.store.add_object(tag) 197 return tag 198 199 def test_peel_sha(self): 200 self.store.add_object(testobject) 201 tag1 = self.make_tag(b'1', testobject) 202 tag2 = self.make_tag(b'2', testobject) 203 tag3 = self.make_tag(b'3', testobject) 204 for obj in [testobject, tag1, tag2, tag3]: 205 self.assertEqual(testobject, self.store.peel_sha(obj.id)) 206 207 def test_get_raw(self): 208 self.store.add_object(testobject) 209 self.assertEqual((Blob.type_num, b'yummy data'), 210 self.store.get_raw(testobject.id)) 211 212 def test_close(self): 213 # For now, just check that close doesn't barf. 214 self.store.add_object(testobject) 215 self.store.close() 216 217 218class OverlayObjectStoreTests(ObjectStoreTests, TestCase): 219 220 def setUp(self): 221 TestCase.setUp(self) 222 self.bases = [MemoryObjectStore(), MemoryObjectStore()] 223 self.store = OverlayObjectStore(self.bases, self.bases[0]) 224 225 226class MemoryObjectStoreTests(ObjectStoreTests, TestCase): 227 228 def setUp(self): 229 TestCase.setUp(self) 230 self.store = MemoryObjectStore() 231 232 def test_add_pack(self): 233 o = MemoryObjectStore() 234 f, commit, abort = o.add_pack() 235 try: 236 b = make_object(Blob, data=b"more yummy data") 237 write_pack_objects(f, [(b, None)]) 238 except BaseException: 239 abort() 240 raise 241 else: 242 commit() 243 244 def test_add_pack_emtpy(self): 245 o = MemoryObjectStore() 246 f, commit, abort = o.add_pack() 247 commit() 248 249 def test_add_thin_pack(self): 250 o = MemoryObjectStore() 251 blob = make_object(Blob, data=b'yummy data') 252 o.add_object(blob) 253 254 f = BytesIO() 255 entries = build_pack(f, [ 256 (REF_DELTA, (blob.id, b'more yummy data')), 257 ], store=o) 258 o.add_thin_pack(f.read, None) 259 packed_blob_sha = sha_to_hex(entries[0][3]) 260 self.assertEqual((Blob.type_num, b'more yummy data'), 261 o.get_raw(packed_blob_sha)) 262 263 def test_add_thin_pack_empty(self): 264 o = MemoryObjectStore() 265 266 f = BytesIO() 267 entries = build_pack(f, [], store=o) 268 self.assertEqual([], entries) 269 o.add_thin_pack(f.read, None) 270 271 272class PackBasedObjectStoreTests(ObjectStoreTests): 273 274 def tearDown(self): 275 for pack in self.store.packs: 276 pack.close() 277 278 def test_empty_packs(self): 279 self.assertEqual([], list(self.store.packs)) 280 281 def test_pack_loose_objects(self): 282 b1 = make_object(Blob, data=b"yummy data") 283 self.store.add_object(b1) 284 b2 = make_object(Blob, data=b"more yummy data") 285 self.store.add_object(b2) 286 b3 = make_object(Blob, data=b"even more yummy data") 287 b4 = make_object(Blob, data=b"and more yummy data") 288 self.store.add_objects([(b3, None), (b4, None)]) 289 self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store)) 290 self.assertEqual(1, len(self.store.packs)) 291 self.assertEqual(2, self.store.pack_loose_objects()) 292 self.assertNotEqual([], list(self.store.packs)) 293 self.assertEqual(0, self.store.pack_loose_objects()) 294 295 def test_repack(self): 296 b1 = make_object(Blob, data=b"yummy data") 297 self.store.add_object(b1) 298 b2 = make_object(Blob, data=b"more yummy data") 299 self.store.add_object(b2) 300 b3 = make_object(Blob, data=b"even more yummy data") 301 b4 = make_object(Blob, data=b"and more yummy data") 302 self.store.add_objects([(b3, None), (b4, None)]) 303 b5 = make_object(Blob, data=b"and more data") 304 b6 = make_object(Blob, data=b"and some more data") 305 self.store.add_objects([(b5, None), (b6, None)]) 306 self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, 307 set(self.store)) 308 self.assertEqual(2, len(self.store.packs)) 309 self.assertEqual(6, self.store.repack()) 310 self.assertEqual(1, len(self.store.packs)) 311 self.assertEqual(0, self.store.pack_loose_objects()) 312 313 def test_repack_existing(self): 314 b1 = make_object(Blob, data=b"yummy data") 315 self.store.add_object(b1) 316 b2 = make_object(Blob, data=b"more yummy data") 317 self.store.add_object(b2) 318 self.store.add_objects([(b1, None), (b2, None)]) 319 self.store.add_objects([(b2, None)]) 320 self.assertEqual({b1.id, b2.id}, set(self.store)) 321 self.assertEqual(2, len(self.store.packs)) 322 self.assertEqual(2, self.store.repack()) 323 self.assertEqual(1, len(self.store.packs)) 324 self.assertEqual(0, self.store.pack_loose_objects()) 325 326 self.assertEqual({b1.id, b2.id}, set(self.store)) 327 self.assertEqual(1, len(self.store.packs)) 328 self.assertEqual(2, self.store.repack()) 329 self.assertEqual(1, len(self.store.packs)) 330 self.assertEqual(0, self.store.pack_loose_objects()) 331 332 333class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase): 334 335 def setUp(self): 336 TestCase.setUp(self) 337 self.store_dir = tempfile.mkdtemp() 338 self.addCleanup(shutil.rmtree, self.store_dir) 339 self.store = DiskObjectStore.init(self.store_dir) 340 341 def tearDown(self): 342 TestCase.tearDown(self) 343 PackBasedObjectStoreTests.tearDown(self) 344 345 def test_loose_compression_level(self): 346 alternate_dir = tempfile.mkdtemp() 347 self.addCleanup(shutil.rmtree, alternate_dir) 348 alternate_store = DiskObjectStore( 349 alternate_dir, loose_compression_level=6) 350 b2 = make_object(Blob, data=b"yummy data") 351 alternate_store.add_object(b2) 352 353 def test_alternates(self): 354 alternate_dir = tempfile.mkdtemp() 355 self.addCleanup(shutil.rmtree, alternate_dir) 356 alternate_store = DiskObjectStore(alternate_dir) 357 b2 = make_object(Blob, data=b"yummy data") 358 alternate_store.add_object(b2) 359 store = DiskObjectStore(self.store_dir) 360 self.assertRaises(KeyError, store.__getitem__, b2.id) 361 store.add_alternate_path(alternate_dir) 362 self.assertIn(b2.id, store) 363 self.assertEqual(b2, store[b2.id]) 364 365 def test_corrupted_object_raise_exception(self): 366 """Corrupted sha1 disk file should raise specific exception""" 367 self.store.add_object(testobject) 368 self.assertEqual((Blob.type_num, b'yummy data'), 369 self.store.get_raw(testobject.id)) 370 self.assertTrue(self.store.contains_loose(testobject.id)) 371 self.assertIsNotNone(self.store._get_loose_object(testobject.id)) 372 373 path = self.store._get_shafile_path(testobject.id) 374 with open(path, 'wb') as f: # corrupt the file 375 f.write(b'') 376 377 expected_error_msg = 'Corrupted empty file detected' 378 try: 379 self.store.contains_loose(testobject.id) 380 except EmptyFileException as e: 381 self.assertEqual(str(e), expected_error_msg) 382 383 try: 384 self.store._get_loose_object(testobject.id) 385 except EmptyFileException as e: 386 self.assertEqual(str(e), expected_error_msg) 387 388 # this does not change iteration on loose objects though 389 self.assertEqual([testobject.id], 390 list(self.store._iter_loose_objects())) 391 392 def test_add_alternate_path(self): 393 store = DiskObjectStore(self.store_dir) 394 self.assertEqual([], list(store._read_alternate_paths())) 395 store.add_alternate_path("/foo/path") 396 self.assertEqual(["/foo/path"], list(store._read_alternate_paths())) 397 store.add_alternate_path("/bar/path") 398 self.assertEqual( 399 ["/foo/path", "/bar/path"], 400 list(store._read_alternate_paths())) 401 402 def test_rel_alternative_path(self): 403 alternate_dir = tempfile.mkdtemp() 404 self.addCleanup(shutil.rmtree, alternate_dir) 405 alternate_store = DiskObjectStore(alternate_dir) 406 b2 = make_object(Blob, data=b"yummy data") 407 alternate_store.add_object(b2) 408 store = DiskObjectStore(self.store_dir) 409 self.assertRaises(KeyError, store.__getitem__, b2.id) 410 store.add_alternate_path( 411 os.path.relpath(alternate_dir, self.store_dir)) 412 self.assertEqual(list(alternate_store), list(store.alternates[0])) 413 self.assertIn(b2.id, store) 414 self.assertEqual(b2, store[b2.id]) 415 416 def test_pack_dir(self): 417 o = DiskObjectStore(self.store_dir) 418 self.assertEqual(os.path.join(self.store_dir, "pack"), o.pack_dir) 419 420 def test_add_pack(self): 421 o = DiskObjectStore(self.store_dir) 422 f, commit, abort = o.add_pack() 423 try: 424 b = make_object(Blob, data=b"more yummy data") 425 write_pack_objects(f, [(b, None)]) 426 except BaseException: 427 abort() 428 raise 429 else: 430 commit() 431 432 def test_add_thin_pack(self): 433 o = DiskObjectStore(self.store_dir) 434 try: 435 blob = make_object(Blob, data=b'yummy data') 436 o.add_object(blob) 437 438 f = BytesIO() 439 entries = build_pack(f, [ 440 (REF_DELTA, (blob.id, b'more yummy data')), 441 ], store=o) 442 443 with o.add_thin_pack(f.read, None) as pack: 444 packed_blob_sha = sha_to_hex(entries[0][3]) 445 pack.check_length_and_checksum() 446 self.assertEqual( 447 sorted([blob.id, packed_blob_sha]), list(pack)) 448 self.assertTrue(o.contains_packed(packed_blob_sha)) 449 self.assertTrue(o.contains_packed(blob.id)) 450 self.assertEqual((Blob.type_num, b'more yummy data'), 451 o.get_raw(packed_blob_sha)) 452 finally: 453 o.close() 454 455 def test_add_thin_pack_empty(self): 456 with closing(DiskObjectStore(self.store_dir)) as o: 457 f = BytesIO() 458 entries = build_pack(f, [], store=o) 459 self.assertEqual([], entries) 460 o.add_thin_pack(f.read, None) 461 462 463class TreeLookupPathTests(TestCase): 464 465 def setUp(self): 466 TestCase.setUp(self) 467 self.store = MemoryObjectStore() 468 blob_a = make_object(Blob, data=b'a') 469 blob_b = make_object(Blob, data=b'b') 470 blob_c = make_object(Blob, data=b'c') 471 for blob in [blob_a, blob_b, blob_c]: 472 self.store.add_object(blob) 473 474 blobs = [ 475 (b'a', blob_a.id, 0o100644), 476 (b'ad/b', blob_b.id, 0o100644), 477 (b'ad/bd/c', blob_c.id, 0o100755), 478 (b'ad/c', blob_c.id, 0o100644), 479 (b'c', blob_c.id, 0o100644), 480 ] 481 self.tree_id = commit_tree(self.store, blobs) 482 483 def get_object(self, sha): 484 return self.store[sha] 485 486 def test_lookup_blob(self): 487 o_id = tree_lookup_path(self.get_object, self.tree_id, b'a')[1] 488 self.assertTrue(isinstance(self.store[o_id], Blob)) 489 490 def test_lookup_tree(self): 491 o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad')[1] 492 self.assertTrue(isinstance(self.store[o_id], Tree)) 493 o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad/bd')[1] 494 self.assertTrue(isinstance(self.store[o_id], Tree)) 495 o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad/bd/')[1] 496 self.assertTrue(isinstance(self.store[o_id], Tree)) 497 498 def test_lookup_nonexistent(self): 499 self.assertRaises( 500 KeyError, tree_lookup_path, self.get_object, self.tree_id, b'j') 501 502 def test_lookup_not_tree(self): 503 self.assertRaises( 504 NotTreeError, tree_lookup_path, self.get_object, self.tree_id, 505 b'ad/b/j') 506 507 508class ObjectStoreGraphWalkerTests(TestCase): 509 510 def get_walker(self, heads, parent_map): 511 new_parent_map = dict( 512 [(k * 40, [(p * 40) for p in ps]) 513 for (k, ps) in parent_map.items()]) 514 return ObjectStoreGraphWalker([x * 40 for x in heads], 515 new_parent_map.__getitem__) 516 517 def test_ack_invalid_value(self): 518 gw = self.get_walker([], {}) 519 self.assertRaises(ValueError, gw.ack, "tooshort") 520 521 def test_empty(self): 522 gw = self.get_walker([], {}) 523 self.assertIs(None, next(gw)) 524 gw.ack(b"a" * 40) 525 self.assertIs(None, next(gw)) 526 527 def test_descends(self): 528 gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []}) 529 self.assertEqual(b"a" * 40, next(gw)) 530 self.assertEqual(b"b" * 40, next(gw)) 531 532 def test_present(self): 533 gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []}) 534 gw.ack(b"a" * 40) 535 self.assertIs(None, next(gw)) 536 537 def test_parent_present(self): 538 gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []}) 539 self.assertEqual(b"a" * 40, next(gw)) 540 gw.ack(b"a" * 40) 541 self.assertIs(None, next(gw)) 542 543 def test_child_ack_later(self): 544 gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": [b"c"], b"c": []}) 545 self.assertEqual(b"a" * 40, next(gw)) 546 self.assertEqual(b"b" * 40, next(gw)) 547 gw.ack(b"a" * 40) 548 self.assertIs(None, next(gw)) 549 550 def test_only_once(self): 551 # a b 552 # | | 553 # c d 554 # \ / 555 # e 556 gw = self.get_walker([b"a", b"b"], { 557 b"a": [b"c"], 558 b"b": [b"d"], 559 b"c": [b"e"], 560 b"d": [b"e"], 561 b"e": [], 562 }) 563 walk = [] 564 acked = False 565 walk.append(next(gw)) 566 walk.append(next(gw)) 567 # A branch (a, c) or (b, d) may be done after 2 steps or 3 depending on 568 # the order walked: 3-step walks include (a, b, c) and (b, a, d), etc. 569 if walk == [b"a" * 40, b"c" * 40] or walk == [b"b" * 40, b"d" * 40]: 570 gw.ack(walk[0]) 571 acked = True 572 573 walk.append(next(gw)) 574 if not acked and walk[2] == b"c" * 40: 575 gw.ack(b"a" * 40) 576 elif not acked and walk[2] == b"d" * 40: 577 gw.ack(b"b" * 40) 578 walk.append(next(gw)) 579 self.assertIs(None, next(gw)) 580 581 self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40], 582 sorted(walk)) 583 self.assertLess(walk.index(b"a" * 40), walk.index(b"c" * 40)) 584 self.assertLess(walk.index(b"b" * 40), walk.index(b"d" * 40)) 585 586 587class CommitTreeChangesTests(TestCase): 588 589 def setUp(self): 590 super(CommitTreeChangesTests, self).setUp() 591 self.store = MemoryObjectStore() 592 self.blob_a = make_object(Blob, data=b'a') 593 self.blob_b = make_object(Blob, data=b'b') 594 self.blob_c = make_object(Blob, data=b'c') 595 for blob in [self.blob_a, self.blob_b, self.blob_c]: 596 self.store.add_object(blob) 597 598 blobs = [ 599 (b'a', self.blob_a.id, 0o100644), 600 (b'ad/b', self.blob_b.id, 0o100644), 601 (b'ad/bd/c', self.blob_c.id, 0o100755), 602 (b'ad/c', self.blob_c.id, 0o100644), 603 (b'c', self.blob_c.id, 0o100644), 604 ] 605 self.tree_id = commit_tree(self.store, blobs) 606 607 def test_no_changes(self): 608 self.assertEqual( 609 self.store[self.tree_id], 610 commit_tree_changes(self.store, self.store[self.tree_id], [])) 611 612 def test_add_blob(self): 613 blob_d = make_object(Blob, data=b'd') 614 new_tree = commit_tree_changes( 615 self.store, self.store[self.tree_id], [ 616 (b'd', 0o100644, blob_d.id)]) 617 self.assertEqual( 618 new_tree[b'd'], 619 (33188, b'c59d9b6344f1af00e504ba698129f07a34bbed8d')) 620 621 def test_add_blob_in_dir(self): 622 blob_d = make_object(Blob, data=b'd') 623 new_tree = commit_tree_changes( 624 self.store, self.store[self.tree_id], [ 625 (b'e/f/d', 0o100644, blob_d.id)]) 626 self.assertEqual( 627 new_tree.items(), [ 628 TreeEntry(path=b'a', mode=stat.S_IFREG | 0o100644, 629 sha=self.blob_a.id), 630 TreeEntry(path=b'ad', mode=stat.S_IFDIR, 631 sha=b'0e2ce2cd7725ff4817791be31ccd6e627e801f4a'), 632 TreeEntry(path=b'c', mode=stat.S_IFREG | 0o100644, 633 sha=self.blob_c.id), 634 TreeEntry(path=b'e', mode=stat.S_IFDIR, 635 sha=b'6ab344e288724ac2fb38704728b8896e367ed108') 636 ]) 637 e_tree = self.store[new_tree[b'e'][1]] 638 self.assertEqual( 639 e_tree.items(), [ 640 TreeEntry(path=b'f', mode=stat.S_IFDIR, 641 sha=b'24d2c94d8af232b15a0978c006bf61ef4479a0a5') 642 ]) 643 f_tree = self.store[e_tree[b'f'][1]] 644 self.assertEqual( 645 f_tree.items(), [ 646 TreeEntry(path=b'd', mode=stat.S_IFREG | 0o100644, 647 sha=blob_d.id) 648 ]) 649 650 def test_delete_blob(self): 651 new_tree = commit_tree_changes( 652 self.store, self.store[self.tree_id], [ 653 (b'ad/bd/c', None, None)]) 654 self.assertEqual(set(new_tree), {b'a', b'ad', b'c'}) 655 ad_tree = self.store[new_tree[b'ad'][1]] 656 self.assertEqual(set(ad_tree), {b'b', b'c'}) 657 658 659class TestReadPacksFile(TestCase): 660 661 def test_read_packs(self): 662 self.assertEqual(["pack-1.pack"], list(read_packs_file(BytesIO(b"""P pack-1.pack 663""")))) 664