1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16 17 18from tempfile import TemporaryFile 19from random import randint, random, getrandbits 20 21from test.support import script_helper 22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd, 23 requires_zlib, requires_bz2, requires_lzma, 24 captured_stdout) 25 26TESTFN2 = TESTFN + "2" 27TESTFNDIR = TESTFN + "d" 28FIXEDTEST_SIZE = 1000 29DATAFILES_DIR = 'zipfile_datafiles' 30 31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 32 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 33 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 34 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 35 36def getrandbytes(size): 37 return getrandbits(8 * size).to_bytes(size, 'little') 38 39def get_files(test): 40 yield TESTFN2 41 with TemporaryFile() as f: 42 yield f 43 test.assertFalse(f.closed) 44 with io.BytesIO() as f: 45 yield f 46 test.assertFalse(f.closed) 47 48class AbstractTestsWithSourceFile: 49 @classmethod 50 def setUpClass(cls): 51 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 52 (i, random()), "ascii") 53 for i in range(FIXEDTEST_SIZE)] 54 cls.data = b''.join(cls.line_gen) 55 56 def setUp(self): 57 # Make a source file with some lines 58 with open(TESTFN, "wb") as fp: 59 fp.write(self.data) 60 61 def make_test_archive(self, f, compression, compresslevel=None): 62 kwargs = {'compression': compression, 'compresslevel': compresslevel} 63 # Create the ZIP archive 64 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 65 zipfp.write(TESTFN, "another.name") 66 zipfp.write(TESTFN, TESTFN) 67 zipfp.writestr("strfile", self.data) 68 with zipfp.open('written-open-w', mode='w') as f: 69 for line in self.line_gen: 70 f.write(line) 71 72 def zip_test(self, f, compression, compresslevel=None): 73 self.make_test_archive(f, compression, compresslevel) 74 75 # Read the ZIP archive 76 with zipfile.ZipFile(f, "r", compression) as zipfp: 77 self.assertEqual(zipfp.read(TESTFN), self.data) 78 self.assertEqual(zipfp.read("another.name"), self.data) 79 self.assertEqual(zipfp.read("strfile"), self.data) 80 81 # Print the ZIP directory 82 fp = io.StringIO() 83 zipfp.printdir(file=fp) 84 directory = fp.getvalue() 85 lines = directory.splitlines() 86 self.assertEqual(len(lines), 5) # Number of files + header 87 88 self.assertIn('File Name', lines[0]) 89 self.assertIn('Modified', lines[0]) 90 self.assertIn('Size', lines[0]) 91 92 fn, date, time_, size = lines[1].split() 93 self.assertEqual(fn, 'another.name') 94 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 95 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 96 self.assertEqual(size, str(len(self.data))) 97 98 # Check the namelist 99 names = zipfp.namelist() 100 self.assertEqual(len(names), 4) 101 self.assertIn(TESTFN, names) 102 self.assertIn("another.name", names) 103 self.assertIn("strfile", names) 104 self.assertIn("written-open-w", names) 105 106 # Check infolist 107 infos = zipfp.infolist() 108 names = [i.filename for i in infos] 109 self.assertEqual(len(names), 4) 110 self.assertIn(TESTFN, names) 111 self.assertIn("another.name", names) 112 self.assertIn("strfile", names) 113 self.assertIn("written-open-w", names) 114 for i in infos: 115 self.assertEqual(i.file_size, len(self.data)) 116 117 # check getinfo 118 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 119 info = zipfp.getinfo(nm) 120 self.assertEqual(info.filename, nm) 121 self.assertEqual(info.file_size, len(self.data)) 122 123 # Check that testzip doesn't raise an exception 124 zipfp.testzip() 125 126 def test_basic(self): 127 for f in get_files(self): 128 self.zip_test(f, self.compression) 129 130 def zip_open_test(self, f, compression): 131 self.make_test_archive(f, compression) 132 133 # Read the ZIP archive 134 with zipfile.ZipFile(f, "r", compression) as zipfp: 135 zipdata1 = [] 136 with zipfp.open(TESTFN) as zipopen1: 137 while True: 138 read_data = zipopen1.read(256) 139 if not read_data: 140 break 141 zipdata1.append(read_data) 142 143 zipdata2 = [] 144 with zipfp.open("another.name") as zipopen2: 145 while True: 146 read_data = zipopen2.read(256) 147 if not read_data: 148 break 149 zipdata2.append(read_data) 150 151 self.assertEqual(b''.join(zipdata1), self.data) 152 self.assertEqual(b''.join(zipdata2), self.data) 153 154 def test_open(self): 155 for f in get_files(self): 156 self.zip_open_test(f, self.compression) 157 158 def test_open_with_pathlike(self): 159 path = pathlib.Path(TESTFN2) 160 self.zip_open_test(path, self.compression) 161 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 162 self.assertIsInstance(zipfp.filename, str) 163 164 def zip_random_open_test(self, f, compression): 165 self.make_test_archive(f, compression) 166 167 # Read the ZIP archive 168 with zipfile.ZipFile(f, "r", compression) as zipfp: 169 zipdata1 = [] 170 with zipfp.open(TESTFN) as zipopen1: 171 while True: 172 read_data = zipopen1.read(randint(1, 1024)) 173 if not read_data: 174 break 175 zipdata1.append(read_data) 176 177 self.assertEqual(b''.join(zipdata1), self.data) 178 179 def test_random_open(self): 180 for f in get_files(self): 181 self.zip_random_open_test(f, self.compression) 182 183 def zip_read1_test(self, f, compression): 184 self.make_test_archive(f, compression) 185 186 # Read the ZIP archive 187 with zipfile.ZipFile(f, "r") as zipfp, \ 188 zipfp.open(TESTFN) as zipopen: 189 zipdata = [] 190 while True: 191 read_data = zipopen.read1(-1) 192 if not read_data: 193 break 194 zipdata.append(read_data) 195 196 self.assertEqual(b''.join(zipdata), self.data) 197 198 def test_read1(self): 199 for f in get_files(self): 200 self.zip_read1_test(f, self.compression) 201 202 def zip_read1_10_test(self, f, compression): 203 self.make_test_archive(f, compression) 204 205 # Read the ZIP archive 206 with zipfile.ZipFile(f, "r") as zipfp, \ 207 zipfp.open(TESTFN) as zipopen: 208 zipdata = [] 209 while True: 210 read_data = zipopen.read1(10) 211 self.assertLessEqual(len(read_data), 10) 212 if not read_data: 213 break 214 zipdata.append(read_data) 215 216 self.assertEqual(b''.join(zipdata), self.data) 217 218 def test_read1_10(self): 219 for f in get_files(self): 220 self.zip_read1_10_test(f, self.compression) 221 222 def zip_readline_read_test(self, f, compression): 223 self.make_test_archive(f, compression) 224 225 # Read the ZIP archive 226 with zipfile.ZipFile(f, "r") as zipfp, \ 227 zipfp.open(TESTFN) as zipopen: 228 data = b'' 229 while True: 230 read = zipopen.readline() 231 if not read: 232 break 233 data += read 234 235 read = zipopen.read(100) 236 if not read: 237 break 238 data += read 239 240 self.assertEqual(data, self.data) 241 242 def test_readline_read(self): 243 # Issue #7610: calls to readline() interleaved with calls to read(). 244 for f in get_files(self): 245 self.zip_readline_read_test(f, self.compression) 246 247 def zip_readline_test(self, f, compression): 248 self.make_test_archive(f, compression) 249 250 # Read the ZIP archive 251 with zipfile.ZipFile(f, "r") as zipfp: 252 with zipfp.open(TESTFN) as zipopen: 253 for line in self.line_gen: 254 linedata = zipopen.readline() 255 self.assertEqual(linedata, line) 256 257 def test_readline(self): 258 for f in get_files(self): 259 self.zip_readline_test(f, self.compression) 260 261 def zip_readlines_test(self, f, compression): 262 self.make_test_archive(f, compression) 263 264 # Read the ZIP archive 265 with zipfile.ZipFile(f, "r") as zipfp: 266 with zipfp.open(TESTFN) as zipopen: 267 ziplines = zipopen.readlines() 268 for line, zipline in zip(self.line_gen, ziplines): 269 self.assertEqual(zipline, line) 270 271 def test_readlines(self): 272 for f in get_files(self): 273 self.zip_readlines_test(f, self.compression) 274 275 def zip_iterlines_test(self, f, compression): 276 self.make_test_archive(f, compression) 277 278 # Read the ZIP archive 279 with zipfile.ZipFile(f, "r") as zipfp: 280 with zipfp.open(TESTFN) as zipopen: 281 for line, zipline in zip(self.line_gen, zipopen): 282 self.assertEqual(zipline, line) 283 284 def test_iterlines(self): 285 for f in get_files(self): 286 self.zip_iterlines_test(f, self.compression) 287 288 def test_low_compression(self): 289 """Check for cases where compressed data is larger than original.""" 290 # Create the ZIP archive 291 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 292 zipfp.writestr("strfile", '12') 293 294 # Get an open object for strfile 295 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 296 with zipfp.open("strfile") as openobj: 297 self.assertEqual(openobj.read(1), b'1') 298 self.assertEqual(openobj.read(1), b'2') 299 300 def test_writestr_compression(self): 301 zipfp = zipfile.ZipFile(TESTFN2, "w") 302 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 303 info = zipfp.getinfo('b.txt') 304 self.assertEqual(info.compress_type, self.compression) 305 306 def test_writestr_compresslevel(self): 307 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 308 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 309 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 310 compresslevel=2) 311 312 # Compression level follows the constructor. 313 a_info = zipfp.getinfo('a.txt') 314 self.assertEqual(a_info.compress_type, self.compression) 315 self.assertEqual(a_info._compresslevel, 1) 316 317 # Compression level is overridden. 318 b_info = zipfp.getinfo('b.txt') 319 self.assertEqual(b_info.compress_type, self.compression) 320 self.assertEqual(b_info._compresslevel, 2) 321 322 def test_read_return_size(self): 323 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 324 # than requested. 325 for test_size in (1, 4095, 4096, 4097, 16384): 326 file_size = test_size + 1 327 junk = getrandbytes(file_size) 328 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 329 zipf.writestr('foo', junk) 330 with zipf.open('foo', 'r') as fp: 331 buf = fp.read(test_size) 332 self.assertEqual(len(buf), test_size) 333 334 def test_truncated_zipfile(self): 335 fp = io.BytesIO() 336 with zipfile.ZipFile(fp, mode='w') as zipf: 337 zipf.writestr('strfile', self.data, compress_type=self.compression) 338 end_offset = fp.tell() 339 zipfiledata = fp.getvalue() 340 341 fp = io.BytesIO(zipfiledata) 342 with zipfile.ZipFile(fp) as zipf: 343 with zipf.open('strfile') as zipopen: 344 fp.truncate(end_offset - 20) 345 with self.assertRaises(EOFError): 346 zipopen.read() 347 348 fp = io.BytesIO(zipfiledata) 349 with zipfile.ZipFile(fp) as zipf: 350 with zipf.open('strfile') as zipopen: 351 fp.truncate(end_offset - 20) 352 with self.assertRaises(EOFError): 353 while zipopen.read(100): 354 pass 355 356 fp = io.BytesIO(zipfiledata) 357 with zipfile.ZipFile(fp) as zipf: 358 with zipf.open('strfile') as zipopen: 359 fp.truncate(end_offset - 20) 360 with self.assertRaises(EOFError): 361 while zipopen.read1(100): 362 pass 363 364 def test_repr(self): 365 fname = 'file.name' 366 for f in get_files(self): 367 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 368 zipfp.write(TESTFN, fname) 369 r = repr(zipfp) 370 self.assertIn("mode='w'", r) 371 372 with zipfile.ZipFile(f, 'r') as zipfp: 373 r = repr(zipfp) 374 if isinstance(f, str): 375 self.assertIn('filename=%r' % f, r) 376 else: 377 self.assertIn('file=%r' % f, r) 378 self.assertIn("mode='r'", r) 379 r = repr(zipfp.getinfo(fname)) 380 self.assertIn('filename=%r' % fname, r) 381 self.assertIn('filemode=', r) 382 self.assertIn('file_size=', r) 383 if self.compression != zipfile.ZIP_STORED: 384 self.assertIn('compress_type=', r) 385 self.assertIn('compress_size=', r) 386 with zipfp.open(fname) as zipopen: 387 r = repr(zipopen) 388 self.assertIn('name=%r' % fname, r) 389 self.assertIn("mode='r'", r) 390 if self.compression != zipfile.ZIP_STORED: 391 self.assertIn('compress_type=', r) 392 self.assertIn('[closed]', repr(zipopen)) 393 self.assertIn('[closed]', repr(zipfp)) 394 395 def test_compresslevel_basic(self): 396 for f in get_files(self): 397 self.zip_test(f, self.compression, compresslevel=9) 398 399 def test_per_file_compresslevel(self): 400 """Check that files within a Zip archive can have different 401 compression levels.""" 402 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 403 zipfp.write(TESTFN, 'compress_1') 404 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 405 one_info = zipfp.getinfo('compress_1') 406 nine_info = zipfp.getinfo('compress_9') 407 self.assertEqual(one_info._compresslevel, 1) 408 self.assertEqual(nine_info._compresslevel, 9) 409 410 def test_writing_errors(self): 411 class BrokenFile(io.BytesIO): 412 def write(self, data): 413 nonlocal count 414 if count is not None: 415 if count == stop: 416 raise OSError 417 count += 1 418 super().write(data) 419 420 stop = 0 421 while True: 422 testfile = BrokenFile() 423 count = None 424 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 425 with zipfp.open('file1', 'w') as f: 426 f.write(b'data1') 427 count = 0 428 try: 429 with zipfp.open('file2', 'w') as f: 430 f.write(b'data2') 431 except OSError: 432 stop += 1 433 else: 434 break 435 finally: 436 count = None 437 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 438 self.assertEqual(zipfp.namelist(), ['file1']) 439 self.assertEqual(zipfp.read('file1'), b'data1') 440 441 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 442 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 443 self.assertEqual(zipfp.read('file1'), b'data1') 444 self.assertEqual(zipfp.read('file2'), b'data2') 445 446 447 def tearDown(self): 448 unlink(TESTFN) 449 unlink(TESTFN2) 450 451 452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 453 unittest.TestCase): 454 compression = zipfile.ZIP_STORED 455 test_low_compression = None 456 457 def zip_test_writestr_permissions(self, f, compression): 458 # Make sure that writestr and open(... mode='w') create files with 459 # mode 0600, when they are passed a name rather than a ZipInfo 460 # instance. 461 462 self.make_test_archive(f, compression) 463 with zipfile.ZipFile(f, "r") as zipfp: 464 zinfo = zipfp.getinfo('strfile') 465 self.assertEqual(zinfo.external_attr, 0o600 << 16) 466 467 zinfo2 = zipfp.getinfo('written-open-w') 468 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 469 470 def test_writestr_permissions(self): 471 for f in get_files(self): 472 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 473 474 def test_absolute_arcnames(self): 475 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 476 zipfp.write(TESTFN, "/absolute") 477 478 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 479 self.assertEqual(zipfp.namelist(), ["absolute"]) 480 481 def test_append_to_zip_file(self): 482 """Test appending to an existing zipfile.""" 483 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 484 zipfp.write(TESTFN, TESTFN) 485 486 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 487 zipfp.writestr("strfile", self.data) 488 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 489 490 def test_append_to_non_zip_file(self): 491 """Test appending to an existing file that is not a zipfile.""" 492 # NOTE: this test fails if len(d) < 22 because of the first 493 # line "fpin.seek(-22, 2)" in _EndRecData 494 data = b'I am not a ZipFile!'*10 495 with open(TESTFN2, 'wb') as f: 496 f.write(data) 497 498 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 499 zipfp.write(TESTFN, TESTFN) 500 501 with open(TESTFN2, 'rb') as f: 502 f.seek(len(data)) 503 with zipfile.ZipFile(f, "r") as zipfp: 504 self.assertEqual(zipfp.namelist(), [TESTFN]) 505 self.assertEqual(zipfp.read(TESTFN), self.data) 506 with open(TESTFN2, 'rb') as f: 507 self.assertEqual(f.read(len(data)), data) 508 zipfiledata = f.read() 509 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 510 self.assertEqual(zipfp.namelist(), [TESTFN]) 511 self.assertEqual(zipfp.read(TESTFN), self.data) 512 513 def test_read_concatenated_zip_file(self): 514 with io.BytesIO() as bio: 515 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 516 zipfp.write(TESTFN, TESTFN) 517 zipfiledata = bio.getvalue() 518 data = b'I am not a ZipFile!'*10 519 with open(TESTFN2, 'wb') as f: 520 f.write(data) 521 f.write(zipfiledata) 522 523 with zipfile.ZipFile(TESTFN2) as zipfp: 524 self.assertEqual(zipfp.namelist(), [TESTFN]) 525 self.assertEqual(zipfp.read(TESTFN), self.data) 526 527 def test_append_to_concatenated_zip_file(self): 528 with io.BytesIO() as bio: 529 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 530 zipfp.write(TESTFN, TESTFN) 531 zipfiledata = bio.getvalue() 532 data = b'I am not a ZipFile!'*1000000 533 with open(TESTFN2, 'wb') as f: 534 f.write(data) 535 f.write(zipfiledata) 536 537 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 538 self.assertEqual(zipfp.namelist(), [TESTFN]) 539 zipfp.writestr('strfile', self.data) 540 541 with open(TESTFN2, 'rb') as f: 542 self.assertEqual(f.read(len(data)), data) 543 zipfiledata = f.read() 544 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 545 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 546 self.assertEqual(zipfp.read(TESTFN), self.data) 547 self.assertEqual(zipfp.read('strfile'), self.data) 548 549 def test_ignores_newline_at_end(self): 550 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 551 zipfp.write(TESTFN, TESTFN) 552 with open(TESTFN2, 'a') as f: 553 f.write("\r\n\00\00\00") 554 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 555 self.assertIsInstance(zipfp, zipfile.ZipFile) 556 557 def test_ignores_stuff_appended_past_comments(self): 558 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 559 zipfp.comment = b"this is a comment" 560 zipfp.write(TESTFN, TESTFN) 561 with open(TESTFN2, 'a') as f: 562 f.write("abcdef\r\n") 563 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 564 self.assertIsInstance(zipfp, zipfile.ZipFile) 565 self.assertEqual(zipfp.comment, b"this is a comment") 566 567 def test_write_default_name(self): 568 """Check that calling ZipFile.write without arcname specified 569 produces the expected result.""" 570 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 571 zipfp.write(TESTFN) 572 with open(TESTFN, "rb") as f: 573 self.assertEqual(zipfp.read(TESTFN), f.read()) 574 575 def test_write_to_readonly(self): 576 """Check that trying to call write() on a readonly ZipFile object 577 raises a ValueError.""" 578 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 579 zipfp.writestr("somefile.txt", "bogus") 580 581 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 582 self.assertRaises(ValueError, zipfp.write, TESTFN) 583 584 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 585 with self.assertRaises(ValueError): 586 zipfp.open(TESTFN, mode='w') 587 588 def test_add_file_before_1980(self): 589 # Set atime and mtime to 1970-01-01 590 os.utime(TESTFN, (0, 0)) 591 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 592 self.assertRaises(ValueError, zipfp.write, TESTFN) 593 594 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 595 zipfp.write(TESTFN) 596 zinfo = zipfp.getinfo(TESTFN) 597 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 598 599 def test_add_file_after_2107(self): 600 # Set atime and mtime to 2108-12-30 601 ts = 4386268800 602 try: 603 time.localtime(ts) 604 except OverflowError: 605 self.skipTest(f'time.localtime({ts}) raises OverflowError') 606 try: 607 os.utime(TESTFN, (ts, ts)) 608 except OverflowError: 609 self.skipTest('Host fs cannot set timestamp to required value.') 610 611 mtime_ns = os.stat(TESTFN).st_mtime_ns 612 if mtime_ns != (4386268800 * 10**9): 613 # XFS filesystem is limited to 32-bit timestamp, but the syscall 614 # didn't fail. Moreover, there is a VFS bug which returns 615 # a cached timestamp which is different than the value on disk. 616 # 617 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 618 # 619 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 620 # https://bugs.python.org/issue39460#msg360952 621 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 622 623 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 624 self.assertRaises(struct.error, zipfp.write, TESTFN) 625 626 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 627 zipfp.write(TESTFN) 628 zinfo = zipfp.getinfo(TESTFN) 629 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 630 631 632@requires_zlib 633class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 634 unittest.TestCase): 635 compression = zipfile.ZIP_DEFLATED 636 637 def test_per_file_compression(self): 638 """Check that files within a Zip archive can have different 639 compression options.""" 640 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 641 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 642 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 643 sinfo = zipfp.getinfo('storeme') 644 dinfo = zipfp.getinfo('deflateme') 645 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 646 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 647 648@requires_bz2 649class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 650 unittest.TestCase): 651 compression = zipfile.ZIP_BZIP2 652 653@requires_lzma 654class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 655 unittest.TestCase): 656 compression = zipfile.ZIP_LZMA 657 658 659class AbstractTestZip64InSmallFiles: 660 # These tests test the ZIP64 functionality without using large files, 661 # see test_zipfile64 for proper tests. 662 663 @classmethod 664 def setUpClass(cls): 665 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 666 for i in range(0, FIXEDTEST_SIZE)) 667 cls.data = b'\n'.join(line_gen) 668 669 def setUp(self): 670 self._limit = zipfile.ZIP64_LIMIT 671 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 672 zipfile.ZIP64_LIMIT = 1000 673 zipfile.ZIP_FILECOUNT_LIMIT = 9 674 675 # Make a source file with some lines 676 with open(TESTFN, "wb") as fp: 677 fp.write(self.data) 678 679 def zip_test(self, f, compression): 680 # Create the ZIP archive 681 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 682 zipfp.write(TESTFN, "another.name") 683 zipfp.write(TESTFN, TESTFN) 684 zipfp.writestr("strfile", self.data) 685 686 # Read the ZIP archive 687 with zipfile.ZipFile(f, "r", compression) as zipfp: 688 self.assertEqual(zipfp.read(TESTFN), self.data) 689 self.assertEqual(zipfp.read("another.name"), self.data) 690 self.assertEqual(zipfp.read("strfile"), self.data) 691 692 # Print the ZIP directory 693 fp = io.StringIO() 694 zipfp.printdir(fp) 695 696 directory = fp.getvalue() 697 lines = directory.splitlines() 698 self.assertEqual(len(lines), 4) # Number of files + header 699 700 self.assertIn('File Name', lines[0]) 701 self.assertIn('Modified', lines[0]) 702 self.assertIn('Size', lines[0]) 703 704 fn, date, time_, size = lines[1].split() 705 self.assertEqual(fn, 'another.name') 706 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 707 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 708 self.assertEqual(size, str(len(self.data))) 709 710 # Check the namelist 711 names = zipfp.namelist() 712 self.assertEqual(len(names), 3) 713 self.assertIn(TESTFN, names) 714 self.assertIn("another.name", names) 715 self.assertIn("strfile", names) 716 717 # Check infolist 718 infos = zipfp.infolist() 719 names = [i.filename for i in infos] 720 self.assertEqual(len(names), 3) 721 self.assertIn(TESTFN, names) 722 self.assertIn("another.name", names) 723 self.assertIn("strfile", names) 724 for i in infos: 725 self.assertEqual(i.file_size, len(self.data)) 726 727 # check getinfo 728 for nm in (TESTFN, "another.name", "strfile"): 729 info = zipfp.getinfo(nm) 730 self.assertEqual(info.filename, nm) 731 self.assertEqual(info.file_size, len(self.data)) 732 733 # Check that testzip doesn't raise an exception 734 zipfp.testzip() 735 736 def test_basic(self): 737 for f in get_files(self): 738 self.zip_test(f, self.compression) 739 740 def test_too_many_files(self): 741 # This test checks that more than 64k files can be added to an archive, 742 # and that the resulting archive can be read properly by ZipFile 743 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 744 allowZip64=True) 745 zipf.debug = 100 746 numfiles = 15 747 for i in range(numfiles): 748 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 749 self.assertEqual(len(zipf.namelist()), numfiles) 750 zipf.close() 751 752 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 753 self.assertEqual(len(zipf2.namelist()), numfiles) 754 for i in range(numfiles): 755 content = zipf2.read("foo%08d" % i).decode('ascii') 756 self.assertEqual(content, "%d" % (i**3 % 57)) 757 zipf2.close() 758 759 def test_too_many_files_append(self): 760 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 761 allowZip64=False) 762 zipf.debug = 100 763 numfiles = 9 764 for i in range(numfiles): 765 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 766 self.assertEqual(len(zipf.namelist()), numfiles) 767 with self.assertRaises(zipfile.LargeZipFile): 768 zipf.writestr("foo%08d" % numfiles, b'') 769 self.assertEqual(len(zipf.namelist()), numfiles) 770 zipf.close() 771 772 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 773 allowZip64=False) 774 zipf.debug = 100 775 self.assertEqual(len(zipf.namelist()), numfiles) 776 with self.assertRaises(zipfile.LargeZipFile): 777 zipf.writestr("foo%08d" % numfiles, b'') 778 self.assertEqual(len(zipf.namelist()), numfiles) 779 zipf.close() 780 781 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 782 allowZip64=True) 783 zipf.debug = 100 784 self.assertEqual(len(zipf.namelist()), numfiles) 785 numfiles2 = 15 786 for i in range(numfiles, numfiles2): 787 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 788 self.assertEqual(len(zipf.namelist()), numfiles2) 789 zipf.close() 790 791 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 792 self.assertEqual(len(zipf2.namelist()), numfiles2) 793 for i in range(numfiles2): 794 content = zipf2.read("foo%08d" % i).decode('ascii') 795 self.assertEqual(content, "%d" % (i**3 % 57)) 796 zipf2.close() 797 798 def tearDown(self): 799 zipfile.ZIP64_LIMIT = self._limit 800 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 801 unlink(TESTFN) 802 unlink(TESTFN2) 803 804 805class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 806 unittest.TestCase): 807 compression = zipfile.ZIP_STORED 808 809 def large_file_exception_test(self, f, compression): 810 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 811 self.assertRaises(zipfile.LargeZipFile, 812 zipfp.write, TESTFN, "another.name") 813 814 def large_file_exception_test2(self, f, compression): 815 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 816 self.assertRaises(zipfile.LargeZipFile, 817 zipfp.writestr, "another.name", self.data) 818 819 def test_large_file_exception(self): 820 for f in get_files(self): 821 self.large_file_exception_test(f, zipfile.ZIP_STORED) 822 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 823 824 def test_absolute_arcnames(self): 825 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 826 allowZip64=True) as zipfp: 827 zipfp.write(TESTFN, "/absolute") 828 829 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 830 self.assertEqual(zipfp.namelist(), ["absolute"]) 831 832 def test_append(self): 833 # Test that appending to the Zip64 archive doesn't change 834 # extra fields of existing entries. 835 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 836 zipfp.writestr("strfile", self.data) 837 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 838 zinfo = zipfp.getinfo("strfile") 839 extra = zinfo.extra 840 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 841 zipfp.writestr("strfile2", self.data) 842 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 843 zinfo = zipfp.getinfo("strfile") 844 self.assertEqual(zinfo.extra, extra) 845 846 def make_zip64_file( 847 self, file_size_64_set=False, file_size_extra=False, 848 compress_size_64_set=False, compress_size_extra=False, 849 header_offset_64_set=False, header_offset_extra=False, 850 ): 851 """Generate bytes sequence for a zip with (incomplete) zip64 data. 852 853 The actual values (not the zip 64 0xffffffff values) stored in the file 854 are: 855 file_size: 8 856 compress_size: 8 857 header_offset: 0 858 """ 859 actual_size = 8 860 actual_header_offset = 0 861 local_zip64_fields = [] 862 central_zip64_fields = [] 863 864 file_size = actual_size 865 if file_size_64_set: 866 file_size = 0xffffffff 867 if file_size_extra: 868 local_zip64_fields.append(actual_size) 869 central_zip64_fields.append(actual_size) 870 file_size = struct.pack("<L", file_size) 871 872 compress_size = actual_size 873 if compress_size_64_set: 874 compress_size = 0xffffffff 875 if compress_size_extra: 876 local_zip64_fields.append(actual_size) 877 central_zip64_fields.append(actual_size) 878 compress_size = struct.pack("<L", compress_size) 879 880 header_offset = actual_header_offset 881 if header_offset_64_set: 882 header_offset = 0xffffffff 883 if header_offset_extra: 884 central_zip64_fields.append(actual_header_offset) 885 header_offset = struct.pack("<L", header_offset) 886 887 local_extra = struct.pack( 888 '<HH' + 'Q'*len(local_zip64_fields), 889 0x0001, 890 8*len(local_zip64_fields), 891 *local_zip64_fields 892 ) 893 894 central_extra = struct.pack( 895 '<HH' + 'Q'*len(central_zip64_fields), 896 0x0001, 897 8*len(central_zip64_fields), 898 *central_zip64_fields 899 ) 900 901 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 902 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 903 904 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 905 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 906 907 filename = b"test.txt" 908 content = b"test1234" 909 filename_length = struct.pack("<H", len(filename)) 910 zip64_contents = ( 911 # Local file header 912 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 913 + compress_size 914 + file_size 915 + filename_length 916 + local_extra_length 917 + filename 918 + local_extra 919 + content 920 # Central directory: 921 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 922 + compress_size 923 + file_size 924 + filename_length 925 + central_extra_length 926 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 927 + header_offset 928 + filename 929 + central_extra 930 # Zip64 end of central directory 931 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 932 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 933 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 934 + central_dir_size 935 + offset_to_central_dir 936 # Zip64 end of central directory locator 937 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 938 + b"\x00\x00\x00" 939 # end of central directory 940 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 941 + b"\x00\x00\x00\x00" 942 ) 943 return zip64_contents 944 945 def test_bad_zip64_extra(self): 946 """Missing zip64 extra records raises an exception. 947 948 There are 4 fields that the zip64 format handles (the disk number is 949 not used in this module and so is ignored here). According to the zip 950 spec: 951 The order of the fields in the zip64 extended 952 information record is fixed, but the fields MUST 953 only appear if the corresponding Local or Central 954 directory record field is set to 0xFFFF or 0xFFFFFFFF. 955 956 If the zip64 extra content doesn't contain enough entries for the 957 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 958 This test mismatches the length of the zip64 extra field and the number 959 of fields set to indicate the presence of zip64 data. 960 """ 961 # zip64 file size present, no fields in extra, expecting one, equals 962 # missing file size. 963 missing_file_size_extra = self.make_zip64_file( 964 file_size_64_set=True, 965 ) 966 with self.assertRaises(zipfile.BadZipFile) as e: 967 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 968 self.assertIn('file size', str(e.exception).lower()) 969 970 # zip64 file size present, zip64 compress size present, one field in 971 # extra, expecting two, equals missing compress size. 972 missing_compress_size_extra = self.make_zip64_file( 973 file_size_64_set=True, 974 file_size_extra=True, 975 compress_size_64_set=True, 976 ) 977 with self.assertRaises(zipfile.BadZipFile) as e: 978 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 979 self.assertIn('compress size', str(e.exception).lower()) 980 981 # zip64 compress size present, no fields in extra, expecting one, 982 # equals missing compress size. 983 missing_compress_size_extra = self.make_zip64_file( 984 compress_size_64_set=True, 985 ) 986 with self.assertRaises(zipfile.BadZipFile) as e: 987 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 988 self.assertIn('compress size', str(e.exception).lower()) 989 990 # zip64 file size present, zip64 compress size present, zip64 header 991 # offset present, two fields in extra, expecting three, equals missing 992 # header offset 993 missing_header_offset_extra = self.make_zip64_file( 994 file_size_64_set=True, 995 file_size_extra=True, 996 compress_size_64_set=True, 997 compress_size_extra=True, 998 header_offset_64_set=True, 999 ) 1000 with self.assertRaises(zipfile.BadZipFile) as e: 1001 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1002 self.assertIn('header offset', str(e.exception).lower()) 1003 1004 # zip64 compress size present, zip64 header offset present, one field 1005 # in extra, expecting two, equals missing header offset 1006 missing_header_offset_extra = self.make_zip64_file( 1007 file_size_64_set=False, 1008 compress_size_64_set=True, 1009 compress_size_extra=True, 1010 header_offset_64_set=True, 1011 ) 1012 with self.assertRaises(zipfile.BadZipFile) as e: 1013 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1014 self.assertIn('header offset', str(e.exception).lower()) 1015 1016 # zip64 file size present, zip64 header offset present, one field in 1017 # extra, expecting two, equals missing header offset 1018 missing_header_offset_extra = self.make_zip64_file( 1019 file_size_64_set=True, 1020 file_size_extra=True, 1021 compress_size_64_set=False, 1022 header_offset_64_set=True, 1023 ) 1024 with self.assertRaises(zipfile.BadZipFile) as e: 1025 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1026 self.assertIn('header offset', str(e.exception).lower()) 1027 1028 # zip64 header offset present, no fields in extra, expecting one, 1029 # equals missing header offset 1030 missing_header_offset_extra = self.make_zip64_file( 1031 file_size_64_set=False, 1032 compress_size_64_set=False, 1033 header_offset_64_set=True, 1034 ) 1035 with self.assertRaises(zipfile.BadZipFile) as e: 1036 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1037 self.assertIn('header offset', str(e.exception).lower()) 1038 1039 def test_generated_valid_zip64_extra(self): 1040 # These values are what is set in the make_zip64_file method. 1041 expected_file_size = 8 1042 expected_compress_size = 8 1043 expected_header_offset = 0 1044 expected_content = b"test1234" 1045 1046 # Loop through the various valid combinations of zip64 masks 1047 # present and extra fields present. 1048 params = ( 1049 {"file_size_64_set": True, "file_size_extra": True}, 1050 {"compress_size_64_set": True, "compress_size_extra": True}, 1051 {"header_offset_64_set": True, "header_offset_extra": True}, 1052 ) 1053 1054 for r in range(1, len(params) + 1): 1055 for combo in itertools.combinations(params, r): 1056 kwargs = {} 1057 for c in combo: 1058 kwargs.update(c) 1059 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1060 zinfo = zf.infolist()[0] 1061 self.assertEqual(zinfo.file_size, expected_file_size) 1062 self.assertEqual(zinfo.compress_size, expected_compress_size) 1063 self.assertEqual(zinfo.header_offset, expected_header_offset) 1064 self.assertEqual(zf.read(zinfo), expected_content) 1065 1066 1067@requires_zlib 1068class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1069 unittest.TestCase): 1070 compression = zipfile.ZIP_DEFLATED 1071 1072@requires_bz2 1073class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1074 unittest.TestCase): 1075 compression = zipfile.ZIP_BZIP2 1076 1077@requires_lzma 1078class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1079 unittest.TestCase): 1080 compression = zipfile.ZIP_LZMA 1081 1082 1083class AbstractWriterTests: 1084 1085 def tearDown(self): 1086 unlink(TESTFN2) 1087 1088 def test_close_after_close(self): 1089 data = b'content' 1090 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1091 w = zipf.open('test', 'w') 1092 w.write(data) 1093 w.close() 1094 self.assertTrue(w.closed) 1095 w.close() 1096 self.assertTrue(w.closed) 1097 self.assertEqual(zipf.read('test'), data) 1098 1099 def test_write_after_close(self): 1100 data = b'content' 1101 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1102 w = zipf.open('test', 'w') 1103 w.write(data) 1104 w.close() 1105 self.assertTrue(w.closed) 1106 self.assertRaises(ValueError, w.write, b'') 1107 self.assertEqual(zipf.read('test'), data) 1108 1109class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1110 compression = zipfile.ZIP_STORED 1111 1112@requires_zlib 1113class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1114 compression = zipfile.ZIP_DEFLATED 1115 1116@requires_bz2 1117class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1118 compression = zipfile.ZIP_BZIP2 1119 1120@requires_lzma 1121class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1122 compression = zipfile.ZIP_LZMA 1123 1124 1125class PyZipFileTests(unittest.TestCase): 1126 def assertCompiledIn(self, name, namelist): 1127 if name + 'o' not in namelist: 1128 self.assertIn(name + 'c', namelist) 1129 1130 def requiresWriteAccess(self, path): 1131 # effective_ids unavailable on windows 1132 if not os.access(path, os.W_OK, 1133 effective_ids=os.access in os.supports_effective_ids): 1134 self.skipTest('requires write access to the installed location') 1135 filename = os.path.join(path, 'test_zipfile.try') 1136 try: 1137 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1138 os.close(fd) 1139 except Exception: 1140 self.skipTest('requires write access to the installed location') 1141 unlink(filename) 1142 1143 def test_write_pyfile(self): 1144 self.requiresWriteAccess(os.path.dirname(__file__)) 1145 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1146 fn = __file__ 1147 if fn.endswith('.pyc'): 1148 path_split = fn.split(os.sep) 1149 if os.altsep is not None: 1150 path_split.extend(fn.split(os.altsep)) 1151 if '__pycache__' in path_split: 1152 fn = importlib.util.source_from_cache(fn) 1153 else: 1154 fn = fn[:-1] 1155 1156 zipfp.writepy(fn) 1157 1158 bn = os.path.basename(fn) 1159 self.assertNotIn(bn, zipfp.namelist()) 1160 self.assertCompiledIn(bn, zipfp.namelist()) 1161 1162 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1163 fn = __file__ 1164 if fn.endswith('.pyc'): 1165 fn = fn[:-1] 1166 1167 zipfp.writepy(fn, "testpackage") 1168 1169 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1170 self.assertNotIn(bn, zipfp.namelist()) 1171 self.assertCompiledIn(bn, zipfp.namelist()) 1172 1173 def test_write_python_package(self): 1174 import email 1175 packagedir = os.path.dirname(email.__file__) 1176 self.requiresWriteAccess(packagedir) 1177 1178 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1179 zipfp.writepy(packagedir) 1180 1181 # Check for a couple of modules at different levels of the 1182 # hierarchy 1183 names = zipfp.namelist() 1184 self.assertCompiledIn('email/__init__.py', names) 1185 self.assertCompiledIn('email/mime/text.py', names) 1186 1187 def test_write_filtered_python_package(self): 1188 import test 1189 packagedir = os.path.dirname(test.__file__) 1190 self.requiresWriteAccess(packagedir) 1191 1192 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1193 1194 # first make sure that the test folder gives error messages 1195 # (on the badsyntax_... files) 1196 with captured_stdout() as reportSIO: 1197 zipfp.writepy(packagedir) 1198 reportStr = reportSIO.getvalue() 1199 self.assertTrue('SyntaxError' in reportStr) 1200 1201 # then check that the filter works on the whole package 1202 with captured_stdout() as reportSIO: 1203 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1204 reportStr = reportSIO.getvalue() 1205 self.assertTrue('SyntaxError' not in reportStr) 1206 1207 # then check that the filter works on individual files 1208 def filter(path): 1209 return not os.path.basename(path).startswith("bad") 1210 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1211 zipfp.writepy(packagedir, filterfunc=filter) 1212 reportStr = reportSIO.getvalue() 1213 if reportStr: 1214 print(reportStr) 1215 self.assertTrue('SyntaxError' not in reportStr) 1216 1217 def test_write_with_optimization(self): 1218 import email 1219 packagedir = os.path.dirname(email.__file__) 1220 self.requiresWriteAccess(packagedir) 1221 optlevel = 1 if __debug__ else 0 1222 ext = '.pyc' 1223 1224 with TemporaryFile() as t, \ 1225 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1226 zipfp.writepy(packagedir) 1227 1228 names = zipfp.namelist() 1229 self.assertIn('email/__init__' + ext, names) 1230 self.assertIn('email/mime/text' + ext, names) 1231 1232 def test_write_python_directory(self): 1233 os.mkdir(TESTFN2) 1234 try: 1235 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1236 fp.write("print(42)\n") 1237 1238 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1239 fp.write("print(42 * 42)\n") 1240 1241 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 1242 fp.write("bla bla bla\n") 1243 1244 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1245 zipfp.writepy(TESTFN2) 1246 1247 names = zipfp.namelist() 1248 self.assertCompiledIn('mod1.py', names) 1249 self.assertCompiledIn('mod2.py', names) 1250 self.assertNotIn('mod2.txt', names) 1251 1252 finally: 1253 rmtree(TESTFN2) 1254 1255 def test_write_python_directory_filtered(self): 1256 os.mkdir(TESTFN2) 1257 try: 1258 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1259 fp.write("print(42)\n") 1260 1261 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1262 fp.write("print(42 * 42)\n") 1263 1264 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1265 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1266 not fn.endswith('mod2.py')) 1267 1268 names = zipfp.namelist() 1269 self.assertCompiledIn('mod1.py', names) 1270 self.assertNotIn('mod2.py', names) 1271 1272 finally: 1273 rmtree(TESTFN2) 1274 1275 def test_write_non_pyfile(self): 1276 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1277 with open(TESTFN, 'w') as f: 1278 f.write('most definitely not a python file') 1279 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1280 unlink(TESTFN) 1281 1282 def test_write_pyfile_bad_syntax(self): 1283 os.mkdir(TESTFN2) 1284 try: 1285 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1286 fp.write("Bad syntax in python file\n") 1287 1288 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1289 # syntax errors are printed to stdout 1290 with captured_stdout() as s: 1291 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1292 1293 self.assertIn("SyntaxError", s.getvalue()) 1294 1295 # as it will not have compiled the python file, it will 1296 # include the .py file not .pyc 1297 names = zipfp.namelist() 1298 self.assertIn('mod1.py', names) 1299 self.assertNotIn('mod1.pyc', names) 1300 1301 finally: 1302 rmtree(TESTFN2) 1303 1304 def test_write_pathlike(self): 1305 os.mkdir(TESTFN2) 1306 try: 1307 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1308 fp.write("print(42)\n") 1309 1310 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1311 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1312 names = zipfp.namelist() 1313 self.assertCompiledIn('mod1.py', names) 1314 finally: 1315 rmtree(TESTFN2) 1316 1317 1318class ExtractTests(unittest.TestCase): 1319 1320 def make_test_file(self): 1321 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1322 for fpath, fdata in SMALL_TEST_DATA: 1323 zipfp.writestr(fpath, fdata) 1324 1325 def test_extract(self): 1326 with temp_cwd(): 1327 self.make_test_file() 1328 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1329 for fpath, fdata in SMALL_TEST_DATA: 1330 writtenfile = zipfp.extract(fpath) 1331 1332 # make sure it was written to the right place 1333 correctfile = os.path.join(os.getcwd(), fpath) 1334 correctfile = os.path.normpath(correctfile) 1335 1336 self.assertEqual(writtenfile, correctfile) 1337 1338 # make sure correct data is in correct file 1339 with open(writtenfile, "rb") as f: 1340 self.assertEqual(fdata.encode(), f.read()) 1341 1342 unlink(writtenfile) 1343 1344 def _test_extract_with_target(self, target): 1345 self.make_test_file() 1346 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1347 for fpath, fdata in SMALL_TEST_DATA: 1348 writtenfile = zipfp.extract(fpath, target) 1349 1350 # make sure it was written to the right place 1351 correctfile = os.path.join(target, fpath) 1352 correctfile = os.path.normpath(correctfile) 1353 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1354 1355 # make sure correct data is in correct file 1356 with open(writtenfile, "rb") as f: 1357 self.assertEqual(fdata.encode(), f.read()) 1358 1359 unlink(writtenfile) 1360 1361 unlink(TESTFN2) 1362 1363 def test_extract_with_target(self): 1364 with temp_dir() as extdir: 1365 self._test_extract_with_target(extdir) 1366 1367 def test_extract_with_target_pathlike(self): 1368 with temp_dir() as extdir: 1369 self._test_extract_with_target(pathlib.Path(extdir)) 1370 1371 def test_extract_all(self): 1372 with temp_cwd(): 1373 self.make_test_file() 1374 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1375 zipfp.extractall() 1376 for fpath, fdata in SMALL_TEST_DATA: 1377 outfile = os.path.join(os.getcwd(), fpath) 1378 1379 with open(outfile, "rb") as f: 1380 self.assertEqual(fdata.encode(), f.read()) 1381 1382 unlink(outfile) 1383 1384 def _test_extract_all_with_target(self, target): 1385 self.make_test_file() 1386 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1387 zipfp.extractall(target) 1388 for fpath, fdata in SMALL_TEST_DATA: 1389 outfile = os.path.join(target, fpath) 1390 1391 with open(outfile, "rb") as f: 1392 self.assertEqual(fdata.encode(), f.read()) 1393 1394 unlink(outfile) 1395 1396 unlink(TESTFN2) 1397 1398 def test_extract_all_with_target(self): 1399 with temp_dir() as extdir: 1400 self._test_extract_all_with_target(extdir) 1401 1402 def test_extract_all_with_target_pathlike(self): 1403 with temp_dir() as extdir: 1404 self._test_extract_all_with_target(pathlib.Path(extdir)) 1405 1406 def check_file(self, filename, content): 1407 self.assertTrue(os.path.isfile(filename)) 1408 with open(filename, 'rb') as f: 1409 self.assertEqual(f.read(), content) 1410 1411 def test_sanitize_windows_name(self): 1412 san = zipfile.ZipFile._sanitize_windows_name 1413 # Passing pathsep in allows this test to work regardless of platform. 1414 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1415 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1416 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1417 1418 def test_extract_hackers_arcnames_common_cases(self): 1419 common_hacknames = [ 1420 ('../foo/bar', 'foo/bar'), 1421 ('foo/../bar', 'foo/bar'), 1422 ('foo/../../bar', 'foo/bar'), 1423 ('foo/bar/..', 'foo/bar'), 1424 ('./../foo/bar', 'foo/bar'), 1425 ('/foo/bar', 'foo/bar'), 1426 ('/foo/../bar', 'foo/bar'), 1427 ('/foo/../../bar', 'foo/bar'), 1428 ] 1429 self._test_extract_hackers_arcnames(common_hacknames) 1430 1431 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1432 def test_extract_hackers_arcnames_windows_only(self): 1433 """Test combination of path fixing and windows name sanitization.""" 1434 windows_hacknames = [ 1435 (r'..\foo\bar', 'foo/bar'), 1436 (r'..\/foo\/bar', 'foo/bar'), 1437 (r'foo/\..\/bar', 'foo/bar'), 1438 (r'foo\/../\bar', 'foo/bar'), 1439 (r'C:foo/bar', 'foo/bar'), 1440 (r'C:/foo/bar', 'foo/bar'), 1441 (r'C://foo/bar', 'foo/bar'), 1442 (r'C:\foo\bar', 'foo/bar'), 1443 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1444 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1445 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1446 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1447 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1448 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1449 (r'//?/C:/foo/bar', 'foo/bar'), 1450 (r'\\?\C:\foo\bar', 'foo/bar'), 1451 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1452 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1453 ('../../foo../../ba..r', 'foo/ba..r'), 1454 ] 1455 self._test_extract_hackers_arcnames(windows_hacknames) 1456 1457 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1458 def test_extract_hackers_arcnames_posix_only(self): 1459 posix_hacknames = [ 1460 ('//foo/bar', 'foo/bar'), 1461 ('../../foo../../ba..r', 'foo../ba..r'), 1462 (r'foo/..\bar', r'foo/..\bar'), 1463 ] 1464 self._test_extract_hackers_arcnames(posix_hacknames) 1465 1466 def _test_extract_hackers_arcnames(self, hacknames): 1467 for arcname, fixedname in hacknames: 1468 content = b'foobar' + arcname.encode() 1469 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1470 zinfo = zipfile.ZipInfo() 1471 # preserve backslashes 1472 zinfo.filename = arcname 1473 zinfo.external_attr = 0o600 << 16 1474 zipfp.writestr(zinfo, content) 1475 1476 arcname = arcname.replace(os.sep, "/") 1477 targetpath = os.path.join('target', 'subdir', 'subsub') 1478 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1479 1480 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1481 writtenfile = zipfp.extract(arcname, targetpath) 1482 self.assertEqual(writtenfile, correctfile, 1483 msg='extract %r: %r != %r' % 1484 (arcname, writtenfile, correctfile)) 1485 self.check_file(correctfile, content) 1486 rmtree('target') 1487 1488 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1489 zipfp.extractall(targetpath) 1490 self.check_file(correctfile, content) 1491 rmtree('target') 1492 1493 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1494 1495 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1496 writtenfile = zipfp.extract(arcname) 1497 self.assertEqual(writtenfile, correctfile, 1498 msg="extract %r" % arcname) 1499 self.check_file(correctfile, content) 1500 rmtree(fixedname.split('/')[0]) 1501 1502 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1503 zipfp.extractall() 1504 self.check_file(correctfile, content) 1505 rmtree(fixedname.split('/')[0]) 1506 1507 unlink(TESTFN2) 1508 1509 1510class OtherTests(unittest.TestCase): 1511 def test_open_via_zip_info(self): 1512 # Create the ZIP archive 1513 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1514 zipfp.writestr("name", "foo") 1515 with self.assertWarns(UserWarning): 1516 zipfp.writestr("name", "bar") 1517 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1518 1519 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1520 infos = zipfp.infolist() 1521 data = b"" 1522 for info in infos: 1523 with zipfp.open(info) as zipopen: 1524 data += zipopen.read() 1525 self.assertIn(data, {b"foobar", b"barfoo"}) 1526 data = b"" 1527 for info in infos: 1528 data += zipfp.read(info) 1529 self.assertIn(data, {b"foobar", b"barfoo"}) 1530 1531 def test_writestr_extended_local_header_issue1202(self): 1532 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1533 for data in 'abcdefghijklmnop': 1534 zinfo = zipfile.ZipInfo(data) 1535 zinfo.flag_bits |= 0x08 # Include an extended local header. 1536 orig_zip.writestr(zinfo, data) 1537 1538 def test_close(self): 1539 """Check that the zipfile is closed after the 'with' block.""" 1540 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1541 for fpath, fdata in SMALL_TEST_DATA: 1542 zipfp.writestr(fpath, fdata) 1543 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1544 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1545 1546 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1547 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1548 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1549 1550 def test_close_on_exception(self): 1551 """Check that the zipfile is closed if an exception is raised in the 1552 'with' block.""" 1553 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1554 for fpath, fdata in SMALL_TEST_DATA: 1555 zipfp.writestr(fpath, fdata) 1556 1557 try: 1558 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1559 raise zipfile.BadZipFile() 1560 except zipfile.BadZipFile: 1561 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1562 1563 def test_unsupported_version(self): 1564 # File has an extract_version of 120 1565 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1566 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1567 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1568 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1569 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1570 1571 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1572 io.BytesIO(data), 'r') 1573 1574 @requires_zlib 1575 def test_read_unicode_filenames(self): 1576 # bug #10801 1577 fname = findfile('zip_cp437_header.zip') 1578 with zipfile.ZipFile(fname) as zipfp: 1579 for name in zipfp.namelist(): 1580 zipfp.open(name).close() 1581 1582 def test_write_unicode_filenames(self): 1583 with zipfile.ZipFile(TESTFN, "w") as zf: 1584 zf.writestr("foo.txt", "Test for unicode filename") 1585 zf.writestr("\xf6.txt", "Test for unicode filename") 1586 self.assertIsInstance(zf.infolist()[0].filename, str) 1587 1588 with zipfile.ZipFile(TESTFN, "r") as zf: 1589 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1590 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1591 1592 def test_read_after_write_unicode_filenames(self): 1593 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1594 zipfp.writestr('приклад', b'sample') 1595 self.assertEqual(zipfp.read('приклад'), b'sample') 1596 1597 def test_exclusive_create_zip_file(self): 1598 """Test exclusive creating a new zipfile.""" 1599 unlink(TESTFN2) 1600 filename = 'testfile.txt' 1601 content = b'hello, world. this is some content.' 1602 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1603 zipfp.writestr(filename, content) 1604 with self.assertRaises(FileExistsError): 1605 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1606 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1607 self.assertEqual(zipfp.namelist(), [filename]) 1608 self.assertEqual(zipfp.read(filename), content) 1609 1610 def test_create_non_existent_file_for_append(self): 1611 if os.path.exists(TESTFN): 1612 os.unlink(TESTFN) 1613 1614 filename = 'testfile.txt' 1615 content = b'hello, world. this is some content.' 1616 1617 try: 1618 with zipfile.ZipFile(TESTFN, 'a') as zf: 1619 zf.writestr(filename, content) 1620 except OSError: 1621 self.fail('Could not append data to a non-existent zip file.') 1622 1623 self.assertTrue(os.path.exists(TESTFN)) 1624 1625 with zipfile.ZipFile(TESTFN, 'r') as zf: 1626 self.assertEqual(zf.read(filename), content) 1627 1628 def test_close_erroneous_file(self): 1629 # This test checks that the ZipFile constructor closes the file object 1630 # it opens if there's an error in the file. If it doesn't, the 1631 # traceback holds a reference to the ZipFile object and, indirectly, 1632 # the file object. 1633 # On Windows, this causes the os.unlink() call to fail because the 1634 # underlying file is still open. This is SF bug #412214. 1635 # 1636 with open(TESTFN, "w") as fp: 1637 fp.write("this is not a legal zip file\n") 1638 try: 1639 zf = zipfile.ZipFile(TESTFN) 1640 except zipfile.BadZipFile: 1641 pass 1642 1643 def test_is_zip_erroneous_file(self): 1644 """Check that is_zipfile() correctly identifies non-zip files.""" 1645 # - passing a filename 1646 with open(TESTFN, "w") as fp: 1647 fp.write("this is not a legal zip file\n") 1648 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1649 # - passing a path-like object 1650 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1651 # - passing a file object 1652 with open(TESTFN, "rb") as fp: 1653 self.assertFalse(zipfile.is_zipfile(fp)) 1654 # - passing a file-like object 1655 fp = io.BytesIO() 1656 fp.write(b"this is not a legal zip file\n") 1657 self.assertFalse(zipfile.is_zipfile(fp)) 1658 fp.seek(0, 0) 1659 self.assertFalse(zipfile.is_zipfile(fp)) 1660 1661 def test_damaged_zipfile(self): 1662 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1663 # - Create a valid zip file 1664 fp = io.BytesIO() 1665 with zipfile.ZipFile(fp, mode="w") as zipf: 1666 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1667 zipfiledata = fp.getvalue() 1668 1669 # - Now create copies of it missing the last N bytes and make sure 1670 # a BadZipFile exception is raised when we try to open it 1671 for N in range(len(zipfiledata)): 1672 fp = io.BytesIO(zipfiledata[:N]) 1673 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1674 1675 def test_is_zip_valid_file(self): 1676 """Check that is_zipfile() correctly identifies zip files.""" 1677 # - passing a filename 1678 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1679 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1680 1681 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1682 # - passing a file object 1683 with open(TESTFN, "rb") as fp: 1684 self.assertTrue(zipfile.is_zipfile(fp)) 1685 fp.seek(0, 0) 1686 zip_contents = fp.read() 1687 # - passing a file-like object 1688 fp = io.BytesIO() 1689 fp.write(zip_contents) 1690 self.assertTrue(zipfile.is_zipfile(fp)) 1691 fp.seek(0, 0) 1692 self.assertTrue(zipfile.is_zipfile(fp)) 1693 1694 def test_non_existent_file_raises_OSError(self): 1695 # make sure we don't raise an AttributeError when a partially-constructed 1696 # ZipFile instance is finalized; this tests for regression on SF tracker 1697 # bug #403871. 1698 1699 # The bug we're testing for caused an AttributeError to be raised 1700 # when a ZipFile instance was created for a file that did not 1701 # exist; the .fp member was not initialized but was needed by the 1702 # __del__() method. Since the AttributeError is in the __del__(), 1703 # it is ignored, but the user should be sufficiently annoyed by 1704 # the message on the output that regression will be noticed 1705 # quickly. 1706 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1707 1708 def test_empty_file_raises_BadZipFile(self): 1709 f = open(TESTFN, 'w') 1710 f.close() 1711 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1712 1713 with open(TESTFN, 'w') as fp: 1714 fp.write("short file") 1715 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1716 1717 def test_closed_zip_raises_ValueError(self): 1718 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1719 data = io.BytesIO() 1720 with zipfile.ZipFile(data, mode="w") as zipf: 1721 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1722 1723 # This is correct; calling .read on a closed ZipFile should raise 1724 # a ValueError, and so should calling .testzip. An earlier 1725 # version of .testzip would swallow this exception (and any other) 1726 # and report that the first file in the archive was corrupt. 1727 self.assertRaises(ValueError, zipf.read, "foo.txt") 1728 self.assertRaises(ValueError, zipf.open, "foo.txt") 1729 self.assertRaises(ValueError, zipf.testzip) 1730 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1731 with open(TESTFN, 'w') as f: 1732 f.write('zipfile test data') 1733 self.assertRaises(ValueError, zipf.write, TESTFN) 1734 1735 def test_bad_constructor_mode(self): 1736 """Check that bad modes passed to ZipFile constructor are caught.""" 1737 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1738 1739 def test_bad_open_mode(self): 1740 """Check that bad modes passed to ZipFile.open are caught.""" 1741 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1742 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1743 1744 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1745 # read the data to make sure the file is there 1746 zipf.read("foo.txt") 1747 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1748 # universal newlines support is removed 1749 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1750 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1751 1752 def test_read0(self): 1753 """Check that calling read(0) on a ZipExtFile object returns an empty 1754 string and doesn't advance file pointer.""" 1755 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1756 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1757 # read the data to make sure the file is there 1758 with zipf.open("foo.txt") as f: 1759 for i in range(FIXEDTEST_SIZE): 1760 self.assertEqual(f.read(0), b'') 1761 1762 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1763 1764 def test_open_non_existent_item(self): 1765 """Check that attempting to call open() for an item that doesn't 1766 exist in the archive raises a RuntimeError.""" 1767 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1768 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1769 1770 def test_bad_compression_mode(self): 1771 """Check that bad compression methods passed to ZipFile.open are 1772 caught.""" 1773 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1774 1775 def test_unsupported_compression(self): 1776 # data is declared as shrunk, but actually deflated 1777 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1778 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1779 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1780 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1781 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1782 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1783 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1784 self.assertRaises(NotImplementedError, zipf.open, 'x') 1785 1786 def test_null_byte_in_filename(self): 1787 """Check that a filename containing a null byte is properly 1788 terminated.""" 1789 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1790 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1791 self.assertEqual(zipf.namelist(), ['foo.txt']) 1792 1793 def test_struct_sizes(self): 1794 """Check that ZIP internal structure sizes are calculated correctly.""" 1795 self.assertEqual(zipfile.sizeEndCentDir, 22) 1796 self.assertEqual(zipfile.sizeCentralDir, 46) 1797 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1798 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1799 1800 def test_comments(self): 1801 """Check that comments on the archive are handled properly.""" 1802 1803 # check default comment is empty 1804 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1805 self.assertEqual(zipf.comment, b'') 1806 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1807 1808 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1809 self.assertEqual(zipfr.comment, b'') 1810 1811 # check a simple short comment 1812 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1813 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1814 zipf.comment = comment 1815 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1816 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1817 self.assertEqual(zipf.comment, comment) 1818 1819 # check a comment of max length 1820 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1821 comment2 = comment2.encode("ascii") 1822 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1823 zipf.comment = comment2 1824 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1825 1826 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1827 self.assertEqual(zipfr.comment, comment2) 1828 1829 # check a comment that is too long is truncated 1830 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1831 with self.assertWarns(UserWarning): 1832 zipf.comment = comment2 + b'oops' 1833 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1834 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1835 self.assertEqual(zipfr.comment, comment2) 1836 1837 # check that comments are correctly modified in append mode 1838 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1839 zipf.comment = b"original comment" 1840 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1841 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1842 zipf.comment = b"an updated comment" 1843 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1844 self.assertEqual(zipf.comment, b"an updated comment") 1845 1846 # check that comments are correctly shortened in append mode 1847 # and the file is indeed truncated 1848 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1849 zipf.comment = b"original comment that's longer" 1850 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1851 original_zip_size = os.path.getsize(TESTFN) 1852 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1853 zipf.comment = b"shorter comment" 1854 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1855 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1856 self.assertEqual(zipf.comment, b"shorter comment") 1857 1858 def test_unicode_comment(self): 1859 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1860 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1861 with self.assertRaises(TypeError): 1862 zipf.comment = "this is an error" 1863 1864 def test_change_comment_in_empty_archive(self): 1865 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1866 self.assertFalse(zipf.filelist) 1867 zipf.comment = b"this is a comment" 1868 with zipfile.ZipFile(TESTFN, "r") as zipf: 1869 self.assertEqual(zipf.comment, b"this is a comment") 1870 1871 def test_change_comment_in_nonempty_archive(self): 1872 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1873 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1874 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1875 self.assertTrue(zipf.filelist) 1876 zipf.comment = b"this is a comment" 1877 with zipfile.ZipFile(TESTFN, "r") as zipf: 1878 self.assertEqual(zipf.comment, b"this is a comment") 1879 1880 def test_empty_zipfile(self): 1881 # Check that creating a file in 'w' or 'a' mode and closing without 1882 # adding any files to the archives creates a valid empty ZIP file 1883 zipf = zipfile.ZipFile(TESTFN, mode="w") 1884 zipf.close() 1885 try: 1886 zipf = zipfile.ZipFile(TESTFN, mode="r") 1887 except zipfile.BadZipFile: 1888 self.fail("Unable to create empty ZIP file in 'w' mode") 1889 1890 zipf = zipfile.ZipFile(TESTFN, mode="a") 1891 zipf.close() 1892 try: 1893 zipf = zipfile.ZipFile(TESTFN, mode="r") 1894 except: 1895 self.fail("Unable to create empty ZIP file in 'a' mode") 1896 1897 def test_open_empty_file(self): 1898 # Issue 1710703: Check that opening a file with less than 22 bytes 1899 # raises a BadZipFile exception (rather than the previously unhelpful 1900 # OSError) 1901 f = open(TESTFN, 'w') 1902 f.close() 1903 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1904 1905 def test_create_zipinfo_before_1980(self): 1906 self.assertRaises(ValueError, 1907 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1908 1909 def test_zipfile_with_short_extra_field(self): 1910 """If an extra field in the header is less than 4 bytes, skip it.""" 1911 zipdata = ( 1912 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1913 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1914 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1915 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1916 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1917 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1918 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1919 ) 1920 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1921 # testzip returns the name of the first corrupt file, or None 1922 self.assertIsNone(zipf.testzip()) 1923 1924 def test_open_conflicting_handles(self): 1925 # It's only possible to open one writable file handle at a time 1926 msg1 = b"It's fun to charter an accountant!" 1927 msg2 = b"And sail the wide accountant sea" 1928 msg3 = b"To find, explore the funds offshore" 1929 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1930 with zipf.open('foo', mode='w') as w2: 1931 w2.write(msg1) 1932 with zipf.open('bar', mode='w') as w1: 1933 with self.assertRaises(ValueError): 1934 zipf.open('handle', mode='w') 1935 with self.assertRaises(ValueError): 1936 zipf.open('foo', mode='r') 1937 with self.assertRaises(ValueError): 1938 zipf.writestr('str', 'abcde') 1939 with self.assertRaises(ValueError): 1940 zipf.write(__file__, 'file') 1941 with self.assertRaises(ValueError): 1942 zipf.close() 1943 w1.write(msg2) 1944 with zipf.open('baz', mode='w') as w2: 1945 w2.write(msg3) 1946 1947 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1948 self.assertEqual(zipf.read('foo'), msg1) 1949 self.assertEqual(zipf.read('bar'), msg2) 1950 self.assertEqual(zipf.read('baz'), msg3) 1951 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1952 1953 def test_seek_tell(self): 1954 # Test seek functionality 1955 txt = b"Where's Bruce?" 1956 bloc = txt.find(b"Bruce") 1957 # Check seek on a file 1958 with zipfile.ZipFile(TESTFN, "w") as zipf: 1959 zipf.writestr("foo.txt", txt) 1960 with zipfile.ZipFile(TESTFN, "r") as zipf: 1961 with zipf.open("foo.txt", "r") as fp: 1962 fp.seek(bloc, os.SEEK_SET) 1963 self.assertEqual(fp.tell(), bloc) 1964 fp.seek(-bloc, os.SEEK_CUR) 1965 self.assertEqual(fp.tell(), 0) 1966 fp.seek(bloc, os.SEEK_CUR) 1967 self.assertEqual(fp.tell(), bloc) 1968 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1969 fp.seek(0, os.SEEK_END) 1970 self.assertEqual(fp.tell(), len(txt)) 1971 fp.seek(0, os.SEEK_SET) 1972 self.assertEqual(fp.tell(), 0) 1973 # Check seek on memory file 1974 data = io.BytesIO() 1975 with zipfile.ZipFile(data, mode="w") as zipf: 1976 zipf.writestr("foo.txt", txt) 1977 with zipfile.ZipFile(data, mode="r") as zipf: 1978 with zipf.open("foo.txt", "r") as fp: 1979 fp.seek(bloc, os.SEEK_SET) 1980 self.assertEqual(fp.tell(), bloc) 1981 fp.seek(-bloc, os.SEEK_CUR) 1982 self.assertEqual(fp.tell(), 0) 1983 fp.seek(bloc, os.SEEK_CUR) 1984 self.assertEqual(fp.tell(), bloc) 1985 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1986 fp.seek(0, os.SEEK_END) 1987 self.assertEqual(fp.tell(), len(txt)) 1988 fp.seek(0, os.SEEK_SET) 1989 self.assertEqual(fp.tell(), 0) 1990 1991 @requires_bz2 1992 def test_decompress_without_3rd_party_library(self): 1993 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1994 zip_file = io.BytesIO(data) 1995 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 1996 zf.writestr('a.txt', b'a') 1997 with mock.patch('zipfile.bz2', None): 1998 with zipfile.ZipFile(zip_file) as zf: 1999 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2000 2001 def tearDown(self): 2002 unlink(TESTFN) 2003 unlink(TESTFN2) 2004 2005 2006class AbstractBadCrcTests: 2007 def test_testzip_with_bad_crc(self): 2008 """Tests that files with bad CRCs return their name from testzip.""" 2009 zipdata = self.zip_with_bad_crc 2010 2011 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2012 # testzip returns the name of the first corrupt file, or None 2013 self.assertEqual('afile', zipf.testzip()) 2014 2015 def test_read_with_bad_crc(self): 2016 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2017 zipdata = self.zip_with_bad_crc 2018 2019 # Using ZipFile.read() 2020 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2021 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2022 2023 # Using ZipExtFile.read() 2024 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2025 with zipf.open('afile', 'r') as corrupt_file: 2026 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2027 2028 # Same with small reads (in order to exercise the buffering logic) 2029 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2030 with zipf.open('afile', 'r') as corrupt_file: 2031 corrupt_file.MIN_READ_SIZE = 2 2032 with self.assertRaises(zipfile.BadZipFile): 2033 while corrupt_file.read(2): 2034 pass 2035 2036 2037class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2038 compression = zipfile.ZIP_STORED 2039 zip_with_bad_crc = ( 2040 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2041 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2042 b'ilehello,AworldP' 2043 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2044 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2045 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2046 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2047 b'\0\0/\0\0\0\0\0') 2048 2049@requires_zlib 2050class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2051 compression = zipfile.ZIP_DEFLATED 2052 zip_with_bad_crc = ( 2053 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2054 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2055 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2056 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2057 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2058 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2059 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2060 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2061 2062@requires_bz2 2063class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2064 compression = zipfile.ZIP_BZIP2 2065 zip_with_bad_crc = ( 2066 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2067 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2068 b'ileBZh91AY&SY\xd4\xa8\xca' 2069 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2070 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2071 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2072 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2073 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2074 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2075 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2076 b'\x00\x00\x00\x00') 2077 2078@requires_lzma 2079class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2080 compression = zipfile.ZIP_LZMA 2081 zip_with_bad_crc = ( 2082 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2083 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2084 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2085 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2086 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2087 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2088 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2089 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2090 b'\x00>\x00\x00\x00\x00\x00') 2091 2092 2093class DecryptionTests(unittest.TestCase): 2094 """Check that ZIP decryption works. Since the library does not 2095 support encryption at the moment, we use a pre-generated encrypted 2096 ZIP file.""" 2097 2098 data = ( 2099 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2100 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2101 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2102 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2103 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2104 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2105 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2106 data2 = ( 2107 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2108 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2109 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2110 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2111 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2112 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2113 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2114 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2115 2116 plain = b'zipfile.py encryption test' 2117 plain2 = b'\x00'*512 2118 2119 def setUp(self): 2120 with open(TESTFN, "wb") as fp: 2121 fp.write(self.data) 2122 self.zip = zipfile.ZipFile(TESTFN, "r") 2123 with open(TESTFN2, "wb") as fp: 2124 fp.write(self.data2) 2125 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2126 2127 def tearDown(self): 2128 self.zip.close() 2129 os.unlink(TESTFN) 2130 self.zip2.close() 2131 os.unlink(TESTFN2) 2132 2133 def test_no_password(self): 2134 # Reading the encrypted file without password 2135 # must generate a RunTime exception 2136 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2137 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2138 2139 def test_bad_password(self): 2140 self.zip.setpassword(b"perl") 2141 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2142 self.zip2.setpassword(b"perl") 2143 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2144 2145 @requires_zlib 2146 def test_good_password(self): 2147 self.zip.setpassword(b"python") 2148 self.assertEqual(self.zip.read("test.txt"), self.plain) 2149 self.zip2.setpassword(b"12345") 2150 self.assertEqual(self.zip2.read("zero"), self.plain2) 2151 2152 def test_unicode_password(self): 2153 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2154 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2155 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2156 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2157 2158 def test_seek_tell(self): 2159 self.zip.setpassword(b"python") 2160 txt = self.plain 2161 test_word = b'encryption' 2162 bloc = txt.find(test_word) 2163 bloc_len = len(test_word) 2164 with self.zip.open("test.txt", "r") as fp: 2165 fp.seek(bloc, os.SEEK_SET) 2166 self.assertEqual(fp.tell(), bloc) 2167 fp.seek(-bloc, os.SEEK_CUR) 2168 self.assertEqual(fp.tell(), 0) 2169 fp.seek(bloc, os.SEEK_CUR) 2170 self.assertEqual(fp.tell(), bloc) 2171 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2172 2173 # Make sure that the second read after seeking back beyond 2174 # _readbuffer returns the same content (ie. rewind to the start of 2175 # the file to read forward to the required position). 2176 old_read_size = fp.MIN_READ_SIZE 2177 fp.MIN_READ_SIZE = 1 2178 fp._readbuffer = b'' 2179 fp._offset = 0 2180 fp.seek(0, os.SEEK_SET) 2181 self.assertEqual(fp.tell(), 0) 2182 fp.seek(bloc, os.SEEK_CUR) 2183 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2184 fp.MIN_READ_SIZE = old_read_size 2185 2186 fp.seek(0, os.SEEK_END) 2187 self.assertEqual(fp.tell(), len(txt)) 2188 fp.seek(0, os.SEEK_SET) 2189 self.assertEqual(fp.tell(), 0) 2190 2191 # Read the file completely to definitely call any eof integrity 2192 # checks (crc) and make sure they still pass. 2193 fp.read() 2194 2195 2196class AbstractTestsWithRandomBinaryFiles: 2197 @classmethod 2198 def setUpClass(cls): 2199 datacount = randint(16, 64)*1024 + randint(1, 1024) 2200 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2201 for i in range(datacount)) 2202 2203 def setUp(self): 2204 # Make a source file with some lines 2205 with open(TESTFN, "wb") as fp: 2206 fp.write(self.data) 2207 2208 def tearDown(self): 2209 unlink(TESTFN) 2210 unlink(TESTFN2) 2211 2212 def make_test_archive(self, f, compression): 2213 # Create the ZIP archive 2214 with zipfile.ZipFile(f, "w", compression) as zipfp: 2215 zipfp.write(TESTFN, "another.name") 2216 zipfp.write(TESTFN, TESTFN) 2217 2218 def zip_test(self, f, compression): 2219 self.make_test_archive(f, compression) 2220 2221 # Read the ZIP archive 2222 with zipfile.ZipFile(f, "r", compression) as zipfp: 2223 testdata = zipfp.read(TESTFN) 2224 self.assertEqual(len(testdata), len(self.data)) 2225 self.assertEqual(testdata, self.data) 2226 self.assertEqual(zipfp.read("another.name"), self.data) 2227 2228 def test_read(self): 2229 for f in get_files(self): 2230 self.zip_test(f, self.compression) 2231 2232 def zip_open_test(self, f, compression): 2233 self.make_test_archive(f, compression) 2234 2235 # Read the ZIP archive 2236 with zipfile.ZipFile(f, "r", compression) as zipfp: 2237 zipdata1 = [] 2238 with zipfp.open(TESTFN) as zipopen1: 2239 while True: 2240 read_data = zipopen1.read(256) 2241 if not read_data: 2242 break 2243 zipdata1.append(read_data) 2244 2245 zipdata2 = [] 2246 with zipfp.open("another.name") as zipopen2: 2247 while True: 2248 read_data = zipopen2.read(256) 2249 if not read_data: 2250 break 2251 zipdata2.append(read_data) 2252 2253 testdata1 = b''.join(zipdata1) 2254 self.assertEqual(len(testdata1), len(self.data)) 2255 self.assertEqual(testdata1, self.data) 2256 2257 testdata2 = b''.join(zipdata2) 2258 self.assertEqual(len(testdata2), len(self.data)) 2259 self.assertEqual(testdata2, self.data) 2260 2261 def test_open(self): 2262 for f in get_files(self): 2263 self.zip_open_test(f, self.compression) 2264 2265 def zip_random_open_test(self, f, compression): 2266 self.make_test_archive(f, compression) 2267 2268 # Read the ZIP archive 2269 with zipfile.ZipFile(f, "r", compression) as zipfp: 2270 zipdata1 = [] 2271 with zipfp.open(TESTFN) as zipopen1: 2272 while True: 2273 read_data = zipopen1.read(randint(1, 1024)) 2274 if not read_data: 2275 break 2276 zipdata1.append(read_data) 2277 2278 testdata = b''.join(zipdata1) 2279 self.assertEqual(len(testdata), len(self.data)) 2280 self.assertEqual(testdata, self.data) 2281 2282 def test_random_open(self): 2283 for f in get_files(self): 2284 self.zip_random_open_test(f, self.compression) 2285 2286 2287class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2288 unittest.TestCase): 2289 compression = zipfile.ZIP_STORED 2290 2291@requires_zlib 2292class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2293 unittest.TestCase): 2294 compression = zipfile.ZIP_DEFLATED 2295 2296@requires_bz2 2297class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2298 unittest.TestCase): 2299 compression = zipfile.ZIP_BZIP2 2300 2301@requires_lzma 2302class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2303 unittest.TestCase): 2304 compression = zipfile.ZIP_LZMA 2305 2306 2307# Provide the tell() method but not seek() 2308class Tellable: 2309 def __init__(self, fp): 2310 self.fp = fp 2311 self.offset = 0 2312 2313 def write(self, data): 2314 n = self.fp.write(data) 2315 self.offset += n 2316 return n 2317 2318 def tell(self): 2319 return self.offset 2320 2321 def flush(self): 2322 self.fp.flush() 2323 2324class Unseekable: 2325 def __init__(self, fp): 2326 self.fp = fp 2327 2328 def write(self, data): 2329 return self.fp.write(data) 2330 2331 def flush(self): 2332 self.fp.flush() 2333 2334class UnseekableTests(unittest.TestCase): 2335 def test_writestr(self): 2336 for wrapper in (lambda f: f), Tellable, Unseekable: 2337 with self.subTest(wrapper=wrapper): 2338 f = io.BytesIO() 2339 f.write(b'abc') 2340 bf = io.BufferedWriter(f) 2341 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2342 zipfp.writestr('ones', b'111') 2343 zipfp.writestr('twos', b'222') 2344 self.assertEqual(f.getvalue()[:5], b'abcPK') 2345 with zipfile.ZipFile(f, mode='r') as zipf: 2346 with zipf.open('ones') as zopen: 2347 self.assertEqual(zopen.read(), b'111') 2348 with zipf.open('twos') as zopen: 2349 self.assertEqual(zopen.read(), b'222') 2350 2351 def test_write(self): 2352 for wrapper in (lambda f: f), Tellable, Unseekable: 2353 with self.subTest(wrapper=wrapper): 2354 f = io.BytesIO() 2355 f.write(b'abc') 2356 bf = io.BufferedWriter(f) 2357 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2358 self.addCleanup(unlink, TESTFN) 2359 with open(TESTFN, 'wb') as f2: 2360 f2.write(b'111') 2361 zipfp.write(TESTFN, 'ones') 2362 with open(TESTFN, 'wb') as f2: 2363 f2.write(b'222') 2364 zipfp.write(TESTFN, 'twos') 2365 self.assertEqual(f.getvalue()[:5], b'abcPK') 2366 with zipfile.ZipFile(f, mode='r') as zipf: 2367 with zipf.open('ones') as zopen: 2368 self.assertEqual(zopen.read(), b'111') 2369 with zipf.open('twos') as zopen: 2370 self.assertEqual(zopen.read(), b'222') 2371 2372 def test_open_write(self): 2373 for wrapper in (lambda f: f), Tellable, Unseekable: 2374 with self.subTest(wrapper=wrapper): 2375 f = io.BytesIO() 2376 f.write(b'abc') 2377 bf = io.BufferedWriter(f) 2378 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2379 with zipf.open('ones', 'w') as zopen: 2380 zopen.write(b'111') 2381 with zipf.open('twos', 'w') as zopen: 2382 zopen.write(b'222') 2383 self.assertEqual(f.getvalue()[:5], b'abcPK') 2384 with zipfile.ZipFile(f) as zipf: 2385 self.assertEqual(zipf.read('ones'), b'111') 2386 self.assertEqual(zipf.read('twos'), b'222') 2387 2388 2389@requires_zlib 2390class TestsWithMultipleOpens(unittest.TestCase): 2391 @classmethod 2392 def setUpClass(cls): 2393 cls.data1 = b'111' + getrandbytes(10000) 2394 cls.data2 = b'222' + getrandbytes(10000) 2395 2396 def make_test_archive(self, f): 2397 # Create the ZIP archive 2398 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2399 zipfp.writestr('ones', self.data1) 2400 zipfp.writestr('twos', self.data2) 2401 2402 def test_same_file(self): 2403 # Verify that (when the ZipFile is in control of creating file objects) 2404 # multiple open() calls can be made without interfering with each other. 2405 for f in get_files(self): 2406 self.make_test_archive(f) 2407 with zipfile.ZipFile(f, mode="r") as zipf: 2408 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2409 data1 = zopen1.read(500) 2410 data2 = zopen2.read(500) 2411 data1 += zopen1.read() 2412 data2 += zopen2.read() 2413 self.assertEqual(data1, data2) 2414 self.assertEqual(data1, self.data1) 2415 2416 def test_different_file(self): 2417 # Verify that (when the ZipFile is in control of creating file objects) 2418 # multiple open() calls can be made without interfering with each other. 2419 for f in get_files(self): 2420 self.make_test_archive(f) 2421 with zipfile.ZipFile(f, mode="r") as zipf: 2422 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2423 data1 = zopen1.read(500) 2424 data2 = zopen2.read(500) 2425 data1 += zopen1.read() 2426 data2 += zopen2.read() 2427 self.assertEqual(data1, self.data1) 2428 self.assertEqual(data2, self.data2) 2429 2430 def test_interleaved(self): 2431 # Verify that (when the ZipFile is in control of creating file objects) 2432 # multiple open() calls can be made without interfering with each other. 2433 for f in get_files(self): 2434 self.make_test_archive(f) 2435 with zipfile.ZipFile(f, mode="r") as zipf: 2436 with zipf.open('ones') as zopen1: 2437 data1 = zopen1.read(500) 2438 with zipf.open('twos') as zopen2: 2439 data2 = zopen2.read(500) 2440 data1 += zopen1.read() 2441 data2 += zopen2.read() 2442 self.assertEqual(data1, self.data1) 2443 self.assertEqual(data2, self.data2) 2444 2445 def test_read_after_close(self): 2446 for f in get_files(self): 2447 self.make_test_archive(f) 2448 with contextlib.ExitStack() as stack: 2449 with zipfile.ZipFile(f, 'r') as zipf: 2450 zopen1 = stack.enter_context(zipf.open('ones')) 2451 zopen2 = stack.enter_context(zipf.open('twos')) 2452 data1 = zopen1.read(500) 2453 data2 = zopen2.read(500) 2454 data1 += zopen1.read() 2455 data2 += zopen2.read() 2456 self.assertEqual(data1, self.data1) 2457 self.assertEqual(data2, self.data2) 2458 2459 def test_read_after_write(self): 2460 for f in get_files(self): 2461 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2462 zipf.writestr('ones', self.data1) 2463 zipf.writestr('twos', self.data2) 2464 with zipf.open('ones') as zopen1: 2465 data1 = zopen1.read(500) 2466 self.assertEqual(data1, self.data1[:500]) 2467 with zipfile.ZipFile(f, 'r') as zipf: 2468 data1 = zipf.read('ones') 2469 data2 = zipf.read('twos') 2470 self.assertEqual(data1, self.data1) 2471 self.assertEqual(data2, self.data2) 2472 2473 def test_write_after_read(self): 2474 for f in get_files(self): 2475 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2476 zipf.writestr('ones', self.data1) 2477 with zipf.open('ones') as zopen1: 2478 zopen1.read(500) 2479 zipf.writestr('twos', self.data2) 2480 with zipfile.ZipFile(f, 'r') as zipf: 2481 data1 = zipf.read('ones') 2482 data2 = zipf.read('twos') 2483 self.assertEqual(data1, self.data1) 2484 self.assertEqual(data2, self.data2) 2485 2486 def test_many_opens(self): 2487 # Verify that read() and open() promptly close the file descriptor, 2488 # and don't rely on the garbage collector to free resources. 2489 self.make_test_archive(TESTFN2) 2490 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2491 for x in range(100): 2492 zipf.read('ones') 2493 with zipf.open('ones') as zopen1: 2494 pass 2495 with open(os.devnull) as f: 2496 self.assertLess(f.fileno(), 100) 2497 2498 def test_write_while_reading(self): 2499 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2500 zipf.writestr('ones', self.data1) 2501 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2502 with zipf.open('ones', 'r') as r1: 2503 data1 = r1.read(500) 2504 with zipf.open('twos', 'w') as w1: 2505 w1.write(self.data2) 2506 data1 += r1.read() 2507 self.assertEqual(data1, self.data1) 2508 with zipfile.ZipFile(TESTFN2) as zipf: 2509 self.assertEqual(zipf.read('twos'), self.data2) 2510 2511 def tearDown(self): 2512 unlink(TESTFN2) 2513 2514 2515class TestWithDirectory(unittest.TestCase): 2516 def setUp(self): 2517 os.mkdir(TESTFN2) 2518 2519 def test_extract_dir(self): 2520 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2521 zipf.extractall(TESTFN2) 2522 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2523 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2524 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2525 2526 def test_bug_6050(self): 2527 # Extraction should succeed if directories already exist 2528 os.mkdir(os.path.join(TESTFN2, "a")) 2529 self.test_extract_dir() 2530 2531 def test_write_dir(self): 2532 dirpath = os.path.join(TESTFN2, "x") 2533 os.mkdir(dirpath) 2534 mode = os.stat(dirpath).st_mode & 0xFFFF 2535 with zipfile.ZipFile(TESTFN, "w") as zipf: 2536 zipf.write(dirpath) 2537 zinfo = zipf.filelist[0] 2538 self.assertTrue(zinfo.filename.endswith("/x/")) 2539 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2540 zipf.write(dirpath, "y") 2541 zinfo = zipf.filelist[1] 2542 self.assertTrue(zinfo.filename, "y/") 2543 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2544 with zipfile.ZipFile(TESTFN, "r") as zipf: 2545 zinfo = zipf.filelist[0] 2546 self.assertTrue(zinfo.filename.endswith("/x/")) 2547 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2548 zinfo = zipf.filelist[1] 2549 self.assertTrue(zinfo.filename, "y/") 2550 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2551 target = os.path.join(TESTFN2, "target") 2552 os.mkdir(target) 2553 zipf.extractall(target) 2554 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2555 self.assertEqual(len(os.listdir(target)), 2) 2556 2557 def test_writestr_dir(self): 2558 os.mkdir(os.path.join(TESTFN2, "x")) 2559 with zipfile.ZipFile(TESTFN, "w") as zipf: 2560 zipf.writestr("x/", b'') 2561 zinfo = zipf.filelist[0] 2562 self.assertEqual(zinfo.filename, "x/") 2563 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2564 with zipfile.ZipFile(TESTFN, "r") as zipf: 2565 zinfo = zipf.filelist[0] 2566 self.assertTrue(zinfo.filename.endswith("x/")) 2567 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2568 target = os.path.join(TESTFN2, "target") 2569 os.mkdir(target) 2570 zipf.extractall(target) 2571 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2572 self.assertEqual(os.listdir(target), ["x"]) 2573 2574 def tearDown(self): 2575 rmtree(TESTFN2) 2576 if os.path.exists(TESTFN): 2577 unlink(TESTFN) 2578 2579 2580class ZipInfoTests(unittest.TestCase): 2581 def test_from_file(self): 2582 zi = zipfile.ZipInfo.from_file(__file__) 2583 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2584 self.assertFalse(zi.is_dir()) 2585 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2586 2587 def test_from_file_pathlike(self): 2588 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2589 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2590 self.assertFalse(zi.is_dir()) 2591 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2592 2593 def test_from_file_bytes(self): 2594 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2595 self.assertEqual(posixpath.basename(zi.filename), 'test') 2596 self.assertFalse(zi.is_dir()) 2597 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2598 2599 def test_from_file_fileno(self): 2600 with open(__file__, 'rb') as f: 2601 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2602 self.assertEqual(posixpath.basename(zi.filename), 'test') 2603 self.assertFalse(zi.is_dir()) 2604 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2605 2606 def test_from_dir(self): 2607 dirpath = os.path.dirname(os.path.abspath(__file__)) 2608 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2609 self.assertEqual(zi.filename, 'stdlib_tests/') 2610 self.assertTrue(zi.is_dir()) 2611 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2612 self.assertEqual(zi.file_size, 0) 2613 2614 2615class CommandLineTest(unittest.TestCase): 2616 2617 def zipfilecmd(self, *args, **kwargs): 2618 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2619 **kwargs) 2620 return out.replace(os.linesep.encode(), b'\n') 2621 2622 def zipfilecmd_failure(self, *args): 2623 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2624 2625 def test_bad_use(self): 2626 rc, out, err = self.zipfilecmd_failure() 2627 self.assertEqual(out, b'') 2628 self.assertIn(b'usage', err.lower()) 2629 self.assertIn(b'error', err.lower()) 2630 self.assertIn(b'required', err.lower()) 2631 rc, out, err = self.zipfilecmd_failure('-l', '') 2632 self.assertEqual(out, b'') 2633 self.assertNotEqual(err.strip(), b'') 2634 2635 def test_test_command(self): 2636 zip_name = findfile('zipdir.zip') 2637 for opt in '-t', '--test': 2638 out = self.zipfilecmd(opt, zip_name) 2639 self.assertEqual(out.rstrip(), b'Done testing') 2640 zip_name = findfile('testtar.tar') 2641 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2642 self.assertEqual(out, b'') 2643 2644 def test_list_command(self): 2645 zip_name = findfile('zipdir.zip') 2646 t = io.StringIO() 2647 with zipfile.ZipFile(zip_name, 'r') as tf: 2648 tf.printdir(t) 2649 expected = t.getvalue().encode('ascii', 'backslashreplace') 2650 for opt in '-l', '--list': 2651 out = self.zipfilecmd(opt, zip_name, 2652 PYTHONIOENCODING='ascii:backslashreplace') 2653 self.assertEqual(out, expected) 2654 2655 @requires_zlib 2656 def test_create_command(self): 2657 self.addCleanup(unlink, TESTFN) 2658 with open(TESTFN, 'w') as f: 2659 f.write('test 1') 2660 os.mkdir(TESTFNDIR) 2661 self.addCleanup(rmtree, TESTFNDIR) 2662 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f: 2663 f.write('test 2') 2664 files = [TESTFN, TESTFNDIR] 2665 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2666 for opt in '-c', '--create': 2667 try: 2668 out = self.zipfilecmd(opt, TESTFN2, *files) 2669 self.assertEqual(out, b'') 2670 with zipfile.ZipFile(TESTFN2) as zf: 2671 self.assertEqual(zf.namelist(), namelist) 2672 self.assertEqual(zf.read(namelist[0]), b'test 1') 2673 self.assertEqual(zf.read(namelist[2]), b'test 2') 2674 finally: 2675 unlink(TESTFN2) 2676 2677 def test_extract_command(self): 2678 zip_name = findfile('zipdir.zip') 2679 for opt in '-e', '--extract': 2680 with temp_dir() as extdir: 2681 out = self.zipfilecmd(opt, zip_name, extdir) 2682 self.assertEqual(out, b'') 2683 with zipfile.ZipFile(zip_name) as zf: 2684 for zi in zf.infolist(): 2685 path = os.path.join(extdir, 2686 zi.filename.replace('/', os.sep)) 2687 if zi.is_dir(): 2688 self.assertTrue(os.path.isdir(path)) 2689 else: 2690 self.assertTrue(os.path.isfile(path)) 2691 with open(path, 'rb') as f: 2692 self.assertEqual(f.read(), zf.read(zi)) 2693 2694 2695class TestExecutablePrependedZip(unittest.TestCase): 2696 """Test our ability to open zip files with an executable prepended.""" 2697 2698 def setUp(self): 2699 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2700 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2701 2702 def _test_zip_works(self, name): 2703 # bpo28494 sanity check: ensure is_zipfile works on these. 2704 self.assertTrue(zipfile.is_zipfile(name), 2705 f'is_zipfile failed on {name}') 2706 # Ensure we can operate on these via ZipFile. 2707 with zipfile.ZipFile(name) as zipfp: 2708 for n in zipfp.namelist(): 2709 data = zipfp.read(n) 2710 self.assertIn(b'FAVORITE_NUMBER', data) 2711 2712 def test_read_zip_with_exe_prepended(self): 2713 self._test_zip_works(self.exe_zip) 2714 2715 def test_read_zip64_with_exe_prepended(self): 2716 self._test_zip_works(self.exe_zip64) 2717 2718 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2719 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2720 'Test relies on #!/bin/bash working.') 2721 def test_execute_zip2(self): 2722 output = subprocess.check_output([self.exe_zip, sys.executable]) 2723 self.assertIn(b'number in executable: 5', output) 2724 2725 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2726 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2727 'Test relies on #!/bin/bash working.') 2728 def test_execute_zip64(self): 2729 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2730 self.assertIn(b'number in executable: 5', output) 2731 2732 2733# Poor man's technique to consume a (smallish) iterable. 2734consume = tuple 2735 2736 2737# from jaraco.itertools 5.0 2738class jaraco: 2739 class itertools: 2740 class Counter: 2741 def __init__(self, i): 2742 self.count = 0 2743 self._orig_iter = iter(i) 2744 2745 def __iter__(self): 2746 return self 2747 2748 def __next__(self): 2749 result = next(self._orig_iter) 2750 self.count += 1 2751 return result 2752 2753 2754def add_dirs(zf): 2755 """ 2756 Given a writable zip file zf, inject directory entries for 2757 any directories implied by the presence of children. 2758 """ 2759 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2760 zf.writestr(name, b"") 2761 return zf 2762 2763 2764def build_alpharep_fixture(): 2765 """ 2766 Create a zip file with this structure: 2767 2768 . 2769 ├── a.txt 2770 ├── b 2771 │ ├── c.txt 2772 │ ├── d 2773 │ │ └── e.txt 2774 │ └── f.txt 2775 └── g 2776 └── h 2777 └── i.txt 2778 2779 This fixture has the following key characteristics: 2780 2781 - a file at the root (a) 2782 - a file two levels deep (b/d/e) 2783 - multiple files in a directory (b/c, b/f) 2784 - a directory containing only a directory (g/h) 2785 2786 "alpha" because it uses alphabet 2787 "rep" because it's a representative example 2788 """ 2789 data = io.BytesIO() 2790 zf = zipfile.ZipFile(data, "w") 2791 zf.writestr("a.txt", b"content of a") 2792 zf.writestr("b/c.txt", b"content of c") 2793 zf.writestr("b/d/e.txt", b"content of e") 2794 zf.writestr("b/f.txt", b"content of f") 2795 zf.writestr("g/h/i.txt", b"content of i") 2796 zf.filename = "alpharep.zip" 2797 return zf 2798 2799 2800class TestPath(unittest.TestCase): 2801 def setUp(self): 2802 self.fixtures = contextlib.ExitStack() 2803 self.addCleanup(self.fixtures.close) 2804 2805 def zipfile_alpharep(self): 2806 with self.subTest(): 2807 yield build_alpharep_fixture() 2808 with self.subTest(): 2809 yield add_dirs(build_alpharep_fixture()) 2810 2811 def zipfile_ondisk(self): 2812 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2813 for alpharep in self.zipfile_alpharep(): 2814 buffer = alpharep.fp 2815 alpharep.close() 2816 path = tmpdir / alpharep.filename 2817 with path.open("wb") as strm: 2818 strm.write(buffer.getvalue()) 2819 yield path 2820 2821 def test_iterdir_and_types(self): 2822 for alpharep in self.zipfile_alpharep(): 2823 root = zipfile.Path(alpharep) 2824 assert root.is_dir() 2825 a, b, g = root.iterdir() 2826 assert a.is_file() 2827 assert b.is_dir() 2828 assert g.is_dir() 2829 c, f, d = b.iterdir() 2830 assert c.is_file() and f.is_file() 2831 e, = d.iterdir() 2832 assert e.is_file() 2833 h, = g.iterdir() 2834 i, = h.iterdir() 2835 assert i.is_file() 2836 2837 def test_subdir_is_dir(self): 2838 for alpharep in self.zipfile_alpharep(): 2839 root = zipfile.Path(alpharep) 2840 assert (root / 'b').is_dir() 2841 assert (root / 'b/').is_dir() 2842 assert (root / 'g').is_dir() 2843 assert (root / 'g/').is_dir() 2844 2845 def test_open(self): 2846 for alpharep in self.zipfile_alpharep(): 2847 root = zipfile.Path(alpharep) 2848 a, b, g = root.iterdir() 2849 with a.open() as strm: 2850 data = strm.read() 2851 assert data == b"content of a" 2852 2853 def test_read(self): 2854 for alpharep in self.zipfile_alpharep(): 2855 root = zipfile.Path(alpharep) 2856 a, b, g = root.iterdir() 2857 assert a.read_text() == "content of a" 2858 assert a.read_bytes() == b"content of a" 2859 2860 def test_joinpath(self): 2861 for alpharep in self.zipfile_alpharep(): 2862 root = zipfile.Path(alpharep) 2863 a = root.joinpath("a") 2864 assert a.is_file() 2865 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2866 assert e.read_text() == "content of e" 2867 2868 def test_traverse_truediv(self): 2869 for alpharep in self.zipfile_alpharep(): 2870 root = zipfile.Path(alpharep) 2871 a = root / "a" 2872 assert a.is_file() 2873 e = root / "b" / "d" / "e.txt" 2874 assert e.read_text() == "content of e" 2875 2876 def test_pathlike_construction(self): 2877 """ 2878 zipfile.Path should be constructable from a path-like object 2879 """ 2880 for zipfile_ondisk in self.zipfile_ondisk(): 2881 pathlike = pathlib.Path(str(zipfile_ondisk)) 2882 zipfile.Path(pathlike) 2883 2884 def test_traverse_pathlike(self): 2885 for alpharep in self.zipfile_alpharep(): 2886 root = zipfile.Path(alpharep) 2887 root / pathlib.Path("a") 2888 2889 def test_parent(self): 2890 for alpharep in self.zipfile_alpharep(): 2891 root = zipfile.Path(alpharep) 2892 assert (root / 'a').parent.at == '' 2893 assert (root / 'a' / 'b').parent.at == 'a/' 2894 2895 def test_dir_parent(self): 2896 for alpharep in self.zipfile_alpharep(): 2897 root = zipfile.Path(alpharep) 2898 assert (root / 'b').parent.at == '' 2899 assert (root / 'b/').parent.at == '' 2900 2901 def test_missing_dir_parent(self): 2902 for alpharep in self.zipfile_alpharep(): 2903 root = zipfile.Path(alpharep) 2904 assert (root / 'missing dir/').parent.at == '' 2905 2906 def test_mutability(self): 2907 """ 2908 If the underlying zipfile is changed, the Path object should 2909 reflect that change. 2910 """ 2911 for alpharep in self.zipfile_alpharep(): 2912 root = zipfile.Path(alpharep) 2913 a, b, g = root.iterdir() 2914 alpharep.writestr('foo.txt', 'foo') 2915 alpharep.writestr('bar/baz.txt', 'baz') 2916 assert any( 2917 child.name == 'foo.txt' 2918 for child in root.iterdir()) 2919 assert (root / 'foo.txt').read_text() == 'foo' 2920 baz, = (root / 'bar').iterdir() 2921 assert baz.read_text() == 'baz' 2922 2923 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 2924 2925 def huge_zipfile(self): 2926 """Create a read-only zipfile with a huge number of entries entries.""" 2927 strm = io.BytesIO() 2928 zf = zipfile.ZipFile(strm, "w") 2929 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 2930 zf.writestr(entry, entry) 2931 zf.mode = 'r' 2932 return zf 2933 2934 def test_joinpath_constant_time(self): 2935 """ 2936 Ensure joinpath on items in zipfile is linear time. 2937 """ 2938 root = zipfile.Path(self.huge_zipfile()) 2939 entries = jaraco.itertools.Counter(root.iterdir()) 2940 for entry in entries: 2941 entry.joinpath('suffix') 2942 # Check the file iterated all items 2943 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 2944 2945 # @func_timeout.func_set_timeout(3) 2946 def test_implied_dirs_performance(self): 2947 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 2948 zipfile.CompleteDirs._implied_dirs(data) 2949 2950 2951if __name__ == "__main__": 2952 unittest.main() 2953