1from test import support 2from test.support import bigmemtest, _4G 3 4import unittest 5from io import BytesIO, DEFAULT_BUFFER_SIZE 6import os 7import pickle 8import glob 9import tempfile 10import pathlib 11import random 12import shutil 13import subprocess 14import threading 15from test.support import unlink 16import _compression 17import sys 18 19 20# Skip tests if the bz2 module doesn't exist. 21bz2 = support.import_module('bz2') 22from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 23 24has_cmdline_bunzip2 = None 25 26def ext_decompress(data): 27 global has_cmdline_bunzip2 28 if has_cmdline_bunzip2 is None: 29 has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) 30 if has_cmdline_bunzip2: 31 return subprocess.check_output(['bunzip2'], input=data) 32 else: 33 return bz2.decompress(data) 34 35class BaseTest(unittest.TestCase): 36 "Base for other testcases." 37 38 TEXT_LINES = [ 39 b'root:x:0:0:root:/root:/bin/bash\n', 40 b'bin:x:1:1:bin:/bin:\n', 41 b'daemon:x:2:2:daemon:/sbin:\n', 42 b'adm:x:3:4:adm:/var/adm:\n', 43 b'lp:x:4:7:lp:/var/spool/lpd:\n', 44 b'sync:x:5:0:sync:/sbin:/bin/sync\n', 45 b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n', 46 b'halt:x:7:0:halt:/sbin:/sbin/halt\n', 47 b'mail:x:8:12:mail:/var/spool/mail:\n', 48 b'news:x:9:13:news:/var/spool/news:\n', 49 b'uucp:x:10:14:uucp:/var/spool/uucp:\n', 50 b'operator:x:11:0:operator:/root:\n', 51 b'games:x:12:100:games:/usr/games:\n', 52 b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n', 53 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', 54 b'nobody:x:65534:65534:Nobody:/home:\n', 55 b'postfix:x:100:101:postfix:/var/spool/postfix:\n', 56 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', 57 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', 58 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', 59 b'www:x:103:104::/var/www:/bin/false\n', 60 ] 61 TEXT = b''.join(TEXT_LINES) 62 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 63 EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' 64 BAD_DATA = b'this is not a valid bzip2 file' 65 66 # Some tests need more than one block of uncompressed data. Since one block 67 # is at least 100,000 bytes, we gather some data dynamically and compress it. 68 # Note that this assumes that compression works correctly, so we cannot 69 # simply use the bigger test data for all tests. 70 test_size = 0 71 BIG_TEXT = bytearray(128*1024) 72 for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')): 73 with open(fname, 'rb') as fh: 74 test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) 75 if test_size > 128*1024: 76 break 77 BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) 78 79 def setUp(self): 80 fd, self.filename = tempfile.mkstemp() 81 os.close(fd) 82 83 def tearDown(self): 84 unlink(self.filename) 85 86 87class BZ2FileTest(BaseTest): 88 "Test the BZ2File class." 89 90 def createTempFile(self, streams=1, suffix=b""): 91 with open(self.filename, "wb") as f: 92 f.write(self.DATA * streams) 93 f.write(suffix) 94 95 def testBadArgs(self): 96 self.assertRaises(TypeError, BZ2File, 123.456) 97 self.assertRaises(ValueError, BZ2File, os.devnull, "z") 98 self.assertRaises(ValueError, BZ2File, os.devnull, "rx") 99 self.assertRaises(ValueError, BZ2File, os.devnull, "rbt") 100 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0) 101 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10) 102 103 def testRead(self): 104 self.createTempFile() 105 with BZ2File(self.filename) as bz2f: 106 self.assertRaises(TypeError, bz2f.read, float()) 107 self.assertEqual(bz2f.read(), self.TEXT) 108 109 def testReadBadFile(self): 110 self.createTempFile(streams=0, suffix=self.BAD_DATA) 111 with BZ2File(self.filename) as bz2f: 112 self.assertRaises(OSError, bz2f.read) 113 114 def testReadMultiStream(self): 115 self.createTempFile(streams=5) 116 with BZ2File(self.filename) as bz2f: 117 self.assertRaises(TypeError, bz2f.read, float()) 118 self.assertEqual(bz2f.read(), self.TEXT * 5) 119 120 def testReadMonkeyMultiStream(self): 121 # Test BZ2File.read() on a multi-stream archive where a stream 122 # boundary coincides with the end of the raw read buffer. 123 buffer_size = _compression.BUFFER_SIZE 124 _compression.BUFFER_SIZE = len(self.DATA) 125 try: 126 self.createTempFile(streams=5) 127 with BZ2File(self.filename) as bz2f: 128 self.assertRaises(TypeError, bz2f.read, float()) 129 self.assertEqual(bz2f.read(), self.TEXT * 5) 130 finally: 131 _compression.BUFFER_SIZE = buffer_size 132 133 def testReadTrailingJunk(self): 134 self.createTempFile(suffix=self.BAD_DATA) 135 with BZ2File(self.filename) as bz2f: 136 self.assertEqual(bz2f.read(), self.TEXT) 137 138 def testReadMultiStreamTrailingJunk(self): 139 self.createTempFile(streams=5, suffix=self.BAD_DATA) 140 with BZ2File(self.filename) as bz2f: 141 self.assertEqual(bz2f.read(), self.TEXT * 5) 142 143 def testRead0(self): 144 self.createTempFile() 145 with BZ2File(self.filename) as bz2f: 146 self.assertRaises(TypeError, bz2f.read, float()) 147 self.assertEqual(bz2f.read(0), b"") 148 149 def testReadChunk10(self): 150 self.createTempFile() 151 with BZ2File(self.filename) as bz2f: 152 text = b'' 153 while True: 154 str = bz2f.read(10) 155 if not str: 156 break 157 text += str 158 self.assertEqual(text, self.TEXT) 159 160 def testReadChunk10MultiStream(self): 161 self.createTempFile(streams=5) 162 with BZ2File(self.filename) as bz2f: 163 text = b'' 164 while True: 165 str = bz2f.read(10) 166 if not str: 167 break 168 text += str 169 self.assertEqual(text, self.TEXT * 5) 170 171 def testRead100(self): 172 self.createTempFile() 173 with BZ2File(self.filename) as bz2f: 174 self.assertEqual(bz2f.read(100), self.TEXT[:100]) 175 176 def testPeek(self): 177 self.createTempFile() 178 with BZ2File(self.filename) as bz2f: 179 pdata = bz2f.peek() 180 self.assertNotEqual(len(pdata), 0) 181 self.assertTrue(self.TEXT.startswith(pdata)) 182 self.assertEqual(bz2f.read(), self.TEXT) 183 184 def testReadInto(self): 185 self.createTempFile() 186 with BZ2File(self.filename) as bz2f: 187 n = 128 188 b = bytearray(n) 189 self.assertEqual(bz2f.readinto(b), n) 190 self.assertEqual(b, self.TEXT[:n]) 191 n = len(self.TEXT) - n 192 b = bytearray(len(self.TEXT)) 193 self.assertEqual(bz2f.readinto(b), n) 194 self.assertEqual(b[:n], self.TEXT[-n:]) 195 196 def testReadLine(self): 197 self.createTempFile() 198 with BZ2File(self.filename) as bz2f: 199 self.assertRaises(TypeError, bz2f.readline, None) 200 for line in self.TEXT_LINES: 201 self.assertEqual(bz2f.readline(), line) 202 203 def testReadLineMultiStream(self): 204 self.createTempFile(streams=5) 205 with BZ2File(self.filename) as bz2f: 206 self.assertRaises(TypeError, bz2f.readline, None) 207 for line in self.TEXT_LINES * 5: 208 self.assertEqual(bz2f.readline(), line) 209 210 def testReadLines(self): 211 self.createTempFile() 212 with BZ2File(self.filename) as bz2f: 213 self.assertRaises(TypeError, bz2f.readlines, None) 214 self.assertEqual(bz2f.readlines(), self.TEXT_LINES) 215 216 def testReadLinesMultiStream(self): 217 self.createTempFile(streams=5) 218 with BZ2File(self.filename) as bz2f: 219 self.assertRaises(TypeError, bz2f.readlines, None) 220 self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5) 221 222 def testIterator(self): 223 self.createTempFile() 224 with BZ2File(self.filename) as bz2f: 225 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES) 226 227 def testIteratorMultiStream(self): 228 self.createTempFile(streams=5) 229 with BZ2File(self.filename) as bz2f: 230 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5) 231 232 def testClosedIteratorDeadlock(self): 233 # Issue #3309: Iteration on a closed BZ2File should release the lock. 234 self.createTempFile() 235 bz2f = BZ2File(self.filename) 236 bz2f.close() 237 self.assertRaises(ValueError, next, bz2f) 238 # This call will deadlock if the above call failed to release the lock. 239 self.assertRaises(ValueError, bz2f.readlines) 240 241 def testWrite(self): 242 with BZ2File(self.filename, "w") as bz2f: 243 self.assertRaises(TypeError, bz2f.write) 244 bz2f.write(self.TEXT) 245 with open(self.filename, 'rb') as f: 246 self.assertEqual(ext_decompress(f.read()), self.TEXT) 247 248 def testWriteChunks10(self): 249 with BZ2File(self.filename, "w") as bz2f: 250 n = 0 251 while True: 252 str = self.TEXT[n*10:(n+1)*10] 253 if not str: 254 break 255 bz2f.write(str) 256 n += 1 257 with open(self.filename, 'rb') as f: 258 self.assertEqual(ext_decompress(f.read()), self.TEXT) 259 260 def testWriteNonDefaultCompressLevel(self): 261 expected = bz2.compress(self.TEXT, compresslevel=5) 262 with BZ2File(self.filename, "w", compresslevel=5) as bz2f: 263 bz2f.write(self.TEXT) 264 with open(self.filename, "rb") as f: 265 self.assertEqual(f.read(), expected) 266 267 def testWriteLines(self): 268 with BZ2File(self.filename, "w") as bz2f: 269 self.assertRaises(TypeError, bz2f.writelines) 270 bz2f.writelines(self.TEXT_LINES) 271 # Issue #1535500: Calling writelines() on a closed BZ2File 272 # should raise an exception. 273 self.assertRaises(ValueError, bz2f.writelines, ["a"]) 274 with open(self.filename, 'rb') as f: 275 self.assertEqual(ext_decompress(f.read()), self.TEXT) 276 277 def testWriteMethodsOnReadOnlyFile(self): 278 with BZ2File(self.filename, "w") as bz2f: 279 bz2f.write(b"abc") 280 281 with BZ2File(self.filename, "r") as bz2f: 282 self.assertRaises(OSError, bz2f.write, b"a") 283 self.assertRaises(OSError, bz2f.writelines, [b"a"]) 284 285 def testAppend(self): 286 with BZ2File(self.filename, "w") as bz2f: 287 self.assertRaises(TypeError, bz2f.write) 288 bz2f.write(self.TEXT) 289 with BZ2File(self.filename, "a") as bz2f: 290 self.assertRaises(TypeError, bz2f.write) 291 bz2f.write(self.TEXT) 292 with open(self.filename, 'rb') as f: 293 self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) 294 295 def testSeekForward(self): 296 self.createTempFile() 297 with BZ2File(self.filename) as bz2f: 298 self.assertRaises(TypeError, bz2f.seek) 299 bz2f.seek(150) 300 self.assertEqual(bz2f.read(), self.TEXT[150:]) 301 302 def testSeekForwardAcrossStreams(self): 303 self.createTempFile(streams=2) 304 with BZ2File(self.filename) as bz2f: 305 self.assertRaises(TypeError, bz2f.seek) 306 bz2f.seek(len(self.TEXT) + 150) 307 self.assertEqual(bz2f.read(), self.TEXT[150:]) 308 309 def testSeekBackwards(self): 310 self.createTempFile() 311 with BZ2File(self.filename) as bz2f: 312 bz2f.read(500) 313 bz2f.seek(-150, 1) 314 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 315 316 def testSeekBackwardsAcrossStreams(self): 317 self.createTempFile(streams=2) 318 with BZ2File(self.filename) as bz2f: 319 readto = len(self.TEXT) + 100 320 while readto > 0: 321 readto -= len(bz2f.read(readto)) 322 bz2f.seek(-150, 1) 323 self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) 324 325 def testSeekBackwardsFromEnd(self): 326 self.createTempFile() 327 with BZ2File(self.filename) as bz2f: 328 bz2f.seek(-150, 2) 329 self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) 330 331 def testSeekBackwardsFromEndAcrossStreams(self): 332 self.createTempFile(streams=2) 333 with BZ2File(self.filename) as bz2f: 334 bz2f.seek(-1000, 2) 335 self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) 336 337 def testSeekPostEnd(self): 338 self.createTempFile() 339 with BZ2File(self.filename) as bz2f: 340 bz2f.seek(150000) 341 self.assertEqual(bz2f.tell(), len(self.TEXT)) 342 self.assertEqual(bz2f.read(), b"") 343 344 def testSeekPostEndMultiStream(self): 345 self.createTempFile(streams=5) 346 with BZ2File(self.filename) as bz2f: 347 bz2f.seek(150000) 348 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 349 self.assertEqual(bz2f.read(), b"") 350 351 def testSeekPostEndTwice(self): 352 self.createTempFile() 353 with BZ2File(self.filename) as bz2f: 354 bz2f.seek(150000) 355 bz2f.seek(150000) 356 self.assertEqual(bz2f.tell(), len(self.TEXT)) 357 self.assertEqual(bz2f.read(), b"") 358 359 def testSeekPostEndTwiceMultiStream(self): 360 self.createTempFile(streams=5) 361 with BZ2File(self.filename) as bz2f: 362 bz2f.seek(150000) 363 bz2f.seek(150000) 364 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 365 self.assertEqual(bz2f.read(), b"") 366 367 def testSeekPreStart(self): 368 self.createTempFile() 369 with BZ2File(self.filename) as bz2f: 370 bz2f.seek(-150) 371 self.assertEqual(bz2f.tell(), 0) 372 self.assertEqual(bz2f.read(), self.TEXT) 373 374 def testSeekPreStartMultiStream(self): 375 self.createTempFile(streams=2) 376 with BZ2File(self.filename) as bz2f: 377 bz2f.seek(-150) 378 self.assertEqual(bz2f.tell(), 0) 379 self.assertEqual(bz2f.read(), self.TEXT * 2) 380 381 def testFileno(self): 382 self.createTempFile() 383 with open(self.filename, 'rb') as rawf: 384 bz2f = BZ2File(rawf) 385 try: 386 self.assertEqual(bz2f.fileno(), rawf.fileno()) 387 finally: 388 bz2f.close() 389 self.assertRaises(ValueError, bz2f.fileno) 390 391 def testSeekable(self): 392 bz2f = BZ2File(BytesIO(self.DATA)) 393 try: 394 self.assertTrue(bz2f.seekable()) 395 bz2f.read() 396 self.assertTrue(bz2f.seekable()) 397 finally: 398 bz2f.close() 399 self.assertRaises(ValueError, bz2f.seekable) 400 401 bz2f = BZ2File(BytesIO(), "w") 402 try: 403 self.assertFalse(bz2f.seekable()) 404 finally: 405 bz2f.close() 406 self.assertRaises(ValueError, bz2f.seekable) 407 408 src = BytesIO(self.DATA) 409 src.seekable = lambda: False 410 bz2f = BZ2File(src) 411 try: 412 self.assertFalse(bz2f.seekable()) 413 finally: 414 bz2f.close() 415 self.assertRaises(ValueError, bz2f.seekable) 416 417 def testReadable(self): 418 bz2f = BZ2File(BytesIO(self.DATA)) 419 try: 420 self.assertTrue(bz2f.readable()) 421 bz2f.read() 422 self.assertTrue(bz2f.readable()) 423 finally: 424 bz2f.close() 425 self.assertRaises(ValueError, bz2f.readable) 426 427 bz2f = BZ2File(BytesIO(), "w") 428 try: 429 self.assertFalse(bz2f.readable()) 430 finally: 431 bz2f.close() 432 self.assertRaises(ValueError, bz2f.readable) 433 434 def testWritable(self): 435 bz2f = BZ2File(BytesIO(self.DATA)) 436 try: 437 self.assertFalse(bz2f.writable()) 438 bz2f.read() 439 self.assertFalse(bz2f.writable()) 440 finally: 441 bz2f.close() 442 self.assertRaises(ValueError, bz2f.writable) 443 444 bz2f = BZ2File(BytesIO(), "w") 445 try: 446 self.assertTrue(bz2f.writable()) 447 finally: 448 bz2f.close() 449 self.assertRaises(ValueError, bz2f.writable) 450 451 def testOpenDel(self): 452 self.createTempFile() 453 for i in range(10000): 454 o = BZ2File(self.filename) 455 del o 456 457 def testOpenNonexistent(self): 458 self.assertRaises(OSError, BZ2File, "/non/existent") 459 460 def testReadlinesNoNewline(self): 461 # Issue #1191043: readlines() fails on a file containing no newline. 462 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' 463 with open(self.filename, "wb") as f: 464 f.write(data) 465 with BZ2File(self.filename) as bz2f: 466 lines = bz2f.readlines() 467 self.assertEqual(lines, [b'Test']) 468 with BZ2File(self.filename) as bz2f: 469 xlines = list(bz2f.readlines()) 470 self.assertEqual(xlines, [b'Test']) 471 472 def testContextProtocol(self): 473 f = None 474 with BZ2File(self.filename, "wb") as f: 475 f.write(b"xxx") 476 f = BZ2File(self.filename, "rb") 477 f.close() 478 try: 479 with f: 480 pass 481 except ValueError: 482 pass 483 else: 484 self.fail("__enter__ on a closed file didn't raise an exception") 485 try: 486 with BZ2File(self.filename, "wb") as f: 487 1/0 488 except ZeroDivisionError: 489 pass 490 else: 491 self.fail("1/0 didn't raise an exception") 492 493 def testThreading(self): 494 # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. 495 data = b"1" * 2**20 496 nthreads = 10 497 with BZ2File(self.filename, 'wb') as f: 498 def comp(): 499 for i in range(5): 500 f.write(data) 501 threads = [threading.Thread(target=comp) for i in range(nthreads)] 502 with support.start_threads(threads): 503 pass 504 505 def testMixedIterationAndReads(self): 506 self.createTempFile() 507 linelen = len(self.TEXT_LINES[0]) 508 halflen = linelen // 2 509 with BZ2File(self.filename) as bz2f: 510 bz2f.read(halflen) 511 self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) 512 self.assertEqual(bz2f.read(), self.TEXT[linelen:]) 513 with BZ2File(self.filename) as bz2f: 514 bz2f.readline() 515 self.assertEqual(next(bz2f), self.TEXT_LINES[1]) 516 self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) 517 with BZ2File(self.filename) as bz2f: 518 bz2f.readlines() 519 self.assertRaises(StopIteration, next, bz2f) 520 self.assertEqual(bz2f.readlines(), []) 521 522 def testMultiStreamOrdering(self): 523 # Test the ordering of streams when reading a multi-stream archive. 524 data1 = b"foo" * 1000 525 data2 = b"bar" * 1000 526 with BZ2File(self.filename, "w") as bz2f: 527 bz2f.write(data1) 528 with BZ2File(self.filename, "a") as bz2f: 529 bz2f.write(data2) 530 with BZ2File(self.filename) as bz2f: 531 self.assertEqual(bz2f.read(), data1 + data2) 532 533 def testOpenBytesFilename(self): 534 str_filename = self.filename 535 try: 536 bytes_filename = str_filename.encode("ascii") 537 except UnicodeEncodeError: 538 self.skipTest("Temporary file name needs to be ASCII") 539 with BZ2File(bytes_filename, "wb") as f: 540 f.write(self.DATA) 541 with BZ2File(bytes_filename, "rb") as f: 542 self.assertEqual(f.read(), self.DATA) 543 # Sanity check that we are actually operating on the right file. 544 with BZ2File(str_filename, "rb") as f: 545 self.assertEqual(f.read(), self.DATA) 546 547 def testOpenPathLikeFilename(self): 548 filename = pathlib.Path(self.filename) 549 with BZ2File(filename, "wb") as f: 550 f.write(self.DATA) 551 with BZ2File(filename, "rb") as f: 552 self.assertEqual(f.read(), self.DATA) 553 554 def testDecompressLimited(self): 555 """Decompressed data buffering should be limited""" 556 bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9) 557 self.assertLess(len(bomb), _compression.BUFFER_SIZE) 558 559 decomp = BZ2File(BytesIO(bomb)) 560 self.assertEqual(decomp.read(1), b'\0') 561 max_decomp = 1 + DEFAULT_BUFFER_SIZE 562 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 563 "Excessive amount of data was decompressed") 564 565 566 # Tests for a BZ2File wrapping another file object: 567 568 def testReadBytesIO(self): 569 with BytesIO(self.DATA) as bio: 570 with BZ2File(bio) as bz2f: 571 self.assertRaises(TypeError, bz2f.read, float()) 572 self.assertEqual(bz2f.read(), self.TEXT) 573 self.assertFalse(bio.closed) 574 575 def testPeekBytesIO(self): 576 with BytesIO(self.DATA) as bio: 577 with BZ2File(bio) as bz2f: 578 pdata = bz2f.peek() 579 self.assertNotEqual(len(pdata), 0) 580 self.assertTrue(self.TEXT.startswith(pdata)) 581 self.assertEqual(bz2f.read(), self.TEXT) 582 583 def testWriteBytesIO(self): 584 with BytesIO() as bio: 585 with BZ2File(bio, "w") as bz2f: 586 self.assertRaises(TypeError, bz2f.write) 587 bz2f.write(self.TEXT) 588 self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) 589 self.assertFalse(bio.closed) 590 591 def testSeekForwardBytesIO(self): 592 with BytesIO(self.DATA) as bio: 593 with BZ2File(bio) as bz2f: 594 self.assertRaises(TypeError, bz2f.seek) 595 bz2f.seek(150) 596 self.assertEqual(bz2f.read(), self.TEXT[150:]) 597 598 def testSeekBackwardsBytesIO(self): 599 with BytesIO(self.DATA) as bio: 600 with BZ2File(bio) as bz2f: 601 bz2f.read(500) 602 bz2f.seek(-150, 1) 603 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 604 605 def test_read_truncated(self): 606 # Drop the eos_magic field (6 bytes) and CRC (4 bytes). 607 truncated = self.DATA[:-10] 608 with BZ2File(BytesIO(truncated)) as f: 609 self.assertRaises(EOFError, f.read) 610 with BZ2File(BytesIO(truncated)) as f: 611 self.assertEqual(f.read(len(self.TEXT)), self.TEXT) 612 self.assertRaises(EOFError, f.read, 1) 613 # Incomplete 4-byte file header, and block header of at least 146 bits. 614 for i in range(22): 615 with BZ2File(BytesIO(truncated[:i])) as f: 616 self.assertRaises(EOFError, f.read, 1) 617 618 619class BZ2CompressorTest(BaseTest): 620 def testCompress(self): 621 bz2c = BZ2Compressor() 622 self.assertRaises(TypeError, bz2c.compress) 623 data = bz2c.compress(self.TEXT) 624 data += bz2c.flush() 625 self.assertEqual(ext_decompress(data), self.TEXT) 626 627 def testCompressEmptyString(self): 628 bz2c = BZ2Compressor() 629 data = bz2c.compress(b'') 630 data += bz2c.flush() 631 self.assertEqual(data, self.EMPTY_DATA) 632 633 def testCompressChunks10(self): 634 bz2c = BZ2Compressor() 635 n = 0 636 data = b'' 637 while True: 638 str = self.TEXT[n*10:(n+1)*10] 639 if not str: 640 break 641 data += bz2c.compress(str) 642 n += 1 643 data += bz2c.flush() 644 self.assertEqual(ext_decompress(data), self.TEXT) 645 646 @support.skip_if_pgo_task 647 @bigmemtest(size=_4G + 100, memuse=2) 648 def testCompress4G(self, size): 649 # "Test BZ2Compressor.compress()/flush() with >4GiB input" 650 bz2c = BZ2Compressor() 651 data = b"x" * size 652 try: 653 compressed = bz2c.compress(data) 654 compressed += bz2c.flush() 655 finally: 656 data = None # Release memory 657 data = bz2.decompress(compressed) 658 try: 659 self.assertEqual(len(data), size) 660 self.assertEqual(len(data.strip(b"x")), 0) 661 finally: 662 data = None 663 664 def testPickle(self): 665 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 666 with self.assertRaises(TypeError): 667 pickle.dumps(BZ2Compressor(), proto) 668 669 670class BZ2DecompressorTest(BaseTest): 671 def test_Constructor(self): 672 self.assertRaises(TypeError, BZ2Decompressor, 42) 673 674 def testDecompress(self): 675 bz2d = BZ2Decompressor() 676 self.assertRaises(TypeError, bz2d.decompress) 677 text = bz2d.decompress(self.DATA) 678 self.assertEqual(text, self.TEXT) 679 680 def testDecompressChunks10(self): 681 bz2d = BZ2Decompressor() 682 text = b'' 683 n = 0 684 while True: 685 str = self.DATA[n*10:(n+1)*10] 686 if not str: 687 break 688 text += bz2d.decompress(str) 689 n += 1 690 self.assertEqual(text, self.TEXT) 691 692 def testDecompressUnusedData(self): 693 bz2d = BZ2Decompressor() 694 unused_data = b"this is unused data" 695 text = bz2d.decompress(self.DATA+unused_data) 696 self.assertEqual(text, self.TEXT) 697 self.assertEqual(bz2d.unused_data, unused_data) 698 699 def testEOFError(self): 700 bz2d = BZ2Decompressor() 701 text = bz2d.decompress(self.DATA) 702 self.assertRaises(EOFError, bz2d.decompress, b"anything") 703 self.assertRaises(EOFError, bz2d.decompress, b"") 704 705 @support.skip_if_pgo_task 706 @bigmemtest(size=_4G + 100, memuse=3.3) 707 def testDecompress4G(self, size): 708 # "Test BZ2Decompressor.decompress() with >4GiB input" 709 blocksize = 10 * 1024 * 1024 710 block = random.getrandbits(blocksize * 8).to_bytes(blocksize, 'little') 711 try: 712 data = block * (size // blocksize + 1) 713 compressed = bz2.compress(data) 714 bz2d = BZ2Decompressor() 715 decompressed = bz2d.decompress(compressed) 716 self.assertTrue(decompressed == data) 717 finally: 718 data = None 719 compressed = None 720 decompressed = None 721 722 def testPickle(self): 723 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 724 with self.assertRaises(TypeError): 725 pickle.dumps(BZ2Decompressor(), proto) 726 727 def testDecompressorChunksMaxsize(self): 728 bzd = BZ2Decompressor() 729 max_length = 100 730 out = [] 731 732 # Feed some input 733 len_ = len(self.BIG_DATA) - 64 734 out.append(bzd.decompress(self.BIG_DATA[:len_], 735 max_length=max_length)) 736 self.assertFalse(bzd.needs_input) 737 self.assertEqual(len(out[-1]), max_length) 738 739 # Retrieve more data without providing more input 740 out.append(bzd.decompress(b'', max_length=max_length)) 741 self.assertFalse(bzd.needs_input) 742 self.assertEqual(len(out[-1]), max_length) 743 744 # Retrieve more data while providing more input 745 out.append(bzd.decompress(self.BIG_DATA[len_:], 746 max_length=max_length)) 747 self.assertLessEqual(len(out[-1]), max_length) 748 749 # Retrieve remaining uncompressed data 750 while not bzd.eof: 751 out.append(bzd.decompress(b'', max_length=max_length)) 752 self.assertLessEqual(len(out[-1]), max_length) 753 754 out = b"".join(out) 755 self.assertEqual(out, self.BIG_TEXT) 756 self.assertEqual(bzd.unused_data, b"") 757 758 def test_decompressor_inputbuf_1(self): 759 # Test reusing input buffer after moving existing 760 # contents to beginning 761 bzd = BZ2Decompressor() 762 out = [] 763 764 # Create input buffer and fill it 765 self.assertEqual(bzd.decompress(self.DATA[:100], 766 max_length=0), b'') 767 768 # Retrieve some results, freeing capacity at beginning 769 # of input buffer 770 out.append(bzd.decompress(b'', 2)) 771 772 # Add more data that fits into input buffer after 773 # moving existing data to beginning 774 out.append(bzd.decompress(self.DATA[100:105], 15)) 775 776 # Decompress rest of data 777 out.append(bzd.decompress(self.DATA[105:])) 778 self.assertEqual(b''.join(out), self.TEXT) 779 780 def test_decompressor_inputbuf_2(self): 781 # Test reusing input buffer by appending data at the 782 # end right away 783 bzd = BZ2Decompressor() 784 out = [] 785 786 # Create input buffer and empty it 787 self.assertEqual(bzd.decompress(self.DATA[:200], 788 max_length=0), b'') 789 out.append(bzd.decompress(b'')) 790 791 # Fill buffer with new data 792 out.append(bzd.decompress(self.DATA[200:280], 2)) 793 794 # Append some more data, not enough to require resize 795 out.append(bzd.decompress(self.DATA[280:300], 2)) 796 797 # Decompress rest of data 798 out.append(bzd.decompress(self.DATA[300:])) 799 self.assertEqual(b''.join(out), self.TEXT) 800 801 def test_decompressor_inputbuf_3(self): 802 # Test reusing input buffer after extending it 803 804 bzd = BZ2Decompressor() 805 out = [] 806 807 # Create almost full input buffer 808 out.append(bzd.decompress(self.DATA[:200], 5)) 809 810 # Add even more data to it, requiring resize 811 out.append(bzd.decompress(self.DATA[200:300], 5)) 812 813 # Decompress rest of data 814 out.append(bzd.decompress(self.DATA[300:])) 815 self.assertEqual(b''.join(out), self.TEXT) 816 817 def test_failure(self): 818 bzd = BZ2Decompressor() 819 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 820 # Previously, a second call could crash due to internal inconsistency 821 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 822 823 @support.refcount_test 824 def test_refleaks_in___init__(self): 825 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 826 bzd = BZ2Decompressor() 827 refs_before = gettotalrefcount() 828 for i in range(100): 829 bzd.__init__() 830 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 831 832 833class CompressDecompressTest(BaseTest): 834 def testCompress(self): 835 data = bz2.compress(self.TEXT) 836 self.assertEqual(ext_decompress(data), self.TEXT) 837 838 def testCompressEmptyString(self): 839 text = bz2.compress(b'') 840 self.assertEqual(text, self.EMPTY_DATA) 841 842 def testDecompress(self): 843 text = bz2.decompress(self.DATA) 844 self.assertEqual(text, self.TEXT) 845 846 def testDecompressEmpty(self): 847 text = bz2.decompress(b"") 848 self.assertEqual(text, b"") 849 850 def testDecompressToEmptyString(self): 851 text = bz2.decompress(self.EMPTY_DATA) 852 self.assertEqual(text, b'') 853 854 def testDecompressIncomplete(self): 855 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 856 857 def testDecompressBadData(self): 858 self.assertRaises(OSError, bz2.decompress, self.BAD_DATA) 859 860 def testDecompressMultiStream(self): 861 text = bz2.decompress(self.DATA * 5) 862 self.assertEqual(text, self.TEXT * 5) 863 864 def testDecompressTrailingJunk(self): 865 text = bz2.decompress(self.DATA + self.BAD_DATA) 866 self.assertEqual(text, self.TEXT) 867 868 def testDecompressMultiStreamTrailingJunk(self): 869 text = bz2.decompress(self.DATA * 5 + self.BAD_DATA) 870 self.assertEqual(text, self.TEXT * 5) 871 872 873class OpenTest(BaseTest): 874 "Test the open function." 875 876 def open(self, *args, **kwargs): 877 return bz2.open(*args, **kwargs) 878 879 def test_binary_modes(self): 880 for mode in ("wb", "xb"): 881 if mode == "xb": 882 unlink(self.filename) 883 with self.open(self.filename, mode) as f: 884 f.write(self.TEXT) 885 with open(self.filename, "rb") as f: 886 file_data = ext_decompress(f.read()) 887 self.assertEqual(file_data, self.TEXT) 888 with self.open(self.filename, "rb") as f: 889 self.assertEqual(f.read(), self.TEXT) 890 with self.open(self.filename, "ab") as f: 891 f.write(self.TEXT) 892 with open(self.filename, "rb") as f: 893 file_data = ext_decompress(f.read()) 894 self.assertEqual(file_data, self.TEXT * 2) 895 896 def test_implicit_binary_modes(self): 897 # Test implicit binary modes (no "b" or "t" in mode string). 898 for mode in ("w", "x"): 899 if mode == "x": 900 unlink(self.filename) 901 with self.open(self.filename, mode) as f: 902 f.write(self.TEXT) 903 with open(self.filename, "rb") as f: 904 file_data = ext_decompress(f.read()) 905 self.assertEqual(file_data, self.TEXT) 906 with self.open(self.filename, "r") as f: 907 self.assertEqual(f.read(), self.TEXT) 908 with self.open(self.filename, "a") as f: 909 f.write(self.TEXT) 910 with open(self.filename, "rb") as f: 911 file_data = ext_decompress(f.read()) 912 self.assertEqual(file_data, self.TEXT * 2) 913 914 def test_text_modes(self): 915 text = self.TEXT.decode("ascii") 916 text_native_eol = text.replace("\n", os.linesep) 917 for mode in ("wt", "xt"): 918 if mode == "xt": 919 unlink(self.filename) 920 with self.open(self.filename, mode) as f: 921 f.write(text) 922 with open(self.filename, "rb") as f: 923 file_data = ext_decompress(f.read()).decode("ascii") 924 self.assertEqual(file_data, text_native_eol) 925 with self.open(self.filename, "rt") as f: 926 self.assertEqual(f.read(), text) 927 with self.open(self.filename, "at") as f: 928 f.write(text) 929 with open(self.filename, "rb") as f: 930 file_data = ext_decompress(f.read()).decode("ascii") 931 self.assertEqual(file_data, text_native_eol * 2) 932 933 def test_x_mode(self): 934 for mode in ("x", "xb", "xt"): 935 unlink(self.filename) 936 with self.open(self.filename, mode) as f: 937 pass 938 with self.assertRaises(FileExistsError): 939 with self.open(self.filename, mode) as f: 940 pass 941 942 def test_fileobj(self): 943 with self.open(BytesIO(self.DATA), "r") as f: 944 self.assertEqual(f.read(), self.TEXT) 945 with self.open(BytesIO(self.DATA), "rb") as f: 946 self.assertEqual(f.read(), self.TEXT) 947 text = self.TEXT.decode("ascii") 948 with self.open(BytesIO(self.DATA), "rt") as f: 949 self.assertEqual(f.read(), text) 950 951 def test_bad_params(self): 952 # Test invalid parameter combinations. 953 self.assertRaises(ValueError, 954 self.open, self.filename, "wbt") 955 self.assertRaises(ValueError, 956 self.open, self.filename, "xbt") 957 self.assertRaises(ValueError, 958 self.open, self.filename, "rb", encoding="utf-8") 959 self.assertRaises(ValueError, 960 self.open, self.filename, "rb", errors="ignore") 961 self.assertRaises(ValueError, 962 self.open, self.filename, "rb", newline="\n") 963 964 def test_encoding(self): 965 # Test non-default encoding. 966 text = self.TEXT.decode("ascii") 967 text_native_eol = text.replace("\n", os.linesep) 968 with self.open(self.filename, "wt", encoding="utf-16-le") as f: 969 f.write(text) 970 with open(self.filename, "rb") as f: 971 file_data = ext_decompress(f.read()).decode("utf-16-le") 972 self.assertEqual(file_data, text_native_eol) 973 with self.open(self.filename, "rt", encoding="utf-16-le") as f: 974 self.assertEqual(f.read(), text) 975 976 def test_encoding_error_handler(self): 977 # Test with non-default encoding error handler. 978 with self.open(self.filename, "wb") as f: 979 f.write(b"foo\xffbar") 980 with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 981 as f: 982 self.assertEqual(f.read(), "foobar") 983 984 def test_newline(self): 985 # Test with explicit newline (universal newline mode disabled). 986 text = self.TEXT.decode("ascii") 987 with self.open(self.filename, "wt", newline="\n") as f: 988 f.write(text) 989 with self.open(self.filename, "rt", newline="\r") as f: 990 self.assertEqual(f.readlines(), [text]) 991 992 993def test_main(): 994 support.run_unittest( 995 BZ2FileTest, 996 BZ2CompressorTest, 997 BZ2DecompressorTest, 998 CompressDecompressTest, 999 OpenTest, 1000 ) 1001 support.reap_children() 1002 1003if __name__ == '__main__': 1004 test_main() 1005