1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
2#
3# This module is part of GitDB and is released under
4# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
5"""Test for object db"""
6
7from gitdb.test.lib import (
8    TestBase,
9    DummyStream,
10    make_bytes,
11    make_object,
12    fixture_path
13)
14
15from gitdb import (
16    DecompressMemMapReader,
17    FDCompressedSha1Writer,
18    LooseObjectDB,
19    Sha1Writer,
20    MemoryDB,
21    IStream,
22)
23from gitdb.util import hex_to_bin
24
25import zlib
26from gitdb.typ import (
27    str_blob_type
28)
29
30import tempfile
31import os
32from io import BytesIO
33
34
35class TestStream(TestBase):
36
37    """Test stream classes"""
38
39    data_sizes = (15, 10000, 1000 * 1024 + 512)
40
41    def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
42        """Make stream tests - the orig_stream is seekable, allowing it to be
43        rewound and reused
44        :param cdata: the data we expect to read from stream, the contents
45        :param rewind_stream: function called to rewind the stream to make it ready
46            for reuse"""
47        ns = 10
48        assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
49
50        # read in small steps
51        ss = len(cdata) // ns
52        for i in range(ns):
53            data = stream.read(ss)
54            chunk = cdata[i * ss:(i + 1) * ss]
55            assert data == chunk
56        # END for each step
57        rest = stream.read()
58        if rest:
59            assert rest == cdata[-len(rest):]
60        # END handle rest
61
62        if isinstance(stream, DecompressMemMapReader):
63            assert len(stream.data()) == stream.compressed_bytes_read()
64        # END handle special type
65
66        rewind_stream(stream)
67
68        # read everything
69        rdata = stream.read()
70        assert rdata == cdata
71
72        if isinstance(stream, DecompressMemMapReader):
73            assert len(stream.data()) == stream.compressed_bytes_read()
74        # END handle special type
75
76    def test_decompress_reader(self):
77        for close_on_deletion in range(2):
78            for with_size in range(2):
79                for ds in self.data_sizes:
80                    cdata = make_bytes(ds, randomize=False)
81
82                    # zdata = zipped actual data
83                    # cdata = original content data
84
85                    # create reader
86                    if with_size:
87                        # need object data
88                        zdata = zlib.compress(make_object(str_blob_type, cdata))
89                        typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
90                        assert size == len(cdata)
91                        assert typ == str_blob_type
92
93                        # even if we don't set the size, it will be set automatically on first read
94                        test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
95                        assert test_reader._s == len(cdata)
96                    else:
97                        # here we need content data
98                        zdata = zlib.compress(cdata)
99                        reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
100                        assert reader._s == len(cdata)
101                    # END get reader
102
103                    self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
104
105                    # put in a dummy stream for closing
106                    dummy = DummyStream()
107                    reader._m = dummy
108
109                    assert not dummy.closed
110                    del(reader)
111                    assert dummy.closed == close_on_deletion
112                # END for each datasize
113            # END whether size should be used
114        # END whether stream should be closed when deleted
115
116    def test_sha_writer(self):
117        writer = Sha1Writer()
118        assert 2 == writer.write("hi".encode("ascii"))
119        assert len(writer.sha(as_hex=1)) == 40
120        assert len(writer.sha(as_hex=0)) == 20
121
122        # make sure it does something ;)
123        prev_sha = writer.sha()
124        writer.write("hi again".encode("ascii"))
125        assert writer.sha() != prev_sha
126
127    def test_compressed_writer(self):
128        for ds in self.data_sizes:
129            fd, path = tempfile.mkstemp()
130            ostream = FDCompressedSha1Writer(fd)
131            data = make_bytes(ds, randomize=False)
132
133            # for now, just a single write, code doesn't care about chunking
134            assert len(data) == ostream.write(data)
135            ostream.close()
136
137            # its closed already
138            self.failUnlessRaises(OSError, os.close, fd)
139
140            # read everything back, compare to data we zip
141            fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
142            written_data = os.read(fd, os.path.getsize(path))
143            assert len(written_data) == os.path.getsize(path)
144            os.close(fd)
145            assert written_data == zlib.compress(data, 1)   # best speed
146
147            os.remove(path)
148        # END for each os
149
150    def test_decompress_reader_special_case(self):
151        odb = LooseObjectDB(fixture_path('objects'))
152        mdb = MemoryDB()
153        for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
154                    b'7bb839852ed5e3a069966281bb08d50012fb309b',):
155            ostream = odb.stream(hex_to_bin(sha))
156
157            # if there is a bug, we will be missing one byte exactly !
158            data = ostream.read()
159            assert len(data) == ostream.size
160
161            # Putting it back in should yield nothing new - after all, we have
162            dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
163            assert dump.hexsha == sha
164        # end for each loose object sha to test
165