1import unittest
2from test import support
3import binascii
4import copy
5import pickle
6import random
7import sys
8from test.support import bigmemtest, _1G, _4G
9
10zlib = support.import_module('zlib')
11
12requires_Compress_copy = unittest.skipUnless(
13        hasattr(zlib.compressobj(), "copy"),
14        'requires Compress.copy()')
15requires_Decompress_copy = unittest.skipUnless(
16        hasattr(zlib.decompressobj(), "copy"),
17        'requires Decompress.copy()')
18
19
20class VersionTestCase(unittest.TestCase):
21
22    def test_library_version(self):
23        # Test that the major version of the actual library in use matches the
24        # major version that we were compiled against. We can't guarantee that
25        # the minor versions will match (even on the machine on which the module
26        # was compiled), and the API is stable between minor versions, so
27        # testing only the major versions avoids spurious failures.
28        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
29
30
31class ChecksumTestCase(unittest.TestCase):
32    # checksum test cases
33    def test_crc32start(self):
34        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
35        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
36
37    def test_crc32empty(self):
38        self.assertEqual(zlib.crc32(b"", 0), 0)
39        self.assertEqual(zlib.crc32(b"", 1), 1)
40        self.assertEqual(zlib.crc32(b"", 432), 432)
41
42    def test_adler32start(self):
43        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
44        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
45
46    def test_adler32empty(self):
47        self.assertEqual(zlib.adler32(b"", 0), 0)
48        self.assertEqual(zlib.adler32(b"", 1), 1)
49        self.assertEqual(zlib.adler32(b"", 432), 432)
50
51    def test_penguins(self):
52        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
53        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
54        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
55        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
56
57        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
58        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
59
60    def test_crc32_adler32_unsigned(self):
61        foo = b'abcdefghijklmnop'
62        # explicitly test signed behavior
63        self.assertEqual(zlib.crc32(foo), 2486878355)
64        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
65        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
66        self.assertEqual(zlib.adler32(b'spam'), 72286642)
67
68    def test_same_as_binascii_crc32(self):
69        foo = b'abcdefghijklmnop'
70        crc = 2486878355
71        self.assertEqual(binascii.crc32(foo), crc)
72        self.assertEqual(zlib.crc32(foo), crc)
73        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
74
75
76# Issue #10276 - check that inputs >=4 GiB are handled correctly.
77class ChecksumBigBufferTestCase(unittest.TestCase):
78
79    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
80    def test_big_buffer(self, size):
81        data = b"nyan" * (_1G + 1)
82        self.assertEqual(zlib.crc32(data), 1044521549)
83        self.assertEqual(zlib.adler32(data), 2256789997)
84
85
86class ExceptionTestCase(unittest.TestCase):
87    # make sure we generate some expected errors
88    def test_badlevel(self):
89        # specifying compression level out of range causes an error
90        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
91        # accepts 0 too)
92        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
93
94    def test_badargs(self):
95        self.assertRaises(TypeError, zlib.adler32)
96        self.assertRaises(TypeError, zlib.crc32)
97        self.assertRaises(TypeError, zlib.compress)
98        self.assertRaises(TypeError, zlib.decompress)
99        for arg in (42, None, '', 'abc', (), []):
100            self.assertRaises(TypeError, zlib.adler32, arg)
101            self.assertRaises(TypeError, zlib.crc32, arg)
102            self.assertRaises(TypeError, zlib.compress, arg)
103            self.assertRaises(TypeError, zlib.decompress, arg)
104
105    def test_badcompressobj(self):
106        # verify failure on building compress object with bad params
107        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
108        # specifying total bits too large causes an error
109        self.assertRaises(ValueError,
110                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
111
112    def test_baddecompressobj(self):
113        # verify failure on building decompress object with bad params
114        self.assertRaises(ValueError, zlib.decompressobj, -1)
115
116    def test_decompressobj_badflush(self):
117        # verify failure on calling decompressobj.flush with bad params
118        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
119        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
120
121    @support.cpython_only
122    def test_overflow(self):
123        with self.assertRaisesRegex(OverflowError, 'int too large'):
124            zlib.decompress(b'', 15, sys.maxsize + 1)
125        with self.assertRaisesRegex(OverflowError, 'int too large'):
126            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
127        with self.assertRaisesRegex(OverflowError, 'int too large'):
128            zlib.decompressobj().flush(sys.maxsize + 1)
129
130
131class BaseCompressTestCase(object):
132    def check_big_compress_buffer(self, size, compress_func):
133        _1M = 1024 * 1024
134        # Generate 10 MiB worth of random, and expand it by repeating it.
135        # The assumption is that zlib's memory is not big enough to exploit
136        # such spread out redundancy.
137        data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
138                        for i in range(10)])
139        data = data * (size // len(data) + 1)
140        try:
141            compress_func(data)
142        finally:
143            # Release memory
144            data = None
145
146    def check_big_decompress_buffer(self, size, decompress_func):
147        data = b'x' * size
148        try:
149            compressed = zlib.compress(data, 1)
150        finally:
151            # Release memory
152            data = None
153        data = decompress_func(compressed)
154        # Sanity check
155        try:
156            self.assertEqual(len(data), size)
157            self.assertEqual(len(data.strip(b'x')), 0)
158        finally:
159            data = None
160
161
162class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
163    # Test compression in one go (whole message compression)
164    def test_speech(self):
165        x = zlib.compress(HAMLET_SCENE)
166        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
167
168    def test_keywords(self):
169        x = zlib.compress(HAMLET_SCENE, level=3)
170        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
171        with self.assertRaises(TypeError):
172            zlib.compress(data=HAMLET_SCENE, level=3)
173        self.assertEqual(zlib.decompress(x,
174                                         wbits=zlib.MAX_WBITS,
175                                         bufsize=zlib.DEF_BUF_SIZE),
176                         HAMLET_SCENE)
177
178    def test_speech128(self):
179        # compress more data
180        data = HAMLET_SCENE * 128
181        x = zlib.compress(data)
182        self.assertEqual(zlib.compress(bytearray(data)), x)
183        for ob in x, bytearray(x):
184            self.assertEqual(zlib.decompress(ob), data)
185
186    def test_incomplete_stream(self):
187        # A useful error message is given
188        x = zlib.compress(HAMLET_SCENE)
189        self.assertRaisesRegex(zlib.error,
190            "Error -5 while decompressing data: incomplete or truncated stream",
191            zlib.decompress, x[:-1])
192
193    # Memory use of the following functions takes into account overallocation
194
195    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
196    def test_big_compress_buffer(self, size):
197        compress = lambda s: zlib.compress(s, 1)
198        self.check_big_compress_buffer(size, compress)
199
200    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
201    def test_big_decompress_buffer(self, size):
202        self.check_big_decompress_buffer(size, zlib.decompress)
203
204    @bigmemtest(size=_4G, memuse=1)
205    def test_large_bufsize(self, size):
206        # Test decompress(bufsize) parameter greater than the internal limit
207        data = HAMLET_SCENE * 10
208        compressed = zlib.compress(data, 1)
209        self.assertEqual(zlib.decompress(compressed, 15, size), data)
210
211    def test_custom_bufsize(self):
212        data = HAMLET_SCENE * 10
213        compressed = zlib.compress(data, 1)
214        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
215
216    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
217    @bigmemtest(size=_4G + 100, memuse=4)
218    def test_64bit_compress(self, size):
219        data = b'x' * size
220        try:
221            comp = zlib.compress(data, 0)
222            self.assertEqual(zlib.decompress(comp), data)
223        finally:
224            comp = data = None
225
226
227class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
228    # Test compression object
229    def test_pair(self):
230        # straightforward compress/decompress objects
231        datasrc = HAMLET_SCENE * 128
232        datazip = zlib.compress(datasrc)
233        # should compress both bytes and bytearray data
234        for data in (datasrc, bytearray(datasrc)):
235            co = zlib.compressobj()
236            x1 = co.compress(data)
237            x2 = co.flush()
238            self.assertRaises(zlib.error, co.flush) # second flush should not work
239            self.assertEqual(x1 + x2, datazip)
240        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
241            dco = zlib.decompressobj()
242            y1 = dco.decompress(v1 + v2)
243            y2 = dco.flush()
244            self.assertEqual(data, y1 + y2)
245            self.assertIsInstance(dco.unconsumed_tail, bytes)
246            self.assertIsInstance(dco.unused_data, bytes)
247
248    def test_keywords(self):
249        level = 2
250        method = zlib.DEFLATED
251        wbits = -12
252        memLevel = 9
253        strategy = zlib.Z_FILTERED
254        co = zlib.compressobj(level=level,
255                              method=method,
256                              wbits=wbits,
257                              memLevel=memLevel,
258                              strategy=strategy,
259                              zdict=b"")
260        do = zlib.decompressobj(wbits=wbits, zdict=b"")
261        with self.assertRaises(TypeError):
262            co.compress(data=HAMLET_SCENE)
263        with self.assertRaises(TypeError):
264            do.decompress(data=zlib.compress(HAMLET_SCENE))
265        x = co.compress(HAMLET_SCENE) + co.flush()
266        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
267        self.assertEqual(HAMLET_SCENE, y)
268
269    def test_compressoptions(self):
270        # specify lots of options to compressobj()
271        level = 2
272        method = zlib.DEFLATED
273        wbits = -12
274        memLevel = 9
275        strategy = zlib.Z_FILTERED
276        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
277        x1 = co.compress(HAMLET_SCENE)
278        x2 = co.flush()
279        dco = zlib.decompressobj(wbits)
280        y1 = dco.decompress(x1 + x2)
281        y2 = dco.flush()
282        self.assertEqual(HAMLET_SCENE, y1 + y2)
283
284    def test_compressincremental(self):
285        # compress object in steps, decompress object as one-shot
286        data = HAMLET_SCENE * 128
287        co = zlib.compressobj()
288        bufs = []
289        for i in range(0, len(data), 256):
290            bufs.append(co.compress(data[i:i+256]))
291        bufs.append(co.flush())
292        combuf = b''.join(bufs)
293
294        dco = zlib.decompressobj()
295        y1 = dco.decompress(b''.join(bufs))
296        y2 = dco.flush()
297        self.assertEqual(data, y1 + y2)
298
299    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
300        # compress object in steps, decompress object in steps
301        source = source or HAMLET_SCENE
302        data = source * 128
303        co = zlib.compressobj()
304        bufs = []
305        for i in range(0, len(data), cx):
306            bufs.append(co.compress(data[i:i+cx]))
307        bufs.append(co.flush())
308        combuf = b''.join(bufs)
309
310        decombuf = zlib.decompress(combuf)
311        # Test type of return value
312        self.assertIsInstance(decombuf, bytes)
313
314        self.assertEqual(data, decombuf)
315
316        dco = zlib.decompressobj()
317        bufs = []
318        for i in range(0, len(combuf), dcx):
319            bufs.append(dco.decompress(combuf[i:i+dcx]))
320            self.assertEqual(b'', dco.unconsumed_tail, ########
321                             "(A) uct should be b'': not %d long" %
322                                       len(dco.unconsumed_tail))
323            self.assertEqual(b'', dco.unused_data)
324        if flush:
325            bufs.append(dco.flush())
326        else:
327            while True:
328                chunk = dco.decompress(b'')
329                if chunk:
330                    bufs.append(chunk)
331                else:
332                    break
333        self.assertEqual(b'', dco.unconsumed_tail, ########
334                         "(B) uct should be b'': not %d long" %
335                                       len(dco.unconsumed_tail))
336        self.assertEqual(b'', dco.unused_data)
337        self.assertEqual(data, b''.join(bufs))
338        # Failure means: "decompressobj with init options failed"
339
340    def test_decompincflush(self):
341        self.test_decompinc(flush=True)
342
343    def test_decompimax(self, source=None, cx=256, dcx=64):
344        # compress in steps, decompress in length-restricted steps
345        source = source or HAMLET_SCENE
346        # Check a decompression object with max_length specified
347        data = source * 128
348        co = zlib.compressobj()
349        bufs = []
350        for i in range(0, len(data), cx):
351            bufs.append(co.compress(data[i:i+cx]))
352        bufs.append(co.flush())
353        combuf = b''.join(bufs)
354        self.assertEqual(data, zlib.decompress(combuf),
355                         'compressed data failure')
356
357        dco = zlib.decompressobj()
358        bufs = []
359        cb = combuf
360        while cb:
361            #max_length = 1 + len(cb)//10
362            chunk = dco.decompress(cb, dcx)
363            self.assertFalse(len(chunk) > dcx,
364                    'chunk too big (%d>%d)' % (len(chunk), dcx))
365            bufs.append(chunk)
366            cb = dco.unconsumed_tail
367        bufs.append(dco.flush())
368        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
369
370    def test_decompressmaxlen(self, flush=False):
371        # Check a decompression object with max_length specified
372        data = HAMLET_SCENE * 128
373        co = zlib.compressobj()
374        bufs = []
375        for i in range(0, len(data), 256):
376            bufs.append(co.compress(data[i:i+256]))
377        bufs.append(co.flush())
378        combuf = b''.join(bufs)
379        self.assertEqual(data, zlib.decompress(combuf),
380                         'compressed data failure')
381
382        dco = zlib.decompressobj()
383        bufs = []
384        cb = combuf
385        while cb:
386            max_length = 1 + len(cb)//10
387            chunk = dco.decompress(cb, max_length)
388            self.assertFalse(len(chunk) > max_length,
389                        'chunk too big (%d>%d)' % (len(chunk),max_length))
390            bufs.append(chunk)
391            cb = dco.unconsumed_tail
392        if flush:
393            bufs.append(dco.flush())
394        else:
395            while chunk:
396                chunk = dco.decompress(b'', max_length)
397                self.assertFalse(len(chunk) > max_length,
398                            'chunk too big (%d>%d)' % (len(chunk),max_length))
399                bufs.append(chunk)
400        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
401
402    def test_decompressmaxlenflush(self):
403        self.test_decompressmaxlen(flush=True)
404
405    def test_maxlenmisc(self):
406        # Misc tests of max_length
407        dco = zlib.decompressobj()
408        self.assertRaises(ValueError, dco.decompress, b"", -1)
409        self.assertEqual(b'', dco.unconsumed_tail)
410
411    def test_maxlen_large(self):
412        # Sizes up to sys.maxsize should be accepted, although zlib is
413        # internally limited to expressing sizes with unsigned int
414        data = HAMLET_SCENE * 10
415        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
416        compressed = zlib.compress(data, 1)
417        dco = zlib.decompressobj()
418        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
419
420    def test_maxlen_custom(self):
421        data = HAMLET_SCENE * 10
422        compressed = zlib.compress(data, 1)
423        dco = zlib.decompressobj()
424        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
425
426    def test_clear_unconsumed_tail(self):
427        # Issue #12050: calling decompress() without providing max_length
428        # should clear the unconsumed_tail attribute.
429        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
430        dco = zlib.decompressobj()
431        ddata = dco.decompress(cdata, 1)
432        ddata += dco.decompress(dco.unconsumed_tail)
433        self.assertEqual(dco.unconsumed_tail, b"")
434
435    def test_flushes(self):
436        # Test flush() with the various options, using all the
437        # different levels in order to provide more variations.
438        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
439                    'Z_PARTIAL_FLUSH']
440
441        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
442        # Z_BLOCK has a known failure prior to 1.2.5.3
443        if ver >= (1, 2, 5, 3):
444            sync_opt.append('Z_BLOCK')
445
446        sync_opt = [getattr(zlib, opt) for opt in sync_opt
447                    if hasattr(zlib, opt)]
448        data = HAMLET_SCENE * 8
449
450        for sync in sync_opt:
451            for level in range(10):
452                try:
453                    obj = zlib.compressobj( level )
454                    a = obj.compress( data[:3000] )
455                    b = obj.flush( sync )
456                    c = obj.compress( data[3000:] )
457                    d = obj.flush()
458                except:
459                    print("Error for flush mode={}, level={}"
460                          .format(sync, level))
461                    raise
462                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
463                                 data, ("Decompress failed: flush "
464                                        "mode=%i, level=%i") % (sync, level))
465                del obj
466
467    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
468                         'requires zlib.Z_SYNC_FLUSH')
469    def test_odd_flush(self):
470        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
471        import random
472        # Testing on 17K of "random" data
473
474        # Create compressor and decompressor objects
475        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
476        dco = zlib.decompressobj()
477
478        # Try 17K of data
479        # generate random data stream
480        try:
481            # In 2.3 and later, WichmannHill is the RNG of the bug report
482            gen = random.WichmannHill()
483        except AttributeError:
484            try:
485                # 2.2 called it Random
486                gen = random.Random()
487            except AttributeError:
488                # others might simply have a single RNG
489                gen = random
490        gen.seed(1)
491        data = genblock(1, 17 * 1024, generator=gen)
492
493        # compress, sync-flush, and decompress
494        first = co.compress(data)
495        second = co.flush(zlib.Z_SYNC_FLUSH)
496        expanded = dco.decompress(first + second)
497
498        # if decompressed data is different from the input data, choke.
499        self.assertEqual(expanded, data, "17K random source doesn't match")
500
501    def test_empty_flush(self):
502        # Test that calling .flush() on unused objects works.
503        # (Bug #1083110 -- calling .flush() on decompress objects
504        # caused a core dump.)
505
506        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
507        self.assertTrue(co.flush())  # Returns a zlib header
508        dco = zlib.decompressobj()
509        self.assertEqual(dco.flush(), b"") # Returns nothing
510
511    def test_dictionary(self):
512        h = HAMLET_SCENE
513        # Build a simulated dictionary out of the words in HAMLET.
514        words = h.split()
515        random.shuffle(words)
516        zdict = b''.join(words)
517        # Use it to compress HAMLET.
518        co = zlib.compressobj(zdict=zdict)
519        cd = co.compress(h) + co.flush()
520        # Verify that it will decompress with the dictionary.
521        dco = zlib.decompressobj(zdict=zdict)
522        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
523        # Verify that it fails when not given the dictionary.
524        dco = zlib.decompressobj()
525        self.assertRaises(zlib.error, dco.decompress, cd)
526
527    def test_dictionary_streaming(self):
528        # This simulates the reuse of a compressor object for compressing
529        # several separate data streams.
530        co = zlib.compressobj(zdict=HAMLET_SCENE)
531        do = zlib.decompressobj(zdict=HAMLET_SCENE)
532        piece = HAMLET_SCENE[1000:1500]
533        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
534        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
535        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
536        self.assertEqual(do.decompress(d0), piece)
537        self.assertEqual(do.decompress(d1), piece[100:])
538        self.assertEqual(do.decompress(d2), piece[:-100])
539
540    def test_decompress_incomplete_stream(self):
541        # This is 'foo', deflated
542        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
543        # For the record
544        self.assertEqual(zlib.decompress(x), b'foo')
545        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
546        # Omitting the stream end works with decompressor objects
547        # (see issue #8672).
548        dco = zlib.decompressobj()
549        y = dco.decompress(x[:-5])
550        y += dco.flush()
551        self.assertEqual(y, b'foo')
552
553    def test_decompress_eof(self):
554        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
555        dco = zlib.decompressobj()
556        self.assertFalse(dco.eof)
557        dco.decompress(x[:-5])
558        self.assertFalse(dco.eof)
559        dco.decompress(x[-5:])
560        self.assertTrue(dco.eof)
561        dco.flush()
562        self.assertTrue(dco.eof)
563
564    def test_decompress_eof_incomplete_stream(self):
565        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
566        dco = zlib.decompressobj()
567        self.assertFalse(dco.eof)
568        dco.decompress(x[:-5])
569        self.assertFalse(dco.eof)
570        dco.flush()
571        self.assertFalse(dco.eof)
572
573    def test_decompress_unused_data(self):
574        # Repeated calls to decompress() after EOF should accumulate data in
575        # dco.unused_data, instead of just storing the arg to the last call.
576        source = b'abcdefghijklmnopqrstuvwxyz'
577        remainder = b'0123456789'
578        y = zlib.compress(source)
579        x = y + remainder
580        for maxlen in 0, 1000:
581            for step in 1, 2, len(y), len(x):
582                dco = zlib.decompressobj()
583                data = b''
584                for i in range(0, len(x), step):
585                    if i < len(y):
586                        self.assertEqual(dco.unused_data, b'')
587                    if maxlen == 0:
588                        data += dco.decompress(x[i : i + step])
589                        self.assertEqual(dco.unconsumed_tail, b'')
590                    else:
591                        data += dco.decompress(
592                                dco.unconsumed_tail + x[i : i + step], maxlen)
593                data += dco.flush()
594                self.assertTrue(dco.eof)
595                self.assertEqual(data, source)
596                self.assertEqual(dco.unconsumed_tail, b'')
597                self.assertEqual(dco.unused_data, remainder)
598
599    # issue27164
600    def test_decompress_raw_with_dictionary(self):
601        zdict = b'abcdefghijklmnopqrstuvwxyz'
602        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
603        comp = co.compress(zdict) + co.flush()
604        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
605        uncomp = dco.decompress(comp) + dco.flush()
606        self.assertEqual(zdict, uncomp)
607
608    def test_flush_with_freed_input(self):
609        # Issue #16411: decompressor accesses input to last decompress() call
610        # in flush(), even if this object has been freed in the meanwhile.
611        input1 = b'abcdefghijklmnopqrstuvwxyz'
612        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
613        data = zlib.compress(input1)
614        dco = zlib.decompressobj()
615        dco.decompress(data, 1)
616        del data
617        data = zlib.compress(input2)
618        self.assertEqual(dco.flush(), input1[1:])
619
620    @bigmemtest(size=_4G, memuse=1)
621    def test_flush_large_length(self, size):
622        # Test flush(length) parameter greater than internal limit UINT_MAX
623        input = HAMLET_SCENE * 10
624        data = zlib.compress(input, 1)
625        dco = zlib.decompressobj()
626        dco.decompress(data, 1)
627        self.assertEqual(dco.flush(size), input[1:])
628
629    def test_flush_custom_length(self):
630        input = HAMLET_SCENE * 10
631        data = zlib.compress(input, 1)
632        dco = zlib.decompressobj()
633        dco.decompress(data, 1)
634        self.assertEqual(dco.flush(CustomInt()), input[1:])
635
636    @requires_Compress_copy
637    def test_compresscopy(self):
638        # Test copying a compression object
639        data0 = HAMLET_SCENE
640        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
641        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
642            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
643            bufs0 = []
644            bufs0.append(c0.compress(data0))
645
646            c1 = func(c0)
647            bufs1 = bufs0[:]
648
649            bufs0.append(c0.compress(data0))
650            bufs0.append(c0.flush())
651            s0 = b''.join(bufs0)
652
653            bufs1.append(c1.compress(data1))
654            bufs1.append(c1.flush())
655            s1 = b''.join(bufs1)
656
657            self.assertEqual(zlib.decompress(s0),data0+data0)
658            self.assertEqual(zlib.decompress(s1),data0+data1)
659
660    @requires_Compress_copy
661    def test_badcompresscopy(self):
662        # Test copying a compression object in an inconsistent state
663        c = zlib.compressobj()
664        c.compress(HAMLET_SCENE)
665        c.flush()
666        self.assertRaises(ValueError, c.copy)
667        self.assertRaises(ValueError, copy.copy, c)
668        self.assertRaises(ValueError, copy.deepcopy, c)
669
670    @requires_Decompress_copy
671    def test_decompresscopy(self):
672        # Test copying a decompression object
673        data = HAMLET_SCENE
674        comp = zlib.compress(data)
675        # Test type of return value
676        self.assertIsInstance(comp, bytes)
677
678        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
679            d0 = zlib.decompressobj()
680            bufs0 = []
681            bufs0.append(d0.decompress(comp[:32]))
682
683            d1 = func(d0)
684            bufs1 = bufs0[:]
685
686            bufs0.append(d0.decompress(comp[32:]))
687            s0 = b''.join(bufs0)
688
689            bufs1.append(d1.decompress(comp[32:]))
690            s1 = b''.join(bufs1)
691
692            self.assertEqual(s0,s1)
693            self.assertEqual(s0,data)
694
695    @requires_Decompress_copy
696    def test_baddecompresscopy(self):
697        # Test copying a compression object in an inconsistent state
698        data = zlib.compress(HAMLET_SCENE)
699        d = zlib.decompressobj()
700        d.decompress(data)
701        d.flush()
702        self.assertRaises(ValueError, d.copy)
703        self.assertRaises(ValueError, copy.copy, d)
704        self.assertRaises(ValueError, copy.deepcopy, d)
705
706    def test_compresspickle(self):
707        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
708            with self.assertRaises((TypeError, pickle.PicklingError)):
709                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
710
711    def test_decompresspickle(self):
712        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
713            with self.assertRaises((TypeError, pickle.PicklingError)):
714                pickle.dumps(zlib.decompressobj(), proto)
715
716    # Memory use of the following functions takes into account overallocation
717
718    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
719    def test_big_compress_buffer(self, size):
720        c = zlib.compressobj(1)
721        compress = lambda s: c.compress(s) + c.flush()
722        self.check_big_compress_buffer(size, compress)
723
724    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
725    def test_big_decompress_buffer(self, size):
726        d = zlib.decompressobj()
727        decompress = lambda s: d.decompress(s) + d.flush()
728        self.check_big_decompress_buffer(size, decompress)
729
730    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
731    @bigmemtest(size=_4G + 100, memuse=4)
732    def test_64bit_compress(self, size):
733        data = b'x' * size
734        co = zlib.compressobj(0)
735        do = zlib.decompressobj()
736        try:
737            comp = co.compress(data) + co.flush()
738            uncomp = do.decompress(comp) + do.flush()
739            self.assertEqual(uncomp, data)
740        finally:
741            comp = uncomp = data = None
742
743    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
744    @bigmemtest(size=_4G + 100, memuse=3)
745    def test_large_unused_data(self, size):
746        data = b'abcdefghijklmnop'
747        unused = b'x' * size
748        comp = zlib.compress(data) + unused
749        do = zlib.decompressobj()
750        try:
751            uncomp = do.decompress(comp) + do.flush()
752            self.assertEqual(unused, do.unused_data)
753            self.assertEqual(uncomp, data)
754        finally:
755            unused = comp = do = None
756
757    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
758    @bigmemtest(size=_4G + 100, memuse=5)
759    def test_large_unconsumed_tail(self, size):
760        data = b'x' * size
761        do = zlib.decompressobj()
762        try:
763            comp = zlib.compress(data, 0)
764            uncomp = do.decompress(comp, 1) + do.flush()
765            self.assertEqual(uncomp, data)
766            self.assertEqual(do.unconsumed_tail, b'')
767        finally:
768            comp = uncomp = data = None
769
770    def test_wbits(self):
771        # wbits=0 only supported since zlib v1.2.3.5
772        # Register "1.2.3" as "1.2.3.0"
773        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
774        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
775        if len(v) < 4:
776            v.append('0')
777        elif not v[-1].isnumeric():
778            v[-1] = '0'
779
780        v = tuple(map(int, v))
781        supports_wbits_0 = v >= (1, 2, 3, 5)
782
783        co = zlib.compressobj(level=1, wbits=15)
784        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
785        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
786        if supports_wbits_0:
787            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
788        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
789        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
790            zlib.decompress(zlib15, 14)
791        dco = zlib.decompressobj(wbits=32 + 15)
792        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
793        dco = zlib.decompressobj(wbits=14)
794        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
795            dco.decompress(zlib15)
796
797        co = zlib.compressobj(level=1, wbits=9)
798        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
799        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
800        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
801        if supports_wbits_0:
802            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
803        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
804        dco = zlib.decompressobj(wbits=32 + 9)
805        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
806
807        co = zlib.compressobj(level=1, wbits=-15)
808        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
809        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
810        dco = zlib.decompressobj(wbits=-15)
811        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
812
813        co = zlib.compressobj(level=1, wbits=-9)
814        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
815        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
816        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
817        dco = zlib.decompressobj(wbits=-9)
818        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
819
820        co = zlib.compressobj(level=1, wbits=16 + 15)
821        gzip = co.compress(HAMLET_SCENE) + co.flush()
822        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
823        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
824        dco = zlib.decompressobj(32 + 15)
825        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
826
827
828def genblock(seed, length, step=1024, generator=random):
829    """length-byte stream of random data from a seed (in step-byte blocks)."""
830    if seed is not None:
831        generator.seed(seed)
832    randint = generator.randint
833    if length < step or step < 2:
834        step = length
835    blocks = bytes()
836    for i in range(0, length, step):
837        blocks += bytes(randint(0, 255) for x in range(step))
838    return blocks
839
840
841
842def choose_lines(source, number, seed=None, generator=random):
843    """Return a list of number lines randomly chosen from the source"""
844    if seed is not None:
845        generator.seed(seed)
846    sources = source.split('\n')
847    return [generator.choice(sources) for n in range(number)]
848
849
850
851HAMLET_SCENE = b"""
852LAERTES
853
854       O, fear me not.
855       I stay too long: but here my father comes.
856
857       Enter POLONIUS
858
859       A double blessing is a double grace,
860       Occasion smiles upon a second leave.
861
862LORD POLONIUS
863
864       Yet here, Laertes! aboard, aboard, for shame!
865       The wind sits in the shoulder of your sail,
866       And you are stay'd for. There; my blessing with thee!
867       And these few precepts in thy memory
868       See thou character. Give thy thoughts no tongue,
869       Nor any unproportioned thought his act.
870       Be thou familiar, but by no means vulgar.
871       Those friends thou hast, and their adoption tried,
872       Grapple them to thy soul with hoops of steel;
873       But do not dull thy palm with entertainment
874       Of each new-hatch'd, unfledged comrade. Beware
875       Of entrance to a quarrel, but being in,
876       Bear't that the opposed may beware of thee.
877       Give every man thy ear, but few thy voice;
878       Take each man's censure, but reserve thy judgment.
879       Costly thy habit as thy purse can buy,
880       But not express'd in fancy; rich, not gaudy;
881       For the apparel oft proclaims the man,
882       And they in France of the best rank and station
883       Are of a most select and generous chief in that.
884       Neither a borrower nor a lender be;
885       For loan oft loses both itself and friend,
886       And borrowing dulls the edge of husbandry.
887       This above all: to thine ownself be true,
888       And it must follow, as the night the day,
889       Thou canst not then be false to any man.
890       Farewell: my blessing season this in thee!
891
892LAERTES
893
894       Most humbly do I take my leave, my lord.
895
896LORD POLONIUS
897
898       The time invites you; go; your servants tend.
899
900LAERTES
901
902       Farewell, Ophelia; and remember well
903       What I have said to you.
904
905OPHELIA
906
907       'Tis in my memory lock'd,
908       And you yourself shall keep the key of it.
909
910LAERTES
911
912       Farewell.
913"""
914
915
916class CustomInt:
917    def __index__(self):
918        return 100
919
920
921if __name__ == "__main__":
922    unittest.main()
923