1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16import functools 17 18 19from tempfile import TemporaryFile 20from random import randint, random, randbytes 21 22from test.support import script_helper 23from test.support import (findfile, requires_zlib, requires_bz2, 24 requires_lzma, captured_stdout) 25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd 26 27 28TESTFN2 = TESTFN + "2" 29TESTFNDIR = TESTFN + "d" 30FIXEDTEST_SIZE = 1000 31DATAFILES_DIR = 'zipfile_datafiles' 32 33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 34 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 35 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 36 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 37 38def get_files(test): 39 yield TESTFN2 40 with TemporaryFile() as f: 41 yield f 42 test.assertFalse(f.closed) 43 with io.BytesIO() as f: 44 yield f 45 test.assertFalse(f.closed) 46 47class AbstractTestsWithSourceFile: 48 @classmethod 49 def setUpClass(cls): 50 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 51 (i, random()), "ascii") 52 for i in range(FIXEDTEST_SIZE)] 53 cls.data = b''.join(cls.line_gen) 54 55 def setUp(self): 56 # Make a source file with some lines 57 with open(TESTFN, "wb") as fp: 58 fp.write(self.data) 59 60 def make_test_archive(self, f, compression, compresslevel=None): 61 kwargs = {'compression': compression, 'compresslevel': compresslevel} 62 # Create the ZIP archive 63 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 64 zipfp.write(TESTFN, "another.name") 65 zipfp.write(TESTFN, TESTFN) 66 zipfp.writestr("strfile", self.data) 67 with zipfp.open('written-open-w', mode='w') as f: 68 for line in self.line_gen: 69 f.write(line) 70 71 def zip_test(self, f, compression, compresslevel=None): 72 self.make_test_archive(f, compression, compresslevel) 73 74 # Read the ZIP archive 75 with zipfile.ZipFile(f, "r", compression) as zipfp: 76 self.assertEqual(zipfp.read(TESTFN), self.data) 77 self.assertEqual(zipfp.read("another.name"), self.data) 78 self.assertEqual(zipfp.read("strfile"), self.data) 79 80 # Print the ZIP directory 81 fp = io.StringIO() 82 zipfp.printdir(file=fp) 83 directory = fp.getvalue() 84 lines = directory.splitlines() 85 self.assertEqual(len(lines), 5) # Number of files + header 86 87 self.assertIn('File Name', lines[0]) 88 self.assertIn('Modified', lines[0]) 89 self.assertIn('Size', lines[0]) 90 91 fn, date, time_, size = lines[1].split() 92 self.assertEqual(fn, 'another.name') 93 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 94 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 95 self.assertEqual(size, str(len(self.data))) 96 97 # Check the namelist 98 names = zipfp.namelist() 99 self.assertEqual(len(names), 4) 100 self.assertIn(TESTFN, names) 101 self.assertIn("another.name", names) 102 self.assertIn("strfile", names) 103 self.assertIn("written-open-w", names) 104 105 # Check infolist 106 infos = zipfp.infolist() 107 names = [i.filename for i in infos] 108 self.assertEqual(len(names), 4) 109 self.assertIn(TESTFN, names) 110 self.assertIn("another.name", names) 111 self.assertIn("strfile", names) 112 self.assertIn("written-open-w", names) 113 for i in infos: 114 self.assertEqual(i.file_size, len(self.data)) 115 116 # check getinfo 117 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 118 info = zipfp.getinfo(nm) 119 self.assertEqual(info.filename, nm) 120 self.assertEqual(info.file_size, len(self.data)) 121 122 # Check that testzip doesn't raise an exception 123 zipfp.testzip() 124 125 def test_basic(self): 126 for f in get_files(self): 127 self.zip_test(f, self.compression) 128 129 def zip_open_test(self, f, compression): 130 self.make_test_archive(f, compression) 131 132 # Read the ZIP archive 133 with zipfile.ZipFile(f, "r", compression) as zipfp: 134 zipdata1 = [] 135 with zipfp.open(TESTFN) as zipopen1: 136 while True: 137 read_data = zipopen1.read(256) 138 if not read_data: 139 break 140 zipdata1.append(read_data) 141 142 zipdata2 = [] 143 with zipfp.open("another.name") as zipopen2: 144 while True: 145 read_data = zipopen2.read(256) 146 if not read_data: 147 break 148 zipdata2.append(read_data) 149 150 self.assertEqual(b''.join(zipdata1), self.data) 151 self.assertEqual(b''.join(zipdata2), self.data) 152 153 def test_open(self): 154 for f in get_files(self): 155 self.zip_open_test(f, self.compression) 156 157 def test_open_with_pathlike(self): 158 path = pathlib.Path(TESTFN2) 159 self.zip_open_test(path, self.compression) 160 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 161 self.assertIsInstance(zipfp.filename, str) 162 163 def zip_random_open_test(self, f, compression): 164 self.make_test_archive(f, compression) 165 166 # Read the ZIP archive 167 with zipfile.ZipFile(f, "r", compression) as zipfp: 168 zipdata1 = [] 169 with zipfp.open(TESTFN) as zipopen1: 170 while True: 171 read_data = zipopen1.read(randint(1, 1024)) 172 if not read_data: 173 break 174 zipdata1.append(read_data) 175 176 self.assertEqual(b''.join(zipdata1), self.data) 177 178 def test_random_open(self): 179 for f in get_files(self): 180 self.zip_random_open_test(f, self.compression) 181 182 def zip_read1_test(self, f, compression): 183 self.make_test_archive(f, compression) 184 185 # Read the ZIP archive 186 with zipfile.ZipFile(f, "r") as zipfp, \ 187 zipfp.open(TESTFN) as zipopen: 188 zipdata = [] 189 while True: 190 read_data = zipopen.read1(-1) 191 if not read_data: 192 break 193 zipdata.append(read_data) 194 195 self.assertEqual(b''.join(zipdata), self.data) 196 197 def test_read1(self): 198 for f in get_files(self): 199 self.zip_read1_test(f, self.compression) 200 201 def zip_read1_10_test(self, f, compression): 202 self.make_test_archive(f, compression) 203 204 # Read the ZIP archive 205 with zipfile.ZipFile(f, "r") as zipfp, \ 206 zipfp.open(TESTFN) as zipopen: 207 zipdata = [] 208 while True: 209 read_data = zipopen.read1(10) 210 self.assertLessEqual(len(read_data), 10) 211 if not read_data: 212 break 213 zipdata.append(read_data) 214 215 self.assertEqual(b''.join(zipdata), self.data) 216 217 def test_read1_10(self): 218 for f in get_files(self): 219 self.zip_read1_10_test(f, self.compression) 220 221 def zip_readline_read_test(self, f, compression): 222 self.make_test_archive(f, compression) 223 224 # Read the ZIP archive 225 with zipfile.ZipFile(f, "r") as zipfp, \ 226 zipfp.open(TESTFN) as zipopen: 227 data = b'' 228 while True: 229 read = zipopen.readline() 230 if not read: 231 break 232 data += read 233 234 read = zipopen.read(100) 235 if not read: 236 break 237 data += read 238 239 self.assertEqual(data, self.data) 240 241 def test_readline_read(self): 242 # Issue #7610: calls to readline() interleaved with calls to read(). 243 for f in get_files(self): 244 self.zip_readline_read_test(f, self.compression) 245 246 def zip_readline_test(self, f, compression): 247 self.make_test_archive(f, compression) 248 249 # Read the ZIP archive 250 with zipfile.ZipFile(f, "r") as zipfp: 251 with zipfp.open(TESTFN) as zipopen: 252 for line in self.line_gen: 253 linedata = zipopen.readline() 254 self.assertEqual(linedata, line) 255 256 def test_readline(self): 257 for f in get_files(self): 258 self.zip_readline_test(f, self.compression) 259 260 def zip_readlines_test(self, f, compression): 261 self.make_test_archive(f, compression) 262 263 # Read the ZIP archive 264 with zipfile.ZipFile(f, "r") as zipfp: 265 with zipfp.open(TESTFN) as zipopen: 266 ziplines = zipopen.readlines() 267 for line, zipline in zip(self.line_gen, ziplines): 268 self.assertEqual(zipline, line) 269 270 def test_readlines(self): 271 for f in get_files(self): 272 self.zip_readlines_test(f, self.compression) 273 274 def zip_iterlines_test(self, f, compression): 275 self.make_test_archive(f, compression) 276 277 # Read the ZIP archive 278 with zipfile.ZipFile(f, "r") as zipfp: 279 with zipfp.open(TESTFN) as zipopen: 280 for line, zipline in zip(self.line_gen, zipopen): 281 self.assertEqual(zipline, line) 282 283 def test_iterlines(self): 284 for f in get_files(self): 285 self.zip_iterlines_test(f, self.compression) 286 287 def test_low_compression(self): 288 """Check for cases where compressed data is larger than original.""" 289 # Create the ZIP archive 290 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 291 zipfp.writestr("strfile", '12') 292 293 # Get an open object for strfile 294 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 295 with zipfp.open("strfile") as openobj: 296 self.assertEqual(openobj.read(1), b'1') 297 self.assertEqual(openobj.read(1), b'2') 298 299 def test_writestr_compression(self): 300 zipfp = zipfile.ZipFile(TESTFN2, "w") 301 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 302 info = zipfp.getinfo('b.txt') 303 self.assertEqual(info.compress_type, self.compression) 304 305 def test_writestr_compresslevel(self): 306 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 307 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 308 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 309 compresslevel=2) 310 311 # Compression level follows the constructor. 312 a_info = zipfp.getinfo('a.txt') 313 self.assertEqual(a_info.compress_type, self.compression) 314 self.assertEqual(a_info._compresslevel, 1) 315 316 # Compression level is overridden. 317 b_info = zipfp.getinfo('b.txt') 318 self.assertEqual(b_info.compress_type, self.compression) 319 self.assertEqual(b_info._compresslevel, 2) 320 321 def test_read_return_size(self): 322 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 323 # than requested. 324 for test_size in (1, 4095, 4096, 4097, 16384): 325 file_size = test_size + 1 326 junk = randbytes(file_size) 327 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 328 zipf.writestr('foo', junk) 329 with zipf.open('foo', 'r') as fp: 330 buf = fp.read(test_size) 331 self.assertEqual(len(buf), test_size) 332 333 def test_truncated_zipfile(self): 334 fp = io.BytesIO() 335 with zipfile.ZipFile(fp, mode='w') as zipf: 336 zipf.writestr('strfile', self.data, compress_type=self.compression) 337 end_offset = fp.tell() 338 zipfiledata = fp.getvalue() 339 340 fp = io.BytesIO(zipfiledata) 341 with zipfile.ZipFile(fp) as zipf: 342 with zipf.open('strfile') as zipopen: 343 fp.truncate(end_offset - 20) 344 with self.assertRaises(EOFError): 345 zipopen.read() 346 347 fp = io.BytesIO(zipfiledata) 348 with zipfile.ZipFile(fp) as zipf: 349 with zipf.open('strfile') as zipopen: 350 fp.truncate(end_offset - 20) 351 with self.assertRaises(EOFError): 352 while zipopen.read(100): 353 pass 354 355 fp = io.BytesIO(zipfiledata) 356 with zipfile.ZipFile(fp) as zipf: 357 with zipf.open('strfile') as zipopen: 358 fp.truncate(end_offset - 20) 359 with self.assertRaises(EOFError): 360 while zipopen.read1(100): 361 pass 362 363 def test_repr(self): 364 fname = 'file.name' 365 for f in get_files(self): 366 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 367 zipfp.write(TESTFN, fname) 368 r = repr(zipfp) 369 self.assertIn("mode='w'", r) 370 371 with zipfile.ZipFile(f, 'r') as zipfp: 372 r = repr(zipfp) 373 if isinstance(f, str): 374 self.assertIn('filename=%r' % f, r) 375 else: 376 self.assertIn('file=%r' % f, r) 377 self.assertIn("mode='r'", r) 378 r = repr(zipfp.getinfo(fname)) 379 self.assertIn('filename=%r' % fname, r) 380 self.assertIn('filemode=', r) 381 self.assertIn('file_size=', r) 382 if self.compression != zipfile.ZIP_STORED: 383 self.assertIn('compress_type=', r) 384 self.assertIn('compress_size=', r) 385 with zipfp.open(fname) as zipopen: 386 r = repr(zipopen) 387 self.assertIn('name=%r' % fname, r) 388 self.assertIn("mode='r'", r) 389 if self.compression != zipfile.ZIP_STORED: 390 self.assertIn('compress_type=', r) 391 self.assertIn('[closed]', repr(zipopen)) 392 self.assertIn('[closed]', repr(zipfp)) 393 394 def test_compresslevel_basic(self): 395 for f in get_files(self): 396 self.zip_test(f, self.compression, compresslevel=9) 397 398 def test_per_file_compresslevel(self): 399 """Check that files within a Zip archive can have different 400 compression levels.""" 401 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 402 zipfp.write(TESTFN, 'compress_1') 403 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 404 one_info = zipfp.getinfo('compress_1') 405 nine_info = zipfp.getinfo('compress_9') 406 self.assertEqual(one_info._compresslevel, 1) 407 self.assertEqual(nine_info._compresslevel, 9) 408 409 def test_writing_errors(self): 410 class BrokenFile(io.BytesIO): 411 def write(self, data): 412 nonlocal count 413 if count is not None: 414 if count == stop: 415 raise OSError 416 count += 1 417 super().write(data) 418 419 stop = 0 420 while True: 421 testfile = BrokenFile() 422 count = None 423 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 424 with zipfp.open('file1', 'w') as f: 425 f.write(b'data1') 426 count = 0 427 try: 428 with zipfp.open('file2', 'w') as f: 429 f.write(b'data2') 430 except OSError: 431 stop += 1 432 else: 433 break 434 finally: 435 count = None 436 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 437 self.assertEqual(zipfp.namelist(), ['file1']) 438 self.assertEqual(zipfp.read('file1'), b'data1') 439 440 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 441 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 442 self.assertEqual(zipfp.read('file1'), b'data1') 443 self.assertEqual(zipfp.read('file2'), b'data2') 444 445 446 def tearDown(self): 447 unlink(TESTFN) 448 unlink(TESTFN2) 449 450 451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 452 unittest.TestCase): 453 compression = zipfile.ZIP_STORED 454 test_low_compression = None 455 456 def zip_test_writestr_permissions(self, f, compression): 457 # Make sure that writestr and open(... mode='w') create files with 458 # mode 0600, when they are passed a name rather than a ZipInfo 459 # instance. 460 461 self.make_test_archive(f, compression) 462 with zipfile.ZipFile(f, "r") as zipfp: 463 zinfo = zipfp.getinfo('strfile') 464 self.assertEqual(zinfo.external_attr, 0o600 << 16) 465 466 zinfo2 = zipfp.getinfo('written-open-w') 467 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 468 469 def test_writestr_permissions(self): 470 for f in get_files(self): 471 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 472 473 def test_absolute_arcnames(self): 474 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 475 zipfp.write(TESTFN, "/absolute") 476 477 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 478 self.assertEqual(zipfp.namelist(), ["absolute"]) 479 480 def test_append_to_zip_file(self): 481 """Test appending to an existing zipfile.""" 482 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 483 zipfp.write(TESTFN, TESTFN) 484 485 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 486 zipfp.writestr("strfile", self.data) 487 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 488 489 def test_append_to_non_zip_file(self): 490 """Test appending to an existing file that is not a zipfile.""" 491 # NOTE: this test fails if len(d) < 22 because of the first 492 # line "fpin.seek(-22, 2)" in _EndRecData 493 data = b'I am not a ZipFile!'*10 494 with open(TESTFN2, 'wb') as f: 495 f.write(data) 496 497 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 498 zipfp.write(TESTFN, TESTFN) 499 500 with open(TESTFN2, 'rb') as f: 501 f.seek(len(data)) 502 with zipfile.ZipFile(f, "r") as zipfp: 503 self.assertEqual(zipfp.namelist(), [TESTFN]) 504 self.assertEqual(zipfp.read(TESTFN), self.data) 505 with open(TESTFN2, 'rb') as f: 506 self.assertEqual(f.read(len(data)), data) 507 zipfiledata = f.read() 508 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 509 self.assertEqual(zipfp.namelist(), [TESTFN]) 510 self.assertEqual(zipfp.read(TESTFN), self.data) 511 512 def test_read_concatenated_zip_file(self): 513 with io.BytesIO() as bio: 514 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 515 zipfp.write(TESTFN, TESTFN) 516 zipfiledata = bio.getvalue() 517 data = b'I am not a ZipFile!'*10 518 with open(TESTFN2, 'wb') as f: 519 f.write(data) 520 f.write(zipfiledata) 521 522 with zipfile.ZipFile(TESTFN2) as zipfp: 523 self.assertEqual(zipfp.namelist(), [TESTFN]) 524 self.assertEqual(zipfp.read(TESTFN), self.data) 525 526 def test_append_to_concatenated_zip_file(self): 527 with io.BytesIO() as bio: 528 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 529 zipfp.write(TESTFN, TESTFN) 530 zipfiledata = bio.getvalue() 531 data = b'I am not a ZipFile!'*1000000 532 with open(TESTFN2, 'wb') as f: 533 f.write(data) 534 f.write(zipfiledata) 535 536 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 537 self.assertEqual(zipfp.namelist(), [TESTFN]) 538 zipfp.writestr('strfile', self.data) 539 540 with open(TESTFN2, 'rb') as f: 541 self.assertEqual(f.read(len(data)), data) 542 zipfiledata = f.read() 543 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 544 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 545 self.assertEqual(zipfp.read(TESTFN), self.data) 546 self.assertEqual(zipfp.read('strfile'), self.data) 547 548 def test_ignores_newline_at_end(self): 549 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 550 zipfp.write(TESTFN, TESTFN) 551 with open(TESTFN2, 'a', encoding='utf-8') as f: 552 f.write("\r\n\00\00\00") 553 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 554 self.assertIsInstance(zipfp, zipfile.ZipFile) 555 556 def test_ignores_stuff_appended_past_comments(self): 557 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 558 zipfp.comment = b"this is a comment" 559 zipfp.write(TESTFN, TESTFN) 560 with open(TESTFN2, 'a', encoding='utf-8') as f: 561 f.write("abcdef\r\n") 562 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 563 self.assertIsInstance(zipfp, zipfile.ZipFile) 564 self.assertEqual(zipfp.comment, b"this is a comment") 565 566 def test_write_default_name(self): 567 """Check that calling ZipFile.write without arcname specified 568 produces the expected result.""" 569 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 570 zipfp.write(TESTFN) 571 with open(TESTFN, "rb") as f: 572 self.assertEqual(zipfp.read(TESTFN), f.read()) 573 574 def test_io_on_closed_zipextfile(self): 575 fname = "somefile.txt" 576 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 577 zipfp.writestr(fname, "bogus") 578 579 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 580 with zipfp.open(fname) as fid: 581 fid.close() 582 self.assertRaises(ValueError, fid.read) 583 self.assertRaises(ValueError, fid.seek, 0) 584 self.assertRaises(ValueError, fid.tell) 585 self.assertRaises(ValueError, fid.readable) 586 self.assertRaises(ValueError, fid.seekable) 587 588 def test_write_to_readonly(self): 589 """Check that trying to call write() on a readonly ZipFile object 590 raises a ValueError.""" 591 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 592 zipfp.writestr("somefile.txt", "bogus") 593 594 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 595 self.assertRaises(ValueError, zipfp.write, TESTFN) 596 597 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 598 with self.assertRaises(ValueError): 599 zipfp.open(TESTFN, mode='w') 600 601 def test_add_file_before_1980(self): 602 # Set atime and mtime to 1970-01-01 603 os.utime(TESTFN, (0, 0)) 604 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 605 self.assertRaises(ValueError, zipfp.write, TESTFN) 606 607 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 608 zipfp.write(TESTFN) 609 zinfo = zipfp.getinfo(TESTFN) 610 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 611 612 def test_add_file_after_2107(self): 613 # Set atime and mtime to 2108-12-30 614 ts = 4386268800 615 try: 616 time.localtime(ts) 617 except OverflowError: 618 self.skipTest(f'time.localtime({ts}) raises OverflowError') 619 try: 620 os.utime(TESTFN, (ts, ts)) 621 except OverflowError: 622 self.skipTest('Host fs cannot set timestamp to required value.') 623 624 mtime_ns = os.stat(TESTFN).st_mtime_ns 625 if mtime_ns != (4386268800 * 10**9): 626 # XFS filesystem is limited to 32-bit timestamp, but the syscall 627 # didn't fail. Moreover, there is a VFS bug which returns 628 # a cached timestamp which is different than the value on disk. 629 # 630 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 631 # 632 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 633 # https://bugs.python.org/issue39460#msg360952 634 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 635 636 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 637 self.assertRaises(struct.error, zipfp.write, TESTFN) 638 639 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 640 zipfp.write(TESTFN) 641 zinfo = zipfp.getinfo(TESTFN) 642 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 643 644 645@requires_zlib() 646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 647 unittest.TestCase): 648 compression = zipfile.ZIP_DEFLATED 649 650 def test_per_file_compression(self): 651 """Check that files within a Zip archive can have different 652 compression options.""" 653 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 654 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 655 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 656 sinfo = zipfp.getinfo('storeme') 657 dinfo = zipfp.getinfo('deflateme') 658 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 659 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 660 661@requires_bz2() 662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 663 unittest.TestCase): 664 compression = zipfile.ZIP_BZIP2 665 666@requires_lzma() 667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 668 unittest.TestCase): 669 compression = zipfile.ZIP_LZMA 670 671 672class AbstractTestZip64InSmallFiles: 673 # These tests test the ZIP64 functionality without using large files, 674 # see test_zipfile64 for proper tests. 675 676 @classmethod 677 def setUpClass(cls): 678 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 679 for i in range(0, FIXEDTEST_SIZE)) 680 cls.data = b'\n'.join(line_gen) 681 682 def setUp(self): 683 self._limit = zipfile.ZIP64_LIMIT 684 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 685 zipfile.ZIP64_LIMIT = 1000 686 zipfile.ZIP_FILECOUNT_LIMIT = 9 687 688 # Make a source file with some lines 689 with open(TESTFN, "wb") as fp: 690 fp.write(self.data) 691 692 def zip_test(self, f, compression): 693 # Create the ZIP archive 694 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 695 zipfp.write(TESTFN, "another.name") 696 zipfp.write(TESTFN, TESTFN) 697 zipfp.writestr("strfile", self.data) 698 699 # Read the ZIP archive 700 with zipfile.ZipFile(f, "r", compression) as zipfp: 701 self.assertEqual(zipfp.read(TESTFN), self.data) 702 self.assertEqual(zipfp.read("another.name"), self.data) 703 self.assertEqual(zipfp.read("strfile"), self.data) 704 705 # Print the ZIP directory 706 fp = io.StringIO() 707 zipfp.printdir(fp) 708 709 directory = fp.getvalue() 710 lines = directory.splitlines() 711 self.assertEqual(len(lines), 4) # Number of files + header 712 713 self.assertIn('File Name', lines[0]) 714 self.assertIn('Modified', lines[0]) 715 self.assertIn('Size', lines[0]) 716 717 fn, date, time_, size = lines[1].split() 718 self.assertEqual(fn, 'another.name') 719 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 720 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 721 self.assertEqual(size, str(len(self.data))) 722 723 # Check the namelist 724 names = zipfp.namelist() 725 self.assertEqual(len(names), 3) 726 self.assertIn(TESTFN, names) 727 self.assertIn("another.name", names) 728 self.assertIn("strfile", names) 729 730 # Check infolist 731 infos = zipfp.infolist() 732 names = [i.filename for i in infos] 733 self.assertEqual(len(names), 3) 734 self.assertIn(TESTFN, names) 735 self.assertIn("another.name", names) 736 self.assertIn("strfile", names) 737 for i in infos: 738 self.assertEqual(i.file_size, len(self.data)) 739 740 # check getinfo 741 for nm in (TESTFN, "another.name", "strfile"): 742 info = zipfp.getinfo(nm) 743 self.assertEqual(info.filename, nm) 744 self.assertEqual(info.file_size, len(self.data)) 745 746 # Check that testzip doesn't raise an exception 747 zipfp.testzip() 748 749 def test_basic(self): 750 for f in get_files(self): 751 self.zip_test(f, self.compression) 752 753 def test_too_many_files(self): 754 # This test checks that more than 64k files can be added to an archive, 755 # and that the resulting archive can be read properly by ZipFile 756 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 757 allowZip64=True) 758 zipf.debug = 100 759 numfiles = 15 760 for i in range(numfiles): 761 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 762 self.assertEqual(len(zipf.namelist()), numfiles) 763 zipf.close() 764 765 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 766 self.assertEqual(len(zipf2.namelist()), numfiles) 767 for i in range(numfiles): 768 content = zipf2.read("foo%08d" % i).decode('ascii') 769 self.assertEqual(content, "%d" % (i**3 % 57)) 770 zipf2.close() 771 772 def test_too_many_files_append(self): 773 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 774 allowZip64=False) 775 zipf.debug = 100 776 numfiles = 9 777 for i in range(numfiles): 778 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 779 self.assertEqual(len(zipf.namelist()), numfiles) 780 with self.assertRaises(zipfile.LargeZipFile): 781 zipf.writestr("foo%08d" % numfiles, b'') 782 self.assertEqual(len(zipf.namelist()), numfiles) 783 zipf.close() 784 785 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 786 allowZip64=False) 787 zipf.debug = 100 788 self.assertEqual(len(zipf.namelist()), numfiles) 789 with self.assertRaises(zipfile.LargeZipFile): 790 zipf.writestr("foo%08d" % numfiles, b'') 791 self.assertEqual(len(zipf.namelist()), numfiles) 792 zipf.close() 793 794 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 795 allowZip64=True) 796 zipf.debug = 100 797 self.assertEqual(len(zipf.namelist()), numfiles) 798 numfiles2 = 15 799 for i in range(numfiles, numfiles2): 800 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 801 self.assertEqual(len(zipf.namelist()), numfiles2) 802 zipf.close() 803 804 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 805 self.assertEqual(len(zipf2.namelist()), numfiles2) 806 for i in range(numfiles2): 807 content = zipf2.read("foo%08d" % i).decode('ascii') 808 self.assertEqual(content, "%d" % (i**3 % 57)) 809 zipf2.close() 810 811 def tearDown(self): 812 zipfile.ZIP64_LIMIT = self._limit 813 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 814 unlink(TESTFN) 815 unlink(TESTFN2) 816 817 818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 819 unittest.TestCase): 820 compression = zipfile.ZIP_STORED 821 822 def large_file_exception_test(self, f, compression): 823 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 824 self.assertRaises(zipfile.LargeZipFile, 825 zipfp.write, TESTFN, "another.name") 826 827 def large_file_exception_test2(self, f, compression): 828 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 829 self.assertRaises(zipfile.LargeZipFile, 830 zipfp.writestr, "another.name", self.data) 831 832 def test_large_file_exception(self): 833 for f in get_files(self): 834 self.large_file_exception_test(f, zipfile.ZIP_STORED) 835 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 836 837 def test_absolute_arcnames(self): 838 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 839 allowZip64=True) as zipfp: 840 zipfp.write(TESTFN, "/absolute") 841 842 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 843 self.assertEqual(zipfp.namelist(), ["absolute"]) 844 845 def test_append(self): 846 # Test that appending to the Zip64 archive doesn't change 847 # extra fields of existing entries. 848 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 849 zipfp.writestr("strfile", self.data) 850 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 851 zinfo = zipfp.getinfo("strfile") 852 extra = zinfo.extra 853 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 854 zipfp.writestr("strfile2", self.data) 855 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 856 zinfo = zipfp.getinfo("strfile") 857 self.assertEqual(zinfo.extra, extra) 858 859 def make_zip64_file( 860 self, file_size_64_set=False, file_size_extra=False, 861 compress_size_64_set=False, compress_size_extra=False, 862 header_offset_64_set=False, header_offset_extra=False, 863 ): 864 """Generate bytes sequence for a zip with (incomplete) zip64 data. 865 866 The actual values (not the zip 64 0xffffffff values) stored in the file 867 are: 868 file_size: 8 869 compress_size: 8 870 header_offset: 0 871 """ 872 actual_size = 8 873 actual_header_offset = 0 874 local_zip64_fields = [] 875 central_zip64_fields = [] 876 877 file_size = actual_size 878 if file_size_64_set: 879 file_size = 0xffffffff 880 if file_size_extra: 881 local_zip64_fields.append(actual_size) 882 central_zip64_fields.append(actual_size) 883 file_size = struct.pack("<L", file_size) 884 885 compress_size = actual_size 886 if compress_size_64_set: 887 compress_size = 0xffffffff 888 if compress_size_extra: 889 local_zip64_fields.append(actual_size) 890 central_zip64_fields.append(actual_size) 891 compress_size = struct.pack("<L", compress_size) 892 893 header_offset = actual_header_offset 894 if header_offset_64_set: 895 header_offset = 0xffffffff 896 if header_offset_extra: 897 central_zip64_fields.append(actual_header_offset) 898 header_offset = struct.pack("<L", header_offset) 899 900 local_extra = struct.pack( 901 '<HH' + 'Q'*len(local_zip64_fields), 902 0x0001, 903 8*len(local_zip64_fields), 904 *local_zip64_fields 905 ) 906 907 central_extra = struct.pack( 908 '<HH' + 'Q'*len(central_zip64_fields), 909 0x0001, 910 8*len(central_zip64_fields), 911 *central_zip64_fields 912 ) 913 914 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 915 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 916 917 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 918 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 919 920 filename = b"test.txt" 921 content = b"test1234" 922 filename_length = struct.pack("<H", len(filename)) 923 zip64_contents = ( 924 # Local file header 925 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 926 + compress_size 927 + file_size 928 + filename_length 929 + local_extra_length 930 + filename 931 + local_extra 932 + content 933 # Central directory: 934 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 935 + compress_size 936 + file_size 937 + filename_length 938 + central_extra_length 939 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 940 + header_offset 941 + filename 942 + central_extra 943 # Zip64 end of central directory 944 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 945 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 946 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 947 + central_dir_size 948 + offset_to_central_dir 949 # Zip64 end of central directory locator 950 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 951 + b"\x00\x00\x00" 952 # end of central directory 953 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 954 + b"\x00\x00\x00\x00" 955 ) 956 return zip64_contents 957 958 def test_bad_zip64_extra(self): 959 """Missing zip64 extra records raises an exception. 960 961 There are 4 fields that the zip64 format handles (the disk number is 962 not used in this module and so is ignored here). According to the zip 963 spec: 964 The order of the fields in the zip64 extended 965 information record is fixed, but the fields MUST 966 only appear if the corresponding Local or Central 967 directory record field is set to 0xFFFF or 0xFFFFFFFF. 968 969 If the zip64 extra content doesn't contain enough entries for the 970 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 971 This test mismatches the length of the zip64 extra field and the number 972 of fields set to indicate the presence of zip64 data. 973 """ 974 # zip64 file size present, no fields in extra, expecting one, equals 975 # missing file size. 976 missing_file_size_extra = self.make_zip64_file( 977 file_size_64_set=True, 978 ) 979 with self.assertRaises(zipfile.BadZipFile) as e: 980 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 981 self.assertIn('file size', str(e.exception).lower()) 982 983 # zip64 file size present, zip64 compress size present, one field in 984 # extra, expecting two, equals missing compress size. 985 missing_compress_size_extra = self.make_zip64_file( 986 file_size_64_set=True, 987 file_size_extra=True, 988 compress_size_64_set=True, 989 ) 990 with self.assertRaises(zipfile.BadZipFile) as e: 991 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 992 self.assertIn('compress size', str(e.exception).lower()) 993 994 # zip64 compress size present, no fields in extra, expecting one, 995 # equals missing compress size. 996 missing_compress_size_extra = self.make_zip64_file( 997 compress_size_64_set=True, 998 ) 999 with self.assertRaises(zipfile.BadZipFile) as e: 1000 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1001 self.assertIn('compress size', str(e.exception).lower()) 1002 1003 # zip64 file size present, zip64 compress size present, zip64 header 1004 # offset present, two fields in extra, expecting three, equals missing 1005 # header offset 1006 missing_header_offset_extra = self.make_zip64_file( 1007 file_size_64_set=True, 1008 file_size_extra=True, 1009 compress_size_64_set=True, 1010 compress_size_extra=True, 1011 header_offset_64_set=True, 1012 ) 1013 with self.assertRaises(zipfile.BadZipFile) as e: 1014 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1015 self.assertIn('header offset', str(e.exception).lower()) 1016 1017 # zip64 compress size present, zip64 header offset present, one field 1018 # in extra, expecting two, equals missing header offset 1019 missing_header_offset_extra = self.make_zip64_file( 1020 file_size_64_set=False, 1021 compress_size_64_set=True, 1022 compress_size_extra=True, 1023 header_offset_64_set=True, 1024 ) 1025 with self.assertRaises(zipfile.BadZipFile) as e: 1026 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1027 self.assertIn('header offset', str(e.exception).lower()) 1028 1029 # zip64 file size present, zip64 header offset present, one field in 1030 # extra, expecting two, equals missing header offset 1031 missing_header_offset_extra = self.make_zip64_file( 1032 file_size_64_set=True, 1033 file_size_extra=True, 1034 compress_size_64_set=False, 1035 header_offset_64_set=True, 1036 ) 1037 with self.assertRaises(zipfile.BadZipFile) as e: 1038 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1039 self.assertIn('header offset', str(e.exception).lower()) 1040 1041 # zip64 header offset present, no fields in extra, expecting one, 1042 # equals missing header offset 1043 missing_header_offset_extra = self.make_zip64_file( 1044 file_size_64_set=False, 1045 compress_size_64_set=False, 1046 header_offset_64_set=True, 1047 ) 1048 with self.assertRaises(zipfile.BadZipFile) as e: 1049 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1050 self.assertIn('header offset', str(e.exception).lower()) 1051 1052 def test_generated_valid_zip64_extra(self): 1053 # These values are what is set in the make_zip64_file method. 1054 expected_file_size = 8 1055 expected_compress_size = 8 1056 expected_header_offset = 0 1057 expected_content = b"test1234" 1058 1059 # Loop through the various valid combinations of zip64 masks 1060 # present and extra fields present. 1061 params = ( 1062 {"file_size_64_set": True, "file_size_extra": True}, 1063 {"compress_size_64_set": True, "compress_size_extra": True}, 1064 {"header_offset_64_set": True, "header_offset_extra": True}, 1065 ) 1066 1067 for r in range(1, len(params) + 1): 1068 for combo in itertools.combinations(params, r): 1069 kwargs = {} 1070 for c in combo: 1071 kwargs.update(c) 1072 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1073 zinfo = zf.infolist()[0] 1074 self.assertEqual(zinfo.file_size, expected_file_size) 1075 self.assertEqual(zinfo.compress_size, expected_compress_size) 1076 self.assertEqual(zinfo.header_offset, expected_header_offset) 1077 self.assertEqual(zf.read(zinfo), expected_content) 1078 1079 1080@requires_zlib() 1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1082 unittest.TestCase): 1083 compression = zipfile.ZIP_DEFLATED 1084 1085@requires_bz2() 1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1087 unittest.TestCase): 1088 compression = zipfile.ZIP_BZIP2 1089 1090@requires_lzma() 1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1092 unittest.TestCase): 1093 compression = zipfile.ZIP_LZMA 1094 1095 1096class AbstractWriterTests: 1097 1098 def tearDown(self): 1099 unlink(TESTFN2) 1100 1101 def test_close_after_close(self): 1102 data = b'content' 1103 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1104 w = zipf.open('test', 'w') 1105 w.write(data) 1106 w.close() 1107 self.assertTrue(w.closed) 1108 w.close() 1109 self.assertTrue(w.closed) 1110 self.assertEqual(zipf.read('test'), data) 1111 1112 def test_write_after_close(self): 1113 data = b'content' 1114 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1115 w = zipf.open('test', 'w') 1116 w.write(data) 1117 w.close() 1118 self.assertTrue(w.closed) 1119 self.assertRaises(ValueError, w.write, b'') 1120 self.assertEqual(zipf.read('test'), data) 1121 1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1123 compression = zipfile.ZIP_STORED 1124 1125@requires_zlib() 1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1127 compression = zipfile.ZIP_DEFLATED 1128 1129@requires_bz2() 1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1131 compression = zipfile.ZIP_BZIP2 1132 1133@requires_lzma() 1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1135 compression = zipfile.ZIP_LZMA 1136 1137 1138class PyZipFileTests(unittest.TestCase): 1139 def assertCompiledIn(self, name, namelist): 1140 if name + 'o' not in namelist: 1141 self.assertIn(name + 'c', namelist) 1142 1143 def requiresWriteAccess(self, path): 1144 # effective_ids unavailable on windows 1145 if not os.access(path, os.W_OK, 1146 effective_ids=os.access in os.supports_effective_ids): 1147 self.skipTest('requires write access to the installed location') 1148 filename = os.path.join(path, 'test_zipfile.try') 1149 try: 1150 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1151 os.close(fd) 1152 except Exception: 1153 self.skipTest('requires write access to the installed location') 1154 unlink(filename) 1155 1156 def test_write_pyfile(self): 1157 self.requiresWriteAccess(os.path.dirname(__file__)) 1158 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1159 fn = __file__ 1160 if fn.endswith('.pyc'): 1161 path_split = fn.split(os.sep) 1162 if os.altsep is not None: 1163 path_split.extend(fn.split(os.altsep)) 1164 if '__pycache__' in path_split: 1165 fn = importlib.util.source_from_cache(fn) 1166 else: 1167 fn = fn[:-1] 1168 1169 zipfp.writepy(fn) 1170 1171 bn = os.path.basename(fn) 1172 self.assertNotIn(bn, zipfp.namelist()) 1173 self.assertCompiledIn(bn, zipfp.namelist()) 1174 1175 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1176 fn = __file__ 1177 if fn.endswith('.pyc'): 1178 fn = fn[:-1] 1179 1180 zipfp.writepy(fn, "testpackage") 1181 1182 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1183 self.assertNotIn(bn, zipfp.namelist()) 1184 self.assertCompiledIn(bn, zipfp.namelist()) 1185 1186 def test_write_python_package(self): 1187 import email 1188 packagedir = os.path.dirname(email.__file__) 1189 self.requiresWriteAccess(packagedir) 1190 1191 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1192 zipfp.writepy(packagedir) 1193 1194 # Check for a couple of modules at different levels of the 1195 # hierarchy 1196 names = zipfp.namelist() 1197 self.assertCompiledIn('email/__init__.py', names) 1198 self.assertCompiledIn('email/mime/text.py', names) 1199 1200 def test_write_filtered_python_package(self): 1201 import test 1202 packagedir = os.path.dirname(test.__file__) 1203 self.requiresWriteAccess(packagedir) 1204 1205 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1206 1207 # first make sure that the test folder gives error messages 1208 # (on the badsyntax_... files) 1209 with captured_stdout() as reportSIO: 1210 zipfp.writepy(packagedir) 1211 reportStr = reportSIO.getvalue() 1212 self.assertTrue('SyntaxError' in reportStr) 1213 1214 # then check that the filter works on the whole package 1215 with captured_stdout() as reportSIO: 1216 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1217 reportStr = reportSIO.getvalue() 1218 self.assertTrue('SyntaxError' not in reportStr) 1219 1220 # then check that the filter works on individual files 1221 def filter(path): 1222 return not os.path.basename(path).startswith("bad") 1223 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1224 zipfp.writepy(packagedir, filterfunc=filter) 1225 reportStr = reportSIO.getvalue() 1226 if reportStr: 1227 print(reportStr) 1228 self.assertTrue('SyntaxError' not in reportStr) 1229 1230 def test_write_with_optimization(self): 1231 import email 1232 packagedir = os.path.dirname(email.__file__) 1233 self.requiresWriteAccess(packagedir) 1234 optlevel = 1 if __debug__ else 0 1235 ext = '.pyc' 1236 1237 with TemporaryFile() as t, \ 1238 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1239 zipfp.writepy(packagedir) 1240 1241 names = zipfp.namelist() 1242 self.assertIn('email/__init__' + ext, names) 1243 self.assertIn('email/mime/text' + ext, names) 1244 1245 def test_write_python_directory(self): 1246 os.mkdir(TESTFN2) 1247 try: 1248 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1249 fp.write("print(42)\n") 1250 1251 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1252 fp.write("print(42 * 42)\n") 1253 1254 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1255 fp.write("bla bla bla\n") 1256 1257 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1258 zipfp.writepy(TESTFN2) 1259 1260 names = zipfp.namelist() 1261 self.assertCompiledIn('mod1.py', names) 1262 self.assertCompiledIn('mod2.py', names) 1263 self.assertNotIn('mod2.txt', names) 1264 1265 finally: 1266 rmtree(TESTFN2) 1267 1268 def test_write_python_directory_filtered(self): 1269 os.mkdir(TESTFN2) 1270 try: 1271 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1272 fp.write("print(42)\n") 1273 1274 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1275 fp.write("print(42 * 42)\n") 1276 1277 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1278 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1279 not fn.endswith('mod2.py')) 1280 1281 names = zipfp.namelist() 1282 self.assertCompiledIn('mod1.py', names) 1283 self.assertNotIn('mod2.py', names) 1284 1285 finally: 1286 rmtree(TESTFN2) 1287 1288 def test_write_non_pyfile(self): 1289 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1290 with open(TESTFN, 'w', encoding='utf-8') as f: 1291 f.write('most definitely not a python file') 1292 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1293 unlink(TESTFN) 1294 1295 def test_write_pyfile_bad_syntax(self): 1296 os.mkdir(TESTFN2) 1297 try: 1298 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1299 fp.write("Bad syntax in python file\n") 1300 1301 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1302 # syntax errors are printed to stdout 1303 with captured_stdout() as s: 1304 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1305 1306 self.assertIn("SyntaxError", s.getvalue()) 1307 1308 # as it will not have compiled the python file, it will 1309 # include the .py file not .pyc 1310 names = zipfp.namelist() 1311 self.assertIn('mod1.py', names) 1312 self.assertNotIn('mod1.pyc', names) 1313 1314 finally: 1315 rmtree(TESTFN2) 1316 1317 def test_write_pathlike(self): 1318 os.mkdir(TESTFN2) 1319 try: 1320 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1321 fp.write("print(42)\n") 1322 1323 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1324 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1325 names = zipfp.namelist() 1326 self.assertCompiledIn('mod1.py', names) 1327 finally: 1328 rmtree(TESTFN2) 1329 1330 1331class ExtractTests(unittest.TestCase): 1332 1333 def make_test_file(self): 1334 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1335 for fpath, fdata in SMALL_TEST_DATA: 1336 zipfp.writestr(fpath, fdata) 1337 1338 def test_extract(self): 1339 with temp_cwd(): 1340 self.make_test_file() 1341 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1342 for fpath, fdata in SMALL_TEST_DATA: 1343 writtenfile = zipfp.extract(fpath) 1344 1345 # make sure it was written to the right place 1346 correctfile = os.path.join(os.getcwd(), fpath) 1347 correctfile = os.path.normpath(correctfile) 1348 1349 self.assertEqual(writtenfile, correctfile) 1350 1351 # make sure correct data is in correct file 1352 with open(writtenfile, "rb") as f: 1353 self.assertEqual(fdata.encode(), f.read()) 1354 1355 unlink(writtenfile) 1356 1357 def _test_extract_with_target(self, target): 1358 self.make_test_file() 1359 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1360 for fpath, fdata in SMALL_TEST_DATA: 1361 writtenfile = zipfp.extract(fpath, target) 1362 1363 # make sure it was written to the right place 1364 correctfile = os.path.join(target, fpath) 1365 correctfile = os.path.normpath(correctfile) 1366 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1367 1368 # make sure correct data is in correct file 1369 with open(writtenfile, "rb") as f: 1370 self.assertEqual(fdata.encode(), f.read()) 1371 1372 unlink(writtenfile) 1373 1374 unlink(TESTFN2) 1375 1376 def test_extract_with_target(self): 1377 with temp_dir() as extdir: 1378 self._test_extract_with_target(extdir) 1379 1380 def test_extract_with_target_pathlike(self): 1381 with temp_dir() as extdir: 1382 self._test_extract_with_target(pathlib.Path(extdir)) 1383 1384 def test_extract_all(self): 1385 with temp_cwd(): 1386 self.make_test_file() 1387 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1388 zipfp.extractall() 1389 for fpath, fdata in SMALL_TEST_DATA: 1390 outfile = os.path.join(os.getcwd(), fpath) 1391 1392 with open(outfile, "rb") as f: 1393 self.assertEqual(fdata.encode(), f.read()) 1394 1395 unlink(outfile) 1396 1397 def _test_extract_all_with_target(self, target): 1398 self.make_test_file() 1399 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1400 zipfp.extractall(target) 1401 for fpath, fdata in SMALL_TEST_DATA: 1402 outfile = os.path.join(target, fpath) 1403 1404 with open(outfile, "rb") as f: 1405 self.assertEqual(fdata.encode(), f.read()) 1406 1407 unlink(outfile) 1408 1409 unlink(TESTFN2) 1410 1411 def test_extract_all_with_target(self): 1412 with temp_dir() as extdir: 1413 self._test_extract_all_with_target(extdir) 1414 1415 def test_extract_all_with_target_pathlike(self): 1416 with temp_dir() as extdir: 1417 self._test_extract_all_with_target(pathlib.Path(extdir)) 1418 1419 def check_file(self, filename, content): 1420 self.assertTrue(os.path.isfile(filename)) 1421 with open(filename, 'rb') as f: 1422 self.assertEqual(f.read(), content) 1423 1424 def test_sanitize_windows_name(self): 1425 san = zipfile.ZipFile._sanitize_windows_name 1426 # Passing pathsep in allows this test to work regardless of platform. 1427 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1428 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1429 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1430 1431 def test_extract_hackers_arcnames_common_cases(self): 1432 common_hacknames = [ 1433 ('../foo/bar', 'foo/bar'), 1434 ('foo/../bar', 'foo/bar'), 1435 ('foo/../../bar', 'foo/bar'), 1436 ('foo/bar/..', 'foo/bar'), 1437 ('./../foo/bar', 'foo/bar'), 1438 ('/foo/bar', 'foo/bar'), 1439 ('/foo/../bar', 'foo/bar'), 1440 ('/foo/../../bar', 'foo/bar'), 1441 ] 1442 self._test_extract_hackers_arcnames(common_hacknames) 1443 1444 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1445 def test_extract_hackers_arcnames_windows_only(self): 1446 """Test combination of path fixing and windows name sanitization.""" 1447 windows_hacknames = [ 1448 (r'..\foo\bar', 'foo/bar'), 1449 (r'..\/foo\/bar', 'foo/bar'), 1450 (r'foo/\..\/bar', 'foo/bar'), 1451 (r'foo\/../\bar', 'foo/bar'), 1452 (r'C:foo/bar', 'foo/bar'), 1453 (r'C:/foo/bar', 'foo/bar'), 1454 (r'C://foo/bar', 'foo/bar'), 1455 (r'C:\foo\bar', 'foo/bar'), 1456 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1457 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1458 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1459 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1460 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1461 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1462 (r'//?/C:/foo/bar', 'foo/bar'), 1463 (r'\\?\C:\foo\bar', 'foo/bar'), 1464 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1465 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1466 ('../../foo../../ba..r', 'foo/ba..r'), 1467 ] 1468 self._test_extract_hackers_arcnames(windows_hacknames) 1469 1470 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1471 def test_extract_hackers_arcnames_posix_only(self): 1472 posix_hacknames = [ 1473 ('//foo/bar', 'foo/bar'), 1474 ('../../foo../../ba..r', 'foo../ba..r'), 1475 (r'foo/..\bar', r'foo/..\bar'), 1476 ] 1477 self._test_extract_hackers_arcnames(posix_hacknames) 1478 1479 def _test_extract_hackers_arcnames(self, hacknames): 1480 for arcname, fixedname in hacknames: 1481 content = b'foobar' + arcname.encode() 1482 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1483 zinfo = zipfile.ZipInfo() 1484 # preserve backslashes 1485 zinfo.filename = arcname 1486 zinfo.external_attr = 0o600 << 16 1487 zipfp.writestr(zinfo, content) 1488 1489 arcname = arcname.replace(os.sep, "/") 1490 targetpath = os.path.join('target', 'subdir', 'subsub') 1491 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1492 1493 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1494 writtenfile = zipfp.extract(arcname, targetpath) 1495 self.assertEqual(writtenfile, correctfile, 1496 msg='extract %r: %r != %r' % 1497 (arcname, writtenfile, correctfile)) 1498 self.check_file(correctfile, content) 1499 rmtree('target') 1500 1501 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1502 zipfp.extractall(targetpath) 1503 self.check_file(correctfile, content) 1504 rmtree('target') 1505 1506 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1507 1508 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1509 writtenfile = zipfp.extract(arcname) 1510 self.assertEqual(writtenfile, correctfile, 1511 msg="extract %r" % arcname) 1512 self.check_file(correctfile, content) 1513 rmtree(fixedname.split('/')[0]) 1514 1515 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1516 zipfp.extractall() 1517 self.check_file(correctfile, content) 1518 rmtree(fixedname.split('/')[0]) 1519 1520 unlink(TESTFN2) 1521 1522 1523class OtherTests(unittest.TestCase): 1524 def test_open_via_zip_info(self): 1525 # Create the ZIP archive 1526 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1527 zipfp.writestr("name", "foo") 1528 with self.assertWarns(UserWarning): 1529 zipfp.writestr("name", "bar") 1530 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1531 1532 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1533 infos = zipfp.infolist() 1534 data = b"" 1535 for info in infos: 1536 with zipfp.open(info) as zipopen: 1537 data += zipopen.read() 1538 self.assertIn(data, {b"foobar", b"barfoo"}) 1539 data = b"" 1540 for info in infos: 1541 data += zipfp.read(info) 1542 self.assertIn(data, {b"foobar", b"barfoo"}) 1543 1544 def test_writestr_extended_local_header_issue1202(self): 1545 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1546 for data in 'abcdefghijklmnop': 1547 zinfo = zipfile.ZipInfo(data) 1548 zinfo.flag_bits |= 0x08 # Include an extended local header. 1549 orig_zip.writestr(zinfo, data) 1550 1551 def test_close(self): 1552 """Check that the zipfile is closed after the 'with' block.""" 1553 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1554 for fpath, fdata in SMALL_TEST_DATA: 1555 zipfp.writestr(fpath, fdata) 1556 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1557 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1558 1559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1560 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1561 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1562 1563 def test_close_on_exception(self): 1564 """Check that the zipfile is closed if an exception is raised in the 1565 'with' block.""" 1566 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1567 for fpath, fdata in SMALL_TEST_DATA: 1568 zipfp.writestr(fpath, fdata) 1569 1570 try: 1571 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1572 raise zipfile.BadZipFile() 1573 except zipfile.BadZipFile: 1574 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1575 1576 def test_unsupported_version(self): 1577 # File has an extract_version of 120 1578 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1579 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1580 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1581 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1582 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1583 1584 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1585 io.BytesIO(data), 'r') 1586 1587 @requires_zlib() 1588 def test_read_unicode_filenames(self): 1589 # bug #10801 1590 fname = findfile('zip_cp437_header.zip') 1591 with zipfile.ZipFile(fname) as zipfp: 1592 for name in zipfp.namelist(): 1593 zipfp.open(name).close() 1594 1595 def test_write_unicode_filenames(self): 1596 with zipfile.ZipFile(TESTFN, "w") as zf: 1597 zf.writestr("foo.txt", "Test for unicode filename") 1598 zf.writestr("\xf6.txt", "Test for unicode filename") 1599 self.assertIsInstance(zf.infolist()[0].filename, str) 1600 1601 with zipfile.ZipFile(TESTFN, "r") as zf: 1602 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1603 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1604 1605 def test_read_after_write_unicode_filenames(self): 1606 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1607 zipfp.writestr('приклад', b'sample') 1608 self.assertEqual(zipfp.read('приклад'), b'sample') 1609 1610 def test_exclusive_create_zip_file(self): 1611 """Test exclusive creating a new zipfile.""" 1612 unlink(TESTFN2) 1613 filename = 'testfile.txt' 1614 content = b'hello, world. this is some content.' 1615 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1616 zipfp.writestr(filename, content) 1617 with self.assertRaises(FileExistsError): 1618 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1619 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1620 self.assertEqual(zipfp.namelist(), [filename]) 1621 self.assertEqual(zipfp.read(filename), content) 1622 1623 def test_create_non_existent_file_for_append(self): 1624 if os.path.exists(TESTFN): 1625 os.unlink(TESTFN) 1626 1627 filename = 'testfile.txt' 1628 content = b'hello, world. this is some content.' 1629 1630 try: 1631 with zipfile.ZipFile(TESTFN, 'a') as zf: 1632 zf.writestr(filename, content) 1633 except OSError: 1634 self.fail('Could not append data to a non-existent zip file.') 1635 1636 self.assertTrue(os.path.exists(TESTFN)) 1637 1638 with zipfile.ZipFile(TESTFN, 'r') as zf: 1639 self.assertEqual(zf.read(filename), content) 1640 1641 def test_close_erroneous_file(self): 1642 # This test checks that the ZipFile constructor closes the file object 1643 # it opens if there's an error in the file. If it doesn't, the 1644 # traceback holds a reference to the ZipFile object and, indirectly, 1645 # the file object. 1646 # On Windows, this causes the os.unlink() call to fail because the 1647 # underlying file is still open. This is SF bug #412214. 1648 # 1649 with open(TESTFN, "w", encoding="utf-8") as fp: 1650 fp.write("this is not a legal zip file\n") 1651 try: 1652 zf = zipfile.ZipFile(TESTFN) 1653 except zipfile.BadZipFile: 1654 pass 1655 1656 def test_is_zip_erroneous_file(self): 1657 """Check that is_zipfile() correctly identifies non-zip files.""" 1658 # - passing a filename 1659 with open(TESTFN, "w", encoding='utf-8') as fp: 1660 fp.write("this is not a legal zip file\n") 1661 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1662 # - passing a path-like object 1663 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1664 # - passing a file object 1665 with open(TESTFN, "rb") as fp: 1666 self.assertFalse(zipfile.is_zipfile(fp)) 1667 # - passing a file-like object 1668 fp = io.BytesIO() 1669 fp.write(b"this is not a legal zip file\n") 1670 self.assertFalse(zipfile.is_zipfile(fp)) 1671 fp.seek(0, 0) 1672 self.assertFalse(zipfile.is_zipfile(fp)) 1673 1674 def test_damaged_zipfile(self): 1675 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1676 # - Create a valid zip file 1677 fp = io.BytesIO() 1678 with zipfile.ZipFile(fp, mode="w") as zipf: 1679 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1680 zipfiledata = fp.getvalue() 1681 1682 # - Now create copies of it missing the last N bytes and make sure 1683 # a BadZipFile exception is raised when we try to open it 1684 for N in range(len(zipfiledata)): 1685 fp = io.BytesIO(zipfiledata[:N]) 1686 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1687 1688 def test_is_zip_valid_file(self): 1689 """Check that is_zipfile() correctly identifies zip files.""" 1690 # - passing a filename 1691 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1692 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1693 1694 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1695 # - passing a file object 1696 with open(TESTFN, "rb") as fp: 1697 self.assertTrue(zipfile.is_zipfile(fp)) 1698 fp.seek(0, 0) 1699 zip_contents = fp.read() 1700 # - passing a file-like object 1701 fp = io.BytesIO() 1702 fp.write(zip_contents) 1703 self.assertTrue(zipfile.is_zipfile(fp)) 1704 fp.seek(0, 0) 1705 self.assertTrue(zipfile.is_zipfile(fp)) 1706 1707 def test_non_existent_file_raises_OSError(self): 1708 # make sure we don't raise an AttributeError when a partially-constructed 1709 # ZipFile instance is finalized; this tests for regression on SF tracker 1710 # bug #403871. 1711 1712 # The bug we're testing for caused an AttributeError to be raised 1713 # when a ZipFile instance was created for a file that did not 1714 # exist; the .fp member was not initialized but was needed by the 1715 # __del__() method. Since the AttributeError is in the __del__(), 1716 # it is ignored, but the user should be sufficiently annoyed by 1717 # the message on the output that regression will be noticed 1718 # quickly. 1719 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1720 1721 def test_empty_file_raises_BadZipFile(self): 1722 f = open(TESTFN, 'w', encoding='utf-8') 1723 f.close() 1724 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1725 1726 with open(TESTFN, 'w', encoding='utf-8') as fp: 1727 fp.write("short file") 1728 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1729 1730 def test_closed_zip_raises_ValueError(self): 1731 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1732 data = io.BytesIO() 1733 with zipfile.ZipFile(data, mode="w") as zipf: 1734 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1735 1736 # This is correct; calling .read on a closed ZipFile should raise 1737 # a ValueError, and so should calling .testzip. An earlier 1738 # version of .testzip would swallow this exception (and any other) 1739 # and report that the first file in the archive was corrupt. 1740 self.assertRaises(ValueError, zipf.read, "foo.txt") 1741 self.assertRaises(ValueError, zipf.open, "foo.txt") 1742 self.assertRaises(ValueError, zipf.testzip) 1743 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1744 with open(TESTFN, 'w', encoding='utf-8') as f: 1745 f.write('zipfile test data') 1746 self.assertRaises(ValueError, zipf.write, TESTFN) 1747 1748 def test_bad_constructor_mode(self): 1749 """Check that bad modes passed to ZipFile constructor are caught.""" 1750 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1751 1752 def test_bad_open_mode(self): 1753 """Check that bad modes passed to ZipFile.open are caught.""" 1754 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1755 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1756 1757 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1758 # read the data to make sure the file is there 1759 zipf.read("foo.txt") 1760 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1761 # universal newlines support is removed 1762 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1763 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1764 1765 def test_read0(self): 1766 """Check that calling read(0) on a ZipExtFile object returns an empty 1767 string and doesn't advance file pointer.""" 1768 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1769 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1770 # read the data to make sure the file is there 1771 with zipf.open("foo.txt") as f: 1772 for i in range(FIXEDTEST_SIZE): 1773 self.assertEqual(f.read(0), b'') 1774 1775 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1776 1777 def test_open_non_existent_item(self): 1778 """Check that attempting to call open() for an item that doesn't 1779 exist in the archive raises a RuntimeError.""" 1780 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1781 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1782 1783 def test_bad_compression_mode(self): 1784 """Check that bad compression methods passed to ZipFile.open are 1785 caught.""" 1786 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1787 1788 def test_unsupported_compression(self): 1789 # data is declared as shrunk, but actually deflated 1790 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1791 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1792 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1793 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1794 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1795 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1796 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1797 self.assertRaises(NotImplementedError, zipf.open, 'x') 1798 1799 def test_null_byte_in_filename(self): 1800 """Check that a filename containing a null byte is properly 1801 terminated.""" 1802 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1803 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1804 self.assertEqual(zipf.namelist(), ['foo.txt']) 1805 1806 def test_struct_sizes(self): 1807 """Check that ZIP internal structure sizes are calculated correctly.""" 1808 self.assertEqual(zipfile.sizeEndCentDir, 22) 1809 self.assertEqual(zipfile.sizeCentralDir, 46) 1810 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1811 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1812 1813 def test_comments(self): 1814 """Check that comments on the archive are handled properly.""" 1815 1816 # check default comment is empty 1817 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1818 self.assertEqual(zipf.comment, b'') 1819 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1820 1821 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1822 self.assertEqual(zipfr.comment, b'') 1823 1824 # check a simple short comment 1825 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1826 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1827 zipf.comment = comment 1828 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1829 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1830 self.assertEqual(zipf.comment, comment) 1831 1832 # check a comment of max length 1833 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1834 comment2 = comment2.encode("ascii") 1835 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1836 zipf.comment = comment2 1837 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1838 1839 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1840 self.assertEqual(zipfr.comment, comment2) 1841 1842 # check a comment that is too long is truncated 1843 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1844 with self.assertWarns(UserWarning): 1845 zipf.comment = comment2 + b'oops' 1846 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1847 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1848 self.assertEqual(zipfr.comment, comment2) 1849 1850 # check that comments are correctly modified in append mode 1851 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1852 zipf.comment = b"original comment" 1853 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1854 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1855 zipf.comment = b"an updated comment" 1856 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1857 self.assertEqual(zipf.comment, b"an updated comment") 1858 1859 # check that comments are correctly shortened in append mode 1860 # and the file is indeed truncated 1861 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1862 zipf.comment = b"original comment that's longer" 1863 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1864 original_zip_size = os.path.getsize(TESTFN) 1865 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1866 zipf.comment = b"shorter comment" 1867 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1868 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1869 self.assertEqual(zipf.comment, b"shorter comment") 1870 1871 def test_unicode_comment(self): 1872 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1873 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1874 with self.assertRaises(TypeError): 1875 zipf.comment = "this is an error" 1876 1877 def test_change_comment_in_empty_archive(self): 1878 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1879 self.assertFalse(zipf.filelist) 1880 zipf.comment = b"this is a comment" 1881 with zipfile.ZipFile(TESTFN, "r") as zipf: 1882 self.assertEqual(zipf.comment, b"this is a comment") 1883 1884 def test_change_comment_in_nonempty_archive(self): 1885 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1886 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1887 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1888 self.assertTrue(zipf.filelist) 1889 zipf.comment = b"this is a comment" 1890 with zipfile.ZipFile(TESTFN, "r") as zipf: 1891 self.assertEqual(zipf.comment, b"this is a comment") 1892 1893 def test_empty_zipfile(self): 1894 # Check that creating a file in 'w' or 'a' mode and closing without 1895 # adding any files to the archives creates a valid empty ZIP file 1896 zipf = zipfile.ZipFile(TESTFN, mode="w") 1897 zipf.close() 1898 try: 1899 zipf = zipfile.ZipFile(TESTFN, mode="r") 1900 except zipfile.BadZipFile: 1901 self.fail("Unable to create empty ZIP file in 'w' mode") 1902 1903 zipf = zipfile.ZipFile(TESTFN, mode="a") 1904 zipf.close() 1905 try: 1906 zipf = zipfile.ZipFile(TESTFN, mode="r") 1907 except: 1908 self.fail("Unable to create empty ZIP file in 'a' mode") 1909 1910 def test_open_empty_file(self): 1911 # Issue 1710703: Check that opening a file with less than 22 bytes 1912 # raises a BadZipFile exception (rather than the previously unhelpful 1913 # OSError) 1914 f = open(TESTFN, 'w', encoding='utf-8') 1915 f.close() 1916 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1917 1918 def test_create_zipinfo_before_1980(self): 1919 self.assertRaises(ValueError, 1920 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1921 1922 def test_create_empty_zipinfo_repr(self): 1923 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 1924 zi = zipfile.ZipInfo(filename="empty") 1925 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 1926 1927 def test_create_empty_zipinfo_default_attributes(self): 1928 """Ensure all required attributes are set.""" 1929 zi = zipfile.ZipInfo() 1930 self.assertEqual(zi.orig_filename, "NoName") 1931 self.assertEqual(zi.filename, "NoName") 1932 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 1933 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 1934 self.assertEqual(zi.comment, b"") 1935 self.assertEqual(zi.extra, b"") 1936 self.assertIn(zi.create_system, (0, 3)) 1937 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 1938 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 1939 self.assertEqual(zi.reserved, 0) 1940 self.assertEqual(zi.flag_bits, 0) 1941 self.assertEqual(zi.volume, 0) 1942 self.assertEqual(zi.internal_attr, 0) 1943 self.assertEqual(zi.external_attr, 0) 1944 1945 # Before bpo-26185, both were missing 1946 self.assertEqual(zi.file_size, 0) 1947 self.assertEqual(zi.compress_size, 0) 1948 1949 def test_zipfile_with_short_extra_field(self): 1950 """If an extra field in the header is less than 4 bytes, skip it.""" 1951 zipdata = ( 1952 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1953 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1954 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1955 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1956 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1957 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1958 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1959 ) 1960 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1961 # testzip returns the name of the first corrupt file, or None 1962 self.assertIsNone(zipf.testzip()) 1963 1964 def test_open_conflicting_handles(self): 1965 # It's only possible to open one writable file handle at a time 1966 msg1 = b"It's fun to charter an accountant!" 1967 msg2 = b"And sail the wide accountant sea" 1968 msg3 = b"To find, explore the funds offshore" 1969 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1970 with zipf.open('foo', mode='w') as w2: 1971 w2.write(msg1) 1972 with zipf.open('bar', mode='w') as w1: 1973 with self.assertRaises(ValueError): 1974 zipf.open('handle', mode='w') 1975 with self.assertRaises(ValueError): 1976 zipf.open('foo', mode='r') 1977 with self.assertRaises(ValueError): 1978 zipf.writestr('str', 'abcde') 1979 with self.assertRaises(ValueError): 1980 zipf.write(__file__, 'file') 1981 with self.assertRaises(ValueError): 1982 zipf.close() 1983 w1.write(msg2) 1984 with zipf.open('baz', mode='w') as w2: 1985 w2.write(msg3) 1986 1987 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1988 self.assertEqual(zipf.read('foo'), msg1) 1989 self.assertEqual(zipf.read('bar'), msg2) 1990 self.assertEqual(zipf.read('baz'), msg3) 1991 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1992 1993 def test_seek_tell(self): 1994 # Test seek functionality 1995 txt = b"Where's Bruce?" 1996 bloc = txt.find(b"Bruce") 1997 # Check seek on a file 1998 with zipfile.ZipFile(TESTFN, "w") as zipf: 1999 zipf.writestr("foo.txt", txt) 2000 with zipfile.ZipFile(TESTFN, "r") as zipf: 2001 with zipf.open("foo.txt", "r") as fp: 2002 fp.seek(bloc, os.SEEK_SET) 2003 self.assertEqual(fp.tell(), bloc) 2004 fp.seek(-bloc, os.SEEK_CUR) 2005 self.assertEqual(fp.tell(), 0) 2006 fp.seek(bloc, os.SEEK_CUR) 2007 self.assertEqual(fp.tell(), bloc) 2008 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2009 fp.seek(0, os.SEEK_END) 2010 self.assertEqual(fp.tell(), len(txt)) 2011 fp.seek(0, os.SEEK_SET) 2012 self.assertEqual(fp.tell(), 0) 2013 # Check seek on memory file 2014 data = io.BytesIO() 2015 with zipfile.ZipFile(data, mode="w") as zipf: 2016 zipf.writestr("foo.txt", txt) 2017 with zipfile.ZipFile(data, mode="r") as zipf: 2018 with zipf.open("foo.txt", "r") as fp: 2019 fp.seek(bloc, os.SEEK_SET) 2020 self.assertEqual(fp.tell(), bloc) 2021 fp.seek(-bloc, os.SEEK_CUR) 2022 self.assertEqual(fp.tell(), 0) 2023 fp.seek(bloc, os.SEEK_CUR) 2024 self.assertEqual(fp.tell(), bloc) 2025 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2026 fp.seek(0, os.SEEK_END) 2027 self.assertEqual(fp.tell(), len(txt)) 2028 fp.seek(0, os.SEEK_SET) 2029 self.assertEqual(fp.tell(), 0) 2030 2031 @requires_bz2() 2032 def test_decompress_without_3rd_party_library(self): 2033 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2034 zip_file = io.BytesIO(data) 2035 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2036 zf.writestr('a.txt', b'a') 2037 with mock.patch('zipfile.bz2', None): 2038 with zipfile.ZipFile(zip_file) as zf: 2039 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2040 2041 def tearDown(self): 2042 unlink(TESTFN) 2043 unlink(TESTFN2) 2044 2045 2046class AbstractBadCrcTests: 2047 def test_testzip_with_bad_crc(self): 2048 """Tests that files with bad CRCs return their name from testzip.""" 2049 zipdata = self.zip_with_bad_crc 2050 2051 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2052 # testzip returns the name of the first corrupt file, or None 2053 self.assertEqual('afile', zipf.testzip()) 2054 2055 def test_read_with_bad_crc(self): 2056 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2057 zipdata = self.zip_with_bad_crc 2058 2059 # Using ZipFile.read() 2060 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2061 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2062 2063 # Using ZipExtFile.read() 2064 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2065 with zipf.open('afile', 'r') as corrupt_file: 2066 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2067 2068 # Same with small reads (in order to exercise the buffering logic) 2069 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2070 with zipf.open('afile', 'r') as corrupt_file: 2071 corrupt_file.MIN_READ_SIZE = 2 2072 with self.assertRaises(zipfile.BadZipFile): 2073 while corrupt_file.read(2): 2074 pass 2075 2076 2077class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2078 compression = zipfile.ZIP_STORED 2079 zip_with_bad_crc = ( 2080 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2081 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2082 b'ilehello,AworldP' 2083 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2084 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2085 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2086 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2087 b'\0\0/\0\0\0\0\0') 2088 2089@requires_zlib() 2090class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2091 compression = zipfile.ZIP_DEFLATED 2092 zip_with_bad_crc = ( 2093 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2094 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2095 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2096 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2097 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2098 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2099 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2100 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2101 2102@requires_bz2() 2103class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2104 compression = zipfile.ZIP_BZIP2 2105 zip_with_bad_crc = ( 2106 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2107 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2108 b'ileBZh91AY&SY\xd4\xa8\xca' 2109 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2110 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2111 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2112 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2113 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2114 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2115 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2116 b'\x00\x00\x00\x00') 2117 2118@requires_lzma() 2119class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2120 compression = zipfile.ZIP_LZMA 2121 zip_with_bad_crc = ( 2122 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2123 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2124 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2125 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2126 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2127 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2128 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2129 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2130 b'\x00>\x00\x00\x00\x00\x00') 2131 2132 2133class DecryptionTests(unittest.TestCase): 2134 """Check that ZIP decryption works. Since the library does not 2135 support encryption at the moment, we use a pre-generated encrypted 2136 ZIP file.""" 2137 2138 data = ( 2139 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2140 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2141 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2142 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2143 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2144 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2145 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2146 data2 = ( 2147 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2148 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2149 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2150 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2151 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2152 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2153 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2154 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2155 2156 plain = b'zipfile.py encryption test' 2157 plain2 = b'\x00'*512 2158 2159 def setUp(self): 2160 with open(TESTFN, "wb") as fp: 2161 fp.write(self.data) 2162 self.zip = zipfile.ZipFile(TESTFN, "r") 2163 with open(TESTFN2, "wb") as fp: 2164 fp.write(self.data2) 2165 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2166 2167 def tearDown(self): 2168 self.zip.close() 2169 os.unlink(TESTFN) 2170 self.zip2.close() 2171 os.unlink(TESTFN2) 2172 2173 def test_no_password(self): 2174 # Reading the encrypted file without password 2175 # must generate a RunTime exception 2176 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2177 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2178 2179 def test_bad_password(self): 2180 self.zip.setpassword(b"perl") 2181 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2182 self.zip2.setpassword(b"perl") 2183 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2184 2185 @requires_zlib() 2186 def test_good_password(self): 2187 self.zip.setpassword(b"python") 2188 self.assertEqual(self.zip.read("test.txt"), self.plain) 2189 self.zip2.setpassword(b"12345") 2190 self.assertEqual(self.zip2.read("zero"), self.plain2) 2191 2192 def test_unicode_password(self): 2193 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2194 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2195 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2196 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2197 2198 def test_seek_tell(self): 2199 self.zip.setpassword(b"python") 2200 txt = self.plain 2201 test_word = b'encryption' 2202 bloc = txt.find(test_word) 2203 bloc_len = len(test_word) 2204 with self.zip.open("test.txt", "r") as fp: 2205 fp.seek(bloc, os.SEEK_SET) 2206 self.assertEqual(fp.tell(), bloc) 2207 fp.seek(-bloc, os.SEEK_CUR) 2208 self.assertEqual(fp.tell(), 0) 2209 fp.seek(bloc, os.SEEK_CUR) 2210 self.assertEqual(fp.tell(), bloc) 2211 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2212 2213 # Make sure that the second read after seeking back beyond 2214 # _readbuffer returns the same content (ie. rewind to the start of 2215 # the file to read forward to the required position). 2216 old_read_size = fp.MIN_READ_SIZE 2217 fp.MIN_READ_SIZE = 1 2218 fp._readbuffer = b'' 2219 fp._offset = 0 2220 fp.seek(0, os.SEEK_SET) 2221 self.assertEqual(fp.tell(), 0) 2222 fp.seek(bloc, os.SEEK_CUR) 2223 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2224 fp.MIN_READ_SIZE = old_read_size 2225 2226 fp.seek(0, os.SEEK_END) 2227 self.assertEqual(fp.tell(), len(txt)) 2228 fp.seek(0, os.SEEK_SET) 2229 self.assertEqual(fp.tell(), 0) 2230 2231 # Read the file completely to definitely call any eof integrity 2232 # checks (crc) and make sure they still pass. 2233 fp.read() 2234 2235 2236class AbstractTestsWithRandomBinaryFiles: 2237 @classmethod 2238 def setUpClass(cls): 2239 datacount = randint(16, 64)*1024 + randint(1, 1024) 2240 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2241 for i in range(datacount)) 2242 2243 def setUp(self): 2244 # Make a source file with some lines 2245 with open(TESTFN, "wb") as fp: 2246 fp.write(self.data) 2247 2248 def tearDown(self): 2249 unlink(TESTFN) 2250 unlink(TESTFN2) 2251 2252 def make_test_archive(self, f, compression): 2253 # Create the ZIP archive 2254 with zipfile.ZipFile(f, "w", compression) as zipfp: 2255 zipfp.write(TESTFN, "another.name") 2256 zipfp.write(TESTFN, TESTFN) 2257 2258 def zip_test(self, f, compression): 2259 self.make_test_archive(f, compression) 2260 2261 # Read the ZIP archive 2262 with zipfile.ZipFile(f, "r", compression) as zipfp: 2263 testdata = zipfp.read(TESTFN) 2264 self.assertEqual(len(testdata), len(self.data)) 2265 self.assertEqual(testdata, self.data) 2266 self.assertEqual(zipfp.read("another.name"), self.data) 2267 2268 def test_read(self): 2269 for f in get_files(self): 2270 self.zip_test(f, self.compression) 2271 2272 def zip_open_test(self, f, compression): 2273 self.make_test_archive(f, compression) 2274 2275 # Read the ZIP archive 2276 with zipfile.ZipFile(f, "r", compression) as zipfp: 2277 zipdata1 = [] 2278 with zipfp.open(TESTFN) as zipopen1: 2279 while True: 2280 read_data = zipopen1.read(256) 2281 if not read_data: 2282 break 2283 zipdata1.append(read_data) 2284 2285 zipdata2 = [] 2286 with zipfp.open("another.name") as zipopen2: 2287 while True: 2288 read_data = zipopen2.read(256) 2289 if not read_data: 2290 break 2291 zipdata2.append(read_data) 2292 2293 testdata1 = b''.join(zipdata1) 2294 self.assertEqual(len(testdata1), len(self.data)) 2295 self.assertEqual(testdata1, self.data) 2296 2297 testdata2 = b''.join(zipdata2) 2298 self.assertEqual(len(testdata2), len(self.data)) 2299 self.assertEqual(testdata2, self.data) 2300 2301 def test_open(self): 2302 for f in get_files(self): 2303 self.zip_open_test(f, self.compression) 2304 2305 def zip_random_open_test(self, f, compression): 2306 self.make_test_archive(f, compression) 2307 2308 # Read the ZIP archive 2309 with zipfile.ZipFile(f, "r", compression) as zipfp: 2310 zipdata1 = [] 2311 with zipfp.open(TESTFN) as zipopen1: 2312 while True: 2313 read_data = zipopen1.read(randint(1, 1024)) 2314 if not read_data: 2315 break 2316 zipdata1.append(read_data) 2317 2318 testdata = b''.join(zipdata1) 2319 self.assertEqual(len(testdata), len(self.data)) 2320 self.assertEqual(testdata, self.data) 2321 2322 def test_random_open(self): 2323 for f in get_files(self): 2324 self.zip_random_open_test(f, self.compression) 2325 2326 2327class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2328 unittest.TestCase): 2329 compression = zipfile.ZIP_STORED 2330 2331@requires_zlib() 2332class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2333 unittest.TestCase): 2334 compression = zipfile.ZIP_DEFLATED 2335 2336@requires_bz2() 2337class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2338 unittest.TestCase): 2339 compression = zipfile.ZIP_BZIP2 2340 2341@requires_lzma() 2342class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2343 unittest.TestCase): 2344 compression = zipfile.ZIP_LZMA 2345 2346 2347# Provide the tell() method but not seek() 2348class Tellable: 2349 def __init__(self, fp): 2350 self.fp = fp 2351 self.offset = 0 2352 2353 def write(self, data): 2354 n = self.fp.write(data) 2355 self.offset += n 2356 return n 2357 2358 def tell(self): 2359 return self.offset 2360 2361 def flush(self): 2362 self.fp.flush() 2363 2364class Unseekable: 2365 def __init__(self, fp): 2366 self.fp = fp 2367 2368 def write(self, data): 2369 return self.fp.write(data) 2370 2371 def flush(self): 2372 self.fp.flush() 2373 2374class UnseekableTests(unittest.TestCase): 2375 def test_writestr(self): 2376 for wrapper in (lambda f: f), Tellable, Unseekable: 2377 with self.subTest(wrapper=wrapper): 2378 f = io.BytesIO() 2379 f.write(b'abc') 2380 bf = io.BufferedWriter(f) 2381 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2382 zipfp.writestr('ones', b'111') 2383 zipfp.writestr('twos', b'222') 2384 self.assertEqual(f.getvalue()[:5], b'abcPK') 2385 with zipfile.ZipFile(f, mode='r') as zipf: 2386 with zipf.open('ones') as zopen: 2387 self.assertEqual(zopen.read(), b'111') 2388 with zipf.open('twos') as zopen: 2389 self.assertEqual(zopen.read(), b'222') 2390 2391 def test_write(self): 2392 for wrapper in (lambda f: f), Tellable, Unseekable: 2393 with self.subTest(wrapper=wrapper): 2394 f = io.BytesIO() 2395 f.write(b'abc') 2396 bf = io.BufferedWriter(f) 2397 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2398 self.addCleanup(unlink, TESTFN) 2399 with open(TESTFN, 'wb') as f2: 2400 f2.write(b'111') 2401 zipfp.write(TESTFN, 'ones') 2402 with open(TESTFN, 'wb') as f2: 2403 f2.write(b'222') 2404 zipfp.write(TESTFN, 'twos') 2405 self.assertEqual(f.getvalue()[:5], b'abcPK') 2406 with zipfile.ZipFile(f, mode='r') as zipf: 2407 with zipf.open('ones') as zopen: 2408 self.assertEqual(zopen.read(), b'111') 2409 with zipf.open('twos') as zopen: 2410 self.assertEqual(zopen.read(), b'222') 2411 2412 def test_open_write(self): 2413 for wrapper in (lambda f: f), Tellable, Unseekable: 2414 with self.subTest(wrapper=wrapper): 2415 f = io.BytesIO() 2416 f.write(b'abc') 2417 bf = io.BufferedWriter(f) 2418 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2419 with zipf.open('ones', 'w') as zopen: 2420 zopen.write(b'111') 2421 with zipf.open('twos', 'w') as zopen: 2422 zopen.write(b'222') 2423 self.assertEqual(f.getvalue()[:5], b'abcPK') 2424 with zipfile.ZipFile(f) as zipf: 2425 self.assertEqual(zipf.read('ones'), b'111') 2426 self.assertEqual(zipf.read('twos'), b'222') 2427 2428 2429@requires_zlib() 2430class TestsWithMultipleOpens(unittest.TestCase): 2431 @classmethod 2432 def setUpClass(cls): 2433 cls.data1 = b'111' + randbytes(10000) 2434 cls.data2 = b'222' + randbytes(10000) 2435 2436 def make_test_archive(self, f): 2437 # Create the ZIP archive 2438 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2439 zipfp.writestr('ones', self.data1) 2440 zipfp.writestr('twos', self.data2) 2441 2442 def test_same_file(self): 2443 # Verify that (when the ZipFile is in control of creating file objects) 2444 # multiple open() calls can be made without interfering with each other. 2445 for f in get_files(self): 2446 self.make_test_archive(f) 2447 with zipfile.ZipFile(f, mode="r") as zipf: 2448 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2449 data1 = zopen1.read(500) 2450 data2 = zopen2.read(500) 2451 data1 += zopen1.read() 2452 data2 += zopen2.read() 2453 self.assertEqual(data1, data2) 2454 self.assertEqual(data1, self.data1) 2455 2456 def test_different_file(self): 2457 # Verify that (when the ZipFile is in control of creating file objects) 2458 # multiple open() calls can be made without interfering with each other. 2459 for f in get_files(self): 2460 self.make_test_archive(f) 2461 with zipfile.ZipFile(f, mode="r") as zipf: 2462 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2463 data1 = zopen1.read(500) 2464 data2 = zopen2.read(500) 2465 data1 += zopen1.read() 2466 data2 += zopen2.read() 2467 self.assertEqual(data1, self.data1) 2468 self.assertEqual(data2, self.data2) 2469 2470 def test_interleaved(self): 2471 # Verify that (when the ZipFile is in control of creating file objects) 2472 # multiple open() calls can be made without interfering with each other. 2473 for f in get_files(self): 2474 self.make_test_archive(f) 2475 with zipfile.ZipFile(f, mode="r") as zipf: 2476 with zipf.open('ones') as zopen1: 2477 data1 = zopen1.read(500) 2478 with zipf.open('twos') as zopen2: 2479 data2 = zopen2.read(500) 2480 data1 += zopen1.read() 2481 data2 += zopen2.read() 2482 self.assertEqual(data1, self.data1) 2483 self.assertEqual(data2, self.data2) 2484 2485 def test_read_after_close(self): 2486 for f in get_files(self): 2487 self.make_test_archive(f) 2488 with contextlib.ExitStack() as stack: 2489 with zipfile.ZipFile(f, 'r') as zipf: 2490 zopen1 = stack.enter_context(zipf.open('ones')) 2491 zopen2 = stack.enter_context(zipf.open('twos')) 2492 data1 = zopen1.read(500) 2493 data2 = zopen2.read(500) 2494 data1 += zopen1.read() 2495 data2 += zopen2.read() 2496 self.assertEqual(data1, self.data1) 2497 self.assertEqual(data2, self.data2) 2498 2499 def test_read_after_write(self): 2500 for f in get_files(self): 2501 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2502 zipf.writestr('ones', self.data1) 2503 zipf.writestr('twos', self.data2) 2504 with zipf.open('ones') as zopen1: 2505 data1 = zopen1.read(500) 2506 self.assertEqual(data1, self.data1[:500]) 2507 with zipfile.ZipFile(f, 'r') as zipf: 2508 data1 = zipf.read('ones') 2509 data2 = zipf.read('twos') 2510 self.assertEqual(data1, self.data1) 2511 self.assertEqual(data2, self.data2) 2512 2513 def test_write_after_read(self): 2514 for f in get_files(self): 2515 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2516 zipf.writestr('ones', self.data1) 2517 with zipf.open('ones') as zopen1: 2518 zopen1.read(500) 2519 zipf.writestr('twos', self.data2) 2520 with zipfile.ZipFile(f, 'r') as zipf: 2521 data1 = zipf.read('ones') 2522 data2 = zipf.read('twos') 2523 self.assertEqual(data1, self.data1) 2524 self.assertEqual(data2, self.data2) 2525 2526 def test_many_opens(self): 2527 # Verify that read() and open() promptly close the file descriptor, 2528 # and don't rely on the garbage collector to free resources. 2529 self.make_test_archive(TESTFN2) 2530 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2531 for x in range(100): 2532 zipf.read('ones') 2533 with zipf.open('ones') as zopen1: 2534 pass 2535 with open(os.devnull, "rb") as f: 2536 self.assertLess(f.fileno(), 100) 2537 2538 def test_write_while_reading(self): 2539 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2540 zipf.writestr('ones', self.data1) 2541 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2542 with zipf.open('ones', 'r') as r1: 2543 data1 = r1.read(500) 2544 with zipf.open('twos', 'w') as w1: 2545 w1.write(self.data2) 2546 data1 += r1.read() 2547 self.assertEqual(data1, self.data1) 2548 with zipfile.ZipFile(TESTFN2) as zipf: 2549 self.assertEqual(zipf.read('twos'), self.data2) 2550 2551 def tearDown(self): 2552 unlink(TESTFN2) 2553 2554 2555class TestWithDirectory(unittest.TestCase): 2556 def setUp(self): 2557 os.mkdir(TESTFN2) 2558 2559 def test_extract_dir(self): 2560 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2561 zipf.extractall(TESTFN2) 2562 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2563 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2564 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2565 2566 def test_bug_6050(self): 2567 # Extraction should succeed if directories already exist 2568 os.mkdir(os.path.join(TESTFN2, "a")) 2569 self.test_extract_dir() 2570 2571 def test_write_dir(self): 2572 dirpath = os.path.join(TESTFN2, "x") 2573 os.mkdir(dirpath) 2574 mode = os.stat(dirpath).st_mode & 0xFFFF 2575 with zipfile.ZipFile(TESTFN, "w") as zipf: 2576 zipf.write(dirpath) 2577 zinfo = zipf.filelist[0] 2578 self.assertTrue(zinfo.filename.endswith("/x/")) 2579 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2580 zipf.write(dirpath, "y") 2581 zinfo = zipf.filelist[1] 2582 self.assertTrue(zinfo.filename, "y/") 2583 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2584 with zipfile.ZipFile(TESTFN, "r") as zipf: 2585 zinfo = zipf.filelist[0] 2586 self.assertTrue(zinfo.filename.endswith("/x/")) 2587 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2588 zinfo = zipf.filelist[1] 2589 self.assertTrue(zinfo.filename, "y/") 2590 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2591 target = os.path.join(TESTFN2, "target") 2592 os.mkdir(target) 2593 zipf.extractall(target) 2594 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2595 self.assertEqual(len(os.listdir(target)), 2) 2596 2597 def test_writestr_dir(self): 2598 os.mkdir(os.path.join(TESTFN2, "x")) 2599 with zipfile.ZipFile(TESTFN, "w") as zipf: 2600 zipf.writestr("x/", b'') 2601 zinfo = zipf.filelist[0] 2602 self.assertEqual(zinfo.filename, "x/") 2603 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2604 with zipfile.ZipFile(TESTFN, "r") as zipf: 2605 zinfo = zipf.filelist[0] 2606 self.assertTrue(zinfo.filename.endswith("x/")) 2607 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2608 target = os.path.join(TESTFN2, "target") 2609 os.mkdir(target) 2610 zipf.extractall(target) 2611 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2612 self.assertEqual(os.listdir(target), ["x"]) 2613 2614 def tearDown(self): 2615 rmtree(TESTFN2) 2616 if os.path.exists(TESTFN): 2617 unlink(TESTFN) 2618 2619 2620class ZipInfoTests(unittest.TestCase): 2621 def test_from_file(self): 2622 zi = zipfile.ZipInfo.from_file(__file__) 2623 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2624 self.assertFalse(zi.is_dir()) 2625 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2626 2627 def test_from_file_pathlike(self): 2628 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2629 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2630 self.assertFalse(zi.is_dir()) 2631 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2632 2633 def test_from_file_bytes(self): 2634 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2635 self.assertEqual(posixpath.basename(zi.filename), 'test') 2636 self.assertFalse(zi.is_dir()) 2637 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2638 2639 def test_from_file_fileno(self): 2640 with open(__file__, 'rb') as f: 2641 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2642 self.assertEqual(posixpath.basename(zi.filename), 'test') 2643 self.assertFalse(zi.is_dir()) 2644 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2645 2646 def test_from_dir(self): 2647 dirpath = os.path.dirname(os.path.abspath(__file__)) 2648 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2649 self.assertEqual(zi.filename, 'stdlib_tests/') 2650 self.assertTrue(zi.is_dir()) 2651 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2652 self.assertEqual(zi.file_size, 0) 2653 2654 2655class CommandLineTest(unittest.TestCase): 2656 2657 def zipfilecmd(self, *args, **kwargs): 2658 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2659 **kwargs) 2660 return out.replace(os.linesep.encode(), b'\n') 2661 2662 def zipfilecmd_failure(self, *args): 2663 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2664 2665 def test_bad_use(self): 2666 rc, out, err = self.zipfilecmd_failure() 2667 self.assertEqual(out, b'') 2668 self.assertIn(b'usage', err.lower()) 2669 self.assertIn(b'error', err.lower()) 2670 self.assertIn(b'required', err.lower()) 2671 rc, out, err = self.zipfilecmd_failure('-l', '') 2672 self.assertEqual(out, b'') 2673 self.assertNotEqual(err.strip(), b'') 2674 2675 def test_test_command(self): 2676 zip_name = findfile('zipdir.zip') 2677 for opt in '-t', '--test': 2678 out = self.zipfilecmd(opt, zip_name) 2679 self.assertEqual(out.rstrip(), b'Done testing') 2680 zip_name = findfile('testtar.tar') 2681 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2682 self.assertEqual(out, b'') 2683 2684 def test_list_command(self): 2685 zip_name = findfile('zipdir.zip') 2686 t = io.StringIO() 2687 with zipfile.ZipFile(zip_name, 'r') as tf: 2688 tf.printdir(t) 2689 expected = t.getvalue().encode('ascii', 'backslashreplace') 2690 for opt in '-l', '--list': 2691 out = self.zipfilecmd(opt, zip_name, 2692 PYTHONIOENCODING='ascii:backslashreplace') 2693 self.assertEqual(out, expected) 2694 2695 @requires_zlib() 2696 def test_create_command(self): 2697 self.addCleanup(unlink, TESTFN) 2698 with open(TESTFN, 'w', encoding='utf-8') as f: 2699 f.write('test 1') 2700 os.mkdir(TESTFNDIR) 2701 self.addCleanup(rmtree, TESTFNDIR) 2702 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 2703 f.write('test 2') 2704 files = [TESTFN, TESTFNDIR] 2705 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2706 for opt in '-c', '--create': 2707 try: 2708 out = self.zipfilecmd(opt, TESTFN2, *files) 2709 self.assertEqual(out, b'') 2710 with zipfile.ZipFile(TESTFN2) as zf: 2711 self.assertEqual(zf.namelist(), namelist) 2712 self.assertEqual(zf.read(namelist[0]), b'test 1') 2713 self.assertEqual(zf.read(namelist[2]), b'test 2') 2714 finally: 2715 unlink(TESTFN2) 2716 2717 def test_extract_command(self): 2718 zip_name = findfile('zipdir.zip') 2719 for opt in '-e', '--extract': 2720 with temp_dir() as extdir: 2721 out = self.zipfilecmd(opt, zip_name, extdir) 2722 self.assertEqual(out, b'') 2723 with zipfile.ZipFile(zip_name) as zf: 2724 for zi in zf.infolist(): 2725 path = os.path.join(extdir, 2726 zi.filename.replace('/', os.sep)) 2727 if zi.is_dir(): 2728 self.assertTrue(os.path.isdir(path)) 2729 else: 2730 self.assertTrue(os.path.isfile(path)) 2731 with open(path, 'rb') as f: 2732 self.assertEqual(f.read(), zf.read(zi)) 2733 2734 2735class TestExecutablePrependedZip(unittest.TestCase): 2736 """Test our ability to open zip files with an executable prepended.""" 2737 2738 def setUp(self): 2739 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2740 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2741 2742 def _test_zip_works(self, name): 2743 # bpo28494 sanity check: ensure is_zipfile works on these. 2744 self.assertTrue(zipfile.is_zipfile(name), 2745 f'is_zipfile failed on {name}') 2746 # Ensure we can operate on these via ZipFile. 2747 with zipfile.ZipFile(name) as zipfp: 2748 for n in zipfp.namelist(): 2749 data = zipfp.read(n) 2750 self.assertIn(b'FAVORITE_NUMBER', data) 2751 2752 def test_read_zip_with_exe_prepended(self): 2753 self._test_zip_works(self.exe_zip) 2754 2755 def test_read_zip64_with_exe_prepended(self): 2756 self._test_zip_works(self.exe_zip64) 2757 2758 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2759 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2760 'Test relies on #!/bin/bash working.') 2761 def test_execute_zip2(self): 2762 output = subprocess.check_output([self.exe_zip, sys.executable]) 2763 self.assertIn(b'number in executable: 5', output) 2764 2765 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2766 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2767 'Test relies on #!/bin/bash working.') 2768 def test_execute_zip64(self): 2769 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2770 self.assertIn(b'number in executable: 5', output) 2771 2772 2773# Poor man's technique to consume a (smallish) iterable. 2774consume = tuple 2775 2776 2777# from jaraco.itertools 5.0 2778class jaraco: 2779 class itertools: 2780 class Counter: 2781 def __init__(self, i): 2782 self.count = 0 2783 self._orig_iter = iter(i) 2784 2785 def __iter__(self): 2786 return self 2787 2788 def __next__(self): 2789 result = next(self._orig_iter) 2790 self.count += 1 2791 return result 2792 2793 2794def add_dirs(zf): 2795 """ 2796 Given a writable zip file zf, inject directory entries for 2797 any directories implied by the presence of children. 2798 """ 2799 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2800 zf.writestr(name, b"") 2801 return zf 2802 2803 2804def build_alpharep_fixture(): 2805 """ 2806 Create a zip file with this structure: 2807 2808 . 2809 ├── a.txt 2810 ├── b 2811 │ ├── c.txt 2812 │ ├── d 2813 │ │ └── e.txt 2814 │ └── f.txt 2815 └── g 2816 └── h 2817 └── i.txt 2818 2819 This fixture has the following key characteristics: 2820 2821 - a file at the root (a) 2822 - a file two levels deep (b/d/e) 2823 - multiple files in a directory (b/c, b/f) 2824 - a directory containing only a directory (g/h) 2825 2826 "alpha" because it uses alphabet 2827 "rep" because it's a representative example 2828 """ 2829 data = io.BytesIO() 2830 zf = zipfile.ZipFile(data, "w") 2831 zf.writestr("a.txt", b"content of a") 2832 zf.writestr("b/c.txt", b"content of c") 2833 zf.writestr("b/d/e.txt", b"content of e") 2834 zf.writestr("b/f.txt", b"content of f") 2835 zf.writestr("g/h/i.txt", b"content of i") 2836 zf.filename = "alpharep.zip" 2837 return zf 2838 2839 2840def pass_alpharep(meth): 2841 """ 2842 Given a method, wrap it in a for loop that invokes method 2843 with each subtest. 2844 """ 2845 2846 @functools.wraps(meth) 2847 def wrapper(self): 2848 for alpharep in self.zipfile_alpharep(): 2849 meth(self, alpharep=alpharep) 2850 2851 return wrapper 2852 2853 2854class TestPath(unittest.TestCase): 2855 def setUp(self): 2856 self.fixtures = contextlib.ExitStack() 2857 self.addCleanup(self.fixtures.close) 2858 2859 def zipfile_alpharep(self): 2860 with self.subTest(): 2861 yield build_alpharep_fixture() 2862 with self.subTest(): 2863 yield add_dirs(build_alpharep_fixture()) 2864 2865 def zipfile_ondisk(self, alpharep): 2866 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2867 buffer = alpharep.fp 2868 alpharep.close() 2869 path = tmpdir / alpharep.filename 2870 with path.open("wb") as strm: 2871 strm.write(buffer.getvalue()) 2872 return path 2873 2874 @pass_alpharep 2875 def test_iterdir_and_types(self, alpharep): 2876 root = zipfile.Path(alpharep) 2877 assert root.is_dir() 2878 a, b, g = root.iterdir() 2879 assert a.is_file() 2880 assert b.is_dir() 2881 assert g.is_dir() 2882 c, f, d = b.iterdir() 2883 assert c.is_file() and f.is_file() 2884 (e,) = d.iterdir() 2885 assert e.is_file() 2886 (h,) = g.iterdir() 2887 (i,) = h.iterdir() 2888 assert i.is_file() 2889 2890 @pass_alpharep 2891 def test_is_file_missing(self, alpharep): 2892 root = zipfile.Path(alpharep) 2893 assert not root.joinpath('missing.txt').is_file() 2894 2895 @pass_alpharep 2896 def test_iterdir_on_file(self, alpharep): 2897 root = zipfile.Path(alpharep) 2898 a, b, g = root.iterdir() 2899 with self.assertRaises(ValueError): 2900 a.iterdir() 2901 2902 @pass_alpharep 2903 def test_subdir_is_dir(self, alpharep): 2904 root = zipfile.Path(alpharep) 2905 assert (root / 'b').is_dir() 2906 assert (root / 'b/').is_dir() 2907 assert (root / 'g').is_dir() 2908 assert (root / 'g/').is_dir() 2909 2910 @pass_alpharep 2911 def test_open(self, alpharep): 2912 root = zipfile.Path(alpharep) 2913 a, b, g = root.iterdir() 2914 with a.open(encoding="utf-8") as strm: 2915 data = strm.read() 2916 assert data == "content of a" 2917 2918 def test_open_write(self): 2919 """ 2920 If the zipfile is open for write, it should be possible to 2921 write bytes or text to it. 2922 """ 2923 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 2924 with zf.joinpath('file.bin').open('wb') as strm: 2925 strm.write(b'binary contents') 2926 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 2927 strm.write('text file') 2928 2929 def test_open_extant_directory(self): 2930 """ 2931 Attempting to open a directory raises IsADirectoryError. 2932 """ 2933 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2934 with self.assertRaises(IsADirectoryError): 2935 zf.joinpath('b').open() 2936 2937 @pass_alpharep 2938 def test_open_binary_invalid_args(self, alpharep): 2939 root = zipfile.Path(alpharep) 2940 with self.assertRaises(ValueError): 2941 root.joinpath('a.txt').open('rb', encoding='utf-8') 2942 with self.assertRaises(ValueError): 2943 root.joinpath('a.txt').open('rb', 'utf-8') 2944 2945 def test_open_missing_directory(self): 2946 """ 2947 Attempting to open a missing directory raises FileNotFoundError. 2948 """ 2949 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2950 with self.assertRaises(FileNotFoundError): 2951 zf.joinpath('z').open() 2952 2953 @pass_alpharep 2954 def test_read(self, alpharep): 2955 root = zipfile.Path(alpharep) 2956 a, b, g = root.iterdir() 2957 assert a.read_text(encoding="utf-8") == "content of a" 2958 assert a.read_bytes() == b"content of a" 2959 2960 @pass_alpharep 2961 def test_joinpath(self, alpharep): 2962 root = zipfile.Path(alpharep) 2963 a = root.joinpath("a.txt") 2964 assert a.is_file() 2965 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2966 assert e.read_text(encoding="utf-8") == "content of e" 2967 2968 @pass_alpharep 2969 def test_joinpath_multiple(self, alpharep): 2970 root = zipfile.Path(alpharep) 2971 e = root.joinpath("b", "d", "e.txt") 2972 assert e.read_text(encoding="utf-8") == "content of e" 2973 2974 @pass_alpharep 2975 def test_traverse_truediv(self, alpharep): 2976 root = zipfile.Path(alpharep) 2977 a = root / "a.txt" 2978 assert a.is_file() 2979 e = root / "b" / "d" / "e.txt" 2980 assert e.read_text(encoding="utf-8") == "content of e" 2981 2982 @pass_alpharep 2983 def test_traverse_simplediv(self, alpharep): 2984 """ 2985 Disable the __future__.division when testing traversal. 2986 """ 2987 code = compile( 2988 source="zipfile.Path(alpharep) / 'a'", 2989 filename="(test)", 2990 mode="eval", 2991 dont_inherit=True, 2992 ) 2993 eval(code) 2994 2995 @pass_alpharep 2996 def test_pathlike_construction(self, alpharep): 2997 """ 2998 zipfile.Path should be constructable from a path-like object 2999 """ 3000 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3001 pathlike = pathlib.Path(str(zipfile_ondisk)) 3002 zipfile.Path(pathlike) 3003 3004 @pass_alpharep 3005 def test_traverse_pathlike(self, alpharep): 3006 root = zipfile.Path(alpharep) 3007 root / pathlib.Path("a") 3008 3009 @pass_alpharep 3010 def test_parent(self, alpharep): 3011 root = zipfile.Path(alpharep) 3012 assert (root / 'a').parent.at == '' 3013 assert (root / 'a' / 'b').parent.at == 'a/' 3014 3015 @pass_alpharep 3016 def test_dir_parent(self, alpharep): 3017 root = zipfile.Path(alpharep) 3018 assert (root / 'b').parent.at == '' 3019 assert (root / 'b/').parent.at == '' 3020 3021 @pass_alpharep 3022 def test_missing_dir_parent(self, alpharep): 3023 root = zipfile.Path(alpharep) 3024 assert (root / 'missing dir/').parent.at == '' 3025 3026 @pass_alpharep 3027 def test_mutability(self, alpharep): 3028 """ 3029 If the underlying zipfile is changed, the Path object should 3030 reflect that change. 3031 """ 3032 root = zipfile.Path(alpharep) 3033 a, b, g = root.iterdir() 3034 alpharep.writestr('foo.txt', 'foo') 3035 alpharep.writestr('bar/baz.txt', 'baz') 3036 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3037 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3038 (baz,) = (root / 'bar').iterdir() 3039 assert baz.read_text(encoding="utf-8") == 'baz' 3040 3041 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3042 3043 def huge_zipfile(self): 3044 """Create a read-only zipfile with a huge number of entries entries.""" 3045 strm = io.BytesIO() 3046 zf = zipfile.ZipFile(strm, "w") 3047 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3048 zf.writestr(entry, entry) 3049 zf.mode = 'r' 3050 return zf 3051 3052 def test_joinpath_constant_time(self): 3053 """ 3054 Ensure joinpath on items in zipfile is linear time. 3055 """ 3056 root = zipfile.Path(self.huge_zipfile()) 3057 entries = jaraco.itertools.Counter(root.iterdir()) 3058 for entry in entries: 3059 entry.joinpath('suffix') 3060 # Check the file iterated all items 3061 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3062 3063 # @func_timeout.func_set_timeout(3) 3064 def test_implied_dirs_performance(self): 3065 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3066 zipfile.CompleteDirs._implied_dirs(data) 3067 3068 @pass_alpharep 3069 def test_read_does_not_close(self, alpharep): 3070 alpharep = self.zipfile_ondisk(alpharep) 3071 with zipfile.ZipFile(alpharep) as file: 3072 for rep in range(2): 3073 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3074 3075 @pass_alpharep 3076 def test_subclass(self, alpharep): 3077 class Subclass(zipfile.Path): 3078 pass 3079 3080 root = Subclass(alpharep) 3081 assert isinstance(root / 'b', Subclass) 3082 3083 @pass_alpharep 3084 def test_filename(self, alpharep): 3085 root = zipfile.Path(alpharep) 3086 assert root.filename == pathlib.Path('alpharep.zip') 3087 3088 @pass_alpharep 3089 def test_root_name(self, alpharep): 3090 """ 3091 The name of the root should be the name of the zipfile 3092 """ 3093 root = zipfile.Path(alpharep) 3094 assert root.name == 'alpharep.zip' == root.filename.name 3095 3096 @pass_alpharep 3097 def test_root_parent(self, alpharep): 3098 root = zipfile.Path(alpharep) 3099 assert root.parent == pathlib.Path('.') 3100 root.root.filename = 'foo/bar.zip' 3101 assert root.parent == pathlib.Path('foo') 3102 3103 @pass_alpharep 3104 def test_root_unnamed(self, alpharep): 3105 """ 3106 It is an error to attempt to get the name 3107 or parent of an unnamed zipfile. 3108 """ 3109 alpharep.filename = None 3110 root = zipfile.Path(alpharep) 3111 with self.assertRaises(TypeError): 3112 root.name 3113 with self.assertRaises(TypeError): 3114 root.parent 3115 3116 # .name and .parent should still work on subs 3117 sub = root / "b" 3118 assert sub.name == "b" 3119 assert sub.parent 3120 3121 @pass_alpharep 3122 def test_inheritance(self, alpharep): 3123 cls = type('PathChild', (zipfile.Path,), {}) 3124 for alpharep in self.zipfile_alpharep(): 3125 file = cls(alpharep).joinpath('some dir').parent 3126 assert isinstance(file, cls) 3127 3128 3129if __name__ == "__main__": 3130 unittest.main() 3131