1"""The module implements compression/decompression with snappy using 2Hadoop snappy format: https://github.com/kubo/snzip#hadoop-snappy-format 3 4Expected usage like: 5 6 import snappy 7 8 src = 'uncompressed' 9 dst = 'compressed' 10 dst2 = 'decompressed' 11 12 with open(src, 'rb') as fin, open(dst, 'wb') as fout: 13 snappy.hadoop_stream_compress(src, dst) 14 15 with open(dst, 'rb') as fin, open(dst2, 'wb') as fout: 16 snappy.hadoop_stream_decompress(fin, fout) 17 18 with open(src, 'rb') as fin1, open(dst2, 'rb') as fin2: 19 assert fin1.read() == fin2.read() 20 21""" 22from __future__ import absolute_import 23 24import struct 25 26from .snappy import ( 27 _compress, _uncompress, 28 stream_compress as _stream_compress, 29 stream_decompress as _stream_decompress, 30 check_format as _check_format, 31 UncompressError, 32 _CHUNK_MAX) 33 34 35SNAPPY_BUFFER_SIZE_DEFAULT = 256 * 1024 36_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX 37 38_INT_SIZE = 4 39 40 41def pack_int(num): 42 big_endian_uint = struct.pack('>I', num) 43 return big_endian_uint 44 45 46def unpack_int(data): 47 return struct.unpack('>I', data)[0] 48 49 50class StreamCompressor(object): 51 52 """This class implements the compressor-side of the hadoop snappy 53 format, taken from https://github.com/kubo/snzip#hadoop-snappy-format 54 55 Keep in mind that this compressor object does no buffering for you to 56 appropriately size chunks. Every call to StreamCompressor.compress results 57 in a unique call to the underlying snappy compression method. 58 """ 59 60 def __init__(self): 61 pass 62 63 def add_chunk(self, data): 64 """Add a chunk containing 'data', returning a string that is 65 compressed. This data should be concatenated to 66 the tail end of an existing Snappy stream. In the absence of any 67 internal buffering, no data is left in any internal buffers, and so 68 unlike zlib.compress, this method returns everything. 69 """ 70 out = [] 71 uncompressed_length = len(data) 72 out.append(pack_int(uncompressed_length)) 73 compressed_chunk = _compress(data) 74 compressed_length = len(compressed_chunk) 75 out.append(pack_int(compressed_length)) 76 out.append(compressed_chunk) 77 return b"".join(out) 78 79 def compress(self, data): 80 """This method is simply an alias for compatibility with zlib 81 compressobj's compress method. 82 """ 83 return self.add_chunk(data) 84 85 def flush(self, mode=None): 86 """This method does nothing and only exists for compatibility with 87 the zlib compressobj 88 """ 89 pass 90 91 def copy(self): 92 """This method exists for compatibility with the zlib compressobj. 93 """ 94 return StreamCompressor() 95 96 97class StreamDecompressor(object): 98 99 """This class implements the decompressor-side of the hadoop snappy 100 format. 101 102 This class matches a subset of the interface found for the zlib module's 103 decompression objects (see zlib.decompressobj). Specifically, it currently 104 implements the decompress method without the max_length option, the flush 105 method without the length option, and the copy method. 106 """ 107 108 __slots__ = ["_buf", "_block_length", "_uncompressed_length"] 109 110 def __init__(self): 111 self._buf = b"" 112 # current block length 113 self._block_length = 0 114 # total uncompressed data length of the current block 115 self._uncompressed_length = 0 116 117 @staticmethod 118 def check_format(data): 119 """Just checks that first two integers (big endian four-bytes int) 120 in the given data block comply to: first int >= second int. 121 This is a simple assumption that we have in the data a start of a 122 block for hadoop snappy format. It should contain uncompressed block 123 length as the first integer, and compressed subblock length as the 124 second integer. 125 Raises UncompressError if the condition is not fulfilled. 126 :return: None 127 """ 128 int_size = _INT_SIZE 129 if len(data) < int_size * 2: 130 raise UncompressError("Too short data length") 131 # We cant actually be sure abot the format here. 132 # Assumption that compressed data length is less than uncompressed 133 # is not true in general. 134 # So, just don't check anything 135 return 136 137 def decompress(self, data): 138 """Decompress 'data', returning a string containing the uncompressed 139 data corresponding to at least part of the data in string. This data 140 should be concatenated to the output produced by any preceding calls to 141 the decompress() method. Some of the input data may be preserved in 142 internal buffers for later processing. 143 """ 144 int_size = _INT_SIZE 145 self._buf += data 146 uncompressed = [] 147 while True: 148 if len(self._buf) < int_size: 149 return b"".join(uncompressed) 150 next_start = 0 151 if not self._block_length: 152 self._block_length = unpack_int(self._buf[:int_size]) 153 self._buf = self._buf[int_size:] 154 if len(self._buf) < int_size: 155 return b"".join(uncompressed) 156 compressed_length = unpack_int( 157 self._buf[next_start:next_start + int_size] 158 ) 159 next_start += int_size 160 if len(self._buf) < compressed_length + next_start: 161 return b"".join(uncompressed) 162 chunk = self._buf[ 163 next_start:next_start + compressed_length 164 ] 165 self._buf = self._buf[next_start + compressed_length:] 166 uncompressed_chunk = _uncompress(chunk) 167 self._uncompressed_length += len(uncompressed_chunk) 168 uncompressed.append(uncompressed_chunk) 169 if self._uncompressed_length == self._block_length: 170 # Here we have uncompressed all subblocks of the current block 171 self._uncompressed_length = 0 172 self._block_length = 0 173 continue 174 175 def flush(self): 176 """All pending input is processed, and a string containing the 177 remaining uncompressed output is returned. After calling flush(), the 178 decompress() method cannot be called again; the only realistic action 179 is to delete the object. 180 """ 181 if self._buf != b"": 182 raise UncompressError("chunk truncated") 183 return b"" 184 185 def copy(self): 186 """Returns a copy of the decompression object. This can be used to save 187 the state of the decompressor midway through the data stream in order 188 to speed up random seeks into the stream at a future point. 189 """ 190 copy = StreamDecompressor() 191 copy._buf = self._buf 192 copy._block_length = self._block_length 193 copy._uncompressed_length = self._uncompressed_length 194 return copy 195 196 197def stream_compress(src, dst, blocksize=SNAPPY_BUFFER_SIZE_DEFAULT): 198 return _stream_compress( 199 src, dst, blocksize=blocksize, compressor_cls=StreamCompressor 200 ) 201 202 203def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE, 204 start_chunk=None): 205 return _stream_decompress( 206 src, dst, blocksize=blocksize, 207 decompressor_cls=StreamDecompressor, 208 start_chunk=start_chunk 209 ) 210 211 212def check_format(fin=None, chunk=None, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE): 213 return _check_format( 214 fin=fin, chunk=chunk, blocksize=blocksize, 215 decompressor_cls=StreamDecompressor 216 ) 217