1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors 2# 3# This module is part of GitDB and is released under 4# the New BSD License: http://www.opensource.org/licenses/bsd-license.php 5"""Test for object db""" 6 7from gitdb.test.lib import ( 8 TestBase, 9 DummyStream, 10 make_bytes, 11 make_object, 12 fixture_path 13) 14 15from gitdb import ( 16 DecompressMemMapReader, 17 FDCompressedSha1Writer, 18 LooseObjectDB, 19 Sha1Writer, 20 MemoryDB, 21 IStream, 22) 23from gitdb.util import hex_to_bin 24 25import zlib 26from gitdb.typ import ( 27 str_blob_type 28) 29 30import tempfile 31import os 32from io import BytesIO 33 34 35class TestStream(TestBase): 36 37 """Test stream classes""" 38 39 data_sizes = (15, 10000, 1000 * 1024 + 512) 40 41 def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): 42 """Make stream tests - the orig_stream is seekable, allowing it to be 43 rewound and reused 44 :param cdata: the data we expect to read from stream, the contents 45 :param rewind_stream: function called to rewind the stream to make it ready 46 for reuse""" 47 ns = 10 48 assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) 49 50 # read in small steps 51 ss = len(cdata) // ns 52 for i in range(ns): 53 data = stream.read(ss) 54 chunk = cdata[i * ss:(i + 1) * ss] 55 assert data == chunk 56 # END for each step 57 rest = stream.read() 58 if rest: 59 assert rest == cdata[-len(rest):] 60 # END handle rest 61 62 if isinstance(stream, DecompressMemMapReader): 63 assert len(stream.data()) == stream.compressed_bytes_read() 64 # END handle special type 65 66 rewind_stream(stream) 67 68 # read everything 69 rdata = stream.read() 70 assert rdata == cdata 71 72 if isinstance(stream, DecompressMemMapReader): 73 assert len(stream.data()) == stream.compressed_bytes_read() 74 # END handle special type 75 76 def test_decompress_reader(self): 77 for close_on_deletion in range(2): 78 for with_size in range(2): 79 for ds in self.data_sizes: 80 cdata = make_bytes(ds, randomize=False) 81 82 # zdata = zipped actual data 83 # cdata = original content data 84 85 # create reader 86 if with_size: 87 # need object data 88 zdata = zlib.compress(make_object(str_blob_type, cdata)) 89 typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) 90 assert size == len(cdata) 91 assert typ == str_blob_type 92 93 # even if we don't set the size, it will be set automatically on first read 94 test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) 95 assert test_reader._s == len(cdata) 96 else: 97 # here we need content data 98 zdata = zlib.compress(cdata) 99 reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) 100 assert reader._s == len(cdata) 101 # END get reader 102 103 self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) 104 105 # put in a dummy stream for closing 106 dummy = DummyStream() 107 reader._m = dummy 108 109 assert not dummy.closed 110 del(reader) 111 assert dummy.closed == close_on_deletion 112 # END for each datasize 113 # END whether size should be used 114 # END whether stream should be closed when deleted 115 116 def test_sha_writer(self): 117 writer = Sha1Writer() 118 assert 2 == writer.write("hi".encode("ascii")) 119 assert len(writer.sha(as_hex=1)) == 40 120 assert len(writer.sha(as_hex=0)) == 20 121 122 # make sure it does something ;) 123 prev_sha = writer.sha() 124 writer.write("hi again".encode("ascii")) 125 assert writer.sha() != prev_sha 126 127 def test_compressed_writer(self): 128 for ds in self.data_sizes: 129 fd, path = tempfile.mkstemp() 130 ostream = FDCompressedSha1Writer(fd) 131 data = make_bytes(ds, randomize=False) 132 133 # for now, just a single write, code doesn't care about chunking 134 assert len(data) == ostream.write(data) 135 ostream.close() 136 137 # its closed already 138 self.failUnlessRaises(OSError, os.close, fd) 139 140 # read everything back, compare to data we zip 141 fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) 142 written_data = os.read(fd, os.path.getsize(path)) 143 assert len(written_data) == os.path.getsize(path) 144 os.close(fd) 145 assert written_data == zlib.compress(data, 1) # best speed 146 147 os.remove(path) 148 # END for each os 149 150 def test_decompress_reader_special_case(self): 151 odb = LooseObjectDB(fixture_path('objects')) 152 mdb = MemoryDB() 153 for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', 154 b'7bb839852ed5e3a069966281bb08d50012fb309b',): 155 ostream = odb.stream(hex_to_bin(sha)) 156 157 # if there is a bug, we will be missing one byte exactly ! 158 data = ostream.read() 159 assert len(data) == ostream.size 160 161 # Putting it back in should yield nothing new - after all, we have 162 dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) 163 assert dump.hexsha == sha 164 # end for each loose object sha to test 165