1"""The module implements compression/decompression with snappy using
2Hadoop snappy format: https://github.com/kubo/snzip#hadoop-snappy-format
3
4Expected usage like:
5
6    import snappy
7
8    src = 'uncompressed'
9    dst = 'compressed'
10    dst2 = 'decompressed'
11
12    with open(src, 'rb') as fin, open(dst, 'wb') as fout:
13        snappy.hadoop_stream_compress(src, dst)
14
15    with open(dst, 'rb') as fin, open(dst2, 'wb') as fout:
16        snappy.hadoop_stream_decompress(fin, fout)
17
18    with open(src, 'rb') as fin1, open(dst2, 'rb') as fin2:
19        assert fin1.read() == fin2.read()
20
21"""
22from __future__ import absolute_import
23
24import struct
25
26from .snappy import (
27    _compress, _uncompress,
28    stream_compress as _stream_compress,
29    stream_decompress as _stream_decompress,
30    check_format as _check_format,
31    UncompressError,
32    _CHUNK_MAX)
33
34
35SNAPPY_BUFFER_SIZE_DEFAULT = 256 * 1024
36_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX
37
38_INT_SIZE = 4
39
40
41def pack_int(num):
42    big_endian_uint = struct.pack('>I', num)
43    return big_endian_uint
44
45
46def unpack_int(data):
47    return struct.unpack('>I', data)[0]
48
49
50class StreamCompressor(object):
51
52    """This class implements the compressor-side of the hadoop snappy
53    format, taken from https://github.com/kubo/snzip#hadoop-snappy-format
54
55    Keep in mind that this compressor object does no buffering for you to
56    appropriately size chunks. Every call to StreamCompressor.compress results
57    in a unique call to the underlying snappy compression method.
58    """
59
60    def __init__(self):
61        pass
62
63    def add_chunk(self, data):
64        """Add a chunk containing 'data', returning a string that is
65        compressed. This data should be concatenated to
66        the tail end of an existing Snappy stream. In the absence of any
67        internal buffering, no data is left in any internal buffers, and so
68        unlike zlib.compress, this method returns everything.
69        """
70        out = []
71        uncompressed_length = len(data)
72        out.append(pack_int(uncompressed_length))
73        compressed_chunk = _compress(data)
74        compressed_length = len(compressed_chunk)
75        out.append(pack_int(compressed_length))
76        out.append(compressed_chunk)
77        return b"".join(out)
78
79    def compress(self, data):
80        """This method is simply an alias for compatibility with zlib
81        compressobj's compress method.
82        """
83        return self.add_chunk(data)
84
85    def flush(self, mode=None):
86        """This method does nothing and only exists for compatibility with
87        the zlib compressobj
88        """
89        pass
90
91    def copy(self):
92        """This method exists for compatibility with the zlib compressobj.
93        """
94        return StreamCompressor()
95
96
97class StreamDecompressor(object):
98
99    """This class implements the decompressor-side of the hadoop snappy
100    format.
101
102    This class matches a subset of the interface found for the zlib module's
103    decompression objects (see zlib.decompressobj). Specifically, it currently
104    implements the decompress method without the max_length option, the flush
105    method without the length option, and the copy method.
106    """
107
108    __slots__ = ["_buf", "_block_length", "_uncompressed_length"]
109
110    def __init__(self):
111        self._buf = b""
112        # current block length
113        self._block_length = 0
114        # total uncompressed data length of the current block
115        self._uncompressed_length = 0
116
117    @staticmethod
118    def check_format(data):
119        """Just checks that first two integers (big endian four-bytes int)
120        in the given data block comply to: first int >= second int.
121        This is a simple assumption that we have in the data a start of a
122        block for hadoop snappy format. It should contain uncompressed block
123        length as the first integer, and compressed subblock length as the
124        second integer.
125        Raises UncompressError if the condition is not fulfilled.
126        :return: None
127        """
128        int_size = _INT_SIZE
129        if len(data) < int_size * 2:
130            raise UncompressError("Too short data length")
131        # We cant actually be sure abot the format here.
132        # Assumption that compressed data length is less than uncompressed
133        # is not true in general.
134        # So, just don't check anything
135        return
136
137    def decompress(self, data):
138        """Decompress 'data', returning a string containing the uncompressed
139        data corresponding to at least part of the data in string. This data
140        should be concatenated to the output produced by any preceding calls to
141        the decompress() method. Some of the input data may be preserved in
142        internal buffers for later processing.
143        """
144        int_size = _INT_SIZE
145        self._buf += data
146        uncompressed = []
147        while True:
148            if len(self._buf) < int_size:
149                return b"".join(uncompressed)
150            next_start = 0
151            if not self._block_length:
152                self._block_length = unpack_int(self._buf[:int_size])
153                self._buf = self._buf[int_size:]
154                if len(self._buf) < int_size:
155                    return b"".join(uncompressed)
156            compressed_length = unpack_int(
157                self._buf[next_start:next_start + int_size]
158            )
159            next_start += int_size
160            if len(self._buf) < compressed_length + next_start:
161                return b"".join(uncompressed)
162            chunk = self._buf[
163                next_start:next_start + compressed_length
164            ]
165            self._buf = self._buf[next_start + compressed_length:]
166            uncompressed_chunk = _uncompress(chunk)
167            self._uncompressed_length += len(uncompressed_chunk)
168            uncompressed.append(uncompressed_chunk)
169            if self._uncompressed_length == self._block_length:
170                # Here we have uncompressed all subblocks of the current block
171                self._uncompressed_length = 0
172                self._block_length = 0
173                continue
174
175    def flush(self):
176        """All pending input is processed, and a string containing the
177        remaining uncompressed output is returned. After calling flush(), the
178        decompress() method cannot be called again; the only realistic action
179        is to delete the object.
180        """
181        if self._buf != b"":
182            raise UncompressError("chunk truncated")
183        return b""
184
185    def copy(self):
186        """Returns a copy of the decompression object. This can be used to save
187        the state of the decompressor midway through the data stream in order
188        to speed up random seeks into the stream at a future point.
189        """
190        copy = StreamDecompressor()
191        copy._buf = self._buf
192        copy._block_length = self._block_length
193        copy._uncompressed_length = self._uncompressed_length
194        return copy
195
196
197def stream_compress(src, dst, blocksize=SNAPPY_BUFFER_SIZE_DEFAULT):
198    return _stream_compress(
199        src, dst, blocksize=blocksize, compressor_cls=StreamCompressor
200    )
201
202
203def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,
204                      start_chunk=None):
205    return _stream_decompress(
206        src, dst, blocksize=blocksize,
207        decompressor_cls=StreamDecompressor,
208        start_chunk=start_chunk
209    )
210
211
212def check_format(fin=None, chunk=None, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
213    return _check_format(
214        fin=fin, chunk=chunk, blocksize=blocksize,
215        decompressor_cls=StreamDecompressor
216    )
217