1import contextlib 2import io 3import itertools 4import os 5import importlib.util 6import pathlib 7import posixpath 8import time 9import struct 10import zipfile 11import unittest 12 13 14from tempfile import TemporaryFile 15from random import randint, random, getrandbits 16 17from test.support import script_helper 18from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd, 19 requires_zlib, requires_bz2, requires_lzma, 20 captured_stdout) 21 22TESTFN2 = TESTFN + "2" 23TESTFNDIR = TESTFN + "d" 24FIXEDTEST_SIZE = 1000 25DATAFILES_DIR = 'zipfile_datafiles' 26 27SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 28 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 29 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 30 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 31 32def getrandbytes(size): 33 return getrandbits(8 * size).to_bytes(size, 'little') 34 35def get_files(test): 36 yield TESTFN2 37 with TemporaryFile() as f: 38 yield f 39 test.assertFalse(f.closed) 40 with io.BytesIO() as f: 41 yield f 42 test.assertFalse(f.closed) 43 44class AbstractTestsWithSourceFile: 45 @classmethod 46 def setUpClass(cls): 47 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 48 (i, random()), "ascii") 49 for i in range(FIXEDTEST_SIZE)] 50 cls.data = b''.join(cls.line_gen) 51 52 def setUp(self): 53 # Make a source file with some lines 54 with open(TESTFN, "wb") as fp: 55 fp.write(self.data) 56 57 def make_test_archive(self, f, compression, compresslevel=None): 58 kwargs = {'compression': compression, 'compresslevel': compresslevel} 59 # Create the ZIP archive 60 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 61 zipfp.write(TESTFN, "another.name") 62 zipfp.write(TESTFN, TESTFN) 63 zipfp.writestr("strfile", self.data) 64 with zipfp.open('written-open-w', mode='w') as f: 65 for line in self.line_gen: 66 f.write(line) 67 68 def zip_test(self, f, compression, compresslevel=None): 69 self.make_test_archive(f, compression, compresslevel) 70 71 # Read the ZIP archive 72 with zipfile.ZipFile(f, "r", compression) as zipfp: 73 self.assertEqual(zipfp.read(TESTFN), self.data) 74 self.assertEqual(zipfp.read("another.name"), self.data) 75 self.assertEqual(zipfp.read("strfile"), self.data) 76 77 # Print the ZIP directory 78 fp = io.StringIO() 79 zipfp.printdir(file=fp) 80 directory = fp.getvalue() 81 lines = directory.splitlines() 82 self.assertEqual(len(lines), 5) # Number of files + header 83 84 self.assertIn('File Name', lines[0]) 85 self.assertIn('Modified', lines[0]) 86 self.assertIn('Size', lines[0]) 87 88 fn, date, time_, size = lines[1].split() 89 self.assertEqual(fn, 'another.name') 90 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 91 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 92 self.assertEqual(size, str(len(self.data))) 93 94 # Check the namelist 95 names = zipfp.namelist() 96 self.assertEqual(len(names), 4) 97 self.assertIn(TESTFN, names) 98 self.assertIn("another.name", names) 99 self.assertIn("strfile", names) 100 self.assertIn("written-open-w", names) 101 102 # Check infolist 103 infos = zipfp.infolist() 104 names = [i.filename for i in infos] 105 self.assertEqual(len(names), 4) 106 self.assertIn(TESTFN, names) 107 self.assertIn("another.name", names) 108 self.assertIn("strfile", names) 109 self.assertIn("written-open-w", names) 110 for i in infos: 111 self.assertEqual(i.file_size, len(self.data)) 112 113 # check getinfo 114 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 115 info = zipfp.getinfo(nm) 116 self.assertEqual(info.filename, nm) 117 self.assertEqual(info.file_size, len(self.data)) 118 119 # Check that testzip doesn't raise an exception 120 zipfp.testzip() 121 122 def test_basic(self): 123 for f in get_files(self): 124 self.zip_test(f, self.compression) 125 126 def zip_open_test(self, f, compression): 127 self.make_test_archive(f, compression) 128 129 # Read the ZIP archive 130 with zipfile.ZipFile(f, "r", compression) as zipfp: 131 zipdata1 = [] 132 with zipfp.open(TESTFN) as zipopen1: 133 while True: 134 read_data = zipopen1.read(256) 135 if not read_data: 136 break 137 zipdata1.append(read_data) 138 139 zipdata2 = [] 140 with zipfp.open("another.name") as zipopen2: 141 while True: 142 read_data = zipopen2.read(256) 143 if not read_data: 144 break 145 zipdata2.append(read_data) 146 147 self.assertEqual(b''.join(zipdata1), self.data) 148 self.assertEqual(b''.join(zipdata2), self.data) 149 150 def test_open(self): 151 for f in get_files(self): 152 self.zip_open_test(f, self.compression) 153 154 def test_open_with_pathlike(self): 155 path = pathlib.Path(TESTFN2) 156 self.zip_open_test(path, self.compression) 157 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 158 self.assertIsInstance(zipfp.filename, str) 159 160 def zip_random_open_test(self, f, compression): 161 self.make_test_archive(f, compression) 162 163 # Read the ZIP archive 164 with zipfile.ZipFile(f, "r", compression) as zipfp: 165 zipdata1 = [] 166 with zipfp.open(TESTFN) as zipopen1: 167 while True: 168 read_data = zipopen1.read(randint(1, 1024)) 169 if not read_data: 170 break 171 zipdata1.append(read_data) 172 173 self.assertEqual(b''.join(zipdata1), self.data) 174 175 def test_random_open(self): 176 for f in get_files(self): 177 self.zip_random_open_test(f, self.compression) 178 179 def zip_read1_test(self, f, compression): 180 self.make_test_archive(f, compression) 181 182 # Read the ZIP archive 183 with zipfile.ZipFile(f, "r") as zipfp, \ 184 zipfp.open(TESTFN) as zipopen: 185 zipdata = [] 186 while True: 187 read_data = zipopen.read1(-1) 188 if not read_data: 189 break 190 zipdata.append(read_data) 191 192 self.assertEqual(b''.join(zipdata), self.data) 193 194 def test_read1(self): 195 for f in get_files(self): 196 self.zip_read1_test(f, self.compression) 197 198 def zip_read1_10_test(self, f, compression): 199 self.make_test_archive(f, compression) 200 201 # Read the ZIP archive 202 with zipfile.ZipFile(f, "r") as zipfp, \ 203 zipfp.open(TESTFN) as zipopen: 204 zipdata = [] 205 while True: 206 read_data = zipopen.read1(10) 207 self.assertLessEqual(len(read_data), 10) 208 if not read_data: 209 break 210 zipdata.append(read_data) 211 212 self.assertEqual(b''.join(zipdata), self.data) 213 214 def test_read1_10(self): 215 for f in get_files(self): 216 self.zip_read1_10_test(f, self.compression) 217 218 def zip_readline_read_test(self, f, compression): 219 self.make_test_archive(f, compression) 220 221 # Read the ZIP archive 222 with zipfile.ZipFile(f, "r") as zipfp, \ 223 zipfp.open(TESTFN) as zipopen: 224 data = b'' 225 while True: 226 read = zipopen.readline() 227 if not read: 228 break 229 data += read 230 231 read = zipopen.read(100) 232 if not read: 233 break 234 data += read 235 236 self.assertEqual(data, self.data) 237 238 def test_readline_read(self): 239 # Issue #7610: calls to readline() interleaved with calls to read(). 240 for f in get_files(self): 241 self.zip_readline_read_test(f, self.compression) 242 243 def zip_readline_test(self, f, compression): 244 self.make_test_archive(f, compression) 245 246 # Read the ZIP archive 247 with zipfile.ZipFile(f, "r") as zipfp: 248 with zipfp.open(TESTFN) as zipopen: 249 for line in self.line_gen: 250 linedata = zipopen.readline() 251 self.assertEqual(linedata, line) 252 253 def test_readline(self): 254 for f in get_files(self): 255 self.zip_readline_test(f, self.compression) 256 257 def zip_readlines_test(self, f, compression): 258 self.make_test_archive(f, compression) 259 260 # Read the ZIP archive 261 with zipfile.ZipFile(f, "r") as zipfp: 262 with zipfp.open(TESTFN) as zipopen: 263 ziplines = zipopen.readlines() 264 for line, zipline in zip(self.line_gen, ziplines): 265 self.assertEqual(zipline, line) 266 267 def test_readlines(self): 268 for f in get_files(self): 269 self.zip_readlines_test(f, self.compression) 270 271 def zip_iterlines_test(self, f, compression): 272 self.make_test_archive(f, compression) 273 274 # Read the ZIP archive 275 with zipfile.ZipFile(f, "r") as zipfp: 276 with zipfp.open(TESTFN) as zipopen: 277 for line, zipline in zip(self.line_gen, zipopen): 278 self.assertEqual(zipline, line) 279 280 def test_iterlines(self): 281 for f in get_files(self): 282 self.zip_iterlines_test(f, self.compression) 283 284 def test_low_compression(self): 285 """Check for cases where compressed data is larger than original.""" 286 # Create the ZIP archive 287 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 288 zipfp.writestr("strfile", '12') 289 290 # Get an open object for strfile 291 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 292 with zipfp.open("strfile") as openobj: 293 self.assertEqual(openobj.read(1), b'1') 294 self.assertEqual(openobj.read(1), b'2') 295 296 def test_writestr_compression(self): 297 zipfp = zipfile.ZipFile(TESTFN2, "w") 298 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 299 info = zipfp.getinfo('b.txt') 300 self.assertEqual(info.compress_type, self.compression) 301 302 def test_writestr_compresslevel(self): 303 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 304 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 305 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 306 compresslevel=2) 307 308 # Compression level follows the constructor. 309 a_info = zipfp.getinfo('a.txt') 310 self.assertEqual(a_info.compress_type, self.compression) 311 self.assertEqual(a_info._compresslevel, 1) 312 313 # Compression level is overridden. 314 b_info = zipfp.getinfo('b.txt') 315 self.assertEqual(b_info.compress_type, self.compression) 316 self.assertEqual(b_info._compresslevel, 2) 317 318 def test_read_return_size(self): 319 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 320 # than requested. 321 for test_size in (1, 4095, 4096, 4097, 16384): 322 file_size = test_size + 1 323 junk = getrandbytes(file_size) 324 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 325 zipf.writestr('foo', junk) 326 with zipf.open('foo', 'r') as fp: 327 buf = fp.read(test_size) 328 self.assertEqual(len(buf), test_size) 329 330 def test_truncated_zipfile(self): 331 fp = io.BytesIO() 332 with zipfile.ZipFile(fp, mode='w') as zipf: 333 zipf.writestr('strfile', self.data, compress_type=self.compression) 334 end_offset = fp.tell() 335 zipfiledata = fp.getvalue() 336 337 fp = io.BytesIO(zipfiledata) 338 with zipfile.ZipFile(fp) as zipf: 339 with zipf.open('strfile') as zipopen: 340 fp.truncate(end_offset - 20) 341 with self.assertRaises(EOFError): 342 zipopen.read() 343 344 fp = io.BytesIO(zipfiledata) 345 with zipfile.ZipFile(fp) as zipf: 346 with zipf.open('strfile') as zipopen: 347 fp.truncate(end_offset - 20) 348 with self.assertRaises(EOFError): 349 while zipopen.read(100): 350 pass 351 352 fp = io.BytesIO(zipfiledata) 353 with zipfile.ZipFile(fp) as zipf: 354 with zipf.open('strfile') as zipopen: 355 fp.truncate(end_offset - 20) 356 with self.assertRaises(EOFError): 357 while zipopen.read1(100): 358 pass 359 360 def test_repr(self): 361 fname = 'file.name' 362 for f in get_files(self): 363 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 364 zipfp.write(TESTFN, fname) 365 r = repr(zipfp) 366 self.assertIn("mode='w'", r) 367 368 with zipfile.ZipFile(f, 'r') as zipfp: 369 r = repr(zipfp) 370 if isinstance(f, str): 371 self.assertIn('filename=%r' % f, r) 372 else: 373 self.assertIn('file=%r' % f, r) 374 self.assertIn("mode='r'", r) 375 r = repr(zipfp.getinfo(fname)) 376 self.assertIn('filename=%r' % fname, r) 377 self.assertIn('filemode=', r) 378 self.assertIn('file_size=', r) 379 if self.compression != zipfile.ZIP_STORED: 380 self.assertIn('compress_type=', r) 381 self.assertIn('compress_size=', r) 382 with zipfp.open(fname) as zipopen: 383 r = repr(zipopen) 384 self.assertIn('name=%r' % fname, r) 385 self.assertIn("mode='r'", r) 386 if self.compression != zipfile.ZIP_STORED: 387 self.assertIn('compress_type=', r) 388 self.assertIn('[closed]', repr(zipopen)) 389 self.assertIn('[closed]', repr(zipfp)) 390 391 def test_compresslevel_basic(self): 392 for f in get_files(self): 393 self.zip_test(f, self.compression, compresslevel=9) 394 395 def test_per_file_compresslevel(self): 396 """Check that files within a Zip archive can have different 397 compression levels.""" 398 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 399 zipfp.write(TESTFN, 'compress_1') 400 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 401 one_info = zipfp.getinfo('compress_1') 402 nine_info = zipfp.getinfo('compress_9') 403 self.assertEqual(one_info._compresslevel, 1) 404 self.assertEqual(nine_info._compresslevel, 9) 405 406 def test_writing_errors(self): 407 class BrokenFile(io.BytesIO): 408 def write(self, data): 409 nonlocal count 410 if count is not None: 411 if count == stop: 412 raise OSError 413 count += 1 414 super().write(data) 415 416 stop = 0 417 while True: 418 testfile = BrokenFile() 419 count = None 420 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 421 with zipfp.open('file1', 'w') as f: 422 f.write(b'data1') 423 count = 0 424 try: 425 with zipfp.open('file2', 'w') as f: 426 f.write(b'data2') 427 except OSError: 428 stop += 1 429 else: 430 break 431 finally: 432 count = None 433 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 434 self.assertEqual(zipfp.namelist(), ['file1']) 435 self.assertEqual(zipfp.read('file1'), b'data1') 436 437 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 438 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 439 self.assertEqual(zipfp.read('file1'), b'data1') 440 self.assertEqual(zipfp.read('file2'), b'data2') 441 442 443 def tearDown(self): 444 unlink(TESTFN) 445 unlink(TESTFN2) 446 447 448class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 449 unittest.TestCase): 450 compression = zipfile.ZIP_STORED 451 test_low_compression = None 452 453 def zip_test_writestr_permissions(self, f, compression): 454 # Make sure that writestr and open(... mode='w') create files with 455 # mode 0600, when they are passed a name rather than a ZipInfo 456 # instance. 457 458 self.make_test_archive(f, compression) 459 with zipfile.ZipFile(f, "r") as zipfp: 460 zinfo = zipfp.getinfo('strfile') 461 self.assertEqual(zinfo.external_attr, 0o600 << 16) 462 463 zinfo2 = zipfp.getinfo('written-open-w') 464 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 465 466 def test_writestr_permissions(self): 467 for f in get_files(self): 468 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 469 470 def test_absolute_arcnames(self): 471 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 472 zipfp.write(TESTFN, "/absolute") 473 474 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 475 self.assertEqual(zipfp.namelist(), ["absolute"]) 476 477 def test_append_to_zip_file(self): 478 """Test appending to an existing zipfile.""" 479 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 480 zipfp.write(TESTFN, TESTFN) 481 482 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 483 zipfp.writestr("strfile", self.data) 484 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 485 486 def test_append_to_non_zip_file(self): 487 """Test appending to an existing file that is not a zipfile.""" 488 # NOTE: this test fails if len(d) < 22 because of the first 489 # line "fpin.seek(-22, 2)" in _EndRecData 490 data = b'I am not a ZipFile!'*10 491 with open(TESTFN2, 'wb') as f: 492 f.write(data) 493 494 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 495 zipfp.write(TESTFN, TESTFN) 496 497 with open(TESTFN2, 'rb') as f: 498 f.seek(len(data)) 499 with zipfile.ZipFile(f, "r") as zipfp: 500 self.assertEqual(zipfp.namelist(), [TESTFN]) 501 self.assertEqual(zipfp.read(TESTFN), self.data) 502 with open(TESTFN2, 'rb') as f: 503 self.assertEqual(f.read(len(data)), data) 504 zipfiledata = f.read() 505 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 506 self.assertEqual(zipfp.namelist(), [TESTFN]) 507 self.assertEqual(zipfp.read(TESTFN), self.data) 508 509 def test_read_concatenated_zip_file(self): 510 with io.BytesIO() as bio: 511 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 512 zipfp.write(TESTFN, TESTFN) 513 zipfiledata = bio.getvalue() 514 data = b'I am not a ZipFile!'*10 515 with open(TESTFN2, 'wb') as f: 516 f.write(data) 517 f.write(zipfiledata) 518 519 with zipfile.ZipFile(TESTFN2) as zipfp: 520 self.assertEqual(zipfp.namelist(), [TESTFN]) 521 self.assertEqual(zipfp.read(TESTFN), self.data) 522 523 def test_append_to_concatenated_zip_file(self): 524 with io.BytesIO() as bio: 525 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 526 zipfp.write(TESTFN, TESTFN) 527 zipfiledata = bio.getvalue() 528 data = b'I am not a ZipFile!'*1000000 529 with open(TESTFN2, 'wb') as f: 530 f.write(data) 531 f.write(zipfiledata) 532 533 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 534 self.assertEqual(zipfp.namelist(), [TESTFN]) 535 zipfp.writestr('strfile', self.data) 536 537 with open(TESTFN2, 'rb') as f: 538 self.assertEqual(f.read(len(data)), data) 539 zipfiledata = f.read() 540 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 541 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 542 self.assertEqual(zipfp.read(TESTFN), self.data) 543 self.assertEqual(zipfp.read('strfile'), self.data) 544 545 def test_ignores_newline_at_end(self): 546 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 547 zipfp.write(TESTFN, TESTFN) 548 with open(TESTFN2, 'a') as f: 549 f.write("\r\n\00\00\00") 550 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 551 self.assertIsInstance(zipfp, zipfile.ZipFile) 552 553 def test_ignores_stuff_appended_past_comments(self): 554 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 555 zipfp.comment = b"this is a comment" 556 zipfp.write(TESTFN, TESTFN) 557 with open(TESTFN2, 'a') as f: 558 f.write("abcdef\r\n") 559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 560 self.assertIsInstance(zipfp, zipfile.ZipFile) 561 self.assertEqual(zipfp.comment, b"this is a comment") 562 563 def test_write_default_name(self): 564 """Check that calling ZipFile.write without arcname specified 565 produces the expected result.""" 566 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 567 zipfp.write(TESTFN) 568 with open(TESTFN, "rb") as f: 569 self.assertEqual(zipfp.read(TESTFN), f.read()) 570 571 def test_write_to_readonly(self): 572 """Check that trying to call write() on a readonly ZipFile object 573 raises a ValueError.""" 574 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 575 zipfp.writestr("somefile.txt", "bogus") 576 577 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 578 self.assertRaises(ValueError, zipfp.write, TESTFN) 579 580 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 581 with self.assertRaises(ValueError): 582 zipfp.open(TESTFN, mode='w') 583 584 def test_add_file_before_1980(self): 585 # Set atime and mtime to 1970-01-01 586 os.utime(TESTFN, (0, 0)) 587 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 588 self.assertRaises(ValueError, zipfp.write, TESTFN) 589 590 591@requires_zlib 592class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 593 unittest.TestCase): 594 compression = zipfile.ZIP_DEFLATED 595 596 def test_per_file_compression(self): 597 """Check that files within a Zip archive can have different 598 compression options.""" 599 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 600 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 601 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 602 sinfo = zipfp.getinfo('storeme') 603 dinfo = zipfp.getinfo('deflateme') 604 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 605 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 606 607@requires_bz2 608class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 609 unittest.TestCase): 610 compression = zipfile.ZIP_BZIP2 611 612@requires_lzma 613class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 614 unittest.TestCase): 615 compression = zipfile.ZIP_LZMA 616 617 618class AbstractTestZip64InSmallFiles: 619 # These tests test the ZIP64 functionality without using large files, 620 # see test_zipfile64 for proper tests. 621 622 @classmethod 623 def setUpClass(cls): 624 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 625 for i in range(0, FIXEDTEST_SIZE)) 626 cls.data = b'\n'.join(line_gen) 627 628 def setUp(self): 629 self._limit = zipfile.ZIP64_LIMIT 630 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 631 zipfile.ZIP64_LIMIT = 1000 632 zipfile.ZIP_FILECOUNT_LIMIT = 9 633 634 # Make a source file with some lines 635 with open(TESTFN, "wb") as fp: 636 fp.write(self.data) 637 638 def zip_test(self, f, compression): 639 # Create the ZIP archive 640 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 641 zipfp.write(TESTFN, "another.name") 642 zipfp.write(TESTFN, TESTFN) 643 zipfp.writestr("strfile", self.data) 644 645 # Read the ZIP archive 646 with zipfile.ZipFile(f, "r", compression) as zipfp: 647 self.assertEqual(zipfp.read(TESTFN), self.data) 648 self.assertEqual(zipfp.read("another.name"), self.data) 649 self.assertEqual(zipfp.read("strfile"), self.data) 650 651 # Print the ZIP directory 652 fp = io.StringIO() 653 zipfp.printdir(fp) 654 655 directory = fp.getvalue() 656 lines = directory.splitlines() 657 self.assertEqual(len(lines), 4) # Number of files + header 658 659 self.assertIn('File Name', lines[0]) 660 self.assertIn('Modified', lines[0]) 661 self.assertIn('Size', lines[0]) 662 663 fn, date, time_, size = lines[1].split() 664 self.assertEqual(fn, 'another.name') 665 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 666 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 667 self.assertEqual(size, str(len(self.data))) 668 669 # Check the namelist 670 names = zipfp.namelist() 671 self.assertEqual(len(names), 3) 672 self.assertIn(TESTFN, names) 673 self.assertIn("another.name", names) 674 self.assertIn("strfile", names) 675 676 # Check infolist 677 infos = zipfp.infolist() 678 names = [i.filename for i in infos] 679 self.assertEqual(len(names), 3) 680 self.assertIn(TESTFN, names) 681 self.assertIn("another.name", names) 682 self.assertIn("strfile", names) 683 for i in infos: 684 self.assertEqual(i.file_size, len(self.data)) 685 686 # check getinfo 687 for nm in (TESTFN, "another.name", "strfile"): 688 info = zipfp.getinfo(nm) 689 self.assertEqual(info.filename, nm) 690 self.assertEqual(info.file_size, len(self.data)) 691 692 # Check that testzip doesn't raise an exception 693 zipfp.testzip() 694 695 def test_basic(self): 696 for f in get_files(self): 697 self.zip_test(f, self.compression) 698 699 def test_too_many_files(self): 700 # This test checks that more than 64k files can be added to an archive, 701 # and that the resulting archive can be read properly by ZipFile 702 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 703 allowZip64=True) 704 zipf.debug = 100 705 numfiles = 15 706 for i in range(numfiles): 707 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 708 self.assertEqual(len(zipf.namelist()), numfiles) 709 zipf.close() 710 711 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 712 self.assertEqual(len(zipf2.namelist()), numfiles) 713 for i in range(numfiles): 714 content = zipf2.read("foo%08d" % i).decode('ascii') 715 self.assertEqual(content, "%d" % (i**3 % 57)) 716 zipf2.close() 717 718 def test_too_many_files_append(self): 719 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 720 allowZip64=False) 721 zipf.debug = 100 722 numfiles = 9 723 for i in range(numfiles): 724 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 725 self.assertEqual(len(zipf.namelist()), numfiles) 726 with self.assertRaises(zipfile.LargeZipFile): 727 zipf.writestr("foo%08d" % numfiles, b'') 728 self.assertEqual(len(zipf.namelist()), numfiles) 729 zipf.close() 730 731 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 732 allowZip64=False) 733 zipf.debug = 100 734 self.assertEqual(len(zipf.namelist()), numfiles) 735 with self.assertRaises(zipfile.LargeZipFile): 736 zipf.writestr("foo%08d" % numfiles, b'') 737 self.assertEqual(len(zipf.namelist()), numfiles) 738 zipf.close() 739 740 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 741 allowZip64=True) 742 zipf.debug = 100 743 self.assertEqual(len(zipf.namelist()), numfiles) 744 numfiles2 = 15 745 for i in range(numfiles, numfiles2): 746 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 747 self.assertEqual(len(zipf.namelist()), numfiles2) 748 zipf.close() 749 750 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 751 self.assertEqual(len(zipf2.namelist()), numfiles2) 752 for i in range(numfiles2): 753 content = zipf2.read("foo%08d" % i).decode('ascii') 754 self.assertEqual(content, "%d" % (i**3 % 57)) 755 zipf2.close() 756 757 def tearDown(self): 758 zipfile.ZIP64_LIMIT = self._limit 759 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 760 unlink(TESTFN) 761 unlink(TESTFN2) 762 763 764class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 765 unittest.TestCase): 766 compression = zipfile.ZIP_STORED 767 768 def large_file_exception_test(self, f, compression): 769 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 770 self.assertRaises(zipfile.LargeZipFile, 771 zipfp.write, TESTFN, "another.name") 772 773 def large_file_exception_test2(self, f, compression): 774 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 775 self.assertRaises(zipfile.LargeZipFile, 776 zipfp.writestr, "another.name", self.data) 777 778 def test_large_file_exception(self): 779 for f in get_files(self): 780 self.large_file_exception_test(f, zipfile.ZIP_STORED) 781 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 782 783 def test_absolute_arcnames(self): 784 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 785 allowZip64=True) as zipfp: 786 zipfp.write(TESTFN, "/absolute") 787 788 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 789 self.assertEqual(zipfp.namelist(), ["absolute"]) 790 791 def test_append(self): 792 # Test that appending to the Zip64 archive doesn't change 793 # extra fields of existing entries. 794 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 795 zipfp.writestr("strfile", self.data) 796 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 797 zinfo = zipfp.getinfo("strfile") 798 extra = zinfo.extra 799 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 800 zipfp.writestr("strfile2", self.data) 801 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 802 zinfo = zipfp.getinfo("strfile") 803 self.assertEqual(zinfo.extra, extra) 804 805 def make_zip64_file( 806 self, file_size_64_set=False, file_size_extra=False, 807 compress_size_64_set=False, compress_size_extra=False, 808 header_offset_64_set=False, header_offset_extra=False, 809 ): 810 """Generate bytes sequence for a zip with (incomplete) zip64 data. 811 812 The actual values (not the zip 64 0xffffffff values) stored in the file 813 are: 814 file_size: 8 815 compress_size: 8 816 header_offset: 0 817 """ 818 actual_size = 8 819 actual_header_offset = 0 820 local_zip64_fields = [] 821 central_zip64_fields = [] 822 823 file_size = actual_size 824 if file_size_64_set: 825 file_size = 0xffffffff 826 if file_size_extra: 827 local_zip64_fields.append(actual_size) 828 central_zip64_fields.append(actual_size) 829 file_size = struct.pack("<L", file_size) 830 831 compress_size = actual_size 832 if compress_size_64_set: 833 compress_size = 0xffffffff 834 if compress_size_extra: 835 local_zip64_fields.append(actual_size) 836 central_zip64_fields.append(actual_size) 837 compress_size = struct.pack("<L", compress_size) 838 839 header_offset = actual_header_offset 840 if header_offset_64_set: 841 header_offset = 0xffffffff 842 if header_offset_extra: 843 central_zip64_fields.append(actual_header_offset) 844 header_offset = struct.pack("<L", header_offset) 845 846 local_extra = struct.pack( 847 '<HH' + 'Q'*len(local_zip64_fields), 848 0x0001, 849 8*len(local_zip64_fields), 850 *local_zip64_fields 851 ) 852 853 central_extra = struct.pack( 854 '<HH' + 'Q'*len(central_zip64_fields), 855 0x0001, 856 8*len(central_zip64_fields), 857 *central_zip64_fields 858 ) 859 860 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 861 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 862 863 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 864 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 865 866 filename = b"test.txt" 867 content = b"test1234" 868 filename_length = struct.pack("<H", len(filename)) 869 zip64_contents = ( 870 # Local file header 871 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 872 + compress_size 873 + file_size 874 + filename_length 875 + local_extra_length 876 + filename 877 + local_extra 878 + content 879 # Central directory: 880 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 881 + compress_size 882 + file_size 883 + filename_length 884 + central_extra_length 885 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 886 + header_offset 887 + filename 888 + central_extra 889 # Zip64 end of central directory 890 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 891 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 892 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 893 + central_dir_size 894 + offset_to_central_dir 895 # Zip64 end of central directory locator 896 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 897 + b"\x00\x00\x00" 898 # end of central directory 899 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 900 + b"\x00\x00\x00\x00" 901 ) 902 return zip64_contents 903 904 def test_bad_zip64_extra(self): 905 """Missing zip64 extra records raises an exception. 906 907 There are 4 fields that the zip64 format handles (the disk number is 908 not used in this module and so is ignored here). According to the zip 909 spec: 910 The order of the fields in the zip64 extended 911 information record is fixed, but the fields MUST 912 only appear if the corresponding Local or Central 913 directory record field is set to 0xFFFF or 0xFFFFFFFF. 914 915 If the zip64 extra content doesn't contain enough entries for the 916 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 917 This test mismatches the length of the zip64 extra field and the number 918 of fields set to indicate the presence of zip64 data. 919 """ 920 # zip64 file size present, no fields in extra, expecting one, equals 921 # missing file size. 922 missing_file_size_extra = self.make_zip64_file( 923 file_size_64_set=True, 924 ) 925 with self.assertRaises(zipfile.BadZipFile) as e: 926 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 927 self.assertIn('file size', str(e.exception).lower()) 928 929 # zip64 file size present, zip64 compress size present, one field in 930 # extra, expecting two, equals missing compress size. 931 missing_compress_size_extra = self.make_zip64_file( 932 file_size_64_set=True, 933 file_size_extra=True, 934 compress_size_64_set=True, 935 ) 936 with self.assertRaises(zipfile.BadZipFile) as e: 937 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 938 self.assertIn('compress size', str(e.exception).lower()) 939 940 # zip64 compress size present, no fields in extra, expecting one, 941 # equals missing compress size. 942 missing_compress_size_extra = self.make_zip64_file( 943 compress_size_64_set=True, 944 ) 945 with self.assertRaises(zipfile.BadZipFile) as e: 946 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 947 self.assertIn('compress size', str(e.exception).lower()) 948 949 # zip64 file size present, zip64 compress size present, zip64 header 950 # offset present, two fields in extra, expecting three, equals missing 951 # header offset 952 missing_header_offset_extra = self.make_zip64_file( 953 file_size_64_set=True, 954 file_size_extra=True, 955 compress_size_64_set=True, 956 compress_size_extra=True, 957 header_offset_64_set=True, 958 ) 959 with self.assertRaises(zipfile.BadZipFile) as e: 960 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 961 self.assertIn('header offset', str(e.exception).lower()) 962 963 # zip64 compress size present, zip64 header offset present, one field 964 # in extra, expecting two, equals missing header offset 965 missing_header_offset_extra = self.make_zip64_file( 966 file_size_64_set=False, 967 compress_size_64_set=True, 968 compress_size_extra=True, 969 header_offset_64_set=True, 970 ) 971 with self.assertRaises(zipfile.BadZipFile) as e: 972 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 973 self.assertIn('header offset', str(e.exception).lower()) 974 975 # zip64 file size present, zip64 header offset present, one field in 976 # extra, expecting two, equals missing header offset 977 missing_header_offset_extra = self.make_zip64_file( 978 file_size_64_set=True, 979 file_size_extra=True, 980 compress_size_64_set=False, 981 header_offset_64_set=True, 982 ) 983 with self.assertRaises(zipfile.BadZipFile) as e: 984 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 985 self.assertIn('header offset', str(e.exception).lower()) 986 987 # zip64 header offset present, no fields in extra, expecting one, 988 # equals missing header offset 989 missing_header_offset_extra = self.make_zip64_file( 990 file_size_64_set=False, 991 compress_size_64_set=False, 992 header_offset_64_set=True, 993 ) 994 with self.assertRaises(zipfile.BadZipFile) as e: 995 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 996 self.assertIn('header offset', str(e.exception).lower()) 997 998 def test_generated_valid_zip64_extra(self): 999 # These values are what is set in the make_zip64_file method. 1000 expected_file_size = 8 1001 expected_compress_size = 8 1002 expected_header_offset = 0 1003 expected_content = b"test1234" 1004 1005 # Loop through the various valid combinations of zip64 masks 1006 # present and extra fields present. 1007 params = ( 1008 {"file_size_64_set": True, "file_size_extra": True}, 1009 {"compress_size_64_set": True, "compress_size_extra": True}, 1010 {"header_offset_64_set": True, "header_offset_extra": True}, 1011 ) 1012 1013 for r in range(1, len(params) + 1): 1014 for combo in itertools.combinations(params, r): 1015 kwargs = {} 1016 for c in combo: 1017 kwargs.update(c) 1018 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1019 zinfo = zf.infolist()[0] 1020 self.assertEqual(zinfo.file_size, expected_file_size) 1021 self.assertEqual(zinfo.compress_size, expected_compress_size) 1022 self.assertEqual(zinfo.header_offset, expected_header_offset) 1023 self.assertEqual(zf.read(zinfo), expected_content) 1024 1025 1026@requires_zlib 1027class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1028 unittest.TestCase): 1029 compression = zipfile.ZIP_DEFLATED 1030 1031@requires_bz2 1032class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1033 unittest.TestCase): 1034 compression = zipfile.ZIP_BZIP2 1035 1036@requires_lzma 1037class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1038 unittest.TestCase): 1039 compression = zipfile.ZIP_LZMA 1040 1041 1042class AbstractWriterTests: 1043 1044 def tearDown(self): 1045 unlink(TESTFN2) 1046 1047 def test_close_after_close(self): 1048 data = b'content' 1049 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1050 w = zipf.open('test', 'w') 1051 w.write(data) 1052 w.close() 1053 self.assertTrue(w.closed) 1054 w.close() 1055 self.assertTrue(w.closed) 1056 self.assertEqual(zipf.read('test'), data) 1057 1058 def test_write_after_close(self): 1059 data = b'content' 1060 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1061 w = zipf.open('test', 'w') 1062 w.write(data) 1063 w.close() 1064 self.assertTrue(w.closed) 1065 self.assertRaises(ValueError, w.write, b'') 1066 self.assertEqual(zipf.read('test'), data) 1067 1068class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1069 compression = zipfile.ZIP_STORED 1070 1071@requires_zlib 1072class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1073 compression = zipfile.ZIP_DEFLATED 1074 1075@requires_bz2 1076class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1077 compression = zipfile.ZIP_BZIP2 1078 1079@requires_lzma 1080class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1081 compression = zipfile.ZIP_LZMA 1082 1083 1084class PyZipFileTests(unittest.TestCase): 1085 def assertCompiledIn(self, name, namelist): 1086 if name + 'o' not in namelist: 1087 self.assertIn(name + 'c', namelist) 1088 1089 def requiresWriteAccess(self, path): 1090 # effective_ids unavailable on windows 1091 if not os.access(path, os.W_OK, 1092 effective_ids=os.access in os.supports_effective_ids): 1093 self.skipTest('requires write access to the installed location') 1094 filename = os.path.join(path, 'test_zipfile.try') 1095 try: 1096 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1097 os.close(fd) 1098 except Exception: 1099 self.skipTest('requires write access to the installed location') 1100 unlink(filename) 1101 1102 def test_write_pyfile(self): 1103 self.requiresWriteAccess(os.path.dirname(__file__)) 1104 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1105 fn = __file__ 1106 if fn.endswith('.pyc'): 1107 path_split = fn.split(os.sep) 1108 if os.altsep is not None: 1109 path_split.extend(fn.split(os.altsep)) 1110 if '__pycache__' in path_split: 1111 fn = importlib.util.source_from_cache(fn) 1112 else: 1113 fn = fn[:-1] 1114 1115 zipfp.writepy(fn) 1116 1117 bn = os.path.basename(fn) 1118 self.assertNotIn(bn, zipfp.namelist()) 1119 self.assertCompiledIn(bn, zipfp.namelist()) 1120 1121 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1122 fn = __file__ 1123 if fn.endswith('.pyc'): 1124 fn = fn[:-1] 1125 1126 zipfp.writepy(fn, "testpackage") 1127 1128 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1129 self.assertNotIn(bn, zipfp.namelist()) 1130 self.assertCompiledIn(bn, zipfp.namelist()) 1131 1132 def test_write_python_package(self): 1133 import email 1134 packagedir = os.path.dirname(email.__file__) 1135 self.requiresWriteAccess(packagedir) 1136 1137 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1138 zipfp.writepy(packagedir) 1139 1140 # Check for a couple of modules at different levels of the 1141 # hierarchy 1142 names = zipfp.namelist() 1143 self.assertCompiledIn('email/__init__.py', names) 1144 self.assertCompiledIn('email/mime/text.py', names) 1145 1146 def test_write_filtered_python_package(self): 1147 import test 1148 packagedir = os.path.dirname(test.__file__) 1149 self.requiresWriteAccess(packagedir) 1150 1151 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1152 1153 # first make sure that the test folder gives error messages 1154 # (on the badsyntax_... files) 1155 with captured_stdout() as reportSIO: 1156 zipfp.writepy(packagedir) 1157 reportStr = reportSIO.getvalue() 1158 self.assertTrue('SyntaxError' in reportStr) 1159 1160 # then check that the filter works on the whole package 1161 with captured_stdout() as reportSIO: 1162 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1163 reportStr = reportSIO.getvalue() 1164 self.assertTrue('SyntaxError' not in reportStr) 1165 1166 # then check that the filter works on individual files 1167 def filter(path): 1168 return not os.path.basename(path).startswith("bad") 1169 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1170 zipfp.writepy(packagedir, filterfunc=filter) 1171 reportStr = reportSIO.getvalue() 1172 if reportStr: 1173 print(reportStr) 1174 self.assertTrue('SyntaxError' not in reportStr) 1175 1176 def test_write_with_optimization(self): 1177 import email 1178 packagedir = os.path.dirname(email.__file__) 1179 self.requiresWriteAccess(packagedir) 1180 optlevel = 1 if __debug__ else 0 1181 ext = '.pyc' 1182 1183 with TemporaryFile() as t, \ 1184 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1185 zipfp.writepy(packagedir) 1186 1187 names = zipfp.namelist() 1188 self.assertIn('email/__init__' + ext, names) 1189 self.assertIn('email/mime/text' + ext, names) 1190 1191 def test_write_python_directory(self): 1192 os.mkdir(TESTFN2) 1193 try: 1194 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1195 fp.write("print(42)\n") 1196 1197 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1198 fp.write("print(42 * 42)\n") 1199 1200 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 1201 fp.write("bla bla bla\n") 1202 1203 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1204 zipfp.writepy(TESTFN2) 1205 1206 names = zipfp.namelist() 1207 self.assertCompiledIn('mod1.py', names) 1208 self.assertCompiledIn('mod2.py', names) 1209 self.assertNotIn('mod2.txt', names) 1210 1211 finally: 1212 rmtree(TESTFN2) 1213 1214 def test_write_python_directory_filtered(self): 1215 os.mkdir(TESTFN2) 1216 try: 1217 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1218 fp.write("print(42)\n") 1219 1220 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1221 fp.write("print(42 * 42)\n") 1222 1223 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1224 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1225 not fn.endswith('mod2.py')) 1226 1227 names = zipfp.namelist() 1228 self.assertCompiledIn('mod1.py', names) 1229 self.assertNotIn('mod2.py', names) 1230 1231 finally: 1232 rmtree(TESTFN2) 1233 1234 def test_write_non_pyfile(self): 1235 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1236 with open(TESTFN, 'w') as f: 1237 f.write('most definitely not a python file') 1238 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1239 unlink(TESTFN) 1240 1241 def test_write_pyfile_bad_syntax(self): 1242 os.mkdir(TESTFN2) 1243 try: 1244 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1245 fp.write("Bad syntax in python file\n") 1246 1247 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1248 # syntax errors are printed to stdout 1249 with captured_stdout() as s: 1250 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1251 1252 self.assertIn("SyntaxError", s.getvalue()) 1253 1254 # as it will not have compiled the python file, it will 1255 # include the .py file not .pyc 1256 names = zipfp.namelist() 1257 self.assertIn('mod1.py', names) 1258 self.assertNotIn('mod1.pyc', names) 1259 1260 finally: 1261 rmtree(TESTFN2) 1262 1263 def test_write_pathlike(self): 1264 os.mkdir(TESTFN2) 1265 try: 1266 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1267 fp.write("print(42)\n") 1268 1269 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1270 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1271 names = zipfp.namelist() 1272 self.assertCompiledIn('mod1.py', names) 1273 finally: 1274 rmtree(TESTFN2) 1275 1276 1277class ExtractTests(unittest.TestCase): 1278 1279 def make_test_file(self): 1280 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1281 for fpath, fdata in SMALL_TEST_DATA: 1282 zipfp.writestr(fpath, fdata) 1283 1284 def test_extract(self): 1285 with temp_cwd(): 1286 self.make_test_file() 1287 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1288 for fpath, fdata in SMALL_TEST_DATA: 1289 writtenfile = zipfp.extract(fpath) 1290 1291 # make sure it was written to the right place 1292 correctfile = os.path.join(os.getcwd(), fpath) 1293 correctfile = os.path.normpath(correctfile) 1294 1295 self.assertEqual(writtenfile, correctfile) 1296 1297 # make sure correct data is in correct file 1298 with open(writtenfile, "rb") as f: 1299 self.assertEqual(fdata.encode(), f.read()) 1300 1301 unlink(writtenfile) 1302 1303 def _test_extract_with_target(self, target): 1304 self.make_test_file() 1305 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1306 for fpath, fdata in SMALL_TEST_DATA: 1307 writtenfile = zipfp.extract(fpath, target) 1308 1309 # make sure it was written to the right place 1310 correctfile = os.path.join(target, fpath) 1311 correctfile = os.path.normpath(correctfile) 1312 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1313 1314 # make sure correct data is in correct file 1315 with open(writtenfile, "rb") as f: 1316 self.assertEqual(fdata.encode(), f.read()) 1317 1318 unlink(writtenfile) 1319 1320 unlink(TESTFN2) 1321 1322 def test_extract_with_target(self): 1323 with temp_dir() as extdir: 1324 self._test_extract_with_target(extdir) 1325 1326 def test_extract_with_target_pathlike(self): 1327 with temp_dir() as extdir: 1328 self._test_extract_with_target(pathlib.Path(extdir)) 1329 1330 def test_extract_all(self): 1331 with temp_cwd(): 1332 self.make_test_file() 1333 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1334 zipfp.extractall() 1335 for fpath, fdata in SMALL_TEST_DATA: 1336 outfile = os.path.join(os.getcwd(), fpath) 1337 1338 with open(outfile, "rb") as f: 1339 self.assertEqual(fdata.encode(), f.read()) 1340 1341 unlink(outfile) 1342 1343 def _test_extract_all_with_target(self, target): 1344 self.make_test_file() 1345 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1346 zipfp.extractall(target) 1347 for fpath, fdata in SMALL_TEST_DATA: 1348 outfile = os.path.join(target, fpath) 1349 1350 with open(outfile, "rb") as f: 1351 self.assertEqual(fdata.encode(), f.read()) 1352 1353 unlink(outfile) 1354 1355 unlink(TESTFN2) 1356 1357 def test_extract_all_with_target(self): 1358 with temp_dir() as extdir: 1359 self._test_extract_all_with_target(extdir) 1360 1361 def test_extract_all_with_target_pathlike(self): 1362 with temp_dir() as extdir: 1363 self._test_extract_all_with_target(pathlib.Path(extdir)) 1364 1365 def check_file(self, filename, content): 1366 self.assertTrue(os.path.isfile(filename)) 1367 with open(filename, 'rb') as f: 1368 self.assertEqual(f.read(), content) 1369 1370 def test_sanitize_windows_name(self): 1371 san = zipfile.ZipFile._sanitize_windows_name 1372 # Passing pathsep in allows this test to work regardless of platform. 1373 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1374 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1375 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1376 1377 def test_extract_hackers_arcnames_common_cases(self): 1378 common_hacknames = [ 1379 ('../foo/bar', 'foo/bar'), 1380 ('foo/../bar', 'foo/bar'), 1381 ('foo/../../bar', 'foo/bar'), 1382 ('foo/bar/..', 'foo/bar'), 1383 ('./../foo/bar', 'foo/bar'), 1384 ('/foo/bar', 'foo/bar'), 1385 ('/foo/../bar', 'foo/bar'), 1386 ('/foo/../../bar', 'foo/bar'), 1387 ] 1388 self._test_extract_hackers_arcnames(common_hacknames) 1389 1390 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1391 def test_extract_hackers_arcnames_windows_only(self): 1392 """Test combination of path fixing and windows name sanitization.""" 1393 windows_hacknames = [ 1394 (r'..\foo\bar', 'foo/bar'), 1395 (r'..\/foo\/bar', 'foo/bar'), 1396 (r'foo/\..\/bar', 'foo/bar'), 1397 (r'foo\/../\bar', 'foo/bar'), 1398 (r'C:foo/bar', 'foo/bar'), 1399 (r'C:/foo/bar', 'foo/bar'), 1400 (r'C://foo/bar', 'foo/bar'), 1401 (r'C:\foo\bar', 'foo/bar'), 1402 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1403 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1404 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1405 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1406 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1407 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1408 (r'//?/C:/foo/bar', 'foo/bar'), 1409 (r'\\?\C:\foo\bar', 'foo/bar'), 1410 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1411 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1412 ('../../foo../../ba..r', 'foo/ba..r'), 1413 ] 1414 self._test_extract_hackers_arcnames(windows_hacknames) 1415 1416 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1417 def test_extract_hackers_arcnames_posix_only(self): 1418 posix_hacknames = [ 1419 ('//foo/bar', 'foo/bar'), 1420 ('../../foo../../ba..r', 'foo../ba..r'), 1421 (r'foo/..\bar', r'foo/..\bar'), 1422 ] 1423 self._test_extract_hackers_arcnames(posix_hacknames) 1424 1425 def _test_extract_hackers_arcnames(self, hacknames): 1426 for arcname, fixedname in hacknames: 1427 content = b'foobar' + arcname.encode() 1428 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1429 zinfo = zipfile.ZipInfo() 1430 # preserve backslashes 1431 zinfo.filename = arcname 1432 zinfo.external_attr = 0o600 << 16 1433 zipfp.writestr(zinfo, content) 1434 1435 arcname = arcname.replace(os.sep, "/") 1436 targetpath = os.path.join('target', 'subdir', 'subsub') 1437 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1438 1439 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1440 writtenfile = zipfp.extract(arcname, targetpath) 1441 self.assertEqual(writtenfile, correctfile, 1442 msg='extract %r: %r != %r' % 1443 (arcname, writtenfile, correctfile)) 1444 self.check_file(correctfile, content) 1445 rmtree('target') 1446 1447 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1448 zipfp.extractall(targetpath) 1449 self.check_file(correctfile, content) 1450 rmtree('target') 1451 1452 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1453 1454 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1455 writtenfile = zipfp.extract(arcname) 1456 self.assertEqual(writtenfile, correctfile, 1457 msg="extract %r" % arcname) 1458 self.check_file(correctfile, content) 1459 rmtree(fixedname.split('/')[0]) 1460 1461 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1462 zipfp.extractall() 1463 self.check_file(correctfile, content) 1464 rmtree(fixedname.split('/')[0]) 1465 1466 unlink(TESTFN2) 1467 1468 1469class OtherTests(unittest.TestCase): 1470 def test_open_via_zip_info(self): 1471 # Create the ZIP archive 1472 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1473 zipfp.writestr("name", "foo") 1474 with self.assertWarns(UserWarning): 1475 zipfp.writestr("name", "bar") 1476 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1477 1478 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1479 infos = zipfp.infolist() 1480 data = b"" 1481 for info in infos: 1482 with zipfp.open(info) as zipopen: 1483 data += zipopen.read() 1484 self.assertIn(data, {b"foobar", b"barfoo"}) 1485 data = b"" 1486 for info in infos: 1487 data += zipfp.read(info) 1488 self.assertIn(data, {b"foobar", b"barfoo"}) 1489 1490 def test_writestr_extended_local_header_issue1202(self): 1491 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1492 for data in 'abcdefghijklmnop': 1493 zinfo = zipfile.ZipInfo(data) 1494 zinfo.flag_bits |= 0x08 # Include an extended local header. 1495 orig_zip.writestr(zinfo, data) 1496 1497 def test_close(self): 1498 """Check that the zipfile is closed after the 'with' block.""" 1499 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1500 for fpath, fdata in SMALL_TEST_DATA: 1501 zipfp.writestr(fpath, fdata) 1502 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1503 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1504 1505 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1506 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1507 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1508 1509 def test_close_on_exception(self): 1510 """Check that the zipfile is closed if an exception is raised in the 1511 'with' block.""" 1512 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1513 for fpath, fdata in SMALL_TEST_DATA: 1514 zipfp.writestr(fpath, fdata) 1515 1516 try: 1517 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1518 raise zipfile.BadZipFile() 1519 except zipfile.BadZipFile: 1520 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1521 1522 def test_unsupported_version(self): 1523 # File has an extract_version of 120 1524 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1525 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1526 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1527 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1528 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1529 1530 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1531 io.BytesIO(data), 'r') 1532 1533 @requires_zlib 1534 def test_read_unicode_filenames(self): 1535 # bug #10801 1536 fname = findfile('zip_cp437_header.zip') 1537 with zipfile.ZipFile(fname) as zipfp: 1538 for name in zipfp.namelist(): 1539 zipfp.open(name).close() 1540 1541 def test_write_unicode_filenames(self): 1542 with zipfile.ZipFile(TESTFN, "w") as zf: 1543 zf.writestr("foo.txt", "Test for unicode filename") 1544 zf.writestr("\xf6.txt", "Test for unicode filename") 1545 self.assertIsInstance(zf.infolist()[0].filename, str) 1546 1547 with zipfile.ZipFile(TESTFN, "r") as zf: 1548 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1549 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1550 1551 def test_exclusive_create_zip_file(self): 1552 """Test exclusive creating a new zipfile.""" 1553 unlink(TESTFN2) 1554 filename = 'testfile.txt' 1555 content = b'hello, world. this is some content.' 1556 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1557 zipfp.writestr(filename, content) 1558 with self.assertRaises(FileExistsError): 1559 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1560 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1561 self.assertEqual(zipfp.namelist(), [filename]) 1562 self.assertEqual(zipfp.read(filename), content) 1563 1564 def test_create_non_existent_file_for_append(self): 1565 if os.path.exists(TESTFN): 1566 os.unlink(TESTFN) 1567 1568 filename = 'testfile.txt' 1569 content = b'hello, world. this is some content.' 1570 1571 try: 1572 with zipfile.ZipFile(TESTFN, 'a') as zf: 1573 zf.writestr(filename, content) 1574 except OSError: 1575 self.fail('Could not append data to a non-existent zip file.') 1576 1577 self.assertTrue(os.path.exists(TESTFN)) 1578 1579 with zipfile.ZipFile(TESTFN, 'r') as zf: 1580 self.assertEqual(zf.read(filename), content) 1581 1582 def test_close_erroneous_file(self): 1583 # This test checks that the ZipFile constructor closes the file object 1584 # it opens if there's an error in the file. If it doesn't, the 1585 # traceback holds a reference to the ZipFile object and, indirectly, 1586 # the file object. 1587 # On Windows, this causes the os.unlink() call to fail because the 1588 # underlying file is still open. This is SF bug #412214. 1589 # 1590 with open(TESTFN, "w") as fp: 1591 fp.write("this is not a legal zip file\n") 1592 try: 1593 zf = zipfile.ZipFile(TESTFN) 1594 except zipfile.BadZipFile: 1595 pass 1596 1597 def test_is_zip_erroneous_file(self): 1598 """Check that is_zipfile() correctly identifies non-zip files.""" 1599 # - passing a filename 1600 with open(TESTFN, "w") as fp: 1601 fp.write("this is not a legal zip file\n") 1602 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1603 # - passing a path-like object 1604 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1605 # - passing a file object 1606 with open(TESTFN, "rb") as fp: 1607 self.assertFalse(zipfile.is_zipfile(fp)) 1608 # - passing a file-like object 1609 fp = io.BytesIO() 1610 fp.write(b"this is not a legal zip file\n") 1611 self.assertFalse(zipfile.is_zipfile(fp)) 1612 fp.seek(0, 0) 1613 self.assertFalse(zipfile.is_zipfile(fp)) 1614 1615 def test_damaged_zipfile(self): 1616 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1617 # - Create a valid zip file 1618 fp = io.BytesIO() 1619 with zipfile.ZipFile(fp, mode="w") as zipf: 1620 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1621 zipfiledata = fp.getvalue() 1622 1623 # - Now create copies of it missing the last N bytes and make sure 1624 # a BadZipFile exception is raised when we try to open it 1625 for N in range(len(zipfiledata)): 1626 fp = io.BytesIO(zipfiledata[:N]) 1627 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1628 1629 def test_is_zip_valid_file(self): 1630 """Check that is_zipfile() correctly identifies zip files.""" 1631 # - passing a filename 1632 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1633 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1634 1635 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1636 # - passing a file object 1637 with open(TESTFN, "rb") as fp: 1638 self.assertTrue(zipfile.is_zipfile(fp)) 1639 fp.seek(0, 0) 1640 zip_contents = fp.read() 1641 # - passing a file-like object 1642 fp = io.BytesIO() 1643 fp.write(zip_contents) 1644 self.assertTrue(zipfile.is_zipfile(fp)) 1645 fp.seek(0, 0) 1646 self.assertTrue(zipfile.is_zipfile(fp)) 1647 1648 def test_non_existent_file_raises_OSError(self): 1649 # make sure we don't raise an AttributeError when a partially-constructed 1650 # ZipFile instance is finalized; this tests for regression on SF tracker 1651 # bug #403871. 1652 1653 # The bug we're testing for caused an AttributeError to be raised 1654 # when a ZipFile instance was created for a file that did not 1655 # exist; the .fp member was not initialized but was needed by the 1656 # __del__() method. Since the AttributeError is in the __del__(), 1657 # it is ignored, but the user should be sufficiently annoyed by 1658 # the message on the output that regression will be noticed 1659 # quickly. 1660 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1661 1662 def test_empty_file_raises_BadZipFile(self): 1663 f = open(TESTFN, 'w') 1664 f.close() 1665 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1666 1667 with open(TESTFN, 'w') as fp: 1668 fp.write("short file") 1669 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1670 1671 def test_closed_zip_raises_ValueError(self): 1672 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1673 data = io.BytesIO() 1674 with zipfile.ZipFile(data, mode="w") as zipf: 1675 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1676 1677 # This is correct; calling .read on a closed ZipFile should raise 1678 # a ValueError, and so should calling .testzip. An earlier 1679 # version of .testzip would swallow this exception (and any other) 1680 # and report that the first file in the archive was corrupt. 1681 self.assertRaises(ValueError, zipf.read, "foo.txt") 1682 self.assertRaises(ValueError, zipf.open, "foo.txt") 1683 self.assertRaises(ValueError, zipf.testzip) 1684 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1685 with open(TESTFN, 'w') as f: 1686 f.write('zipfile test data') 1687 self.assertRaises(ValueError, zipf.write, TESTFN) 1688 1689 def test_bad_constructor_mode(self): 1690 """Check that bad modes passed to ZipFile constructor are caught.""" 1691 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1692 1693 def test_bad_open_mode(self): 1694 """Check that bad modes passed to ZipFile.open are caught.""" 1695 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1696 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1697 1698 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1699 # read the data to make sure the file is there 1700 zipf.read("foo.txt") 1701 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1702 # universal newlines support is removed 1703 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1704 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1705 1706 def test_read0(self): 1707 """Check that calling read(0) on a ZipExtFile object returns an empty 1708 string and doesn't advance file pointer.""" 1709 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1710 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1711 # read the data to make sure the file is there 1712 with zipf.open("foo.txt") as f: 1713 for i in range(FIXEDTEST_SIZE): 1714 self.assertEqual(f.read(0), b'') 1715 1716 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1717 1718 def test_open_non_existent_item(self): 1719 """Check that attempting to call open() for an item that doesn't 1720 exist in the archive raises a RuntimeError.""" 1721 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1722 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1723 1724 def test_bad_compression_mode(self): 1725 """Check that bad compression methods passed to ZipFile.open are 1726 caught.""" 1727 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1728 1729 def test_unsupported_compression(self): 1730 # data is declared as shrunk, but actually deflated 1731 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1732 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1733 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1734 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1735 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1736 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1737 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1738 self.assertRaises(NotImplementedError, zipf.open, 'x') 1739 1740 def test_null_byte_in_filename(self): 1741 """Check that a filename containing a null byte is properly 1742 terminated.""" 1743 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1744 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1745 self.assertEqual(zipf.namelist(), ['foo.txt']) 1746 1747 def test_struct_sizes(self): 1748 """Check that ZIP internal structure sizes are calculated correctly.""" 1749 self.assertEqual(zipfile.sizeEndCentDir, 22) 1750 self.assertEqual(zipfile.sizeCentralDir, 46) 1751 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1752 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1753 1754 def test_comments(self): 1755 """Check that comments on the archive are handled properly.""" 1756 1757 # check default comment is empty 1758 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1759 self.assertEqual(zipf.comment, b'') 1760 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1761 1762 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1763 self.assertEqual(zipfr.comment, b'') 1764 1765 # check a simple short comment 1766 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1767 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1768 zipf.comment = comment 1769 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1770 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1771 self.assertEqual(zipf.comment, comment) 1772 1773 # check a comment of max length 1774 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1775 comment2 = comment2.encode("ascii") 1776 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1777 zipf.comment = comment2 1778 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1779 1780 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1781 self.assertEqual(zipfr.comment, comment2) 1782 1783 # check a comment that is too long is truncated 1784 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1785 with self.assertWarns(UserWarning): 1786 zipf.comment = comment2 + b'oops' 1787 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1788 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1789 self.assertEqual(zipfr.comment, comment2) 1790 1791 # check that comments are correctly modified in append mode 1792 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1793 zipf.comment = b"original comment" 1794 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1795 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1796 zipf.comment = b"an updated comment" 1797 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1798 self.assertEqual(zipf.comment, b"an updated comment") 1799 1800 # check that comments are correctly shortened in append mode 1801 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1802 zipf.comment = b"original comment that's longer" 1803 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1804 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1805 zipf.comment = b"shorter comment" 1806 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1807 self.assertEqual(zipf.comment, b"shorter comment") 1808 1809 def test_unicode_comment(self): 1810 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1811 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1812 with self.assertRaises(TypeError): 1813 zipf.comment = "this is an error" 1814 1815 def test_change_comment_in_empty_archive(self): 1816 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1817 self.assertFalse(zipf.filelist) 1818 zipf.comment = b"this is a comment" 1819 with zipfile.ZipFile(TESTFN, "r") as zipf: 1820 self.assertEqual(zipf.comment, b"this is a comment") 1821 1822 def test_change_comment_in_nonempty_archive(self): 1823 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1824 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1825 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1826 self.assertTrue(zipf.filelist) 1827 zipf.comment = b"this is a comment" 1828 with zipfile.ZipFile(TESTFN, "r") as zipf: 1829 self.assertEqual(zipf.comment, b"this is a comment") 1830 1831 def test_empty_zipfile(self): 1832 # Check that creating a file in 'w' or 'a' mode and closing without 1833 # adding any files to the archives creates a valid empty ZIP file 1834 zipf = zipfile.ZipFile(TESTFN, mode="w") 1835 zipf.close() 1836 try: 1837 zipf = zipfile.ZipFile(TESTFN, mode="r") 1838 except zipfile.BadZipFile: 1839 self.fail("Unable to create empty ZIP file in 'w' mode") 1840 1841 zipf = zipfile.ZipFile(TESTFN, mode="a") 1842 zipf.close() 1843 try: 1844 zipf = zipfile.ZipFile(TESTFN, mode="r") 1845 except: 1846 self.fail("Unable to create empty ZIP file in 'a' mode") 1847 1848 def test_open_empty_file(self): 1849 # Issue 1710703: Check that opening a file with less than 22 bytes 1850 # raises a BadZipFile exception (rather than the previously unhelpful 1851 # OSError) 1852 f = open(TESTFN, 'w') 1853 f.close() 1854 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1855 1856 def test_create_zipinfo_before_1980(self): 1857 self.assertRaises(ValueError, 1858 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1859 1860 def test_zipfile_with_short_extra_field(self): 1861 """If an extra field in the header is less than 4 bytes, skip it.""" 1862 zipdata = ( 1863 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1864 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1865 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1866 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1867 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1868 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1869 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1870 ) 1871 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1872 # testzip returns the name of the first corrupt file, or None 1873 self.assertIsNone(zipf.testzip()) 1874 1875 def test_open_conflicting_handles(self): 1876 # It's only possible to open one writable file handle at a time 1877 msg1 = b"It's fun to charter an accountant!" 1878 msg2 = b"And sail the wide accountant sea" 1879 msg3 = b"To find, explore the funds offshore" 1880 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1881 with zipf.open('foo', mode='w') as w2: 1882 w2.write(msg1) 1883 with zipf.open('bar', mode='w') as w1: 1884 with self.assertRaises(ValueError): 1885 zipf.open('handle', mode='w') 1886 with self.assertRaises(ValueError): 1887 zipf.open('foo', mode='r') 1888 with self.assertRaises(ValueError): 1889 zipf.writestr('str', 'abcde') 1890 with self.assertRaises(ValueError): 1891 zipf.write(__file__, 'file') 1892 with self.assertRaises(ValueError): 1893 zipf.close() 1894 w1.write(msg2) 1895 with zipf.open('baz', mode='w') as w2: 1896 w2.write(msg3) 1897 1898 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1899 self.assertEqual(zipf.read('foo'), msg1) 1900 self.assertEqual(zipf.read('bar'), msg2) 1901 self.assertEqual(zipf.read('baz'), msg3) 1902 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1903 1904 def test_seek_tell(self): 1905 # Test seek functionality 1906 txt = b"Where's Bruce?" 1907 bloc = txt.find(b"Bruce") 1908 # Check seek on a file 1909 with zipfile.ZipFile(TESTFN, "w") as zipf: 1910 zipf.writestr("foo.txt", txt) 1911 with zipfile.ZipFile(TESTFN, "r") as zipf: 1912 with zipf.open("foo.txt", "r") as fp: 1913 fp.seek(bloc, os.SEEK_SET) 1914 self.assertEqual(fp.tell(), bloc) 1915 fp.seek(-bloc, os.SEEK_CUR) 1916 self.assertEqual(fp.tell(), 0) 1917 fp.seek(bloc, os.SEEK_CUR) 1918 self.assertEqual(fp.tell(), bloc) 1919 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1920 fp.seek(0, os.SEEK_END) 1921 self.assertEqual(fp.tell(), len(txt)) 1922 fp.seek(0, os.SEEK_SET) 1923 self.assertEqual(fp.tell(), 0) 1924 # Check seek on memory file 1925 data = io.BytesIO() 1926 with zipfile.ZipFile(data, mode="w") as zipf: 1927 zipf.writestr("foo.txt", txt) 1928 with zipfile.ZipFile(data, mode="r") as zipf: 1929 with zipf.open("foo.txt", "r") as fp: 1930 fp.seek(bloc, os.SEEK_SET) 1931 self.assertEqual(fp.tell(), bloc) 1932 fp.seek(-bloc, os.SEEK_CUR) 1933 self.assertEqual(fp.tell(), 0) 1934 fp.seek(bloc, os.SEEK_CUR) 1935 self.assertEqual(fp.tell(), bloc) 1936 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1937 fp.seek(0, os.SEEK_END) 1938 self.assertEqual(fp.tell(), len(txt)) 1939 fp.seek(0, os.SEEK_SET) 1940 self.assertEqual(fp.tell(), 0) 1941 1942 def tearDown(self): 1943 unlink(TESTFN) 1944 unlink(TESTFN2) 1945 1946 1947class AbstractBadCrcTests: 1948 def test_testzip_with_bad_crc(self): 1949 """Tests that files with bad CRCs return their name from testzip.""" 1950 zipdata = self.zip_with_bad_crc 1951 1952 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1953 # testzip returns the name of the first corrupt file, or None 1954 self.assertEqual('afile', zipf.testzip()) 1955 1956 def test_read_with_bad_crc(self): 1957 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 1958 zipdata = self.zip_with_bad_crc 1959 1960 # Using ZipFile.read() 1961 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1962 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 1963 1964 # Using ZipExtFile.read() 1965 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1966 with zipf.open('afile', 'r') as corrupt_file: 1967 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 1968 1969 # Same with small reads (in order to exercise the buffering logic) 1970 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1971 with zipf.open('afile', 'r') as corrupt_file: 1972 corrupt_file.MIN_READ_SIZE = 2 1973 with self.assertRaises(zipfile.BadZipFile): 1974 while corrupt_file.read(2): 1975 pass 1976 1977 1978class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1979 compression = zipfile.ZIP_STORED 1980 zip_with_bad_crc = ( 1981 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 1982 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 1983 b'ilehello,AworldP' 1984 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 1985 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 1986 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 1987 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 1988 b'\0\0/\0\0\0\0\0') 1989 1990@requires_zlib 1991class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1992 compression = zipfile.ZIP_DEFLATED 1993 zip_with_bad_crc = ( 1994 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 1995 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 1996 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 1997 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 1998 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 1999 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2000 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2001 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2002 2003@requires_bz2 2004class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2005 compression = zipfile.ZIP_BZIP2 2006 zip_with_bad_crc = ( 2007 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2008 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2009 b'ileBZh91AY&SY\xd4\xa8\xca' 2010 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2011 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2012 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2013 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2014 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2015 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2016 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2017 b'\x00\x00\x00\x00') 2018 2019@requires_lzma 2020class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2021 compression = zipfile.ZIP_LZMA 2022 zip_with_bad_crc = ( 2023 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2024 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2025 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2026 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2027 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2028 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2029 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2030 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2031 b'\x00>\x00\x00\x00\x00\x00') 2032 2033 2034class DecryptionTests(unittest.TestCase): 2035 """Check that ZIP decryption works. Since the library does not 2036 support encryption at the moment, we use a pre-generated encrypted 2037 ZIP file.""" 2038 2039 data = ( 2040 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2041 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2042 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2043 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2044 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2045 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2046 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2047 data2 = ( 2048 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2049 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2050 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2051 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2052 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2053 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2054 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2055 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2056 2057 plain = b'zipfile.py encryption test' 2058 plain2 = b'\x00'*512 2059 2060 def setUp(self): 2061 with open(TESTFN, "wb") as fp: 2062 fp.write(self.data) 2063 self.zip = zipfile.ZipFile(TESTFN, "r") 2064 with open(TESTFN2, "wb") as fp: 2065 fp.write(self.data2) 2066 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2067 2068 def tearDown(self): 2069 self.zip.close() 2070 os.unlink(TESTFN) 2071 self.zip2.close() 2072 os.unlink(TESTFN2) 2073 2074 def test_no_password(self): 2075 # Reading the encrypted file without password 2076 # must generate a RunTime exception 2077 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2078 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2079 2080 def test_bad_password(self): 2081 self.zip.setpassword(b"perl") 2082 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2083 self.zip2.setpassword(b"perl") 2084 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2085 2086 @requires_zlib 2087 def test_good_password(self): 2088 self.zip.setpassword(b"python") 2089 self.assertEqual(self.zip.read("test.txt"), self.plain) 2090 self.zip2.setpassword(b"12345") 2091 self.assertEqual(self.zip2.read("zero"), self.plain2) 2092 2093 def test_unicode_password(self): 2094 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2095 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2096 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2097 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2098 2099 def test_seek_tell(self): 2100 self.zip.setpassword(b"python") 2101 txt = self.plain 2102 test_word = b'encryption' 2103 bloc = txt.find(test_word) 2104 bloc_len = len(test_word) 2105 with self.zip.open("test.txt", "r") as fp: 2106 fp.seek(bloc, os.SEEK_SET) 2107 self.assertEqual(fp.tell(), bloc) 2108 fp.seek(-bloc, os.SEEK_CUR) 2109 self.assertEqual(fp.tell(), 0) 2110 fp.seek(bloc, os.SEEK_CUR) 2111 self.assertEqual(fp.tell(), bloc) 2112 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2113 2114 # Make sure that the second read after seeking back beyond 2115 # _readbuffer returns the same content (ie. rewind to the start of 2116 # the file to read forward to the required position). 2117 old_read_size = fp.MIN_READ_SIZE 2118 fp.MIN_READ_SIZE = 1 2119 fp._readbuffer = b'' 2120 fp._offset = 0 2121 fp.seek(0, os.SEEK_SET) 2122 self.assertEqual(fp.tell(), 0) 2123 fp.seek(bloc, os.SEEK_CUR) 2124 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2125 fp.MIN_READ_SIZE = old_read_size 2126 2127 fp.seek(0, os.SEEK_END) 2128 self.assertEqual(fp.tell(), len(txt)) 2129 fp.seek(0, os.SEEK_SET) 2130 self.assertEqual(fp.tell(), 0) 2131 2132 # Read the file completely to definitely call any eof integrity 2133 # checks (crc) and make sure they still pass. 2134 fp.read() 2135 2136 2137class AbstractTestsWithRandomBinaryFiles: 2138 @classmethod 2139 def setUpClass(cls): 2140 datacount = randint(16, 64)*1024 + randint(1, 1024) 2141 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2142 for i in range(datacount)) 2143 2144 def setUp(self): 2145 # Make a source file with some lines 2146 with open(TESTFN, "wb") as fp: 2147 fp.write(self.data) 2148 2149 def tearDown(self): 2150 unlink(TESTFN) 2151 unlink(TESTFN2) 2152 2153 def make_test_archive(self, f, compression): 2154 # Create the ZIP archive 2155 with zipfile.ZipFile(f, "w", compression) as zipfp: 2156 zipfp.write(TESTFN, "another.name") 2157 zipfp.write(TESTFN, TESTFN) 2158 2159 def zip_test(self, f, compression): 2160 self.make_test_archive(f, compression) 2161 2162 # Read the ZIP archive 2163 with zipfile.ZipFile(f, "r", compression) as zipfp: 2164 testdata = zipfp.read(TESTFN) 2165 self.assertEqual(len(testdata), len(self.data)) 2166 self.assertEqual(testdata, self.data) 2167 self.assertEqual(zipfp.read("another.name"), self.data) 2168 2169 def test_read(self): 2170 for f in get_files(self): 2171 self.zip_test(f, self.compression) 2172 2173 def zip_open_test(self, f, compression): 2174 self.make_test_archive(f, compression) 2175 2176 # Read the ZIP archive 2177 with zipfile.ZipFile(f, "r", compression) as zipfp: 2178 zipdata1 = [] 2179 with zipfp.open(TESTFN) as zipopen1: 2180 while True: 2181 read_data = zipopen1.read(256) 2182 if not read_data: 2183 break 2184 zipdata1.append(read_data) 2185 2186 zipdata2 = [] 2187 with zipfp.open("another.name") as zipopen2: 2188 while True: 2189 read_data = zipopen2.read(256) 2190 if not read_data: 2191 break 2192 zipdata2.append(read_data) 2193 2194 testdata1 = b''.join(zipdata1) 2195 self.assertEqual(len(testdata1), len(self.data)) 2196 self.assertEqual(testdata1, self.data) 2197 2198 testdata2 = b''.join(zipdata2) 2199 self.assertEqual(len(testdata2), len(self.data)) 2200 self.assertEqual(testdata2, self.data) 2201 2202 def test_open(self): 2203 for f in get_files(self): 2204 self.zip_open_test(f, self.compression) 2205 2206 def zip_random_open_test(self, f, compression): 2207 self.make_test_archive(f, compression) 2208 2209 # Read the ZIP archive 2210 with zipfile.ZipFile(f, "r", compression) as zipfp: 2211 zipdata1 = [] 2212 with zipfp.open(TESTFN) as zipopen1: 2213 while True: 2214 read_data = zipopen1.read(randint(1, 1024)) 2215 if not read_data: 2216 break 2217 zipdata1.append(read_data) 2218 2219 testdata = b''.join(zipdata1) 2220 self.assertEqual(len(testdata), len(self.data)) 2221 self.assertEqual(testdata, self.data) 2222 2223 def test_random_open(self): 2224 for f in get_files(self): 2225 self.zip_random_open_test(f, self.compression) 2226 2227 2228class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2229 unittest.TestCase): 2230 compression = zipfile.ZIP_STORED 2231 2232@requires_zlib 2233class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2234 unittest.TestCase): 2235 compression = zipfile.ZIP_DEFLATED 2236 2237@requires_bz2 2238class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2239 unittest.TestCase): 2240 compression = zipfile.ZIP_BZIP2 2241 2242@requires_lzma 2243class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2244 unittest.TestCase): 2245 compression = zipfile.ZIP_LZMA 2246 2247 2248# Provide the tell() method but not seek() 2249class Tellable: 2250 def __init__(self, fp): 2251 self.fp = fp 2252 self.offset = 0 2253 2254 def write(self, data): 2255 n = self.fp.write(data) 2256 self.offset += n 2257 return n 2258 2259 def tell(self): 2260 return self.offset 2261 2262 def flush(self): 2263 self.fp.flush() 2264 2265class Unseekable: 2266 def __init__(self, fp): 2267 self.fp = fp 2268 2269 def write(self, data): 2270 return self.fp.write(data) 2271 2272 def flush(self): 2273 self.fp.flush() 2274 2275class UnseekableTests(unittest.TestCase): 2276 def test_writestr(self): 2277 for wrapper in (lambda f: f), Tellable, Unseekable: 2278 with self.subTest(wrapper=wrapper): 2279 f = io.BytesIO() 2280 f.write(b'abc') 2281 bf = io.BufferedWriter(f) 2282 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2283 zipfp.writestr('ones', b'111') 2284 zipfp.writestr('twos', b'222') 2285 self.assertEqual(f.getvalue()[:5], b'abcPK') 2286 with zipfile.ZipFile(f, mode='r') as zipf: 2287 with zipf.open('ones') as zopen: 2288 self.assertEqual(zopen.read(), b'111') 2289 with zipf.open('twos') as zopen: 2290 self.assertEqual(zopen.read(), b'222') 2291 2292 def test_write(self): 2293 for wrapper in (lambda f: f), Tellable, Unseekable: 2294 with self.subTest(wrapper=wrapper): 2295 f = io.BytesIO() 2296 f.write(b'abc') 2297 bf = io.BufferedWriter(f) 2298 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2299 self.addCleanup(unlink, TESTFN) 2300 with open(TESTFN, 'wb') as f2: 2301 f2.write(b'111') 2302 zipfp.write(TESTFN, 'ones') 2303 with open(TESTFN, 'wb') as f2: 2304 f2.write(b'222') 2305 zipfp.write(TESTFN, 'twos') 2306 self.assertEqual(f.getvalue()[:5], b'abcPK') 2307 with zipfile.ZipFile(f, mode='r') as zipf: 2308 with zipf.open('ones') as zopen: 2309 self.assertEqual(zopen.read(), b'111') 2310 with zipf.open('twos') as zopen: 2311 self.assertEqual(zopen.read(), b'222') 2312 2313 def test_open_write(self): 2314 for wrapper in (lambda f: f), Tellable, Unseekable: 2315 with self.subTest(wrapper=wrapper): 2316 f = io.BytesIO() 2317 f.write(b'abc') 2318 bf = io.BufferedWriter(f) 2319 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2320 with zipf.open('ones', 'w') as zopen: 2321 zopen.write(b'111') 2322 with zipf.open('twos', 'w') as zopen: 2323 zopen.write(b'222') 2324 self.assertEqual(f.getvalue()[:5], b'abcPK') 2325 with zipfile.ZipFile(f) as zipf: 2326 self.assertEqual(zipf.read('ones'), b'111') 2327 self.assertEqual(zipf.read('twos'), b'222') 2328 2329 2330@requires_zlib 2331class TestsWithMultipleOpens(unittest.TestCase): 2332 @classmethod 2333 def setUpClass(cls): 2334 cls.data1 = b'111' + getrandbytes(10000) 2335 cls.data2 = b'222' + getrandbytes(10000) 2336 2337 def make_test_archive(self, f): 2338 # Create the ZIP archive 2339 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2340 zipfp.writestr('ones', self.data1) 2341 zipfp.writestr('twos', self.data2) 2342 2343 def test_same_file(self): 2344 # Verify that (when the ZipFile is in control of creating file objects) 2345 # multiple open() calls can be made without interfering with each other. 2346 for f in get_files(self): 2347 self.make_test_archive(f) 2348 with zipfile.ZipFile(f, mode="r") as zipf: 2349 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2350 data1 = zopen1.read(500) 2351 data2 = zopen2.read(500) 2352 data1 += zopen1.read() 2353 data2 += zopen2.read() 2354 self.assertEqual(data1, data2) 2355 self.assertEqual(data1, self.data1) 2356 2357 def test_different_file(self): 2358 # Verify that (when the ZipFile is in control of creating file objects) 2359 # multiple open() calls can be made without interfering with each other. 2360 for f in get_files(self): 2361 self.make_test_archive(f) 2362 with zipfile.ZipFile(f, mode="r") as zipf: 2363 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2364 data1 = zopen1.read(500) 2365 data2 = zopen2.read(500) 2366 data1 += zopen1.read() 2367 data2 += zopen2.read() 2368 self.assertEqual(data1, self.data1) 2369 self.assertEqual(data2, self.data2) 2370 2371 def test_interleaved(self): 2372 # Verify that (when the ZipFile is in control of creating file objects) 2373 # multiple open() calls can be made without interfering with each other. 2374 for f in get_files(self): 2375 self.make_test_archive(f) 2376 with zipfile.ZipFile(f, mode="r") as zipf: 2377 with zipf.open('ones') as zopen1: 2378 data1 = zopen1.read(500) 2379 with zipf.open('twos') as zopen2: 2380 data2 = zopen2.read(500) 2381 data1 += zopen1.read() 2382 data2 += zopen2.read() 2383 self.assertEqual(data1, self.data1) 2384 self.assertEqual(data2, self.data2) 2385 2386 def test_read_after_close(self): 2387 for f in get_files(self): 2388 self.make_test_archive(f) 2389 with contextlib.ExitStack() as stack: 2390 with zipfile.ZipFile(f, 'r') as zipf: 2391 zopen1 = stack.enter_context(zipf.open('ones')) 2392 zopen2 = stack.enter_context(zipf.open('twos')) 2393 data1 = zopen1.read(500) 2394 data2 = zopen2.read(500) 2395 data1 += zopen1.read() 2396 data2 += zopen2.read() 2397 self.assertEqual(data1, self.data1) 2398 self.assertEqual(data2, self.data2) 2399 2400 def test_read_after_write(self): 2401 for f in get_files(self): 2402 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2403 zipf.writestr('ones', self.data1) 2404 zipf.writestr('twos', self.data2) 2405 with zipf.open('ones') as zopen1: 2406 data1 = zopen1.read(500) 2407 self.assertEqual(data1, self.data1[:500]) 2408 with zipfile.ZipFile(f, 'r') as zipf: 2409 data1 = zipf.read('ones') 2410 data2 = zipf.read('twos') 2411 self.assertEqual(data1, self.data1) 2412 self.assertEqual(data2, self.data2) 2413 2414 def test_write_after_read(self): 2415 for f in get_files(self): 2416 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2417 zipf.writestr('ones', self.data1) 2418 with zipf.open('ones') as zopen1: 2419 zopen1.read(500) 2420 zipf.writestr('twos', self.data2) 2421 with zipfile.ZipFile(f, 'r') as zipf: 2422 data1 = zipf.read('ones') 2423 data2 = zipf.read('twos') 2424 self.assertEqual(data1, self.data1) 2425 self.assertEqual(data2, self.data2) 2426 2427 def test_many_opens(self): 2428 # Verify that read() and open() promptly close the file descriptor, 2429 # and don't rely on the garbage collector to free resources. 2430 self.make_test_archive(TESTFN2) 2431 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2432 for x in range(100): 2433 zipf.read('ones') 2434 with zipf.open('ones') as zopen1: 2435 pass 2436 with open(os.devnull) as f: 2437 self.assertLess(f.fileno(), 100) 2438 2439 def test_write_while_reading(self): 2440 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2441 zipf.writestr('ones', self.data1) 2442 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2443 with zipf.open('ones', 'r') as r1: 2444 data1 = r1.read(500) 2445 with zipf.open('twos', 'w') as w1: 2446 w1.write(self.data2) 2447 data1 += r1.read() 2448 self.assertEqual(data1, self.data1) 2449 with zipfile.ZipFile(TESTFN2) as zipf: 2450 self.assertEqual(zipf.read('twos'), self.data2) 2451 2452 def tearDown(self): 2453 unlink(TESTFN2) 2454 2455 2456class TestWithDirectory(unittest.TestCase): 2457 def setUp(self): 2458 os.mkdir(TESTFN2) 2459 2460 def test_extract_dir(self): 2461 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2462 zipf.extractall(TESTFN2) 2463 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2464 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2465 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2466 2467 def test_bug_6050(self): 2468 # Extraction should succeed if directories already exist 2469 os.mkdir(os.path.join(TESTFN2, "a")) 2470 self.test_extract_dir() 2471 2472 def test_write_dir(self): 2473 dirpath = os.path.join(TESTFN2, "x") 2474 os.mkdir(dirpath) 2475 mode = os.stat(dirpath).st_mode & 0xFFFF 2476 with zipfile.ZipFile(TESTFN, "w") as zipf: 2477 zipf.write(dirpath) 2478 zinfo = zipf.filelist[0] 2479 self.assertTrue(zinfo.filename.endswith("/x/")) 2480 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2481 zipf.write(dirpath, "y") 2482 zinfo = zipf.filelist[1] 2483 self.assertTrue(zinfo.filename, "y/") 2484 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2485 with zipfile.ZipFile(TESTFN, "r") as zipf: 2486 zinfo = zipf.filelist[0] 2487 self.assertTrue(zinfo.filename.endswith("/x/")) 2488 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2489 zinfo = zipf.filelist[1] 2490 self.assertTrue(zinfo.filename, "y/") 2491 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2492 target = os.path.join(TESTFN2, "target") 2493 os.mkdir(target) 2494 zipf.extractall(target) 2495 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2496 self.assertEqual(len(os.listdir(target)), 2) 2497 2498 def test_writestr_dir(self): 2499 os.mkdir(os.path.join(TESTFN2, "x")) 2500 with zipfile.ZipFile(TESTFN, "w") as zipf: 2501 zipf.writestr("x/", b'') 2502 zinfo = zipf.filelist[0] 2503 self.assertEqual(zinfo.filename, "x/") 2504 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2505 with zipfile.ZipFile(TESTFN, "r") as zipf: 2506 zinfo = zipf.filelist[0] 2507 self.assertTrue(zinfo.filename.endswith("x/")) 2508 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2509 target = os.path.join(TESTFN2, "target") 2510 os.mkdir(target) 2511 zipf.extractall(target) 2512 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2513 self.assertEqual(os.listdir(target), ["x"]) 2514 2515 def tearDown(self): 2516 rmtree(TESTFN2) 2517 if os.path.exists(TESTFN): 2518 unlink(TESTFN) 2519 2520 2521class ZipInfoTests(unittest.TestCase): 2522 def test_from_file(self): 2523 zi = zipfile.ZipInfo.from_file(__file__) 2524 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2525 self.assertFalse(zi.is_dir()) 2526 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2527 2528 def test_from_file_pathlike(self): 2529 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2530 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2531 self.assertFalse(zi.is_dir()) 2532 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2533 2534 def test_from_file_bytes(self): 2535 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2536 self.assertEqual(posixpath.basename(zi.filename), 'test') 2537 self.assertFalse(zi.is_dir()) 2538 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2539 2540 def test_from_file_fileno(self): 2541 with open(__file__, 'rb') as f: 2542 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2543 self.assertEqual(posixpath.basename(zi.filename), 'test') 2544 self.assertFalse(zi.is_dir()) 2545 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2546 2547 def test_from_dir(self): 2548 dirpath = os.path.dirname(os.path.abspath(__file__)) 2549 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2550 self.assertEqual(zi.filename, 'stdlib_tests/') 2551 self.assertTrue(zi.is_dir()) 2552 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2553 self.assertEqual(zi.file_size, 0) 2554 2555 2556class CommandLineTest(unittest.TestCase): 2557 2558 def zipfilecmd(self, *args, **kwargs): 2559 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2560 **kwargs) 2561 return out.replace(os.linesep.encode(), b'\n') 2562 2563 def zipfilecmd_failure(self, *args): 2564 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2565 2566 def test_bad_use(self): 2567 rc, out, err = self.zipfilecmd_failure() 2568 self.assertEqual(out, b'') 2569 self.assertIn(b'usage', err.lower()) 2570 self.assertIn(b'error', err.lower()) 2571 self.assertIn(b'required', err.lower()) 2572 rc, out, err = self.zipfilecmd_failure('-l', '') 2573 self.assertEqual(out, b'') 2574 self.assertNotEqual(err.strip(), b'') 2575 2576 def test_test_command(self): 2577 zip_name = findfile('zipdir.zip') 2578 for opt in '-t', '--test': 2579 out = self.zipfilecmd(opt, zip_name) 2580 self.assertEqual(out.rstrip(), b'Done testing') 2581 zip_name = findfile('testtar.tar') 2582 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2583 self.assertEqual(out, b'') 2584 2585 def test_list_command(self): 2586 zip_name = findfile('zipdir.zip') 2587 t = io.StringIO() 2588 with zipfile.ZipFile(zip_name, 'r') as tf: 2589 tf.printdir(t) 2590 expected = t.getvalue().encode('ascii', 'backslashreplace') 2591 for opt in '-l', '--list': 2592 out = self.zipfilecmd(opt, zip_name, 2593 PYTHONIOENCODING='ascii:backslashreplace') 2594 self.assertEqual(out, expected) 2595 2596 @requires_zlib 2597 def test_create_command(self): 2598 self.addCleanup(unlink, TESTFN) 2599 with open(TESTFN, 'w') as f: 2600 f.write('test 1') 2601 os.mkdir(TESTFNDIR) 2602 self.addCleanup(rmtree, TESTFNDIR) 2603 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f: 2604 f.write('test 2') 2605 files = [TESTFN, TESTFNDIR] 2606 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2607 for opt in '-c', '--create': 2608 try: 2609 out = self.zipfilecmd(opt, TESTFN2, *files) 2610 self.assertEqual(out, b'') 2611 with zipfile.ZipFile(TESTFN2) as zf: 2612 self.assertEqual(zf.namelist(), namelist) 2613 self.assertEqual(zf.read(namelist[0]), b'test 1') 2614 self.assertEqual(zf.read(namelist[2]), b'test 2') 2615 finally: 2616 unlink(TESTFN2) 2617 2618 def test_extract_command(self): 2619 zip_name = findfile('zipdir.zip') 2620 for opt in '-e', '--extract': 2621 with temp_dir() as extdir: 2622 out = self.zipfilecmd(opt, zip_name, extdir) 2623 self.assertEqual(out, b'') 2624 with zipfile.ZipFile(zip_name) as zf: 2625 for zi in zf.infolist(): 2626 path = os.path.join(extdir, 2627 zi.filename.replace('/', os.sep)) 2628 if zi.is_dir(): 2629 self.assertTrue(os.path.isdir(path)) 2630 else: 2631 self.assertTrue(os.path.isfile(path)) 2632 with open(path, 'rb') as f: 2633 self.assertEqual(f.read(), zf.read(zi)) 2634 2635if __name__ == "__main__": 2636 unittest.main() 2637