1from test import support 2from test.support import bigmemtest, _4G 3 4import array 5import unittest 6from io import BytesIO, DEFAULT_BUFFER_SIZE 7import os 8import pickle 9import glob 10import tempfile 11import pathlib 12import random 13import shutil 14import subprocess 15import threading 16from test.support import import_helper 17from test.support import threading_helper 18from test.support.os_helper import unlink 19import _compression 20import sys 21 22 23# Skip tests if the bz2 module doesn't exist. 24bz2 = import_helper.import_module('bz2') 25from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 26 27has_cmdline_bunzip2 = None 28 29def ext_decompress(data): 30 global has_cmdline_bunzip2 31 if has_cmdline_bunzip2 is None: 32 has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) 33 if has_cmdline_bunzip2: 34 return subprocess.check_output(['bunzip2'], input=data) 35 else: 36 return bz2.decompress(data) 37 38class BaseTest(unittest.TestCase): 39 "Base for other testcases." 40 41 TEXT_LINES = [ 42 b'root:x:0:0:root:/root:/bin/bash\n', 43 b'bin:x:1:1:bin:/bin:\n', 44 b'daemon:x:2:2:daemon:/sbin:\n', 45 b'adm:x:3:4:adm:/var/adm:\n', 46 b'lp:x:4:7:lp:/var/spool/lpd:\n', 47 b'sync:x:5:0:sync:/sbin:/bin/sync\n', 48 b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n', 49 b'halt:x:7:0:halt:/sbin:/sbin/halt\n', 50 b'mail:x:8:12:mail:/var/spool/mail:\n', 51 b'news:x:9:13:news:/var/spool/news:\n', 52 b'uucp:x:10:14:uucp:/var/spool/uucp:\n', 53 b'operator:x:11:0:operator:/root:\n', 54 b'games:x:12:100:games:/usr/games:\n', 55 b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n', 56 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', 57 b'nobody:x:65534:65534:Nobody:/home:\n', 58 b'postfix:x:100:101:postfix:/var/spool/postfix:\n', 59 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', 60 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', 61 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', 62 b'www:x:103:104::/var/www:/bin/false\n', 63 ] 64 TEXT = b''.join(TEXT_LINES) 65 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 66 EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' 67 BAD_DATA = b'this is not a valid bzip2 file' 68 69 # Some tests need more than one block of uncompressed data. Since one block 70 # is at least 100,000 bytes, we gather some data dynamically and compress it. 71 # Note that this assumes that compression works correctly, so we cannot 72 # simply use the bigger test data for all tests. 73 test_size = 0 74 BIG_TEXT = bytearray(128*1024) 75 for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')): 76 with open(fname, 'rb') as fh: 77 test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) 78 if test_size > 128*1024: 79 break 80 BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) 81 82 def setUp(self): 83 fd, self.filename = tempfile.mkstemp() 84 os.close(fd) 85 86 def tearDown(self): 87 unlink(self.filename) 88 89 90class BZ2FileTest(BaseTest): 91 "Test the BZ2File class." 92 93 def createTempFile(self, streams=1, suffix=b""): 94 with open(self.filename, "wb") as f: 95 f.write(self.DATA * streams) 96 f.write(suffix) 97 98 def testBadArgs(self): 99 self.assertRaises(TypeError, BZ2File, 123.456) 100 self.assertRaises(ValueError, BZ2File, os.devnull, "z") 101 self.assertRaises(ValueError, BZ2File, os.devnull, "rx") 102 self.assertRaises(ValueError, BZ2File, os.devnull, "rbt") 103 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0) 104 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10) 105 106 # compresslevel is keyword-only 107 self.assertRaises(TypeError, BZ2File, os.devnull, "r", 3) 108 109 def testRead(self): 110 self.createTempFile() 111 with BZ2File(self.filename) as bz2f: 112 self.assertRaises(TypeError, bz2f.read, float()) 113 self.assertEqual(bz2f.read(), self.TEXT) 114 115 def testReadBadFile(self): 116 self.createTempFile(streams=0, suffix=self.BAD_DATA) 117 with BZ2File(self.filename) as bz2f: 118 self.assertRaises(OSError, bz2f.read) 119 120 def testReadMultiStream(self): 121 self.createTempFile(streams=5) 122 with BZ2File(self.filename) as bz2f: 123 self.assertRaises(TypeError, bz2f.read, float()) 124 self.assertEqual(bz2f.read(), self.TEXT * 5) 125 126 def testReadMonkeyMultiStream(self): 127 # Test BZ2File.read() on a multi-stream archive where a stream 128 # boundary coincides with the end of the raw read buffer. 129 buffer_size = _compression.BUFFER_SIZE 130 _compression.BUFFER_SIZE = len(self.DATA) 131 try: 132 self.createTempFile(streams=5) 133 with BZ2File(self.filename) as bz2f: 134 self.assertRaises(TypeError, bz2f.read, float()) 135 self.assertEqual(bz2f.read(), self.TEXT * 5) 136 finally: 137 _compression.BUFFER_SIZE = buffer_size 138 139 def testReadTrailingJunk(self): 140 self.createTempFile(suffix=self.BAD_DATA) 141 with BZ2File(self.filename) as bz2f: 142 self.assertEqual(bz2f.read(), self.TEXT) 143 144 def testReadMultiStreamTrailingJunk(self): 145 self.createTempFile(streams=5, suffix=self.BAD_DATA) 146 with BZ2File(self.filename) as bz2f: 147 self.assertEqual(bz2f.read(), self.TEXT * 5) 148 149 def testRead0(self): 150 self.createTempFile() 151 with BZ2File(self.filename) as bz2f: 152 self.assertRaises(TypeError, bz2f.read, float()) 153 self.assertEqual(bz2f.read(0), b"") 154 155 def testReadChunk10(self): 156 self.createTempFile() 157 with BZ2File(self.filename) as bz2f: 158 text = b'' 159 while True: 160 str = bz2f.read(10) 161 if not str: 162 break 163 text += str 164 self.assertEqual(text, self.TEXT) 165 166 def testReadChunk10MultiStream(self): 167 self.createTempFile(streams=5) 168 with BZ2File(self.filename) as bz2f: 169 text = b'' 170 while True: 171 str = bz2f.read(10) 172 if not str: 173 break 174 text += str 175 self.assertEqual(text, self.TEXT * 5) 176 177 def testRead100(self): 178 self.createTempFile() 179 with BZ2File(self.filename) as bz2f: 180 self.assertEqual(bz2f.read(100), self.TEXT[:100]) 181 182 def testPeek(self): 183 self.createTempFile() 184 with BZ2File(self.filename) as bz2f: 185 pdata = bz2f.peek() 186 self.assertNotEqual(len(pdata), 0) 187 self.assertTrue(self.TEXT.startswith(pdata)) 188 self.assertEqual(bz2f.read(), self.TEXT) 189 190 def testReadInto(self): 191 self.createTempFile() 192 with BZ2File(self.filename) as bz2f: 193 n = 128 194 b = bytearray(n) 195 self.assertEqual(bz2f.readinto(b), n) 196 self.assertEqual(b, self.TEXT[:n]) 197 n = len(self.TEXT) - n 198 b = bytearray(len(self.TEXT)) 199 self.assertEqual(bz2f.readinto(b), n) 200 self.assertEqual(b[:n], self.TEXT[-n:]) 201 202 def testReadLine(self): 203 self.createTempFile() 204 with BZ2File(self.filename) as bz2f: 205 self.assertRaises(TypeError, bz2f.readline, None) 206 for line in self.TEXT_LINES: 207 self.assertEqual(bz2f.readline(), line) 208 209 def testReadLineMultiStream(self): 210 self.createTempFile(streams=5) 211 with BZ2File(self.filename) as bz2f: 212 self.assertRaises(TypeError, bz2f.readline, None) 213 for line in self.TEXT_LINES * 5: 214 self.assertEqual(bz2f.readline(), line) 215 216 def testReadLines(self): 217 self.createTempFile() 218 with BZ2File(self.filename) as bz2f: 219 self.assertRaises(TypeError, bz2f.readlines, None) 220 self.assertEqual(bz2f.readlines(), self.TEXT_LINES) 221 222 def testReadLinesMultiStream(self): 223 self.createTempFile(streams=5) 224 with BZ2File(self.filename) as bz2f: 225 self.assertRaises(TypeError, bz2f.readlines, None) 226 self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5) 227 228 def testIterator(self): 229 self.createTempFile() 230 with BZ2File(self.filename) as bz2f: 231 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES) 232 233 def testIteratorMultiStream(self): 234 self.createTempFile(streams=5) 235 with BZ2File(self.filename) as bz2f: 236 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5) 237 238 def testClosedIteratorDeadlock(self): 239 # Issue #3309: Iteration on a closed BZ2File should release the lock. 240 self.createTempFile() 241 bz2f = BZ2File(self.filename) 242 bz2f.close() 243 self.assertRaises(ValueError, next, bz2f) 244 # This call will deadlock if the above call failed to release the lock. 245 self.assertRaises(ValueError, bz2f.readlines) 246 247 def testWrite(self): 248 with BZ2File(self.filename, "w") as bz2f: 249 self.assertRaises(TypeError, bz2f.write) 250 bz2f.write(self.TEXT) 251 with open(self.filename, 'rb') as f: 252 self.assertEqual(ext_decompress(f.read()), self.TEXT) 253 254 def testWriteChunks10(self): 255 with BZ2File(self.filename, "w") as bz2f: 256 n = 0 257 while True: 258 str = self.TEXT[n*10:(n+1)*10] 259 if not str: 260 break 261 bz2f.write(str) 262 n += 1 263 with open(self.filename, 'rb') as f: 264 self.assertEqual(ext_decompress(f.read()), self.TEXT) 265 266 def testWriteNonDefaultCompressLevel(self): 267 expected = bz2.compress(self.TEXT, compresslevel=5) 268 with BZ2File(self.filename, "w", compresslevel=5) as bz2f: 269 bz2f.write(self.TEXT) 270 with open(self.filename, "rb") as f: 271 self.assertEqual(f.read(), expected) 272 273 def testWriteLines(self): 274 with BZ2File(self.filename, "w") as bz2f: 275 self.assertRaises(TypeError, bz2f.writelines) 276 bz2f.writelines(self.TEXT_LINES) 277 # Issue #1535500: Calling writelines() on a closed BZ2File 278 # should raise an exception. 279 self.assertRaises(ValueError, bz2f.writelines, ["a"]) 280 with open(self.filename, 'rb') as f: 281 self.assertEqual(ext_decompress(f.read()), self.TEXT) 282 283 def testWriteMethodsOnReadOnlyFile(self): 284 with BZ2File(self.filename, "w") as bz2f: 285 bz2f.write(b"abc") 286 287 with BZ2File(self.filename, "r") as bz2f: 288 self.assertRaises(OSError, bz2f.write, b"a") 289 self.assertRaises(OSError, bz2f.writelines, [b"a"]) 290 291 def testAppend(self): 292 with BZ2File(self.filename, "w") as bz2f: 293 self.assertRaises(TypeError, bz2f.write) 294 bz2f.write(self.TEXT) 295 with BZ2File(self.filename, "a") as bz2f: 296 self.assertRaises(TypeError, bz2f.write) 297 bz2f.write(self.TEXT) 298 with open(self.filename, 'rb') as f: 299 self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) 300 301 def testSeekForward(self): 302 self.createTempFile() 303 with BZ2File(self.filename) as bz2f: 304 self.assertRaises(TypeError, bz2f.seek) 305 bz2f.seek(150) 306 self.assertEqual(bz2f.read(), self.TEXT[150:]) 307 308 def testSeekForwardAcrossStreams(self): 309 self.createTempFile(streams=2) 310 with BZ2File(self.filename) as bz2f: 311 self.assertRaises(TypeError, bz2f.seek) 312 bz2f.seek(len(self.TEXT) + 150) 313 self.assertEqual(bz2f.read(), self.TEXT[150:]) 314 315 def testSeekBackwards(self): 316 self.createTempFile() 317 with BZ2File(self.filename) as bz2f: 318 bz2f.read(500) 319 bz2f.seek(-150, 1) 320 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 321 322 def testSeekBackwardsAcrossStreams(self): 323 self.createTempFile(streams=2) 324 with BZ2File(self.filename) as bz2f: 325 readto = len(self.TEXT) + 100 326 while readto > 0: 327 readto -= len(bz2f.read(readto)) 328 bz2f.seek(-150, 1) 329 self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) 330 331 def testSeekBackwardsFromEnd(self): 332 self.createTempFile() 333 with BZ2File(self.filename) as bz2f: 334 bz2f.seek(-150, 2) 335 self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) 336 337 def testSeekBackwardsFromEndAcrossStreams(self): 338 self.createTempFile(streams=2) 339 with BZ2File(self.filename) as bz2f: 340 bz2f.seek(-1000, 2) 341 self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) 342 343 def testSeekPostEnd(self): 344 self.createTempFile() 345 with BZ2File(self.filename) as bz2f: 346 bz2f.seek(150000) 347 self.assertEqual(bz2f.tell(), len(self.TEXT)) 348 self.assertEqual(bz2f.read(), b"") 349 350 def testSeekPostEndMultiStream(self): 351 self.createTempFile(streams=5) 352 with BZ2File(self.filename) as bz2f: 353 bz2f.seek(150000) 354 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 355 self.assertEqual(bz2f.read(), b"") 356 357 def testSeekPostEndTwice(self): 358 self.createTempFile() 359 with BZ2File(self.filename) as bz2f: 360 bz2f.seek(150000) 361 bz2f.seek(150000) 362 self.assertEqual(bz2f.tell(), len(self.TEXT)) 363 self.assertEqual(bz2f.read(), b"") 364 365 def testSeekPostEndTwiceMultiStream(self): 366 self.createTempFile(streams=5) 367 with BZ2File(self.filename) as bz2f: 368 bz2f.seek(150000) 369 bz2f.seek(150000) 370 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 371 self.assertEqual(bz2f.read(), b"") 372 373 def testSeekPreStart(self): 374 self.createTempFile() 375 with BZ2File(self.filename) as bz2f: 376 bz2f.seek(-150) 377 self.assertEqual(bz2f.tell(), 0) 378 self.assertEqual(bz2f.read(), self.TEXT) 379 380 def testSeekPreStartMultiStream(self): 381 self.createTempFile(streams=2) 382 with BZ2File(self.filename) as bz2f: 383 bz2f.seek(-150) 384 self.assertEqual(bz2f.tell(), 0) 385 self.assertEqual(bz2f.read(), self.TEXT * 2) 386 387 def testFileno(self): 388 self.createTempFile() 389 with open(self.filename, 'rb') as rawf: 390 bz2f = BZ2File(rawf) 391 try: 392 self.assertEqual(bz2f.fileno(), rawf.fileno()) 393 finally: 394 bz2f.close() 395 self.assertRaises(ValueError, bz2f.fileno) 396 397 def testSeekable(self): 398 bz2f = BZ2File(BytesIO(self.DATA)) 399 try: 400 self.assertTrue(bz2f.seekable()) 401 bz2f.read() 402 self.assertTrue(bz2f.seekable()) 403 finally: 404 bz2f.close() 405 self.assertRaises(ValueError, bz2f.seekable) 406 407 bz2f = BZ2File(BytesIO(), "w") 408 try: 409 self.assertFalse(bz2f.seekable()) 410 finally: 411 bz2f.close() 412 self.assertRaises(ValueError, bz2f.seekable) 413 414 src = BytesIO(self.DATA) 415 src.seekable = lambda: False 416 bz2f = BZ2File(src) 417 try: 418 self.assertFalse(bz2f.seekable()) 419 finally: 420 bz2f.close() 421 self.assertRaises(ValueError, bz2f.seekable) 422 423 def testReadable(self): 424 bz2f = BZ2File(BytesIO(self.DATA)) 425 try: 426 self.assertTrue(bz2f.readable()) 427 bz2f.read() 428 self.assertTrue(bz2f.readable()) 429 finally: 430 bz2f.close() 431 self.assertRaises(ValueError, bz2f.readable) 432 433 bz2f = BZ2File(BytesIO(), "w") 434 try: 435 self.assertFalse(bz2f.readable()) 436 finally: 437 bz2f.close() 438 self.assertRaises(ValueError, bz2f.readable) 439 440 def testWritable(self): 441 bz2f = BZ2File(BytesIO(self.DATA)) 442 try: 443 self.assertFalse(bz2f.writable()) 444 bz2f.read() 445 self.assertFalse(bz2f.writable()) 446 finally: 447 bz2f.close() 448 self.assertRaises(ValueError, bz2f.writable) 449 450 bz2f = BZ2File(BytesIO(), "w") 451 try: 452 self.assertTrue(bz2f.writable()) 453 finally: 454 bz2f.close() 455 self.assertRaises(ValueError, bz2f.writable) 456 457 def testOpenDel(self): 458 self.createTempFile() 459 for i in range(10000): 460 o = BZ2File(self.filename) 461 del o 462 463 def testOpenNonexistent(self): 464 self.assertRaises(OSError, BZ2File, "/non/existent") 465 466 def testReadlinesNoNewline(self): 467 # Issue #1191043: readlines() fails on a file containing no newline. 468 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' 469 with open(self.filename, "wb") as f: 470 f.write(data) 471 with BZ2File(self.filename) as bz2f: 472 lines = bz2f.readlines() 473 self.assertEqual(lines, [b'Test']) 474 with BZ2File(self.filename) as bz2f: 475 xlines = list(bz2f.readlines()) 476 self.assertEqual(xlines, [b'Test']) 477 478 def testContextProtocol(self): 479 f = None 480 with BZ2File(self.filename, "wb") as f: 481 f.write(b"xxx") 482 f = BZ2File(self.filename, "rb") 483 f.close() 484 try: 485 with f: 486 pass 487 except ValueError: 488 pass 489 else: 490 self.fail("__enter__ on a closed file didn't raise an exception") 491 try: 492 with BZ2File(self.filename, "wb") as f: 493 1/0 494 except ZeroDivisionError: 495 pass 496 else: 497 self.fail("1/0 didn't raise an exception") 498 499 def testThreading(self): 500 # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. 501 data = b"1" * 2**20 502 nthreads = 10 503 with BZ2File(self.filename, 'wb') as f: 504 def comp(): 505 for i in range(5): 506 f.write(data) 507 threads = [threading.Thread(target=comp) for i in range(nthreads)] 508 with threading_helper.start_threads(threads): 509 pass 510 511 def testMixedIterationAndReads(self): 512 self.createTempFile() 513 linelen = len(self.TEXT_LINES[0]) 514 halflen = linelen // 2 515 with BZ2File(self.filename) as bz2f: 516 bz2f.read(halflen) 517 self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) 518 self.assertEqual(bz2f.read(), self.TEXT[linelen:]) 519 with BZ2File(self.filename) as bz2f: 520 bz2f.readline() 521 self.assertEqual(next(bz2f), self.TEXT_LINES[1]) 522 self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) 523 with BZ2File(self.filename) as bz2f: 524 bz2f.readlines() 525 self.assertRaises(StopIteration, next, bz2f) 526 self.assertEqual(bz2f.readlines(), []) 527 528 def testMultiStreamOrdering(self): 529 # Test the ordering of streams when reading a multi-stream archive. 530 data1 = b"foo" * 1000 531 data2 = b"bar" * 1000 532 with BZ2File(self.filename, "w") as bz2f: 533 bz2f.write(data1) 534 with BZ2File(self.filename, "a") as bz2f: 535 bz2f.write(data2) 536 with BZ2File(self.filename) as bz2f: 537 self.assertEqual(bz2f.read(), data1 + data2) 538 539 def testOpenBytesFilename(self): 540 str_filename = self.filename 541 try: 542 bytes_filename = str_filename.encode("ascii") 543 except UnicodeEncodeError: 544 self.skipTest("Temporary file name needs to be ASCII") 545 with BZ2File(bytes_filename, "wb") as f: 546 f.write(self.DATA) 547 with BZ2File(bytes_filename, "rb") as f: 548 self.assertEqual(f.read(), self.DATA) 549 # Sanity check that we are actually operating on the right file. 550 with BZ2File(str_filename, "rb") as f: 551 self.assertEqual(f.read(), self.DATA) 552 553 def testOpenPathLikeFilename(self): 554 filename = pathlib.Path(self.filename) 555 with BZ2File(filename, "wb") as f: 556 f.write(self.DATA) 557 with BZ2File(filename, "rb") as f: 558 self.assertEqual(f.read(), self.DATA) 559 560 def testDecompressLimited(self): 561 """Decompressed data buffering should be limited""" 562 bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9) 563 self.assertLess(len(bomb), _compression.BUFFER_SIZE) 564 565 decomp = BZ2File(BytesIO(bomb)) 566 self.assertEqual(decomp.read(1), b'\0') 567 max_decomp = 1 + DEFAULT_BUFFER_SIZE 568 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 569 "Excessive amount of data was decompressed") 570 571 572 # Tests for a BZ2File wrapping another file object: 573 574 def testReadBytesIO(self): 575 with BytesIO(self.DATA) as bio: 576 with BZ2File(bio) as bz2f: 577 self.assertRaises(TypeError, bz2f.read, float()) 578 self.assertEqual(bz2f.read(), self.TEXT) 579 self.assertFalse(bio.closed) 580 581 def testPeekBytesIO(self): 582 with BytesIO(self.DATA) as bio: 583 with BZ2File(bio) as bz2f: 584 pdata = bz2f.peek() 585 self.assertNotEqual(len(pdata), 0) 586 self.assertTrue(self.TEXT.startswith(pdata)) 587 self.assertEqual(bz2f.read(), self.TEXT) 588 589 def testWriteBytesIO(self): 590 with BytesIO() as bio: 591 with BZ2File(bio, "w") as bz2f: 592 self.assertRaises(TypeError, bz2f.write) 593 bz2f.write(self.TEXT) 594 self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) 595 self.assertFalse(bio.closed) 596 597 def testSeekForwardBytesIO(self): 598 with BytesIO(self.DATA) as bio: 599 with BZ2File(bio) as bz2f: 600 self.assertRaises(TypeError, bz2f.seek) 601 bz2f.seek(150) 602 self.assertEqual(bz2f.read(), self.TEXT[150:]) 603 604 def testSeekBackwardsBytesIO(self): 605 with BytesIO(self.DATA) as bio: 606 with BZ2File(bio) as bz2f: 607 bz2f.read(500) 608 bz2f.seek(-150, 1) 609 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 610 611 def test_read_truncated(self): 612 # Drop the eos_magic field (6 bytes) and CRC (4 bytes). 613 truncated = self.DATA[:-10] 614 with BZ2File(BytesIO(truncated)) as f: 615 self.assertRaises(EOFError, f.read) 616 with BZ2File(BytesIO(truncated)) as f: 617 self.assertEqual(f.read(len(self.TEXT)), self.TEXT) 618 self.assertRaises(EOFError, f.read, 1) 619 # Incomplete 4-byte file header, and block header of at least 146 bits. 620 for i in range(22): 621 with BZ2File(BytesIO(truncated[:i])) as f: 622 self.assertRaises(EOFError, f.read, 1) 623 624 def test_issue44439(self): 625 q = array.array('Q', [1, 2, 3, 4, 5]) 626 LENGTH = len(q) * q.itemsize 627 628 with BZ2File(BytesIO(), 'w') as f: 629 self.assertEqual(f.write(q), LENGTH) 630 self.assertEqual(f.tell(), LENGTH) 631 632 633class BZ2CompressorTest(BaseTest): 634 def testCompress(self): 635 bz2c = BZ2Compressor() 636 self.assertRaises(TypeError, bz2c.compress) 637 data = bz2c.compress(self.TEXT) 638 data += bz2c.flush() 639 self.assertEqual(ext_decompress(data), self.TEXT) 640 641 def testCompressEmptyString(self): 642 bz2c = BZ2Compressor() 643 data = bz2c.compress(b'') 644 data += bz2c.flush() 645 self.assertEqual(data, self.EMPTY_DATA) 646 647 def testCompressChunks10(self): 648 bz2c = BZ2Compressor() 649 n = 0 650 data = b'' 651 while True: 652 str = self.TEXT[n*10:(n+1)*10] 653 if not str: 654 break 655 data += bz2c.compress(str) 656 n += 1 657 data += bz2c.flush() 658 self.assertEqual(ext_decompress(data), self.TEXT) 659 660 @support.skip_if_pgo_task 661 @bigmemtest(size=_4G + 100, memuse=2) 662 def testCompress4G(self, size): 663 # "Test BZ2Compressor.compress()/flush() with >4GiB input" 664 bz2c = BZ2Compressor() 665 data = b"x" * size 666 try: 667 compressed = bz2c.compress(data) 668 compressed += bz2c.flush() 669 finally: 670 data = None # Release memory 671 data = bz2.decompress(compressed) 672 try: 673 self.assertEqual(len(data), size) 674 self.assertEqual(len(data.strip(b"x")), 0) 675 finally: 676 data = None 677 678 def testPickle(self): 679 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 680 with self.assertRaises(TypeError): 681 pickle.dumps(BZ2Compressor(), proto) 682 683 684class BZ2DecompressorTest(BaseTest): 685 def test_Constructor(self): 686 self.assertRaises(TypeError, BZ2Decompressor, 42) 687 688 def testDecompress(self): 689 bz2d = BZ2Decompressor() 690 self.assertRaises(TypeError, bz2d.decompress) 691 text = bz2d.decompress(self.DATA) 692 self.assertEqual(text, self.TEXT) 693 694 def testDecompressChunks10(self): 695 bz2d = BZ2Decompressor() 696 text = b'' 697 n = 0 698 while True: 699 str = self.DATA[n*10:(n+1)*10] 700 if not str: 701 break 702 text += bz2d.decompress(str) 703 n += 1 704 self.assertEqual(text, self.TEXT) 705 706 def testDecompressUnusedData(self): 707 bz2d = BZ2Decompressor() 708 unused_data = b"this is unused data" 709 text = bz2d.decompress(self.DATA+unused_data) 710 self.assertEqual(text, self.TEXT) 711 self.assertEqual(bz2d.unused_data, unused_data) 712 713 def testEOFError(self): 714 bz2d = BZ2Decompressor() 715 text = bz2d.decompress(self.DATA) 716 self.assertRaises(EOFError, bz2d.decompress, b"anything") 717 self.assertRaises(EOFError, bz2d.decompress, b"") 718 719 @support.skip_if_pgo_task 720 @bigmemtest(size=_4G + 100, memuse=3.3) 721 def testDecompress4G(self, size): 722 # "Test BZ2Decompressor.decompress() with >4GiB input" 723 blocksize = 10 * 1024 * 1024 724 block = random.randbytes(blocksize) 725 try: 726 data = block * (size // blocksize + 1) 727 compressed = bz2.compress(data) 728 bz2d = BZ2Decompressor() 729 decompressed = bz2d.decompress(compressed) 730 self.assertTrue(decompressed == data) 731 finally: 732 data = None 733 compressed = None 734 decompressed = None 735 736 def testPickle(self): 737 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 738 with self.assertRaises(TypeError): 739 pickle.dumps(BZ2Decompressor(), proto) 740 741 def testDecompressorChunksMaxsize(self): 742 bzd = BZ2Decompressor() 743 max_length = 100 744 out = [] 745 746 # Feed some input 747 len_ = len(self.BIG_DATA) - 64 748 out.append(bzd.decompress(self.BIG_DATA[:len_], 749 max_length=max_length)) 750 self.assertFalse(bzd.needs_input) 751 self.assertEqual(len(out[-1]), max_length) 752 753 # Retrieve more data without providing more input 754 out.append(bzd.decompress(b'', max_length=max_length)) 755 self.assertFalse(bzd.needs_input) 756 self.assertEqual(len(out[-1]), max_length) 757 758 # Retrieve more data while providing more input 759 out.append(bzd.decompress(self.BIG_DATA[len_:], 760 max_length=max_length)) 761 self.assertLessEqual(len(out[-1]), max_length) 762 763 # Retrieve remaining uncompressed data 764 while not bzd.eof: 765 out.append(bzd.decompress(b'', max_length=max_length)) 766 self.assertLessEqual(len(out[-1]), max_length) 767 768 out = b"".join(out) 769 self.assertEqual(out, self.BIG_TEXT) 770 self.assertEqual(bzd.unused_data, b"") 771 772 def test_decompressor_inputbuf_1(self): 773 # Test reusing input buffer after moving existing 774 # contents to beginning 775 bzd = BZ2Decompressor() 776 out = [] 777 778 # Create input buffer and fill it 779 self.assertEqual(bzd.decompress(self.DATA[:100], 780 max_length=0), b'') 781 782 # Retrieve some results, freeing capacity at beginning 783 # of input buffer 784 out.append(bzd.decompress(b'', 2)) 785 786 # Add more data that fits into input buffer after 787 # moving existing data to beginning 788 out.append(bzd.decompress(self.DATA[100:105], 15)) 789 790 # Decompress rest of data 791 out.append(bzd.decompress(self.DATA[105:])) 792 self.assertEqual(b''.join(out), self.TEXT) 793 794 def test_decompressor_inputbuf_2(self): 795 # Test reusing input buffer by appending data at the 796 # end right away 797 bzd = BZ2Decompressor() 798 out = [] 799 800 # Create input buffer and empty it 801 self.assertEqual(bzd.decompress(self.DATA[:200], 802 max_length=0), b'') 803 out.append(bzd.decompress(b'')) 804 805 # Fill buffer with new data 806 out.append(bzd.decompress(self.DATA[200:280], 2)) 807 808 # Append some more data, not enough to require resize 809 out.append(bzd.decompress(self.DATA[280:300], 2)) 810 811 # Decompress rest of data 812 out.append(bzd.decompress(self.DATA[300:])) 813 self.assertEqual(b''.join(out), self.TEXT) 814 815 def test_decompressor_inputbuf_3(self): 816 # Test reusing input buffer after extending it 817 818 bzd = BZ2Decompressor() 819 out = [] 820 821 # Create almost full input buffer 822 out.append(bzd.decompress(self.DATA[:200], 5)) 823 824 # Add even more data to it, requiring resize 825 out.append(bzd.decompress(self.DATA[200:300], 5)) 826 827 # Decompress rest of data 828 out.append(bzd.decompress(self.DATA[300:])) 829 self.assertEqual(b''.join(out), self.TEXT) 830 831 def test_failure(self): 832 bzd = BZ2Decompressor() 833 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 834 # Previously, a second call could crash due to internal inconsistency 835 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 836 837 @support.refcount_test 838 def test_refleaks_in___init__(self): 839 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 840 bzd = BZ2Decompressor() 841 refs_before = gettotalrefcount() 842 for i in range(100): 843 bzd.__init__() 844 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 845 846 847class CompressDecompressTest(BaseTest): 848 def testCompress(self): 849 data = bz2.compress(self.TEXT) 850 self.assertEqual(ext_decompress(data), self.TEXT) 851 852 def testCompressEmptyString(self): 853 text = bz2.compress(b'') 854 self.assertEqual(text, self.EMPTY_DATA) 855 856 def testDecompress(self): 857 text = bz2.decompress(self.DATA) 858 self.assertEqual(text, self.TEXT) 859 860 def testDecompressEmpty(self): 861 text = bz2.decompress(b"") 862 self.assertEqual(text, b"") 863 864 def testDecompressToEmptyString(self): 865 text = bz2.decompress(self.EMPTY_DATA) 866 self.assertEqual(text, b'') 867 868 def testDecompressIncomplete(self): 869 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 870 871 def testDecompressBadData(self): 872 self.assertRaises(OSError, bz2.decompress, self.BAD_DATA) 873 874 def testDecompressMultiStream(self): 875 text = bz2.decompress(self.DATA * 5) 876 self.assertEqual(text, self.TEXT * 5) 877 878 def testDecompressTrailingJunk(self): 879 text = bz2.decompress(self.DATA + self.BAD_DATA) 880 self.assertEqual(text, self.TEXT) 881 882 def testDecompressMultiStreamTrailingJunk(self): 883 text = bz2.decompress(self.DATA * 5 + self.BAD_DATA) 884 self.assertEqual(text, self.TEXT * 5) 885 886 887class OpenTest(BaseTest): 888 "Test the open function." 889 890 def open(self, *args, **kwargs): 891 return bz2.open(*args, **kwargs) 892 893 def test_binary_modes(self): 894 for mode in ("wb", "xb"): 895 if mode == "xb": 896 unlink(self.filename) 897 with self.open(self.filename, mode) as f: 898 f.write(self.TEXT) 899 with open(self.filename, "rb") as f: 900 file_data = ext_decompress(f.read()) 901 self.assertEqual(file_data, self.TEXT) 902 with self.open(self.filename, "rb") as f: 903 self.assertEqual(f.read(), self.TEXT) 904 with self.open(self.filename, "ab") as f: 905 f.write(self.TEXT) 906 with open(self.filename, "rb") as f: 907 file_data = ext_decompress(f.read()) 908 self.assertEqual(file_data, self.TEXT * 2) 909 910 def test_implicit_binary_modes(self): 911 # Test implicit binary modes (no "b" or "t" in mode string). 912 for mode in ("w", "x"): 913 if mode == "x": 914 unlink(self.filename) 915 with self.open(self.filename, mode) as f: 916 f.write(self.TEXT) 917 with open(self.filename, "rb") as f: 918 file_data = ext_decompress(f.read()) 919 self.assertEqual(file_data, self.TEXT) 920 with self.open(self.filename, "r") as f: 921 self.assertEqual(f.read(), self.TEXT) 922 with self.open(self.filename, "a") as f: 923 f.write(self.TEXT) 924 with open(self.filename, "rb") as f: 925 file_data = ext_decompress(f.read()) 926 self.assertEqual(file_data, self.TEXT * 2) 927 928 def test_text_modes(self): 929 text = self.TEXT.decode("ascii") 930 text_native_eol = text.replace("\n", os.linesep) 931 for mode in ("wt", "xt"): 932 if mode == "xt": 933 unlink(self.filename) 934 with self.open(self.filename, mode, encoding="ascii") as f: 935 f.write(text) 936 with open(self.filename, "rb") as f: 937 file_data = ext_decompress(f.read()).decode("ascii") 938 self.assertEqual(file_data, text_native_eol) 939 with self.open(self.filename, "rt", encoding="ascii") as f: 940 self.assertEqual(f.read(), text) 941 with self.open(self.filename, "at", encoding="ascii") as f: 942 f.write(text) 943 with open(self.filename, "rb") as f: 944 file_data = ext_decompress(f.read()).decode("ascii") 945 self.assertEqual(file_data, text_native_eol * 2) 946 947 def test_x_mode(self): 948 for mode in ("x", "xb", "xt"): 949 unlink(self.filename) 950 encoding = "utf-8" if "t" in mode else None 951 with self.open(self.filename, mode, encoding=encoding) as f: 952 pass 953 with self.assertRaises(FileExistsError): 954 with self.open(self.filename, mode) as f: 955 pass 956 957 def test_fileobj(self): 958 with self.open(BytesIO(self.DATA), "r") as f: 959 self.assertEqual(f.read(), self.TEXT) 960 with self.open(BytesIO(self.DATA), "rb") as f: 961 self.assertEqual(f.read(), self.TEXT) 962 text = self.TEXT.decode("ascii") 963 with self.open(BytesIO(self.DATA), "rt", encoding="utf-8") as f: 964 self.assertEqual(f.read(), text) 965 966 def test_bad_params(self): 967 # Test invalid parameter combinations. 968 self.assertRaises(ValueError, 969 self.open, self.filename, "wbt") 970 self.assertRaises(ValueError, 971 self.open, self.filename, "xbt") 972 self.assertRaises(ValueError, 973 self.open, self.filename, "rb", encoding="utf-8") 974 self.assertRaises(ValueError, 975 self.open, self.filename, "rb", errors="ignore") 976 self.assertRaises(ValueError, 977 self.open, self.filename, "rb", newline="\n") 978 979 def test_encoding(self): 980 # Test non-default encoding. 981 text = self.TEXT.decode("ascii") 982 text_native_eol = text.replace("\n", os.linesep) 983 with self.open(self.filename, "wt", encoding="utf-16-le") as f: 984 f.write(text) 985 with open(self.filename, "rb") as f: 986 file_data = ext_decompress(f.read()).decode("utf-16-le") 987 self.assertEqual(file_data, text_native_eol) 988 with self.open(self.filename, "rt", encoding="utf-16-le") as f: 989 self.assertEqual(f.read(), text) 990 991 def test_encoding_error_handler(self): 992 # Test with non-default encoding error handler. 993 with self.open(self.filename, "wb") as f: 994 f.write(b"foo\xffbar") 995 with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 996 as f: 997 self.assertEqual(f.read(), "foobar") 998 999 def test_newline(self): 1000 # Test with explicit newline (universal newline mode disabled). 1001 text = self.TEXT.decode("ascii") 1002 with self.open(self.filename, "wt", encoding="utf-8", newline="\n") as f: 1003 f.write(text) 1004 with self.open(self.filename, "rt", encoding="utf-8", newline="\r") as f: 1005 self.assertEqual(f.readlines(), [text]) 1006 1007 1008def tearDownModule(): 1009 support.reap_children() 1010 1011 1012if __name__ == '__main__': 1013 unittest.main() 1014