1# $Id: gzip.py 23 2006-11-08 15:45:33Z dugsong $
2# -*- coding: utf-8 -*-
3"""GNU zip."""
4from __future__ import print_function
5from __future__ import absolute_import
6
7import struct
8import zlib
9
10from . import dpkt
11
12
13# RFC 1952
14GZIP_MAGIC = b'\x1f\x8b'
15
16# Compression methods
17GZIP_MSTORED = 0
18GZIP_MCOMPRESS = 1
19GZIP_MPACKED = 2
20GZIP_MLZHED = 3
21GZIP_MDEFLATE = 8
22
23# Flags
24GZIP_FTEXT = 0x01
25GZIP_FHCRC = 0x02
26GZIP_FEXTRA = 0x04
27GZIP_FNAME = 0x08
28GZIP_FCOMMENT = 0x10
29GZIP_FENCRYPT = 0x20
30GZIP_FRESERVED = 0xC0
31
32# OS
33GZIP_OS_MSDOS = 0
34GZIP_OS_AMIGA = 1
35GZIP_OS_VMS = 2
36GZIP_OS_UNIX = 3
37GZIP_OS_VMCMS = 4
38GZIP_OS_ATARI = 5
39GZIP_OS_OS2 = 6
40GZIP_OS_MACOS = 7
41GZIP_OS_ZSYSTEM = 8
42GZIP_OS_CPM = 9
43GZIP_OS_TOPS20 = 10
44GZIP_OS_WIN32 = 11
45GZIP_OS_QDOS = 12
46GZIP_OS_RISCOS = 13
47GZIP_OS_UNKNOWN = 255
48
49GZIP_FENCRYPT_LEN = 12
50
51
52class GzipExtra(dpkt.Packet):
53    __byte_order__ = '<'
54    __hdr__ = (
55        ('id', '2s', b''),
56        ('len', 'H', 0)
57    )
58
59
60class Gzip(dpkt.Packet):
61    __byte_order__ = '<'
62    __hdr__ = (
63        ('magic', '2s', GZIP_MAGIC),
64        ('method', 'B', GZIP_MDEFLATE),
65        ('flags', 'B', 0),
66        ('mtime', 'I', 0),
67        ('xflags', 'B', 0),
68        ('os', 'B', GZIP_OS_UNIX),
69    )
70
71    def __init__(self, *args, **kwargs):
72        self.extra = None
73        self.filename = None
74        self.comment = None
75        super(Gzip, self).__init__(*args, **kwargs)
76
77    def unpack(self, buf):
78        super(Gzip, self).unpack(buf)
79        if self.flags & GZIP_FEXTRA:
80            if len(self.data) < 2:
81                raise dpkt.NeedData('Gzip extra')
82            n = struct.unpack('<H', self.data[:2])[0]
83            if len(self.data) < 2 + n:
84                raise dpkt.NeedData('Gzip extra')
85            self.extra = GzipExtra(self.data[2:2 + n])
86            self.data = self.data[2 + n:]
87        if self.flags & GZIP_FNAME:
88            n = self.data.find(b'\x00')
89            if n == -1:
90                raise dpkt.NeedData('Gzip end of file name not found')
91            self.filename = self.data[:n].decode('utf-8')
92            self.data = self.data[n + 1:]
93        if self.flags & GZIP_FCOMMENT:
94            n = self.data.find(b'\x00')
95            if n == -1:
96                raise dpkt.NeedData('Gzip end of comment not found')
97            self.comment = self.data[:n]
98            self.data = self.data[n + 1:]
99        if self.flags & GZIP_FENCRYPT:
100            if len(self.data) < GZIP_FENCRYPT_LEN:
101                raise dpkt.NeedData('Gzip encrypt')
102            self.data = self.data[GZIP_FENCRYPT_LEN:]  # XXX - skip
103        if self.flags & GZIP_FHCRC:
104            if len(self.data) < 2:
105                raise dpkt.NeedData('Gzip hcrc')
106            self.data = self.data[2:]  # XXX - skip
107
108    def pack_hdr(self):
109        l_ = []
110        if self.extra:
111            self.flags |= GZIP_FEXTRA
112            s = bytes(self.extra)
113            l_.append(struct.pack('<H', len(s)))
114            l_.append(s)
115        if self.filename:
116            self.flags |= GZIP_FNAME
117            l_.append(self.filename.encode('utf-8'))
118            l_.append(b'\x00')
119        if self.comment:
120            self.flags |= GZIP_FCOMMENT
121            l_.append(self.comment)
122            l_.append(b'\x00')
123        l_.insert(0, super(Gzip, self).pack_hdr())
124        return b''.join(l_)
125
126    def compress(self):
127        """Compress self.data."""
128        c = zlib.compressobj(
129            zlib.Z_BEST_COMPRESSION,
130            zlib.DEFLATED,
131            -zlib.MAX_WBITS,
132            zlib.DEF_MEM_LEVEL,
133            zlib.Z_DEFAULT_STRATEGY,
134        )
135        c.compress(self.data)
136
137        # .compress will return nothing if len(self.data) < the window size.
138        self.data = c.flush()
139
140    def decompress(self):
141        """Return decompressed payload."""
142        d = zlib.decompressobj(-zlib.MAX_WBITS)
143        return d.decompress(self.data)
144
145
146class TestGzip(object):
147    """This data is created with the gzip command line tool"""
148
149    @classmethod
150    def setup_class(cls):
151        from binascii import unhexlify
152        cls.data = unhexlify(
153            b'1F8B'  # magic
154            b'080880C185560003'  # header
155            b'68656C6C6F2E74787400'  # filename
156            b'F348CDC9C95728CF2FCA4951E40200'  # data
157            b'41E4A9B20D000000'  # checksum
158        )
159        cls.p = Gzip(cls.data)
160
161    def test_method(self):
162        assert (self.p.method == GZIP_MDEFLATE)
163
164    def test_flags(self):
165        assert (self.p.flags == GZIP_FNAME)
166
167    def test_mtime(self):
168        # Fri Jan 01 00:00:00 2016 UTC
169        assert (self.p.mtime == 0x5685c180)
170
171    def test_xflags(self):
172        assert (self.p.xflags == 0)
173
174    def test_os(self):
175        assert (self.p.os == GZIP_OS_UNIX)
176
177    def test_filename(self):
178        assert (self.p.filename == "hello.txt")  # always str (utf-8)
179
180    def test_decompress(self):
181        assert (self.p.decompress() == b"Hello world!\n")  # always bytes
182
183
184def test_flags_extra():
185    import pytest
186    from binascii import unhexlify
187
188    buf = unhexlify(
189        '1F8B'      # magic
190        '08'        # method
191        '04'        # flags (GZIP_FEXTRA)
192        '80C18556'  # mtime
193        '00'        # xflags
194        '03'        # os
195    )
196
197    # not enough data to extract
198    with pytest.raises(dpkt.NeedData, match='Gzip extra'):
199        Gzip(buf)
200
201    buf += unhexlify('0400')  # append the length of the fextra
202    # not enough data to extract in extra section
203    with pytest.raises(dpkt.NeedData, match='Gzip extra'):
204        Gzip(buf)
205
206    buf += unhexlify('494401000102')
207
208    gzip = Gzip(buf)
209    assert gzip.extra.id == b'ID'
210    assert gzip.extra.len == 1
211    assert gzip.data == unhexlify('0102')
212    assert bytes(gzip) == buf
213
214
215def test_flags_filename():
216    import pytest
217    from binascii import unhexlify
218
219    buf = unhexlify(
220        '1F8B'      # magic
221        '08'        # method
222        '08'        # flags (GZIP_FNAME)
223        '80C18556'  # mtime
224        '00'        # xflags
225        '03'        # os
226
227        '68656C6C6F2E747874'  # filename
228    )
229    # no trailing null character so unpacking fails
230    with pytest.raises(dpkt.NeedData, match='Gzip end of file name not found'):
231        Gzip(buf)
232
233    buf += unhexlify('00')
234    gzip = Gzip(buf)
235    assert gzip.filename == 'hello.txt'
236    assert gzip.data == b''
237    assert bytes(gzip) == buf
238
239
240def test_flags_comment():
241    import pytest
242    from binascii import unhexlify
243
244    buf = unhexlify(
245        '1F8B'      # magic
246        '08'        # method
247        '10'        # flags (GZIP_FCOMMENT)
248        '80C18556'  # mtime
249        '00'        # xflags
250        '03'        # os
251
252        '68656C6C6F2E747874'  # comment
253    )
254    # no trailing null character so unpacking fails
255    with pytest.raises(dpkt.NeedData, match='Gzip end of comment not found'):
256        Gzip(buf)
257
258    buf += unhexlify('00')
259
260    gzip = Gzip(buf)
261    assert gzip.comment == b'hello.txt'
262    assert gzip.data == b''
263    assert bytes(gzip) == buf
264
265
266def test_flags_encrypt():
267    import pytest
268    from binascii import unhexlify
269
270    buf_header = unhexlify(
271        '1F8B'      # magic
272        '08'        # method
273        '20'        # flags (GZIP_FENCRYPT)
274        '80C18556'  # mtime
275        '00'        # xflags
276        '03'        # os
277    )
278    # not enough data
279    with pytest.raises(dpkt.NeedData, match='Gzip encrypt'):
280        Gzip(buf_header)
281
282    encrypted_buffer = unhexlify('0102030405060708090a0b0c')
283    data = unhexlify('0123456789abcdef')
284
285    gzip = Gzip(buf_header + encrypted_buffer + data)
286    assert gzip.data == data
287    assert bytes(gzip) == buf_header + data
288
289
290def test_flags_hcrc():
291    import pytest
292    from binascii import unhexlify
293
294    buf_header = unhexlify(
295        '1F8B'      # magic
296        '08'        # method
297        '02'        # flags (GZIP_FHCRC)
298        '80C18556'  # mtime
299        '00'        # xflags
300        '03'        # os
301    )
302    # not enough data
303    with pytest.raises(dpkt.NeedData, match='Gzip hcrc'):
304        Gzip(buf_header)
305
306    hcrc = unhexlify('0102')
307    data = unhexlify('0123456789abcdef')
308    gzip = Gzip(buf_header + hcrc + data)
309
310    assert gzip.data == data
311    assert bytes(gzip) == buf_header + data
312
313
314def test_compress():
315    from binascii import unhexlify
316
317    buf_header = unhexlify(
318        '1F8B'      # magic
319        '08'        # method
320        '00'        # flags (NONE)
321        '80C18556'  # mtime
322        '00'        # xflags
323        '03'        # os
324    )
325
326    plain_text = b'Hello world!\n'
327    compressed_text = unhexlify('F348CDC9C95728CF2FCA4951E40200')
328
329    gzip = Gzip(buf_header + plain_text)
330    assert gzip.data == plain_text
331
332    gzip.compress()
333    assert gzip.data == compressed_text
334    assert bytes(gzip) == buf_header + compressed_text
335
336    assert gzip.decompress() == plain_text
337