1import unittest
2from test import support
3import binascii
4import pickle
5import random
6import sys
7from test.support import bigmemtest, _1G, _4G
8
9zlib = support.import_module('zlib')
10
11requires_Compress_copy = unittest.skipUnless(
12        hasattr(zlib.compressobj(), "copy"),
13        'requires Compress.copy()')
14requires_Decompress_copy = unittest.skipUnless(
15        hasattr(zlib.decompressobj(), "copy"),
16        'requires Decompress.copy()')
17
18
19class VersionTestCase(unittest.TestCase):
20
21    def test_library_version(self):
22        # Test that the major version of the actual library in use matches the
23        # major version that we were compiled against. We can't guarantee that
24        # the minor versions will match (even on the machine on which the module
25        # was compiled), and the API is stable between minor versions, so
26        # testing only the major versions avoids spurious failures.
27        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
28
29
30class ChecksumTestCase(unittest.TestCase):
31    # checksum test cases
32    def test_crc32start(self):
33        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
34        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
35
36    def test_crc32empty(self):
37        self.assertEqual(zlib.crc32(b"", 0), 0)
38        self.assertEqual(zlib.crc32(b"", 1), 1)
39        self.assertEqual(zlib.crc32(b"", 432), 432)
40
41    def test_adler32start(self):
42        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
43        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
44
45    def test_adler32empty(self):
46        self.assertEqual(zlib.adler32(b"", 0), 0)
47        self.assertEqual(zlib.adler32(b"", 1), 1)
48        self.assertEqual(zlib.adler32(b"", 432), 432)
49
50    def test_penguins(self):
51        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
52        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
53        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
54        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
55
56        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
57        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
58
59    def test_crc32_adler32_unsigned(self):
60        foo = b'abcdefghijklmnop'
61        # explicitly test signed behavior
62        self.assertEqual(zlib.crc32(foo), 2486878355)
63        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
64        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
65        self.assertEqual(zlib.adler32(b'spam'), 72286642)
66
67    def test_same_as_binascii_crc32(self):
68        foo = b'abcdefghijklmnop'
69        crc = 2486878355
70        self.assertEqual(binascii.crc32(foo), crc)
71        self.assertEqual(zlib.crc32(foo), crc)
72        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
73
74
75# Issue #10276 - check that inputs >=4 GiB are handled correctly.
76class ChecksumBigBufferTestCase(unittest.TestCase):
77
78    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
79    def test_big_buffer(self, size):
80        data = b"nyan" * (_1G + 1)
81        self.assertEqual(zlib.crc32(data), 1044521549)
82        self.assertEqual(zlib.adler32(data), 2256789997)
83
84
85class ExceptionTestCase(unittest.TestCase):
86    # make sure we generate some expected errors
87    def test_badlevel(self):
88        # specifying compression level out of range causes an error
89        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
90        # accepts 0 too)
91        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
92
93    def test_badargs(self):
94        self.assertRaises(TypeError, zlib.adler32)
95        self.assertRaises(TypeError, zlib.crc32)
96        self.assertRaises(TypeError, zlib.compress)
97        self.assertRaises(TypeError, zlib.decompress)
98        for arg in (42, None, '', 'abc', (), []):
99            self.assertRaises(TypeError, zlib.adler32, arg)
100            self.assertRaises(TypeError, zlib.crc32, arg)
101            self.assertRaises(TypeError, zlib.compress, arg)
102            self.assertRaises(TypeError, zlib.decompress, arg)
103
104    def test_badcompressobj(self):
105        # verify failure on building compress object with bad params
106        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
107        # specifying total bits too large causes an error
108        self.assertRaises(ValueError,
109                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
110
111    def test_baddecompressobj(self):
112        # verify failure on building decompress object with bad params
113        self.assertRaises(ValueError, zlib.decompressobj, -1)
114
115    def test_decompressobj_badflush(self):
116        # verify failure on calling decompressobj.flush with bad params
117        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
118        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
119
120    @support.cpython_only
121    def test_overflow(self):
122        with self.assertRaisesRegex(OverflowError, 'int too large'):
123            zlib.decompress(b'', 15, sys.maxsize + 1)
124        with self.assertRaisesRegex(OverflowError, 'int too large'):
125            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
126        with self.assertRaisesRegex(OverflowError, 'int too large'):
127            zlib.decompressobj().flush(sys.maxsize + 1)
128
129
130class BaseCompressTestCase(object):
131    def check_big_compress_buffer(self, size, compress_func):
132        _1M = 1024 * 1024
133        # Generate 10 MiB worth of random, and expand it by repeating it.
134        # The assumption is that zlib's memory is not big enough to exploit
135        # such spread out redundancy.
136        data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
137                        for i in range(10)])
138        data = data * (size // len(data) + 1)
139        try:
140            compress_func(data)
141        finally:
142            # Release memory
143            data = None
144
145    def check_big_decompress_buffer(self, size, decompress_func):
146        data = b'x' * size
147        try:
148            compressed = zlib.compress(data, 1)
149        finally:
150            # Release memory
151            data = None
152        data = decompress_func(compressed)
153        # Sanity check
154        try:
155            self.assertEqual(len(data), size)
156            self.assertEqual(len(data.strip(b'x')), 0)
157        finally:
158            data = None
159
160
161class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
162    # Test compression in one go (whole message compression)
163    def test_speech(self):
164        x = zlib.compress(HAMLET_SCENE)
165        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
166
167    def test_keywords(self):
168        x = zlib.compress(HAMLET_SCENE, level=3)
169        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
170        with self.assertRaises(TypeError):
171            zlib.compress(data=HAMLET_SCENE, level=3)
172        self.assertEqual(zlib.decompress(x,
173                                         wbits=zlib.MAX_WBITS,
174                                         bufsize=zlib.DEF_BUF_SIZE),
175                         HAMLET_SCENE)
176
177    def test_speech128(self):
178        # compress more data
179        data = HAMLET_SCENE * 128
180        x = zlib.compress(data)
181        self.assertEqual(zlib.compress(bytearray(data)), x)
182        for ob in x, bytearray(x):
183            self.assertEqual(zlib.decompress(ob), data)
184
185    def test_incomplete_stream(self):
186        # A useful error message is given
187        x = zlib.compress(HAMLET_SCENE)
188        self.assertRaisesRegex(zlib.error,
189            "Error -5 while decompressing data: incomplete or truncated stream",
190            zlib.decompress, x[:-1])
191
192    # Memory use of the following functions takes into account overallocation
193
194    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
195    def test_big_compress_buffer(self, size):
196        compress = lambda s: zlib.compress(s, 1)
197        self.check_big_compress_buffer(size, compress)
198
199    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
200    def test_big_decompress_buffer(self, size):
201        self.check_big_decompress_buffer(size, zlib.decompress)
202
203    @bigmemtest(size=_4G, memuse=1)
204    def test_large_bufsize(self, size):
205        # Test decompress(bufsize) parameter greater than the internal limit
206        data = HAMLET_SCENE * 10
207        compressed = zlib.compress(data, 1)
208        self.assertEqual(zlib.decompress(compressed, 15, size), data)
209
210    def test_custom_bufsize(self):
211        data = HAMLET_SCENE * 10
212        compressed = zlib.compress(data, 1)
213        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214
215    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216    @bigmemtest(size=_4G + 100, memuse=4)
217    def test_64bit_compress(self, size):
218        data = b'x' * size
219        try:
220            comp = zlib.compress(data, 0)
221            self.assertEqual(zlib.decompress(comp), data)
222        finally:
223            comp = data = None
224
225
226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
227    # Test compression object
228    def test_pair(self):
229        # straightforward compress/decompress objects
230        datasrc = HAMLET_SCENE * 128
231        datazip = zlib.compress(datasrc)
232        # should compress both bytes and bytearray data
233        for data in (datasrc, bytearray(datasrc)):
234            co = zlib.compressobj()
235            x1 = co.compress(data)
236            x2 = co.flush()
237            self.assertRaises(zlib.error, co.flush) # second flush should not work
238            self.assertEqual(x1 + x2, datazip)
239        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240            dco = zlib.decompressobj()
241            y1 = dco.decompress(v1 + v2)
242            y2 = dco.flush()
243            self.assertEqual(data, y1 + y2)
244            self.assertIsInstance(dco.unconsumed_tail, bytes)
245            self.assertIsInstance(dco.unused_data, bytes)
246
247    def test_keywords(self):
248        level = 2
249        method = zlib.DEFLATED
250        wbits = -12
251        memLevel = 9
252        strategy = zlib.Z_FILTERED
253        co = zlib.compressobj(level=level,
254                              method=method,
255                              wbits=wbits,
256                              memLevel=memLevel,
257                              strategy=strategy,
258                              zdict=b"")
259        do = zlib.decompressobj(wbits=wbits, zdict=b"")
260        with self.assertRaises(TypeError):
261            co.compress(data=HAMLET_SCENE)
262        with self.assertRaises(TypeError):
263            do.decompress(data=zlib.compress(HAMLET_SCENE))
264        x = co.compress(HAMLET_SCENE) + co.flush()
265        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266        self.assertEqual(HAMLET_SCENE, y)
267
268    def test_compressoptions(self):
269        # specify lots of options to compressobj()
270        level = 2
271        method = zlib.DEFLATED
272        wbits = -12
273        memLevel = 9
274        strategy = zlib.Z_FILTERED
275        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
276        x1 = co.compress(HAMLET_SCENE)
277        x2 = co.flush()
278        dco = zlib.decompressobj(wbits)
279        y1 = dco.decompress(x1 + x2)
280        y2 = dco.flush()
281        self.assertEqual(HAMLET_SCENE, y1 + y2)
282
283    def test_compressincremental(self):
284        # compress object in steps, decompress object as one-shot
285        data = HAMLET_SCENE * 128
286        co = zlib.compressobj()
287        bufs = []
288        for i in range(0, len(data), 256):
289            bufs.append(co.compress(data[i:i+256]))
290        bufs.append(co.flush())
291        combuf = b''.join(bufs)
292
293        dco = zlib.decompressobj()
294        y1 = dco.decompress(b''.join(bufs))
295        y2 = dco.flush()
296        self.assertEqual(data, y1 + y2)
297
298    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
299        # compress object in steps, decompress object in steps
300        source = source or HAMLET_SCENE
301        data = source * 128
302        co = zlib.compressobj()
303        bufs = []
304        for i in range(0, len(data), cx):
305            bufs.append(co.compress(data[i:i+cx]))
306        bufs.append(co.flush())
307        combuf = b''.join(bufs)
308
309        decombuf = zlib.decompress(combuf)
310        # Test type of return value
311        self.assertIsInstance(decombuf, bytes)
312
313        self.assertEqual(data, decombuf)
314
315        dco = zlib.decompressobj()
316        bufs = []
317        for i in range(0, len(combuf), dcx):
318            bufs.append(dco.decompress(combuf[i:i+dcx]))
319            self.assertEqual(b'', dco.unconsumed_tail, ########
320                             "(A) uct should be b'': not %d long" %
321                                       len(dco.unconsumed_tail))
322            self.assertEqual(b'', dco.unused_data)
323        if flush:
324            bufs.append(dco.flush())
325        else:
326            while True:
327                chunk = dco.decompress(b'')
328                if chunk:
329                    bufs.append(chunk)
330                else:
331                    break
332        self.assertEqual(b'', dco.unconsumed_tail, ########
333                         "(B) uct should be b'': not %d long" %
334                                       len(dco.unconsumed_tail))
335        self.assertEqual(b'', dco.unused_data)
336        self.assertEqual(data, b''.join(bufs))
337        # Failure means: "decompressobj with init options failed"
338
339    def test_decompincflush(self):
340        self.test_decompinc(flush=True)
341
342    def test_decompimax(self, source=None, cx=256, dcx=64):
343        # compress in steps, decompress in length-restricted steps
344        source = source or HAMLET_SCENE
345        # Check a decompression object with max_length specified
346        data = source * 128
347        co = zlib.compressobj()
348        bufs = []
349        for i in range(0, len(data), cx):
350            bufs.append(co.compress(data[i:i+cx]))
351        bufs.append(co.flush())
352        combuf = b''.join(bufs)
353        self.assertEqual(data, zlib.decompress(combuf),
354                         'compressed data failure')
355
356        dco = zlib.decompressobj()
357        bufs = []
358        cb = combuf
359        while cb:
360            #max_length = 1 + len(cb)//10
361            chunk = dco.decompress(cb, dcx)
362            self.assertFalse(len(chunk) > dcx,
363                    'chunk too big (%d>%d)' % (len(chunk), dcx))
364            bufs.append(chunk)
365            cb = dco.unconsumed_tail
366        bufs.append(dco.flush())
367        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
368
369    def test_decompressmaxlen(self, flush=False):
370        # Check a decompression object with max_length specified
371        data = HAMLET_SCENE * 128
372        co = zlib.compressobj()
373        bufs = []
374        for i in range(0, len(data), 256):
375            bufs.append(co.compress(data[i:i+256]))
376        bufs.append(co.flush())
377        combuf = b''.join(bufs)
378        self.assertEqual(data, zlib.decompress(combuf),
379                         'compressed data failure')
380
381        dco = zlib.decompressobj()
382        bufs = []
383        cb = combuf
384        while cb:
385            max_length = 1 + len(cb)//10
386            chunk = dco.decompress(cb, max_length)
387            self.assertFalse(len(chunk) > max_length,
388                        'chunk too big (%d>%d)' % (len(chunk),max_length))
389            bufs.append(chunk)
390            cb = dco.unconsumed_tail
391        if flush:
392            bufs.append(dco.flush())
393        else:
394            while chunk:
395                chunk = dco.decompress(b'', max_length)
396                self.assertFalse(len(chunk) > max_length,
397                            'chunk too big (%d>%d)' % (len(chunk),max_length))
398                bufs.append(chunk)
399        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
400
401    def test_decompressmaxlenflush(self):
402        self.test_decompressmaxlen(flush=True)
403
404    def test_maxlenmisc(self):
405        # Misc tests of max_length
406        dco = zlib.decompressobj()
407        self.assertRaises(ValueError, dco.decompress, b"", -1)
408        self.assertEqual(b'', dco.unconsumed_tail)
409
410    def test_maxlen_large(self):
411        # Sizes up to sys.maxsize should be accepted, although zlib is
412        # internally limited to expressing sizes with unsigned int
413        data = HAMLET_SCENE * 10
414        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415        compressed = zlib.compress(data, 1)
416        dco = zlib.decompressobj()
417        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418
419    def test_maxlen_custom(self):
420        data = HAMLET_SCENE * 10
421        compressed = zlib.compress(data, 1)
422        dco = zlib.decompressobj()
423        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424
425    def test_clear_unconsumed_tail(self):
426        # Issue #12050: calling decompress() without providing max_length
427        # should clear the unconsumed_tail attribute.
428        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
429        dco = zlib.decompressobj()
430        ddata = dco.decompress(cdata, 1)
431        ddata += dco.decompress(dco.unconsumed_tail)
432        self.assertEqual(dco.unconsumed_tail, b"")
433
434    def test_flushes(self):
435        # Test flush() with the various options, using all the
436        # different levels in order to provide more variations.
437        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
438                    'Z_PARTIAL_FLUSH']
439
440        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
441        # Z_BLOCK has a known failure prior to 1.2.5.3
442        if ver >= (1, 2, 5, 3):
443            sync_opt.append('Z_BLOCK')
444
445        sync_opt = [getattr(zlib, opt) for opt in sync_opt
446                    if hasattr(zlib, opt)]
447        data = HAMLET_SCENE * 8
448
449        for sync in sync_opt:
450            for level in range(10):
451                try:
452                    obj = zlib.compressobj( level )
453                    a = obj.compress( data[:3000] )
454                    b = obj.flush( sync )
455                    c = obj.compress( data[3000:] )
456                    d = obj.flush()
457                except:
458                    print("Error for flush mode={}, level={}"
459                          .format(sync, level))
460                    raise
461                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
462                                 data, ("Decompress failed: flush "
463                                        "mode=%i, level=%i") % (sync, level))
464                del obj
465
466    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
467                         'requires zlib.Z_SYNC_FLUSH')
468    def test_odd_flush(self):
469        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
470        import random
471        # Testing on 17K of "random" data
472
473        # Create compressor and decompressor objects
474        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
475        dco = zlib.decompressobj()
476
477        # Try 17K of data
478        # generate random data stream
479        try:
480            # In 2.3 and later, WichmannHill is the RNG of the bug report
481            gen = random.WichmannHill()
482        except AttributeError:
483            try:
484                # 2.2 called it Random
485                gen = random.Random()
486            except AttributeError:
487                # others might simply have a single RNG
488                gen = random
489        gen.seed(1)
490        data = genblock(1, 17 * 1024, generator=gen)
491
492        # compress, sync-flush, and decompress
493        first = co.compress(data)
494        second = co.flush(zlib.Z_SYNC_FLUSH)
495        expanded = dco.decompress(first + second)
496
497        # if decompressed data is different from the input data, choke.
498        self.assertEqual(expanded, data, "17K random source doesn't match")
499
500    def test_empty_flush(self):
501        # Test that calling .flush() on unused objects works.
502        # (Bug #1083110 -- calling .flush() on decompress objects
503        # caused a core dump.)
504
505        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
506        self.assertTrue(co.flush())  # Returns a zlib header
507        dco = zlib.decompressobj()
508        self.assertEqual(dco.flush(), b"") # Returns nothing
509
510    def test_dictionary(self):
511        h = HAMLET_SCENE
512        # Build a simulated dictionary out of the words in HAMLET.
513        words = h.split()
514        random.shuffle(words)
515        zdict = b''.join(words)
516        # Use it to compress HAMLET.
517        co = zlib.compressobj(zdict=zdict)
518        cd = co.compress(h) + co.flush()
519        # Verify that it will decompress with the dictionary.
520        dco = zlib.decompressobj(zdict=zdict)
521        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
522        # Verify that it fails when not given the dictionary.
523        dco = zlib.decompressobj()
524        self.assertRaises(zlib.error, dco.decompress, cd)
525
526    def test_dictionary_streaming(self):
527        # This simulates the reuse of a compressor object for compressing
528        # several separate data streams.
529        co = zlib.compressobj(zdict=HAMLET_SCENE)
530        do = zlib.decompressobj(zdict=HAMLET_SCENE)
531        piece = HAMLET_SCENE[1000:1500]
532        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
533        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
534        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
535        self.assertEqual(do.decompress(d0), piece)
536        self.assertEqual(do.decompress(d1), piece[100:])
537        self.assertEqual(do.decompress(d2), piece[:-100])
538
539    def test_decompress_incomplete_stream(self):
540        # This is 'foo', deflated
541        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
542        # For the record
543        self.assertEqual(zlib.decompress(x), b'foo')
544        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
545        # Omitting the stream end works with decompressor objects
546        # (see issue #8672).
547        dco = zlib.decompressobj()
548        y = dco.decompress(x[:-5])
549        y += dco.flush()
550        self.assertEqual(y, b'foo')
551
552    def test_decompress_eof(self):
553        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
554        dco = zlib.decompressobj()
555        self.assertFalse(dco.eof)
556        dco.decompress(x[:-5])
557        self.assertFalse(dco.eof)
558        dco.decompress(x[-5:])
559        self.assertTrue(dco.eof)
560        dco.flush()
561        self.assertTrue(dco.eof)
562
563    def test_decompress_eof_incomplete_stream(self):
564        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
565        dco = zlib.decompressobj()
566        self.assertFalse(dco.eof)
567        dco.decompress(x[:-5])
568        self.assertFalse(dco.eof)
569        dco.flush()
570        self.assertFalse(dco.eof)
571
572    def test_decompress_unused_data(self):
573        # Repeated calls to decompress() after EOF should accumulate data in
574        # dco.unused_data, instead of just storing the arg to the last call.
575        source = b'abcdefghijklmnopqrstuvwxyz'
576        remainder = b'0123456789'
577        y = zlib.compress(source)
578        x = y + remainder
579        for maxlen in 0, 1000:
580            for step in 1, 2, len(y), len(x):
581                dco = zlib.decompressobj()
582                data = b''
583                for i in range(0, len(x), step):
584                    if i < len(y):
585                        self.assertEqual(dco.unused_data, b'')
586                    if maxlen == 0:
587                        data += dco.decompress(x[i : i + step])
588                        self.assertEqual(dco.unconsumed_tail, b'')
589                    else:
590                        data += dco.decompress(
591                                dco.unconsumed_tail + x[i : i + step], maxlen)
592                data += dco.flush()
593                self.assertTrue(dco.eof)
594                self.assertEqual(data, source)
595                self.assertEqual(dco.unconsumed_tail, b'')
596                self.assertEqual(dco.unused_data, remainder)
597
598    # issue27164
599    def test_decompress_raw_with_dictionary(self):
600        zdict = b'abcdefghijklmnopqrstuvwxyz'
601        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
602        comp = co.compress(zdict) + co.flush()
603        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
604        uncomp = dco.decompress(comp) + dco.flush()
605        self.assertEqual(zdict, uncomp)
606
607    def test_flush_with_freed_input(self):
608        # Issue #16411: decompressor accesses input to last decompress() call
609        # in flush(), even if this object has been freed in the meanwhile.
610        input1 = b'abcdefghijklmnopqrstuvwxyz'
611        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
612        data = zlib.compress(input1)
613        dco = zlib.decompressobj()
614        dco.decompress(data, 1)
615        del data
616        data = zlib.compress(input2)
617        self.assertEqual(dco.flush(), input1[1:])
618
619    @bigmemtest(size=_4G, memuse=1)
620    def test_flush_large_length(self, size):
621        # Test flush(length) parameter greater than internal limit UINT_MAX
622        input = HAMLET_SCENE * 10
623        data = zlib.compress(input, 1)
624        dco = zlib.decompressobj()
625        dco.decompress(data, 1)
626        self.assertEqual(dco.flush(size), input[1:])
627
628    def test_flush_custom_length(self):
629        input = HAMLET_SCENE * 10
630        data = zlib.compress(input, 1)
631        dco = zlib.decompressobj()
632        dco.decompress(data, 1)
633        self.assertEqual(dco.flush(CustomInt()), input[1:])
634
635    @requires_Compress_copy
636    def test_compresscopy(self):
637        # Test copying a compression object
638        data0 = HAMLET_SCENE
639        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
640        c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
641        bufs0 = []
642        bufs0.append(c0.compress(data0))
643
644        c1 = c0.copy()
645        bufs1 = bufs0[:]
646
647        bufs0.append(c0.compress(data0))
648        bufs0.append(c0.flush())
649        s0 = b''.join(bufs0)
650
651        bufs1.append(c1.compress(data1))
652        bufs1.append(c1.flush())
653        s1 = b''.join(bufs1)
654
655        self.assertEqual(zlib.decompress(s0),data0+data0)
656        self.assertEqual(zlib.decompress(s1),data0+data1)
657
658    @requires_Compress_copy
659    def test_badcompresscopy(self):
660        # Test copying a compression object in an inconsistent state
661        c = zlib.compressobj()
662        c.compress(HAMLET_SCENE)
663        c.flush()
664        self.assertRaises(ValueError, c.copy)
665
666    @requires_Decompress_copy
667    def test_decompresscopy(self):
668        # Test copying a decompression object
669        data = HAMLET_SCENE
670        comp = zlib.compress(data)
671        # Test type of return value
672        self.assertIsInstance(comp, bytes)
673
674        d0 = zlib.decompressobj()
675        bufs0 = []
676        bufs0.append(d0.decompress(comp[:32]))
677
678        d1 = d0.copy()
679        bufs1 = bufs0[:]
680
681        bufs0.append(d0.decompress(comp[32:]))
682        s0 = b''.join(bufs0)
683
684        bufs1.append(d1.decompress(comp[32:]))
685        s1 = b''.join(bufs1)
686
687        self.assertEqual(s0,s1)
688        self.assertEqual(s0,data)
689
690    @requires_Decompress_copy
691    def test_baddecompresscopy(self):
692        # Test copying a compression object in an inconsistent state
693        data = zlib.compress(HAMLET_SCENE)
694        d = zlib.decompressobj()
695        d.decompress(data)
696        d.flush()
697        self.assertRaises(ValueError, d.copy)
698
699    def test_compresspickle(self):
700        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
701            with self.assertRaises((TypeError, pickle.PicklingError)):
702                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
703
704    def test_decompresspickle(self):
705        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
706            with self.assertRaises((TypeError, pickle.PicklingError)):
707                pickle.dumps(zlib.decompressobj(), proto)
708
709    # Memory use of the following functions takes into account overallocation
710
711    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
712    def test_big_compress_buffer(self, size):
713        c = zlib.compressobj(1)
714        compress = lambda s: c.compress(s) + c.flush()
715        self.check_big_compress_buffer(size, compress)
716
717    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
718    def test_big_decompress_buffer(self, size):
719        d = zlib.decompressobj()
720        decompress = lambda s: d.decompress(s) + d.flush()
721        self.check_big_decompress_buffer(size, decompress)
722
723    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
724    @bigmemtest(size=_4G + 100, memuse=4)
725    def test_64bit_compress(self, size):
726        data = b'x' * size
727        co = zlib.compressobj(0)
728        do = zlib.decompressobj()
729        try:
730            comp = co.compress(data) + co.flush()
731            uncomp = do.decompress(comp) + do.flush()
732            self.assertEqual(uncomp, data)
733        finally:
734            comp = uncomp = data = None
735
736    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
737    @bigmemtest(size=_4G + 100, memuse=3)
738    def test_large_unused_data(self, size):
739        data = b'abcdefghijklmnop'
740        unused = b'x' * size
741        comp = zlib.compress(data) + unused
742        do = zlib.decompressobj()
743        try:
744            uncomp = do.decompress(comp) + do.flush()
745            self.assertEqual(unused, do.unused_data)
746            self.assertEqual(uncomp, data)
747        finally:
748            unused = comp = do = None
749
750    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
751    @bigmemtest(size=_4G + 100, memuse=5)
752    def test_large_unconsumed_tail(self, size):
753        data = b'x' * size
754        do = zlib.decompressobj()
755        try:
756            comp = zlib.compress(data, 0)
757            uncomp = do.decompress(comp, 1) + do.flush()
758            self.assertEqual(uncomp, data)
759            self.assertEqual(do.unconsumed_tail, b'')
760        finally:
761            comp = uncomp = data = None
762
763    def test_wbits(self):
764        # wbits=0 only supported since zlib v1.2.3.5
765        # Register "1.2.3" as "1.2.3.0"
766        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
767        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
768        if len(v) < 4:
769            v.append('0')
770        elif not v[-1].isnumeric():
771            v[-1] = '0'
772
773        v = tuple(map(int, v))
774        supports_wbits_0 = v >= (1, 2, 3, 5)
775
776        co = zlib.compressobj(level=1, wbits=15)
777        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
778        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
779        if supports_wbits_0:
780            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
781        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
782        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
783            zlib.decompress(zlib15, 14)
784        dco = zlib.decompressobj(wbits=32 + 15)
785        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
786        dco = zlib.decompressobj(wbits=14)
787        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
788            dco.decompress(zlib15)
789
790        co = zlib.compressobj(level=1, wbits=9)
791        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
792        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
793        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
794        if supports_wbits_0:
795            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
796        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
797        dco = zlib.decompressobj(wbits=32 + 9)
798        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
799
800        co = zlib.compressobj(level=1, wbits=-15)
801        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
802        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
803        dco = zlib.decompressobj(wbits=-15)
804        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
805
806        co = zlib.compressobj(level=1, wbits=-9)
807        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
808        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
809        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
810        dco = zlib.decompressobj(wbits=-9)
811        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
812
813        co = zlib.compressobj(level=1, wbits=16 + 15)
814        gzip = co.compress(HAMLET_SCENE) + co.flush()
815        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
816        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
817        dco = zlib.decompressobj(32 + 15)
818        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
819
820
821def genblock(seed, length, step=1024, generator=random):
822    """length-byte stream of random data from a seed (in step-byte blocks)."""
823    if seed is not None:
824        generator.seed(seed)
825    randint = generator.randint
826    if length < step or step < 2:
827        step = length
828    blocks = bytes()
829    for i in range(0, length, step):
830        blocks += bytes(randint(0, 255) for x in range(step))
831    return blocks
832
833
834
835def choose_lines(source, number, seed=None, generator=random):
836    """Return a list of number lines randomly chosen from the source"""
837    if seed is not None:
838        generator.seed(seed)
839    sources = source.split('\n')
840    return [generator.choice(sources) for n in range(number)]
841
842
843
844HAMLET_SCENE = b"""
845LAERTES
846
847       O, fear me not.
848       I stay too long: but here my father comes.
849
850       Enter POLONIUS
851
852       A double blessing is a double grace,
853       Occasion smiles upon a second leave.
854
855LORD POLONIUS
856
857       Yet here, Laertes! aboard, aboard, for shame!
858       The wind sits in the shoulder of your sail,
859       And you are stay'd for. There; my blessing with thee!
860       And these few precepts in thy memory
861       See thou character. Give thy thoughts no tongue,
862       Nor any unproportioned thought his act.
863       Be thou familiar, but by no means vulgar.
864       Those friends thou hast, and their adoption tried,
865       Grapple them to thy soul with hoops of steel;
866       But do not dull thy palm with entertainment
867       Of each new-hatch'd, unfledged comrade. Beware
868       Of entrance to a quarrel, but being in,
869       Bear't that the opposed may beware of thee.
870       Give every man thy ear, but few thy voice;
871       Take each man's censure, but reserve thy judgment.
872       Costly thy habit as thy purse can buy,
873       But not express'd in fancy; rich, not gaudy;
874       For the apparel oft proclaims the man,
875       And they in France of the best rank and station
876       Are of a most select and generous chief in that.
877       Neither a borrower nor a lender be;
878       For loan oft loses both itself and friend,
879       And borrowing dulls the edge of husbandry.
880       This above all: to thine ownself be true,
881       And it must follow, as the night the day,
882       Thou canst not then be false to any man.
883       Farewell: my blessing season this in thee!
884
885LAERTES
886
887       Most humbly do I take my leave, my lord.
888
889LORD POLONIUS
890
891       The time invites you; go; your servants tend.
892
893LAERTES
894
895       Farewell, Ophelia; and remember well
896       What I have said to you.
897
898OPHELIA
899
900       'Tis in my memory lock'd,
901       And you yourself shall keep the key of it.
902
903LAERTES
904
905       Farewell.
906"""
907
908
909class CustomInt:
910    def __int__(self):
911        return 100
912
913
914if __name__ == "__main__":
915    unittest.main()
916