1import unittest 2from test import support 3import binascii 4import copy 5import pickle 6import random 7import sys 8from test.support import bigmemtest, _1G, _4G 9 10zlib = support.import_module('zlib') 11 12requires_Compress_copy = unittest.skipUnless( 13 hasattr(zlib.compressobj(), "copy"), 14 'requires Compress.copy()') 15requires_Decompress_copy = unittest.skipUnless( 16 hasattr(zlib.decompressobj(), "copy"), 17 'requires Decompress.copy()') 18 19 20class VersionTestCase(unittest.TestCase): 21 22 def test_library_version(self): 23 # Test that the major version of the actual library in use matches the 24 # major version that we were compiled against. We can't guarantee that 25 # the minor versions will match (even on the machine on which the module 26 # was compiled), and the API is stable between minor versions, so 27 # testing only the major versions avoids spurious failures. 28 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 29 30 31class ChecksumTestCase(unittest.TestCase): 32 # checksum test cases 33 def test_crc32start(self): 34 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 35 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 36 37 def test_crc32empty(self): 38 self.assertEqual(zlib.crc32(b"", 0), 0) 39 self.assertEqual(zlib.crc32(b"", 1), 1) 40 self.assertEqual(zlib.crc32(b"", 432), 432) 41 42 def test_adler32start(self): 43 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 44 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 45 46 def test_adler32empty(self): 47 self.assertEqual(zlib.adler32(b"", 0), 0) 48 self.assertEqual(zlib.adler32(b"", 1), 1) 49 self.assertEqual(zlib.adler32(b"", 432), 432) 50 51 def test_penguins(self): 52 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 53 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 54 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 55 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 56 57 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 58 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 59 60 def test_crc32_adler32_unsigned(self): 61 foo = b'abcdefghijklmnop' 62 # explicitly test signed behavior 63 self.assertEqual(zlib.crc32(foo), 2486878355) 64 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 65 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 66 self.assertEqual(zlib.adler32(b'spam'), 72286642) 67 68 def test_same_as_binascii_crc32(self): 69 foo = b'abcdefghijklmnop' 70 crc = 2486878355 71 self.assertEqual(binascii.crc32(foo), crc) 72 self.assertEqual(zlib.crc32(foo), crc) 73 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 74 75 76# Issue #10276 - check that inputs >=4 GiB are handled correctly. 77class ChecksumBigBufferTestCase(unittest.TestCase): 78 79 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 80 def test_big_buffer(self, size): 81 data = b"nyan" * (_1G + 1) 82 self.assertEqual(zlib.crc32(data), 1044521549) 83 self.assertEqual(zlib.adler32(data), 2256789997) 84 85 86class ExceptionTestCase(unittest.TestCase): 87 # make sure we generate some expected errors 88 def test_badlevel(self): 89 # specifying compression level out of range causes an error 90 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 91 # accepts 0 too) 92 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 93 94 def test_badargs(self): 95 self.assertRaises(TypeError, zlib.adler32) 96 self.assertRaises(TypeError, zlib.crc32) 97 self.assertRaises(TypeError, zlib.compress) 98 self.assertRaises(TypeError, zlib.decompress) 99 for arg in (42, None, '', 'abc', (), []): 100 self.assertRaises(TypeError, zlib.adler32, arg) 101 self.assertRaises(TypeError, zlib.crc32, arg) 102 self.assertRaises(TypeError, zlib.compress, arg) 103 self.assertRaises(TypeError, zlib.decompress, arg) 104 105 def test_badcompressobj(self): 106 # verify failure on building compress object with bad params 107 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 108 # specifying total bits too large causes an error 109 self.assertRaises(ValueError, 110 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 111 112 def test_baddecompressobj(self): 113 # verify failure on building decompress object with bad params 114 self.assertRaises(ValueError, zlib.decompressobj, -1) 115 116 def test_decompressobj_badflush(self): 117 # verify failure on calling decompressobj.flush with bad params 118 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 119 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 120 121 @support.cpython_only 122 def test_overflow(self): 123 with self.assertRaisesRegex(OverflowError, 'int too large'): 124 zlib.decompress(b'', 15, sys.maxsize + 1) 125 with self.assertRaisesRegex(OverflowError, 'int too large'): 126 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 127 with self.assertRaisesRegex(OverflowError, 'int too large'): 128 zlib.decompressobj().flush(sys.maxsize + 1) 129 130 131class BaseCompressTestCase(object): 132 def check_big_compress_buffer(self, size, compress_func): 133 _1M = 1024 * 1024 134 # Generate 10 MiB worth of random, and expand it by repeating it. 135 # The assumption is that zlib's memory is not big enough to exploit 136 # such spread out redundancy. 137 data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little') 138 for i in range(10)]) 139 data = data * (size // len(data) + 1) 140 try: 141 compress_func(data) 142 finally: 143 # Release memory 144 data = None 145 146 def check_big_decompress_buffer(self, size, decompress_func): 147 data = b'x' * size 148 try: 149 compressed = zlib.compress(data, 1) 150 finally: 151 # Release memory 152 data = None 153 data = decompress_func(compressed) 154 # Sanity check 155 try: 156 self.assertEqual(len(data), size) 157 self.assertEqual(len(data.strip(b'x')), 0) 158 finally: 159 data = None 160 161 162class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 163 # Test compression in one go (whole message compression) 164 def test_speech(self): 165 x = zlib.compress(HAMLET_SCENE) 166 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 167 168 def test_keywords(self): 169 x = zlib.compress(HAMLET_SCENE, level=3) 170 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 171 with self.assertRaises(TypeError): 172 zlib.compress(data=HAMLET_SCENE, level=3) 173 self.assertEqual(zlib.decompress(x, 174 wbits=zlib.MAX_WBITS, 175 bufsize=zlib.DEF_BUF_SIZE), 176 HAMLET_SCENE) 177 178 def test_speech128(self): 179 # compress more data 180 data = HAMLET_SCENE * 128 181 x = zlib.compress(data) 182 self.assertEqual(zlib.compress(bytearray(data)), x) 183 for ob in x, bytearray(x): 184 self.assertEqual(zlib.decompress(ob), data) 185 186 def test_incomplete_stream(self): 187 # A useful error message is given 188 x = zlib.compress(HAMLET_SCENE) 189 self.assertRaisesRegex(zlib.error, 190 "Error -5 while decompressing data: incomplete or truncated stream", 191 zlib.decompress, x[:-1]) 192 193 # Memory use of the following functions takes into account overallocation 194 195 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 196 def test_big_compress_buffer(self, size): 197 compress = lambda s: zlib.compress(s, 1) 198 self.check_big_compress_buffer(size, compress) 199 200 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 201 def test_big_decompress_buffer(self, size): 202 self.check_big_decompress_buffer(size, zlib.decompress) 203 204 @bigmemtest(size=_4G, memuse=1) 205 def test_large_bufsize(self, size): 206 # Test decompress(bufsize) parameter greater than the internal limit 207 data = HAMLET_SCENE * 10 208 compressed = zlib.compress(data, 1) 209 self.assertEqual(zlib.decompress(compressed, 15, size), data) 210 211 def test_custom_bufsize(self): 212 data = HAMLET_SCENE * 10 213 compressed = zlib.compress(data, 1) 214 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 215 216 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 217 @bigmemtest(size=_4G + 100, memuse=4) 218 def test_64bit_compress(self, size): 219 data = b'x' * size 220 try: 221 comp = zlib.compress(data, 0) 222 self.assertEqual(zlib.decompress(comp), data) 223 finally: 224 comp = data = None 225 226 227class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 228 # Test compression object 229 def test_pair(self): 230 # straightforward compress/decompress objects 231 datasrc = HAMLET_SCENE * 128 232 datazip = zlib.compress(datasrc) 233 # should compress both bytes and bytearray data 234 for data in (datasrc, bytearray(datasrc)): 235 co = zlib.compressobj() 236 x1 = co.compress(data) 237 x2 = co.flush() 238 self.assertRaises(zlib.error, co.flush) # second flush should not work 239 self.assertEqual(x1 + x2, datazip) 240 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 241 dco = zlib.decompressobj() 242 y1 = dco.decompress(v1 + v2) 243 y2 = dco.flush() 244 self.assertEqual(data, y1 + y2) 245 self.assertIsInstance(dco.unconsumed_tail, bytes) 246 self.assertIsInstance(dco.unused_data, bytes) 247 248 def test_keywords(self): 249 level = 2 250 method = zlib.DEFLATED 251 wbits = -12 252 memLevel = 9 253 strategy = zlib.Z_FILTERED 254 co = zlib.compressobj(level=level, 255 method=method, 256 wbits=wbits, 257 memLevel=memLevel, 258 strategy=strategy, 259 zdict=b"") 260 do = zlib.decompressobj(wbits=wbits, zdict=b"") 261 with self.assertRaises(TypeError): 262 co.compress(data=HAMLET_SCENE) 263 with self.assertRaises(TypeError): 264 do.decompress(data=zlib.compress(HAMLET_SCENE)) 265 x = co.compress(HAMLET_SCENE) + co.flush() 266 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 267 self.assertEqual(HAMLET_SCENE, y) 268 269 def test_compressoptions(self): 270 # specify lots of options to compressobj() 271 level = 2 272 method = zlib.DEFLATED 273 wbits = -12 274 memLevel = 9 275 strategy = zlib.Z_FILTERED 276 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 277 x1 = co.compress(HAMLET_SCENE) 278 x2 = co.flush() 279 dco = zlib.decompressobj(wbits) 280 y1 = dco.decompress(x1 + x2) 281 y2 = dco.flush() 282 self.assertEqual(HAMLET_SCENE, y1 + y2) 283 284 def test_compressincremental(self): 285 # compress object in steps, decompress object as one-shot 286 data = HAMLET_SCENE * 128 287 co = zlib.compressobj() 288 bufs = [] 289 for i in range(0, len(data), 256): 290 bufs.append(co.compress(data[i:i+256])) 291 bufs.append(co.flush()) 292 combuf = b''.join(bufs) 293 294 dco = zlib.decompressobj() 295 y1 = dco.decompress(b''.join(bufs)) 296 y2 = dco.flush() 297 self.assertEqual(data, y1 + y2) 298 299 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 300 # compress object in steps, decompress object in steps 301 source = source or HAMLET_SCENE 302 data = source * 128 303 co = zlib.compressobj() 304 bufs = [] 305 for i in range(0, len(data), cx): 306 bufs.append(co.compress(data[i:i+cx])) 307 bufs.append(co.flush()) 308 combuf = b''.join(bufs) 309 310 decombuf = zlib.decompress(combuf) 311 # Test type of return value 312 self.assertIsInstance(decombuf, bytes) 313 314 self.assertEqual(data, decombuf) 315 316 dco = zlib.decompressobj() 317 bufs = [] 318 for i in range(0, len(combuf), dcx): 319 bufs.append(dco.decompress(combuf[i:i+dcx])) 320 self.assertEqual(b'', dco.unconsumed_tail, ######## 321 "(A) uct should be b'': not %d long" % 322 len(dco.unconsumed_tail)) 323 self.assertEqual(b'', dco.unused_data) 324 if flush: 325 bufs.append(dco.flush()) 326 else: 327 while True: 328 chunk = dco.decompress(b'') 329 if chunk: 330 bufs.append(chunk) 331 else: 332 break 333 self.assertEqual(b'', dco.unconsumed_tail, ######## 334 "(B) uct should be b'': not %d long" % 335 len(dco.unconsumed_tail)) 336 self.assertEqual(b'', dco.unused_data) 337 self.assertEqual(data, b''.join(bufs)) 338 # Failure means: "decompressobj with init options failed" 339 340 def test_decompincflush(self): 341 self.test_decompinc(flush=True) 342 343 def test_decompimax(self, source=None, cx=256, dcx=64): 344 # compress in steps, decompress in length-restricted steps 345 source = source or HAMLET_SCENE 346 # Check a decompression object with max_length specified 347 data = source * 128 348 co = zlib.compressobj() 349 bufs = [] 350 for i in range(0, len(data), cx): 351 bufs.append(co.compress(data[i:i+cx])) 352 bufs.append(co.flush()) 353 combuf = b''.join(bufs) 354 self.assertEqual(data, zlib.decompress(combuf), 355 'compressed data failure') 356 357 dco = zlib.decompressobj() 358 bufs = [] 359 cb = combuf 360 while cb: 361 #max_length = 1 + len(cb)//10 362 chunk = dco.decompress(cb, dcx) 363 self.assertFalse(len(chunk) > dcx, 364 'chunk too big (%d>%d)' % (len(chunk), dcx)) 365 bufs.append(chunk) 366 cb = dco.unconsumed_tail 367 bufs.append(dco.flush()) 368 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 369 370 def test_decompressmaxlen(self, flush=False): 371 # Check a decompression object with max_length specified 372 data = HAMLET_SCENE * 128 373 co = zlib.compressobj() 374 bufs = [] 375 for i in range(0, len(data), 256): 376 bufs.append(co.compress(data[i:i+256])) 377 bufs.append(co.flush()) 378 combuf = b''.join(bufs) 379 self.assertEqual(data, zlib.decompress(combuf), 380 'compressed data failure') 381 382 dco = zlib.decompressobj() 383 bufs = [] 384 cb = combuf 385 while cb: 386 max_length = 1 + len(cb)//10 387 chunk = dco.decompress(cb, max_length) 388 self.assertFalse(len(chunk) > max_length, 389 'chunk too big (%d>%d)' % (len(chunk),max_length)) 390 bufs.append(chunk) 391 cb = dco.unconsumed_tail 392 if flush: 393 bufs.append(dco.flush()) 394 else: 395 while chunk: 396 chunk = dco.decompress(b'', max_length) 397 self.assertFalse(len(chunk) > max_length, 398 'chunk too big (%d>%d)' % (len(chunk),max_length)) 399 bufs.append(chunk) 400 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 401 402 def test_decompressmaxlenflush(self): 403 self.test_decompressmaxlen(flush=True) 404 405 def test_maxlenmisc(self): 406 # Misc tests of max_length 407 dco = zlib.decompressobj() 408 self.assertRaises(ValueError, dco.decompress, b"", -1) 409 self.assertEqual(b'', dco.unconsumed_tail) 410 411 def test_maxlen_large(self): 412 # Sizes up to sys.maxsize should be accepted, although zlib is 413 # internally limited to expressing sizes with unsigned int 414 data = HAMLET_SCENE * 10 415 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 416 compressed = zlib.compress(data, 1) 417 dco = zlib.decompressobj() 418 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 419 420 def test_maxlen_custom(self): 421 data = HAMLET_SCENE * 10 422 compressed = zlib.compress(data, 1) 423 dco = zlib.decompressobj() 424 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 425 426 def test_clear_unconsumed_tail(self): 427 # Issue #12050: calling decompress() without providing max_length 428 # should clear the unconsumed_tail attribute. 429 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 430 dco = zlib.decompressobj() 431 ddata = dco.decompress(cdata, 1) 432 ddata += dco.decompress(dco.unconsumed_tail) 433 self.assertEqual(dco.unconsumed_tail, b"") 434 435 def test_flushes(self): 436 # Test flush() with the various options, using all the 437 # different levels in order to provide more variations. 438 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH', 439 'Z_PARTIAL_FLUSH'] 440 441 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.')) 442 # Z_BLOCK has a known failure prior to 1.2.5.3 443 if ver >= (1, 2, 5, 3): 444 sync_opt.append('Z_BLOCK') 445 446 sync_opt = [getattr(zlib, opt) for opt in sync_opt 447 if hasattr(zlib, opt)] 448 data = HAMLET_SCENE * 8 449 450 for sync in sync_opt: 451 for level in range(10): 452 try: 453 obj = zlib.compressobj( level ) 454 a = obj.compress( data[:3000] ) 455 b = obj.flush( sync ) 456 c = obj.compress( data[3000:] ) 457 d = obj.flush() 458 except: 459 print("Error for flush mode={}, level={}" 460 .format(sync, level)) 461 raise 462 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 463 data, ("Decompress failed: flush " 464 "mode=%i, level=%i") % (sync, level)) 465 del obj 466 467 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 468 'requires zlib.Z_SYNC_FLUSH') 469 def test_odd_flush(self): 470 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 471 import random 472 # Testing on 17K of "random" data 473 474 # Create compressor and decompressor objects 475 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 476 dco = zlib.decompressobj() 477 478 # Try 17K of data 479 # generate random data stream 480 try: 481 # In 2.3 and later, WichmannHill is the RNG of the bug report 482 gen = random.WichmannHill() 483 except AttributeError: 484 try: 485 # 2.2 called it Random 486 gen = random.Random() 487 except AttributeError: 488 # others might simply have a single RNG 489 gen = random 490 gen.seed(1) 491 data = genblock(1, 17 * 1024, generator=gen) 492 493 # compress, sync-flush, and decompress 494 first = co.compress(data) 495 second = co.flush(zlib.Z_SYNC_FLUSH) 496 expanded = dco.decompress(first + second) 497 498 # if decompressed data is different from the input data, choke. 499 self.assertEqual(expanded, data, "17K random source doesn't match") 500 501 def test_empty_flush(self): 502 # Test that calling .flush() on unused objects works. 503 # (Bug #1083110 -- calling .flush() on decompress objects 504 # caused a core dump.) 505 506 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 507 self.assertTrue(co.flush()) # Returns a zlib header 508 dco = zlib.decompressobj() 509 self.assertEqual(dco.flush(), b"") # Returns nothing 510 511 def test_dictionary(self): 512 h = HAMLET_SCENE 513 # Build a simulated dictionary out of the words in HAMLET. 514 words = h.split() 515 random.shuffle(words) 516 zdict = b''.join(words) 517 # Use it to compress HAMLET. 518 co = zlib.compressobj(zdict=zdict) 519 cd = co.compress(h) + co.flush() 520 # Verify that it will decompress with the dictionary. 521 dco = zlib.decompressobj(zdict=zdict) 522 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 523 # Verify that it fails when not given the dictionary. 524 dco = zlib.decompressobj() 525 self.assertRaises(zlib.error, dco.decompress, cd) 526 527 def test_dictionary_streaming(self): 528 # This simulates the reuse of a compressor object for compressing 529 # several separate data streams. 530 co = zlib.compressobj(zdict=HAMLET_SCENE) 531 do = zlib.decompressobj(zdict=HAMLET_SCENE) 532 piece = HAMLET_SCENE[1000:1500] 533 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 534 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 535 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 536 self.assertEqual(do.decompress(d0), piece) 537 self.assertEqual(do.decompress(d1), piece[100:]) 538 self.assertEqual(do.decompress(d2), piece[:-100]) 539 540 def test_decompress_incomplete_stream(self): 541 # This is 'foo', deflated 542 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 543 # For the record 544 self.assertEqual(zlib.decompress(x), b'foo') 545 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 546 # Omitting the stream end works with decompressor objects 547 # (see issue #8672). 548 dco = zlib.decompressobj() 549 y = dco.decompress(x[:-5]) 550 y += dco.flush() 551 self.assertEqual(y, b'foo') 552 553 def test_decompress_eof(self): 554 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 555 dco = zlib.decompressobj() 556 self.assertFalse(dco.eof) 557 dco.decompress(x[:-5]) 558 self.assertFalse(dco.eof) 559 dco.decompress(x[-5:]) 560 self.assertTrue(dco.eof) 561 dco.flush() 562 self.assertTrue(dco.eof) 563 564 def test_decompress_eof_incomplete_stream(self): 565 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 566 dco = zlib.decompressobj() 567 self.assertFalse(dco.eof) 568 dco.decompress(x[:-5]) 569 self.assertFalse(dco.eof) 570 dco.flush() 571 self.assertFalse(dco.eof) 572 573 def test_decompress_unused_data(self): 574 # Repeated calls to decompress() after EOF should accumulate data in 575 # dco.unused_data, instead of just storing the arg to the last call. 576 source = b'abcdefghijklmnopqrstuvwxyz' 577 remainder = b'0123456789' 578 y = zlib.compress(source) 579 x = y + remainder 580 for maxlen in 0, 1000: 581 for step in 1, 2, len(y), len(x): 582 dco = zlib.decompressobj() 583 data = b'' 584 for i in range(0, len(x), step): 585 if i < len(y): 586 self.assertEqual(dco.unused_data, b'') 587 if maxlen == 0: 588 data += dco.decompress(x[i : i + step]) 589 self.assertEqual(dco.unconsumed_tail, b'') 590 else: 591 data += dco.decompress( 592 dco.unconsumed_tail + x[i : i + step], maxlen) 593 data += dco.flush() 594 self.assertTrue(dco.eof) 595 self.assertEqual(data, source) 596 self.assertEqual(dco.unconsumed_tail, b'') 597 self.assertEqual(dco.unused_data, remainder) 598 599 # issue27164 600 def test_decompress_raw_with_dictionary(self): 601 zdict = b'abcdefghijklmnopqrstuvwxyz' 602 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 603 comp = co.compress(zdict) + co.flush() 604 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 605 uncomp = dco.decompress(comp) + dco.flush() 606 self.assertEqual(zdict, uncomp) 607 608 def test_flush_with_freed_input(self): 609 # Issue #16411: decompressor accesses input to last decompress() call 610 # in flush(), even if this object has been freed in the meanwhile. 611 input1 = b'abcdefghijklmnopqrstuvwxyz' 612 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 613 data = zlib.compress(input1) 614 dco = zlib.decompressobj() 615 dco.decompress(data, 1) 616 del data 617 data = zlib.compress(input2) 618 self.assertEqual(dco.flush(), input1[1:]) 619 620 @bigmemtest(size=_4G, memuse=1) 621 def test_flush_large_length(self, size): 622 # Test flush(length) parameter greater than internal limit UINT_MAX 623 input = HAMLET_SCENE * 10 624 data = zlib.compress(input, 1) 625 dco = zlib.decompressobj() 626 dco.decompress(data, 1) 627 self.assertEqual(dco.flush(size), input[1:]) 628 629 def test_flush_custom_length(self): 630 input = HAMLET_SCENE * 10 631 data = zlib.compress(input, 1) 632 dco = zlib.decompressobj() 633 dco.decompress(data, 1) 634 self.assertEqual(dco.flush(CustomInt()), input[1:]) 635 636 @requires_Compress_copy 637 def test_compresscopy(self): 638 # Test copying a compression object 639 data0 = HAMLET_SCENE 640 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 641 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 642 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 643 bufs0 = [] 644 bufs0.append(c0.compress(data0)) 645 646 c1 = func(c0) 647 bufs1 = bufs0[:] 648 649 bufs0.append(c0.compress(data0)) 650 bufs0.append(c0.flush()) 651 s0 = b''.join(bufs0) 652 653 bufs1.append(c1.compress(data1)) 654 bufs1.append(c1.flush()) 655 s1 = b''.join(bufs1) 656 657 self.assertEqual(zlib.decompress(s0),data0+data0) 658 self.assertEqual(zlib.decompress(s1),data0+data1) 659 660 @requires_Compress_copy 661 def test_badcompresscopy(self): 662 # Test copying a compression object in an inconsistent state 663 c = zlib.compressobj() 664 c.compress(HAMLET_SCENE) 665 c.flush() 666 self.assertRaises(ValueError, c.copy) 667 self.assertRaises(ValueError, copy.copy, c) 668 self.assertRaises(ValueError, copy.deepcopy, c) 669 670 @requires_Decompress_copy 671 def test_decompresscopy(self): 672 # Test copying a decompression object 673 data = HAMLET_SCENE 674 comp = zlib.compress(data) 675 # Test type of return value 676 self.assertIsInstance(comp, bytes) 677 678 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 679 d0 = zlib.decompressobj() 680 bufs0 = [] 681 bufs0.append(d0.decompress(comp[:32])) 682 683 d1 = func(d0) 684 bufs1 = bufs0[:] 685 686 bufs0.append(d0.decompress(comp[32:])) 687 s0 = b''.join(bufs0) 688 689 bufs1.append(d1.decompress(comp[32:])) 690 s1 = b''.join(bufs1) 691 692 self.assertEqual(s0,s1) 693 self.assertEqual(s0,data) 694 695 @requires_Decompress_copy 696 def test_baddecompresscopy(self): 697 # Test copying a compression object in an inconsistent state 698 data = zlib.compress(HAMLET_SCENE) 699 d = zlib.decompressobj() 700 d.decompress(data) 701 d.flush() 702 self.assertRaises(ValueError, d.copy) 703 self.assertRaises(ValueError, copy.copy, d) 704 self.assertRaises(ValueError, copy.deepcopy, d) 705 706 def test_compresspickle(self): 707 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 708 with self.assertRaises((TypeError, pickle.PicklingError)): 709 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 710 711 def test_decompresspickle(self): 712 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 713 with self.assertRaises((TypeError, pickle.PicklingError)): 714 pickle.dumps(zlib.decompressobj(), proto) 715 716 # Memory use of the following functions takes into account overallocation 717 718 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 719 def test_big_compress_buffer(self, size): 720 c = zlib.compressobj(1) 721 compress = lambda s: c.compress(s) + c.flush() 722 self.check_big_compress_buffer(size, compress) 723 724 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 725 def test_big_decompress_buffer(self, size): 726 d = zlib.decompressobj() 727 decompress = lambda s: d.decompress(s) + d.flush() 728 self.check_big_decompress_buffer(size, decompress) 729 730 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 731 @bigmemtest(size=_4G + 100, memuse=4) 732 def test_64bit_compress(self, size): 733 data = b'x' * size 734 co = zlib.compressobj(0) 735 do = zlib.decompressobj() 736 try: 737 comp = co.compress(data) + co.flush() 738 uncomp = do.decompress(comp) + do.flush() 739 self.assertEqual(uncomp, data) 740 finally: 741 comp = uncomp = data = None 742 743 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 744 @bigmemtest(size=_4G + 100, memuse=3) 745 def test_large_unused_data(self, size): 746 data = b'abcdefghijklmnop' 747 unused = b'x' * size 748 comp = zlib.compress(data) + unused 749 do = zlib.decompressobj() 750 try: 751 uncomp = do.decompress(comp) + do.flush() 752 self.assertEqual(unused, do.unused_data) 753 self.assertEqual(uncomp, data) 754 finally: 755 unused = comp = do = None 756 757 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 758 @bigmemtest(size=_4G + 100, memuse=5) 759 def test_large_unconsumed_tail(self, size): 760 data = b'x' * size 761 do = zlib.decompressobj() 762 try: 763 comp = zlib.compress(data, 0) 764 uncomp = do.decompress(comp, 1) + do.flush() 765 self.assertEqual(uncomp, data) 766 self.assertEqual(do.unconsumed_tail, b'') 767 finally: 768 comp = uncomp = data = None 769 770 def test_wbits(self): 771 # wbits=0 only supported since zlib v1.2.3.5 772 # Register "1.2.3" as "1.2.3.0" 773 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux" 774 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.') 775 if len(v) < 4: 776 v.append('0') 777 elif not v[-1].isnumeric(): 778 v[-1] = '0' 779 780 v = tuple(map(int, v)) 781 supports_wbits_0 = v >= (1, 2, 3, 5) 782 783 co = zlib.compressobj(level=1, wbits=15) 784 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 785 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 786 if supports_wbits_0: 787 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 788 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 789 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 790 zlib.decompress(zlib15, 14) 791 dco = zlib.decompressobj(wbits=32 + 15) 792 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 793 dco = zlib.decompressobj(wbits=14) 794 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 795 dco.decompress(zlib15) 796 797 co = zlib.compressobj(level=1, wbits=9) 798 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 799 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 800 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 801 if supports_wbits_0: 802 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 803 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 804 dco = zlib.decompressobj(wbits=32 + 9) 805 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 806 807 co = zlib.compressobj(level=1, wbits=-15) 808 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 809 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 810 dco = zlib.decompressobj(wbits=-15) 811 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 812 813 co = zlib.compressobj(level=1, wbits=-9) 814 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 815 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 816 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 817 dco = zlib.decompressobj(wbits=-9) 818 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 819 820 co = zlib.compressobj(level=1, wbits=16 + 15) 821 gzip = co.compress(HAMLET_SCENE) + co.flush() 822 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 823 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 824 dco = zlib.decompressobj(32 + 15) 825 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 826 827 828def genblock(seed, length, step=1024, generator=random): 829 """length-byte stream of random data from a seed (in step-byte blocks).""" 830 if seed is not None: 831 generator.seed(seed) 832 randint = generator.randint 833 if length < step or step < 2: 834 step = length 835 blocks = bytes() 836 for i in range(0, length, step): 837 blocks += bytes(randint(0, 255) for x in range(step)) 838 return blocks 839 840 841 842def choose_lines(source, number, seed=None, generator=random): 843 """Return a list of number lines randomly chosen from the source""" 844 if seed is not None: 845 generator.seed(seed) 846 sources = source.split('\n') 847 return [generator.choice(sources) for n in range(number)] 848 849 850 851HAMLET_SCENE = b""" 852LAERTES 853 854 O, fear me not. 855 I stay too long: but here my father comes. 856 857 Enter POLONIUS 858 859 A double blessing is a double grace, 860 Occasion smiles upon a second leave. 861 862LORD POLONIUS 863 864 Yet here, Laertes! aboard, aboard, for shame! 865 The wind sits in the shoulder of your sail, 866 And you are stay'd for. There; my blessing with thee! 867 And these few precepts in thy memory 868 See thou character. Give thy thoughts no tongue, 869 Nor any unproportioned thought his act. 870 Be thou familiar, but by no means vulgar. 871 Those friends thou hast, and their adoption tried, 872 Grapple them to thy soul with hoops of steel; 873 But do not dull thy palm with entertainment 874 Of each new-hatch'd, unfledged comrade. Beware 875 Of entrance to a quarrel, but being in, 876 Bear't that the opposed may beware of thee. 877 Give every man thy ear, but few thy voice; 878 Take each man's censure, but reserve thy judgment. 879 Costly thy habit as thy purse can buy, 880 But not express'd in fancy; rich, not gaudy; 881 For the apparel oft proclaims the man, 882 And they in France of the best rank and station 883 Are of a most select and generous chief in that. 884 Neither a borrower nor a lender be; 885 For loan oft loses both itself and friend, 886 And borrowing dulls the edge of husbandry. 887 This above all: to thine ownself be true, 888 And it must follow, as the night the day, 889 Thou canst not then be false to any man. 890 Farewell: my blessing season this in thee! 891 892LAERTES 893 894 Most humbly do I take my leave, my lord. 895 896LORD POLONIUS 897 898 The time invites you; go; your servants tend. 899 900LAERTES 901 902 Farewell, Ophelia; and remember well 903 What I have said to you. 904 905OPHELIA 906 907 'Tis in my memory lock'd, 908 And you yourself shall keep the key of it. 909 910LAERTES 911 912 Farewell. 913""" 914 915 916class CustomInt: 917 def __index__(self): 918 return 100 919 920 921if __name__ == "__main__": 922 unittest.main() 923