1# $Id: gzip.py 23 2006-11-08 15:45:33Z dugsong $ 2# -*- coding: utf-8 -*- 3"""GNU zip.""" 4from __future__ import print_function 5from __future__ import absolute_import 6 7import struct 8import zlib 9 10from . import dpkt 11 12 13# RFC 1952 14GZIP_MAGIC = b'\x1f\x8b' 15 16# Compression methods 17GZIP_MSTORED = 0 18GZIP_MCOMPRESS = 1 19GZIP_MPACKED = 2 20GZIP_MLZHED = 3 21GZIP_MDEFLATE = 8 22 23# Flags 24GZIP_FTEXT = 0x01 25GZIP_FHCRC = 0x02 26GZIP_FEXTRA = 0x04 27GZIP_FNAME = 0x08 28GZIP_FCOMMENT = 0x10 29GZIP_FENCRYPT = 0x20 30GZIP_FRESERVED = 0xC0 31 32# OS 33GZIP_OS_MSDOS = 0 34GZIP_OS_AMIGA = 1 35GZIP_OS_VMS = 2 36GZIP_OS_UNIX = 3 37GZIP_OS_VMCMS = 4 38GZIP_OS_ATARI = 5 39GZIP_OS_OS2 = 6 40GZIP_OS_MACOS = 7 41GZIP_OS_ZSYSTEM = 8 42GZIP_OS_CPM = 9 43GZIP_OS_TOPS20 = 10 44GZIP_OS_WIN32 = 11 45GZIP_OS_QDOS = 12 46GZIP_OS_RISCOS = 13 47GZIP_OS_UNKNOWN = 255 48 49GZIP_FENCRYPT_LEN = 12 50 51 52class GzipExtra(dpkt.Packet): 53 __byte_order__ = '<' 54 __hdr__ = ( 55 ('id', '2s', b''), 56 ('len', 'H', 0) 57 ) 58 59 60class Gzip(dpkt.Packet): 61 __byte_order__ = '<' 62 __hdr__ = ( 63 ('magic', '2s', GZIP_MAGIC), 64 ('method', 'B', GZIP_MDEFLATE), 65 ('flags', 'B', 0), 66 ('mtime', 'I', 0), 67 ('xflags', 'B', 0), 68 ('os', 'B', GZIP_OS_UNIX), 69 ) 70 71 def __init__(self, *args, **kwargs): 72 self.extra = None 73 self.filename = None 74 self.comment = None 75 super(Gzip, self).__init__(*args, **kwargs) 76 77 def unpack(self, buf): 78 super(Gzip, self).unpack(buf) 79 if self.flags & GZIP_FEXTRA: 80 if len(self.data) < 2: 81 raise dpkt.NeedData('Gzip extra') 82 n = struct.unpack('<H', self.data[:2])[0] 83 if len(self.data) < 2 + n: 84 raise dpkt.NeedData('Gzip extra') 85 self.extra = GzipExtra(self.data[2:2 + n]) 86 self.data = self.data[2 + n:] 87 if self.flags & GZIP_FNAME: 88 n = self.data.find(b'\x00') 89 if n == -1: 90 raise dpkt.NeedData('Gzip end of file name not found') 91 self.filename = self.data[:n].decode('utf-8') 92 self.data = self.data[n + 1:] 93 if self.flags & GZIP_FCOMMENT: 94 n = self.data.find(b'\x00') 95 if n == -1: 96 raise dpkt.NeedData('Gzip end of comment not found') 97 self.comment = self.data[:n] 98 self.data = self.data[n + 1:] 99 if self.flags & GZIP_FENCRYPT: 100 if len(self.data) < GZIP_FENCRYPT_LEN: 101 raise dpkt.NeedData('Gzip encrypt') 102 self.data = self.data[GZIP_FENCRYPT_LEN:] # XXX - skip 103 if self.flags & GZIP_FHCRC: 104 if len(self.data) < 2: 105 raise dpkt.NeedData('Gzip hcrc') 106 self.data = self.data[2:] # XXX - skip 107 108 def pack_hdr(self): 109 l_ = [] 110 if self.extra: 111 self.flags |= GZIP_FEXTRA 112 s = bytes(self.extra) 113 l_.append(struct.pack('<H', len(s))) 114 l_.append(s) 115 if self.filename: 116 self.flags |= GZIP_FNAME 117 l_.append(self.filename.encode('utf-8')) 118 l_.append(b'\x00') 119 if self.comment: 120 self.flags |= GZIP_FCOMMENT 121 l_.append(self.comment) 122 l_.append(b'\x00') 123 l_.insert(0, super(Gzip, self).pack_hdr()) 124 return b''.join(l_) 125 126 def compress(self): 127 """Compress self.data.""" 128 c = zlib.compressobj( 129 zlib.Z_BEST_COMPRESSION, 130 zlib.DEFLATED, 131 -zlib.MAX_WBITS, 132 zlib.DEF_MEM_LEVEL, 133 zlib.Z_DEFAULT_STRATEGY, 134 ) 135 c.compress(self.data) 136 137 # .compress will return nothing if len(self.data) < the window size. 138 self.data = c.flush() 139 140 def decompress(self): 141 """Return decompressed payload.""" 142 d = zlib.decompressobj(-zlib.MAX_WBITS) 143 return d.decompress(self.data) 144 145 146class TestGzip(object): 147 """This data is created with the gzip command line tool""" 148 149 @classmethod 150 def setup_class(cls): 151 from binascii import unhexlify 152 cls.data = unhexlify( 153 b'1F8B' # magic 154 b'080880C185560003' # header 155 b'68656C6C6F2E74787400' # filename 156 b'F348CDC9C95728CF2FCA4951E40200' # data 157 b'41E4A9B20D000000' # checksum 158 ) 159 cls.p = Gzip(cls.data) 160 161 def test_method(self): 162 assert (self.p.method == GZIP_MDEFLATE) 163 164 def test_flags(self): 165 assert (self.p.flags == GZIP_FNAME) 166 167 def test_mtime(self): 168 # Fri Jan 01 00:00:00 2016 UTC 169 assert (self.p.mtime == 0x5685c180) 170 171 def test_xflags(self): 172 assert (self.p.xflags == 0) 173 174 def test_os(self): 175 assert (self.p.os == GZIP_OS_UNIX) 176 177 def test_filename(self): 178 assert (self.p.filename == "hello.txt") # always str (utf-8) 179 180 def test_decompress(self): 181 assert (self.p.decompress() == b"Hello world!\n") # always bytes 182 183 184def test_flags_extra(): 185 import pytest 186 from binascii import unhexlify 187 188 buf = unhexlify( 189 '1F8B' # magic 190 '08' # method 191 '04' # flags (GZIP_FEXTRA) 192 '80C18556' # mtime 193 '00' # xflags 194 '03' # os 195 ) 196 197 # not enough data to extract 198 with pytest.raises(dpkt.NeedData, match='Gzip extra'): 199 Gzip(buf) 200 201 buf += unhexlify('0400') # append the length of the fextra 202 # not enough data to extract in extra section 203 with pytest.raises(dpkt.NeedData, match='Gzip extra'): 204 Gzip(buf) 205 206 buf += unhexlify('494401000102') 207 208 gzip = Gzip(buf) 209 assert gzip.extra.id == b'ID' 210 assert gzip.extra.len == 1 211 assert gzip.data == unhexlify('0102') 212 assert bytes(gzip) == buf 213 214 215def test_flags_filename(): 216 import pytest 217 from binascii import unhexlify 218 219 buf = unhexlify( 220 '1F8B' # magic 221 '08' # method 222 '08' # flags (GZIP_FNAME) 223 '80C18556' # mtime 224 '00' # xflags 225 '03' # os 226 227 '68656C6C6F2E747874' # filename 228 ) 229 # no trailing null character so unpacking fails 230 with pytest.raises(dpkt.NeedData, match='Gzip end of file name not found'): 231 Gzip(buf) 232 233 buf += unhexlify('00') 234 gzip = Gzip(buf) 235 assert gzip.filename == 'hello.txt' 236 assert gzip.data == b'' 237 assert bytes(gzip) == buf 238 239 240def test_flags_comment(): 241 import pytest 242 from binascii import unhexlify 243 244 buf = unhexlify( 245 '1F8B' # magic 246 '08' # method 247 '10' # flags (GZIP_FCOMMENT) 248 '80C18556' # mtime 249 '00' # xflags 250 '03' # os 251 252 '68656C6C6F2E747874' # comment 253 ) 254 # no trailing null character so unpacking fails 255 with pytest.raises(dpkt.NeedData, match='Gzip end of comment not found'): 256 Gzip(buf) 257 258 buf += unhexlify('00') 259 260 gzip = Gzip(buf) 261 assert gzip.comment == b'hello.txt' 262 assert gzip.data == b'' 263 assert bytes(gzip) == buf 264 265 266def test_flags_encrypt(): 267 import pytest 268 from binascii import unhexlify 269 270 buf_header = unhexlify( 271 '1F8B' # magic 272 '08' # method 273 '20' # flags (GZIP_FENCRYPT) 274 '80C18556' # mtime 275 '00' # xflags 276 '03' # os 277 ) 278 # not enough data 279 with pytest.raises(dpkt.NeedData, match='Gzip encrypt'): 280 Gzip(buf_header) 281 282 encrypted_buffer = unhexlify('0102030405060708090a0b0c') 283 data = unhexlify('0123456789abcdef') 284 285 gzip = Gzip(buf_header + encrypted_buffer + data) 286 assert gzip.data == data 287 assert bytes(gzip) == buf_header + data 288 289 290def test_flags_hcrc(): 291 import pytest 292 from binascii import unhexlify 293 294 buf_header = unhexlify( 295 '1F8B' # magic 296 '08' # method 297 '02' # flags (GZIP_FHCRC) 298 '80C18556' # mtime 299 '00' # xflags 300 '03' # os 301 ) 302 # not enough data 303 with pytest.raises(dpkt.NeedData, match='Gzip hcrc'): 304 Gzip(buf_header) 305 306 hcrc = unhexlify('0102') 307 data = unhexlify('0123456789abcdef') 308 gzip = Gzip(buf_header + hcrc + data) 309 310 assert gzip.data == data 311 assert bytes(gzip) == buf_header + data 312 313 314def test_compress(): 315 from binascii import unhexlify 316 317 buf_header = unhexlify( 318 '1F8B' # magic 319 '08' # method 320 '00' # flags (NONE) 321 '80C18556' # mtime 322 '00' # xflags 323 '03' # os 324 ) 325 326 plain_text = b'Hello world!\n' 327 compressed_text = unhexlify('F348CDC9C95728CF2FCA4951E40200') 328 329 gzip = Gzip(buf_header + plain_text) 330 assert gzip.data == plain_text 331 332 gzip.compress() 333 assert gzip.data == compressed_text 334 assert bytes(gzip) == buf_header + compressed_text 335 336 assert gzip.decompress() == plain_text 337