1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16import functools 17 18 19from tempfile import TemporaryFile 20from random import randint, random, randbytes 21 22from test.support import script_helper 23from test.support import (findfile, requires_zlib, requires_bz2, 24 requires_lzma, captured_stdout) 25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd 26 27 28TESTFN2 = TESTFN + "2" 29TESTFNDIR = TESTFN + "d" 30FIXEDTEST_SIZE = 1000 31DATAFILES_DIR = 'zipfile_datafiles' 32 33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 34 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 35 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 36 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 37 38def get_files(test): 39 yield TESTFN2 40 with TemporaryFile() as f: 41 yield f 42 test.assertFalse(f.closed) 43 with io.BytesIO() as f: 44 yield f 45 test.assertFalse(f.closed) 46 47class AbstractTestsWithSourceFile: 48 @classmethod 49 def setUpClass(cls): 50 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 51 (i, random()), "ascii") 52 for i in range(FIXEDTEST_SIZE)] 53 cls.data = b''.join(cls.line_gen) 54 55 def setUp(self): 56 # Make a source file with some lines 57 with open(TESTFN, "wb") as fp: 58 fp.write(self.data) 59 60 def make_test_archive(self, f, compression, compresslevel=None): 61 kwargs = {'compression': compression, 'compresslevel': compresslevel} 62 # Create the ZIP archive 63 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 64 zipfp.write(TESTFN, "another.name") 65 zipfp.write(TESTFN, TESTFN) 66 zipfp.writestr("strfile", self.data) 67 with zipfp.open('written-open-w', mode='w') as f: 68 for line in self.line_gen: 69 f.write(line) 70 71 def zip_test(self, f, compression, compresslevel=None): 72 self.make_test_archive(f, compression, compresslevel) 73 74 # Read the ZIP archive 75 with zipfile.ZipFile(f, "r", compression) as zipfp: 76 self.assertEqual(zipfp.read(TESTFN), self.data) 77 self.assertEqual(zipfp.read("another.name"), self.data) 78 self.assertEqual(zipfp.read("strfile"), self.data) 79 80 # Print the ZIP directory 81 fp = io.StringIO() 82 zipfp.printdir(file=fp) 83 directory = fp.getvalue() 84 lines = directory.splitlines() 85 self.assertEqual(len(lines), 5) # Number of files + header 86 87 self.assertIn('File Name', lines[0]) 88 self.assertIn('Modified', lines[0]) 89 self.assertIn('Size', lines[0]) 90 91 fn, date, time_, size = lines[1].split() 92 self.assertEqual(fn, 'another.name') 93 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 94 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 95 self.assertEqual(size, str(len(self.data))) 96 97 # Check the namelist 98 names = zipfp.namelist() 99 self.assertEqual(len(names), 4) 100 self.assertIn(TESTFN, names) 101 self.assertIn("another.name", names) 102 self.assertIn("strfile", names) 103 self.assertIn("written-open-w", names) 104 105 # Check infolist 106 infos = zipfp.infolist() 107 names = [i.filename for i in infos] 108 self.assertEqual(len(names), 4) 109 self.assertIn(TESTFN, names) 110 self.assertIn("another.name", names) 111 self.assertIn("strfile", names) 112 self.assertIn("written-open-w", names) 113 for i in infos: 114 self.assertEqual(i.file_size, len(self.data)) 115 116 # check getinfo 117 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 118 info = zipfp.getinfo(nm) 119 self.assertEqual(info.filename, nm) 120 self.assertEqual(info.file_size, len(self.data)) 121 122 # Check that testzip doesn't raise an exception 123 zipfp.testzip() 124 125 def test_basic(self): 126 for f in get_files(self): 127 self.zip_test(f, self.compression) 128 129 def zip_open_test(self, f, compression): 130 self.make_test_archive(f, compression) 131 132 # Read the ZIP archive 133 with zipfile.ZipFile(f, "r", compression) as zipfp: 134 zipdata1 = [] 135 with zipfp.open(TESTFN) as zipopen1: 136 while True: 137 read_data = zipopen1.read(256) 138 if not read_data: 139 break 140 zipdata1.append(read_data) 141 142 zipdata2 = [] 143 with zipfp.open("another.name") as zipopen2: 144 while True: 145 read_data = zipopen2.read(256) 146 if not read_data: 147 break 148 zipdata2.append(read_data) 149 150 self.assertEqual(b''.join(zipdata1), self.data) 151 self.assertEqual(b''.join(zipdata2), self.data) 152 153 def test_open(self): 154 for f in get_files(self): 155 self.zip_open_test(f, self.compression) 156 157 def test_open_with_pathlike(self): 158 path = pathlib.Path(TESTFN2) 159 self.zip_open_test(path, self.compression) 160 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 161 self.assertIsInstance(zipfp.filename, str) 162 163 def zip_random_open_test(self, f, compression): 164 self.make_test_archive(f, compression) 165 166 # Read the ZIP archive 167 with zipfile.ZipFile(f, "r", compression) as zipfp: 168 zipdata1 = [] 169 with zipfp.open(TESTFN) as zipopen1: 170 while True: 171 read_data = zipopen1.read(randint(1, 1024)) 172 if not read_data: 173 break 174 zipdata1.append(read_data) 175 176 self.assertEqual(b''.join(zipdata1), self.data) 177 178 def test_random_open(self): 179 for f in get_files(self): 180 self.zip_random_open_test(f, self.compression) 181 182 def zip_read1_test(self, f, compression): 183 self.make_test_archive(f, compression) 184 185 # Read the ZIP archive 186 with zipfile.ZipFile(f, "r") as zipfp, \ 187 zipfp.open(TESTFN) as zipopen: 188 zipdata = [] 189 while True: 190 read_data = zipopen.read1(-1) 191 if not read_data: 192 break 193 zipdata.append(read_data) 194 195 self.assertEqual(b''.join(zipdata), self.data) 196 197 def test_read1(self): 198 for f in get_files(self): 199 self.zip_read1_test(f, self.compression) 200 201 def zip_read1_10_test(self, f, compression): 202 self.make_test_archive(f, compression) 203 204 # Read the ZIP archive 205 with zipfile.ZipFile(f, "r") as zipfp, \ 206 zipfp.open(TESTFN) as zipopen: 207 zipdata = [] 208 while True: 209 read_data = zipopen.read1(10) 210 self.assertLessEqual(len(read_data), 10) 211 if not read_data: 212 break 213 zipdata.append(read_data) 214 215 self.assertEqual(b''.join(zipdata), self.data) 216 217 def test_read1_10(self): 218 for f in get_files(self): 219 self.zip_read1_10_test(f, self.compression) 220 221 def zip_readline_read_test(self, f, compression): 222 self.make_test_archive(f, compression) 223 224 # Read the ZIP archive 225 with zipfile.ZipFile(f, "r") as zipfp, \ 226 zipfp.open(TESTFN) as zipopen: 227 data = b'' 228 while True: 229 read = zipopen.readline() 230 if not read: 231 break 232 data += read 233 234 read = zipopen.read(100) 235 if not read: 236 break 237 data += read 238 239 self.assertEqual(data, self.data) 240 241 def test_readline_read(self): 242 # Issue #7610: calls to readline() interleaved with calls to read(). 243 for f in get_files(self): 244 self.zip_readline_read_test(f, self.compression) 245 246 def zip_readline_test(self, f, compression): 247 self.make_test_archive(f, compression) 248 249 # Read the ZIP archive 250 with zipfile.ZipFile(f, "r") as zipfp: 251 with zipfp.open(TESTFN) as zipopen: 252 for line in self.line_gen: 253 linedata = zipopen.readline() 254 self.assertEqual(linedata, line) 255 256 def test_readline(self): 257 for f in get_files(self): 258 self.zip_readline_test(f, self.compression) 259 260 def zip_readlines_test(self, f, compression): 261 self.make_test_archive(f, compression) 262 263 # Read the ZIP archive 264 with zipfile.ZipFile(f, "r") as zipfp: 265 with zipfp.open(TESTFN) as zipopen: 266 ziplines = zipopen.readlines() 267 for line, zipline in zip(self.line_gen, ziplines): 268 self.assertEqual(zipline, line) 269 270 def test_readlines(self): 271 for f in get_files(self): 272 self.zip_readlines_test(f, self.compression) 273 274 def zip_iterlines_test(self, f, compression): 275 self.make_test_archive(f, compression) 276 277 # Read the ZIP archive 278 with zipfile.ZipFile(f, "r") as zipfp: 279 with zipfp.open(TESTFN) as zipopen: 280 for line, zipline in zip(self.line_gen, zipopen): 281 self.assertEqual(zipline, line) 282 283 def test_iterlines(self): 284 for f in get_files(self): 285 self.zip_iterlines_test(f, self.compression) 286 287 def test_low_compression(self): 288 """Check for cases where compressed data is larger than original.""" 289 # Create the ZIP archive 290 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 291 zipfp.writestr("strfile", '12') 292 293 # Get an open object for strfile 294 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 295 with zipfp.open("strfile") as openobj: 296 self.assertEqual(openobj.read(1), b'1') 297 self.assertEqual(openobj.read(1), b'2') 298 299 def test_writestr_compression(self): 300 zipfp = zipfile.ZipFile(TESTFN2, "w") 301 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 302 info = zipfp.getinfo('b.txt') 303 self.assertEqual(info.compress_type, self.compression) 304 305 def test_writestr_compresslevel(self): 306 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 307 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 308 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 309 compresslevel=2) 310 311 # Compression level follows the constructor. 312 a_info = zipfp.getinfo('a.txt') 313 self.assertEqual(a_info.compress_type, self.compression) 314 self.assertEqual(a_info._compresslevel, 1) 315 316 # Compression level is overridden. 317 b_info = zipfp.getinfo('b.txt') 318 self.assertEqual(b_info.compress_type, self.compression) 319 self.assertEqual(b_info._compresslevel, 2) 320 321 def test_read_return_size(self): 322 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 323 # than requested. 324 for test_size in (1, 4095, 4096, 4097, 16384): 325 file_size = test_size + 1 326 junk = randbytes(file_size) 327 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 328 zipf.writestr('foo', junk) 329 with zipf.open('foo', 'r') as fp: 330 buf = fp.read(test_size) 331 self.assertEqual(len(buf), test_size) 332 333 def test_truncated_zipfile(self): 334 fp = io.BytesIO() 335 with zipfile.ZipFile(fp, mode='w') as zipf: 336 zipf.writestr('strfile', self.data, compress_type=self.compression) 337 end_offset = fp.tell() 338 zipfiledata = fp.getvalue() 339 340 fp = io.BytesIO(zipfiledata) 341 with zipfile.ZipFile(fp) as zipf: 342 with zipf.open('strfile') as zipopen: 343 fp.truncate(end_offset - 20) 344 with self.assertRaises(EOFError): 345 zipopen.read() 346 347 fp = io.BytesIO(zipfiledata) 348 with zipfile.ZipFile(fp) as zipf: 349 with zipf.open('strfile') as zipopen: 350 fp.truncate(end_offset - 20) 351 with self.assertRaises(EOFError): 352 while zipopen.read(100): 353 pass 354 355 fp = io.BytesIO(zipfiledata) 356 with zipfile.ZipFile(fp) as zipf: 357 with zipf.open('strfile') as zipopen: 358 fp.truncate(end_offset - 20) 359 with self.assertRaises(EOFError): 360 while zipopen.read1(100): 361 pass 362 363 def test_repr(self): 364 fname = 'file.name' 365 for f in get_files(self): 366 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 367 zipfp.write(TESTFN, fname) 368 r = repr(zipfp) 369 self.assertIn("mode='w'", r) 370 371 with zipfile.ZipFile(f, 'r') as zipfp: 372 r = repr(zipfp) 373 if isinstance(f, str): 374 self.assertIn('filename=%r' % f, r) 375 else: 376 self.assertIn('file=%r' % f, r) 377 self.assertIn("mode='r'", r) 378 r = repr(zipfp.getinfo(fname)) 379 self.assertIn('filename=%r' % fname, r) 380 self.assertIn('filemode=', r) 381 self.assertIn('file_size=', r) 382 if self.compression != zipfile.ZIP_STORED: 383 self.assertIn('compress_type=', r) 384 self.assertIn('compress_size=', r) 385 with zipfp.open(fname) as zipopen: 386 r = repr(zipopen) 387 self.assertIn('name=%r' % fname, r) 388 self.assertIn("mode='r'", r) 389 if self.compression != zipfile.ZIP_STORED: 390 self.assertIn('compress_type=', r) 391 self.assertIn('[closed]', repr(zipopen)) 392 self.assertIn('[closed]', repr(zipfp)) 393 394 def test_compresslevel_basic(self): 395 for f in get_files(self): 396 self.zip_test(f, self.compression, compresslevel=9) 397 398 def test_per_file_compresslevel(self): 399 """Check that files within a Zip archive can have different 400 compression levels.""" 401 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 402 zipfp.write(TESTFN, 'compress_1') 403 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 404 one_info = zipfp.getinfo('compress_1') 405 nine_info = zipfp.getinfo('compress_9') 406 self.assertEqual(one_info._compresslevel, 1) 407 self.assertEqual(nine_info._compresslevel, 9) 408 409 def test_writing_errors(self): 410 class BrokenFile(io.BytesIO): 411 def write(self, data): 412 nonlocal count 413 if count is not None: 414 if count == stop: 415 raise OSError 416 count += 1 417 super().write(data) 418 419 stop = 0 420 while True: 421 testfile = BrokenFile() 422 count = None 423 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 424 with zipfp.open('file1', 'w') as f: 425 f.write(b'data1') 426 count = 0 427 try: 428 with zipfp.open('file2', 'w') as f: 429 f.write(b'data2') 430 except OSError: 431 stop += 1 432 else: 433 break 434 finally: 435 count = None 436 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 437 self.assertEqual(zipfp.namelist(), ['file1']) 438 self.assertEqual(zipfp.read('file1'), b'data1') 439 440 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 441 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 442 self.assertEqual(zipfp.read('file1'), b'data1') 443 self.assertEqual(zipfp.read('file2'), b'data2') 444 445 446 def tearDown(self): 447 unlink(TESTFN) 448 unlink(TESTFN2) 449 450 451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 452 unittest.TestCase): 453 compression = zipfile.ZIP_STORED 454 test_low_compression = None 455 456 def zip_test_writestr_permissions(self, f, compression): 457 # Make sure that writestr and open(... mode='w') create files with 458 # mode 0600, when they are passed a name rather than a ZipInfo 459 # instance. 460 461 self.make_test_archive(f, compression) 462 with zipfile.ZipFile(f, "r") as zipfp: 463 zinfo = zipfp.getinfo('strfile') 464 self.assertEqual(zinfo.external_attr, 0o600 << 16) 465 466 zinfo2 = zipfp.getinfo('written-open-w') 467 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 468 469 def test_writestr_permissions(self): 470 for f in get_files(self): 471 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 472 473 def test_absolute_arcnames(self): 474 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 475 zipfp.write(TESTFN, "/absolute") 476 477 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 478 self.assertEqual(zipfp.namelist(), ["absolute"]) 479 480 def test_append_to_zip_file(self): 481 """Test appending to an existing zipfile.""" 482 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 483 zipfp.write(TESTFN, TESTFN) 484 485 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 486 zipfp.writestr("strfile", self.data) 487 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 488 489 def test_append_to_non_zip_file(self): 490 """Test appending to an existing file that is not a zipfile.""" 491 # NOTE: this test fails if len(d) < 22 because of the first 492 # line "fpin.seek(-22, 2)" in _EndRecData 493 data = b'I am not a ZipFile!'*10 494 with open(TESTFN2, 'wb') as f: 495 f.write(data) 496 497 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 498 zipfp.write(TESTFN, TESTFN) 499 500 with open(TESTFN2, 'rb') as f: 501 f.seek(len(data)) 502 with zipfile.ZipFile(f, "r") as zipfp: 503 self.assertEqual(zipfp.namelist(), [TESTFN]) 504 self.assertEqual(zipfp.read(TESTFN), self.data) 505 with open(TESTFN2, 'rb') as f: 506 self.assertEqual(f.read(len(data)), data) 507 zipfiledata = f.read() 508 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 509 self.assertEqual(zipfp.namelist(), [TESTFN]) 510 self.assertEqual(zipfp.read(TESTFN), self.data) 511 512 def test_read_concatenated_zip_file(self): 513 with io.BytesIO() as bio: 514 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 515 zipfp.write(TESTFN, TESTFN) 516 zipfiledata = bio.getvalue() 517 data = b'I am not a ZipFile!'*10 518 with open(TESTFN2, 'wb') as f: 519 f.write(data) 520 f.write(zipfiledata) 521 522 with zipfile.ZipFile(TESTFN2) as zipfp: 523 self.assertEqual(zipfp.namelist(), [TESTFN]) 524 self.assertEqual(zipfp.read(TESTFN), self.data) 525 526 def test_append_to_concatenated_zip_file(self): 527 with io.BytesIO() as bio: 528 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 529 zipfp.write(TESTFN, TESTFN) 530 zipfiledata = bio.getvalue() 531 data = b'I am not a ZipFile!'*1000000 532 with open(TESTFN2, 'wb') as f: 533 f.write(data) 534 f.write(zipfiledata) 535 536 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 537 self.assertEqual(zipfp.namelist(), [TESTFN]) 538 zipfp.writestr('strfile', self.data) 539 540 with open(TESTFN2, 'rb') as f: 541 self.assertEqual(f.read(len(data)), data) 542 zipfiledata = f.read() 543 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 544 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 545 self.assertEqual(zipfp.read(TESTFN), self.data) 546 self.assertEqual(zipfp.read('strfile'), self.data) 547 548 def test_ignores_newline_at_end(self): 549 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 550 zipfp.write(TESTFN, TESTFN) 551 with open(TESTFN2, 'a', encoding='utf-8') as f: 552 f.write("\r\n\00\00\00") 553 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 554 self.assertIsInstance(zipfp, zipfile.ZipFile) 555 556 def test_ignores_stuff_appended_past_comments(self): 557 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 558 zipfp.comment = b"this is a comment" 559 zipfp.write(TESTFN, TESTFN) 560 with open(TESTFN2, 'a', encoding='utf-8') as f: 561 f.write("abcdef\r\n") 562 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 563 self.assertIsInstance(zipfp, zipfile.ZipFile) 564 self.assertEqual(zipfp.comment, b"this is a comment") 565 566 def test_write_default_name(self): 567 """Check that calling ZipFile.write without arcname specified 568 produces the expected result.""" 569 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 570 zipfp.write(TESTFN) 571 with open(TESTFN, "rb") as f: 572 self.assertEqual(zipfp.read(TESTFN), f.read()) 573 574 def test_io_on_closed_zipextfile(self): 575 fname = "somefile.txt" 576 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 577 zipfp.writestr(fname, "bogus") 578 579 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 580 with zipfp.open(fname) as fid: 581 fid.close() 582 self.assertRaises(ValueError, fid.read) 583 self.assertRaises(ValueError, fid.seek, 0) 584 self.assertRaises(ValueError, fid.tell) 585 self.assertRaises(ValueError, fid.readable) 586 self.assertRaises(ValueError, fid.seekable) 587 588 def test_write_to_readonly(self): 589 """Check that trying to call write() on a readonly ZipFile object 590 raises a ValueError.""" 591 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 592 zipfp.writestr("somefile.txt", "bogus") 593 594 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 595 self.assertRaises(ValueError, zipfp.write, TESTFN) 596 597 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 598 with self.assertRaises(ValueError): 599 zipfp.open(TESTFN, mode='w') 600 601 def test_add_file_before_1980(self): 602 # Set atime and mtime to 1970-01-01 603 os.utime(TESTFN, (0, 0)) 604 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 605 self.assertRaises(ValueError, zipfp.write, TESTFN) 606 607 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 608 zipfp.write(TESTFN) 609 zinfo = zipfp.getinfo(TESTFN) 610 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 611 612 def test_add_file_after_2107(self): 613 # Set atime and mtime to 2108-12-30 614 ts = 4386268800 615 try: 616 time.localtime(ts) 617 except OverflowError: 618 self.skipTest(f'time.localtime({ts}) raises OverflowError') 619 try: 620 os.utime(TESTFN, (ts, ts)) 621 except OverflowError: 622 self.skipTest('Host fs cannot set timestamp to required value.') 623 624 mtime_ns = os.stat(TESTFN).st_mtime_ns 625 if mtime_ns != (4386268800 * 10**9): 626 # XFS filesystem is limited to 32-bit timestamp, but the syscall 627 # didn't fail. Moreover, there is a VFS bug which returns 628 # a cached timestamp which is different than the value on disk. 629 # 630 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 631 # 632 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 633 # https://bugs.python.org/issue39460#msg360952 634 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 635 636 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 637 self.assertRaises(struct.error, zipfp.write, TESTFN) 638 639 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 640 zipfp.write(TESTFN) 641 zinfo = zipfp.getinfo(TESTFN) 642 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 643 644 645@requires_zlib() 646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 647 unittest.TestCase): 648 compression = zipfile.ZIP_DEFLATED 649 650 def test_per_file_compression(self): 651 """Check that files within a Zip archive can have different 652 compression options.""" 653 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 654 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 655 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 656 sinfo = zipfp.getinfo('storeme') 657 dinfo = zipfp.getinfo('deflateme') 658 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 659 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 660 661@requires_bz2() 662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 663 unittest.TestCase): 664 compression = zipfile.ZIP_BZIP2 665 666@requires_lzma() 667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 668 unittest.TestCase): 669 compression = zipfile.ZIP_LZMA 670 671 672class AbstractTestZip64InSmallFiles: 673 # These tests test the ZIP64 functionality without using large files, 674 # see test_zipfile64 for proper tests. 675 676 @classmethod 677 def setUpClass(cls): 678 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 679 for i in range(0, FIXEDTEST_SIZE)) 680 cls.data = b'\n'.join(line_gen) 681 682 def setUp(self): 683 self._limit = zipfile.ZIP64_LIMIT 684 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 685 zipfile.ZIP64_LIMIT = 1000 686 zipfile.ZIP_FILECOUNT_LIMIT = 9 687 688 # Make a source file with some lines 689 with open(TESTFN, "wb") as fp: 690 fp.write(self.data) 691 692 def zip_test(self, f, compression): 693 # Create the ZIP archive 694 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 695 zipfp.write(TESTFN, "another.name") 696 zipfp.write(TESTFN, TESTFN) 697 zipfp.writestr("strfile", self.data) 698 699 # Read the ZIP archive 700 with zipfile.ZipFile(f, "r", compression) as zipfp: 701 self.assertEqual(zipfp.read(TESTFN), self.data) 702 self.assertEqual(zipfp.read("another.name"), self.data) 703 self.assertEqual(zipfp.read("strfile"), self.data) 704 705 # Print the ZIP directory 706 fp = io.StringIO() 707 zipfp.printdir(fp) 708 709 directory = fp.getvalue() 710 lines = directory.splitlines() 711 self.assertEqual(len(lines), 4) # Number of files + header 712 713 self.assertIn('File Name', lines[0]) 714 self.assertIn('Modified', lines[0]) 715 self.assertIn('Size', lines[0]) 716 717 fn, date, time_, size = lines[1].split() 718 self.assertEqual(fn, 'another.name') 719 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 720 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 721 self.assertEqual(size, str(len(self.data))) 722 723 # Check the namelist 724 names = zipfp.namelist() 725 self.assertEqual(len(names), 3) 726 self.assertIn(TESTFN, names) 727 self.assertIn("another.name", names) 728 self.assertIn("strfile", names) 729 730 # Check infolist 731 infos = zipfp.infolist() 732 names = [i.filename for i in infos] 733 self.assertEqual(len(names), 3) 734 self.assertIn(TESTFN, names) 735 self.assertIn("another.name", names) 736 self.assertIn("strfile", names) 737 for i in infos: 738 self.assertEqual(i.file_size, len(self.data)) 739 740 # check getinfo 741 for nm in (TESTFN, "another.name", "strfile"): 742 info = zipfp.getinfo(nm) 743 self.assertEqual(info.filename, nm) 744 self.assertEqual(info.file_size, len(self.data)) 745 746 # Check that testzip doesn't raise an exception 747 zipfp.testzip() 748 749 def test_basic(self): 750 for f in get_files(self): 751 self.zip_test(f, self.compression) 752 753 def test_too_many_files(self): 754 # This test checks that more than 64k files can be added to an archive, 755 # and that the resulting archive can be read properly by ZipFile 756 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 757 allowZip64=True) 758 zipf.debug = 100 759 numfiles = 15 760 for i in range(numfiles): 761 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 762 self.assertEqual(len(zipf.namelist()), numfiles) 763 zipf.close() 764 765 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 766 self.assertEqual(len(zipf2.namelist()), numfiles) 767 for i in range(numfiles): 768 content = zipf2.read("foo%08d" % i).decode('ascii') 769 self.assertEqual(content, "%d" % (i**3 % 57)) 770 zipf2.close() 771 772 def test_too_many_files_append(self): 773 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 774 allowZip64=False) 775 zipf.debug = 100 776 numfiles = 9 777 for i in range(numfiles): 778 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 779 self.assertEqual(len(zipf.namelist()), numfiles) 780 with self.assertRaises(zipfile.LargeZipFile): 781 zipf.writestr("foo%08d" % numfiles, b'') 782 self.assertEqual(len(zipf.namelist()), numfiles) 783 zipf.close() 784 785 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 786 allowZip64=False) 787 zipf.debug = 100 788 self.assertEqual(len(zipf.namelist()), numfiles) 789 with self.assertRaises(zipfile.LargeZipFile): 790 zipf.writestr("foo%08d" % numfiles, b'') 791 self.assertEqual(len(zipf.namelist()), numfiles) 792 zipf.close() 793 794 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 795 allowZip64=True) 796 zipf.debug = 100 797 self.assertEqual(len(zipf.namelist()), numfiles) 798 numfiles2 = 15 799 for i in range(numfiles, numfiles2): 800 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 801 self.assertEqual(len(zipf.namelist()), numfiles2) 802 zipf.close() 803 804 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 805 self.assertEqual(len(zipf2.namelist()), numfiles2) 806 for i in range(numfiles2): 807 content = zipf2.read("foo%08d" % i).decode('ascii') 808 self.assertEqual(content, "%d" % (i**3 % 57)) 809 zipf2.close() 810 811 def tearDown(self): 812 zipfile.ZIP64_LIMIT = self._limit 813 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 814 unlink(TESTFN) 815 unlink(TESTFN2) 816 817 818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 819 unittest.TestCase): 820 compression = zipfile.ZIP_STORED 821 822 def large_file_exception_test(self, f, compression): 823 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 824 self.assertRaises(zipfile.LargeZipFile, 825 zipfp.write, TESTFN, "another.name") 826 827 def large_file_exception_test2(self, f, compression): 828 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 829 self.assertRaises(zipfile.LargeZipFile, 830 zipfp.writestr, "another.name", self.data) 831 832 def test_large_file_exception(self): 833 for f in get_files(self): 834 self.large_file_exception_test(f, zipfile.ZIP_STORED) 835 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 836 837 def test_absolute_arcnames(self): 838 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 839 allowZip64=True) as zipfp: 840 zipfp.write(TESTFN, "/absolute") 841 842 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 843 self.assertEqual(zipfp.namelist(), ["absolute"]) 844 845 def test_append(self): 846 # Test that appending to the Zip64 archive doesn't change 847 # extra fields of existing entries. 848 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 849 zipfp.writestr("strfile", self.data) 850 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 851 zinfo = zipfp.getinfo("strfile") 852 extra = zinfo.extra 853 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 854 zipfp.writestr("strfile2", self.data) 855 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 856 zinfo = zipfp.getinfo("strfile") 857 self.assertEqual(zinfo.extra, extra) 858 859 def make_zip64_file( 860 self, file_size_64_set=False, file_size_extra=False, 861 compress_size_64_set=False, compress_size_extra=False, 862 header_offset_64_set=False, header_offset_extra=False, 863 ): 864 """Generate bytes sequence for a zip with (incomplete) zip64 data. 865 866 The actual values (not the zip 64 0xffffffff values) stored in the file 867 are: 868 file_size: 8 869 compress_size: 8 870 header_offset: 0 871 """ 872 actual_size = 8 873 actual_header_offset = 0 874 local_zip64_fields = [] 875 central_zip64_fields = [] 876 877 file_size = actual_size 878 if file_size_64_set: 879 file_size = 0xffffffff 880 if file_size_extra: 881 local_zip64_fields.append(actual_size) 882 central_zip64_fields.append(actual_size) 883 file_size = struct.pack("<L", file_size) 884 885 compress_size = actual_size 886 if compress_size_64_set: 887 compress_size = 0xffffffff 888 if compress_size_extra: 889 local_zip64_fields.append(actual_size) 890 central_zip64_fields.append(actual_size) 891 compress_size = struct.pack("<L", compress_size) 892 893 header_offset = actual_header_offset 894 if header_offset_64_set: 895 header_offset = 0xffffffff 896 if header_offset_extra: 897 central_zip64_fields.append(actual_header_offset) 898 header_offset = struct.pack("<L", header_offset) 899 900 local_extra = struct.pack( 901 '<HH' + 'Q'*len(local_zip64_fields), 902 0x0001, 903 8*len(local_zip64_fields), 904 *local_zip64_fields 905 ) 906 907 central_extra = struct.pack( 908 '<HH' + 'Q'*len(central_zip64_fields), 909 0x0001, 910 8*len(central_zip64_fields), 911 *central_zip64_fields 912 ) 913 914 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 915 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 916 917 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 918 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 919 920 filename = b"test.txt" 921 content = b"test1234" 922 filename_length = struct.pack("<H", len(filename)) 923 zip64_contents = ( 924 # Local file header 925 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 926 + compress_size 927 + file_size 928 + filename_length 929 + local_extra_length 930 + filename 931 + local_extra 932 + content 933 # Central directory: 934 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 935 + compress_size 936 + file_size 937 + filename_length 938 + central_extra_length 939 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 940 + header_offset 941 + filename 942 + central_extra 943 # Zip64 end of central directory 944 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 945 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 946 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 947 + central_dir_size 948 + offset_to_central_dir 949 # Zip64 end of central directory locator 950 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 951 + b"\x00\x00\x00" 952 # end of central directory 953 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 954 + b"\x00\x00\x00\x00" 955 ) 956 return zip64_contents 957 958 def test_bad_zip64_extra(self): 959 """Missing zip64 extra records raises an exception. 960 961 There are 4 fields that the zip64 format handles (the disk number is 962 not used in this module and so is ignored here). According to the zip 963 spec: 964 The order of the fields in the zip64 extended 965 information record is fixed, but the fields MUST 966 only appear if the corresponding Local or Central 967 directory record field is set to 0xFFFF or 0xFFFFFFFF. 968 969 If the zip64 extra content doesn't contain enough entries for the 970 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 971 This test mismatches the length of the zip64 extra field and the number 972 of fields set to indicate the presence of zip64 data. 973 """ 974 # zip64 file size present, no fields in extra, expecting one, equals 975 # missing file size. 976 missing_file_size_extra = self.make_zip64_file( 977 file_size_64_set=True, 978 ) 979 with self.assertRaises(zipfile.BadZipFile) as e: 980 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 981 self.assertIn('file size', str(e.exception).lower()) 982 983 # zip64 file size present, zip64 compress size present, one field in 984 # extra, expecting two, equals missing compress size. 985 missing_compress_size_extra = self.make_zip64_file( 986 file_size_64_set=True, 987 file_size_extra=True, 988 compress_size_64_set=True, 989 ) 990 with self.assertRaises(zipfile.BadZipFile) as e: 991 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 992 self.assertIn('compress size', str(e.exception).lower()) 993 994 # zip64 compress size present, no fields in extra, expecting one, 995 # equals missing compress size. 996 missing_compress_size_extra = self.make_zip64_file( 997 compress_size_64_set=True, 998 ) 999 with self.assertRaises(zipfile.BadZipFile) as e: 1000 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1001 self.assertIn('compress size', str(e.exception).lower()) 1002 1003 # zip64 file size present, zip64 compress size present, zip64 header 1004 # offset present, two fields in extra, expecting three, equals missing 1005 # header offset 1006 missing_header_offset_extra = self.make_zip64_file( 1007 file_size_64_set=True, 1008 file_size_extra=True, 1009 compress_size_64_set=True, 1010 compress_size_extra=True, 1011 header_offset_64_set=True, 1012 ) 1013 with self.assertRaises(zipfile.BadZipFile) as e: 1014 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1015 self.assertIn('header offset', str(e.exception).lower()) 1016 1017 # zip64 compress size present, zip64 header offset present, one field 1018 # in extra, expecting two, equals missing header offset 1019 missing_header_offset_extra = self.make_zip64_file( 1020 file_size_64_set=False, 1021 compress_size_64_set=True, 1022 compress_size_extra=True, 1023 header_offset_64_set=True, 1024 ) 1025 with self.assertRaises(zipfile.BadZipFile) as e: 1026 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1027 self.assertIn('header offset', str(e.exception).lower()) 1028 1029 # zip64 file size present, zip64 header offset present, one field in 1030 # extra, expecting two, equals missing header offset 1031 missing_header_offset_extra = self.make_zip64_file( 1032 file_size_64_set=True, 1033 file_size_extra=True, 1034 compress_size_64_set=False, 1035 header_offset_64_set=True, 1036 ) 1037 with self.assertRaises(zipfile.BadZipFile) as e: 1038 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1039 self.assertIn('header offset', str(e.exception).lower()) 1040 1041 # zip64 header offset present, no fields in extra, expecting one, 1042 # equals missing header offset 1043 missing_header_offset_extra = self.make_zip64_file( 1044 file_size_64_set=False, 1045 compress_size_64_set=False, 1046 header_offset_64_set=True, 1047 ) 1048 with self.assertRaises(zipfile.BadZipFile) as e: 1049 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1050 self.assertIn('header offset', str(e.exception).lower()) 1051 1052 def test_generated_valid_zip64_extra(self): 1053 # These values are what is set in the make_zip64_file method. 1054 expected_file_size = 8 1055 expected_compress_size = 8 1056 expected_header_offset = 0 1057 expected_content = b"test1234" 1058 1059 # Loop through the various valid combinations of zip64 masks 1060 # present and extra fields present. 1061 params = ( 1062 {"file_size_64_set": True, "file_size_extra": True}, 1063 {"compress_size_64_set": True, "compress_size_extra": True}, 1064 {"header_offset_64_set": True, "header_offset_extra": True}, 1065 ) 1066 1067 for r in range(1, len(params) + 1): 1068 for combo in itertools.combinations(params, r): 1069 kwargs = {} 1070 for c in combo: 1071 kwargs.update(c) 1072 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1073 zinfo = zf.infolist()[0] 1074 self.assertEqual(zinfo.file_size, expected_file_size) 1075 self.assertEqual(zinfo.compress_size, expected_compress_size) 1076 self.assertEqual(zinfo.header_offset, expected_header_offset) 1077 self.assertEqual(zf.read(zinfo), expected_content) 1078 1079 1080@requires_zlib() 1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1082 unittest.TestCase): 1083 compression = zipfile.ZIP_DEFLATED 1084 1085@requires_bz2() 1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1087 unittest.TestCase): 1088 compression = zipfile.ZIP_BZIP2 1089 1090@requires_lzma() 1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1092 unittest.TestCase): 1093 compression = zipfile.ZIP_LZMA 1094 1095 1096class AbstractWriterTests: 1097 1098 def tearDown(self): 1099 unlink(TESTFN2) 1100 1101 def test_close_after_close(self): 1102 data = b'content' 1103 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1104 w = zipf.open('test', 'w') 1105 w.write(data) 1106 w.close() 1107 self.assertTrue(w.closed) 1108 w.close() 1109 self.assertTrue(w.closed) 1110 self.assertEqual(zipf.read('test'), data) 1111 1112 def test_write_after_close(self): 1113 data = b'content' 1114 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1115 w = zipf.open('test', 'w') 1116 w.write(data) 1117 w.close() 1118 self.assertTrue(w.closed) 1119 self.assertRaises(ValueError, w.write, b'') 1120 self.assertEqual(zipf.read('test'), data) 1121 1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1123 compression = zipfile.ZIP_STORED 1124 1125@requires_zlib() 1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1127 compression = zipfile.ZIP_DEFLATED 1128 1129@requires_bz2() 1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1131 compression = zipfile.ZIP_BZIP2 1132 1133@requires_lzma() 1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1135 compression = zipfile.ZIP_LZMA 1136 1137 1138class PyZipFileTests(unittest.TestCase): 1139 def assertCompiledIn(self, name, namelist): 1140 if name + 'o' not in namelist: 1141 self.assertIn(name + 'c', namelist) 1142 1143 def requiresWriteAccess(self, path): 1144 # effective_ids unavailable on windows 1145 if not os.access(path, os.W_OK, 1146 effective_ids=os.access in os.supports_effective_ids): 1147 self.skipTest('requires write access to the installed location') 1148 filename = os.path.join(path, 'test_zipfile.try') 1149 try: 1150 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1151 os.close(fd) 1152 except Exception: 1153 self.skipTest('requires write access to the installed location') 1154 unlink(filename) 1155 1156 def test_write_pyfile(self): 1157 self.requiresWriteAccess(os.path.dirname(__file__)) 1158 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1159 fn = __file__ 1160 if fn.endswith('.pyc'): 1161 path_split = fn.split(os.sep) 1162 if os.altsep is not None: 1163 path_split.extend(fn.split(os.altsep)) 1164 if '__pycache__' in path_split: 1165 fn = importlib.util.source_from_cache(fn) 1166 else: 1167 fn = fn[:-1] 1168 1169 zipfp.writepy(fn) 1170 1171 bn = os.path.basename(fn) 1172 self.assertNotIn(bn, zipfp.namelist()) 1173 self.assertCompiledIn(bn, zipfp.namelist()) 1174 1175 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1176 fn = __file__ 1177 if fn.endswith('.pyc'): 1178 fn = fn[:-1] 1179 1180 zipfp.writepy(fn, "testpackage") 1181 1182 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1183 self.assertNotIn(bn, zipfp.namelist()) 1184 self.assertCompiledIn(bn, zipfp.namelist()) 1185 1186 def test_write_python_package(self): 1187 import email 1188 packagedir = os.path.dirname(email.__file__) 1189 self.requiresWriteAccess(packagedir) 1190 1191 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1192 zipfp.writepy(packagedir) 1193 1194 # Check for a couple of modules at different levels of the 1195 # hierarchy 1196 names = zipfp.namelist() 1197 self.assertCompiledIn('email/__init__.py', names) 1198 self.assertCompiledIn('email/mime/text.py', names) 1199 1200 def test_write_filtered_python_package(self): 1201 import test 1202 packagedir = os.path.dirname(test.__file__) 1203 self.requiresWriteAccess(packagedir) 1204 1205 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1206 1207 # first make sure that the test folder gives error messages 1208 # (on the badsyntax_... files) 1209 with captured_stdout() as reportSIO: 1210 zipfp.writepy(packagedir) 1211 reportStr = reportSIO.getvalue() 1212 self.assertTrue('SyntaxError' in reportStr) 1213 1214 # then check that the filter works on the whole package 1215 with captured_stdout() as reportSIO: 1216 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1217 reportStr = reportSIO.getvalue() 1218 self.assertTrue('SyntaxError' not in reportStr) 1219 1220 # then check that the filter works on individual files 1221 def filter(path): 1222 return not os.path.basename(path).startswith("bad") 1223 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1224 zipfp.writepy(packagedir, filterfunc=filter) 1225 reportStr = reportSIO.getvalue() 1226 if reportStr: 1227 print(reportStr) 1228 self.assertTrue('SyntaxError' not in reportStr) 1229 1230 def test_write_with_optimization(self): 1231 import email 1232 packagedir = os.path.dirname(email.__file__) 1233 self.requiresWriteAccess(packagedir) 1234 optlevel = 1 if __debug__ else 0 1235 ext = '.pyc' 1236 1237 with TemporaryFile() as t, \ 1238 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1239 zipfp.writepy(packagedir) 1240 1241 names = zipfp.namelist() 1242 self.assertIn('email/__init__' + ext, names) 1243 self.assertIn('email/mime/text' + ext, names) 1244 1245 def test_write_python_directory(self): 1246 os.mkdir(TESTFN2) 1247 try: 1248 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1249 fp.write("print(42)\n") 1250 1251 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1252 fp.write("print(42 * 42)\n") 1253 1254 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1255 fp.write("bla bla bla\n") 1256 1257 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1258 zipfp.writepy(TESTFN2) 1259 1260 names = zipfp.namelist() 1261 self.assertCompiledIn('mod1.py', names) 1262 self.assertCompiledIn('mod2.py', names) 1263 self.assertNotIn('mod2.txt', names) 1264 1265 finally: 1266 rmtree(TESTFN2) 1267 1268 def test_write_python_directory_filtered(self): 1269 os.mkdir(TESTFN2) 1270 try: 1271 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1272 fp.write("print(42)\n") 1273 1274 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1275 fp.write("print(42 * 42)\n") 1276 1277 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1278 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1279 not fn.endswith('mod2.py')) 1280 1281 names = zipfp.namelist() 1282 self.assertCompiledIn('mod1.py', names) 1283 self.assertNotIn('mod2.py', names) 1284 1285 finally: 1286 rmtree(TESTFN2) 1287 1288 def test_write_non_pyfile(self): 1289 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1290 with open(TESTFN, 'w', encoding='utf-8') as f: 1291 f.write('most definitely not a python file') 1292 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1293 unlink(TESTFN) 1294 1295 def test_write_pyfile_bad_syntax(self): 1296 os.mkdir(TESTFN2) 1297 try: 1298 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1299 fp.write("Bad syntax in python file\n") 1300 1301 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1302 # syntax errors are printed to stdout 1303 with captured_stdout() as s: 1304 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1305 1306 self.assertIn("SyntaxError", s.getvalue()) 1307 1308 # as it will not have compiled the python file, it will 1309 # include the .py file not .pyc 1310 names = zipfp.namelist() 1311 self.assertIn('mod1.py', names) 1312 self.assertNotIn('mod1.pyc', names) 1313 1314 finally: 1315 rmtree(TESTFN2) 1316 1317 def test_write_pathlike(self): 1318 os.mkdir(TESTFN2) 1319 try: 1320 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1321 fp.write("print(42)\n") 1322 1323 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1324 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1325 names = zipfp.namelist() 1326 self.assertCompiledIn('mod1.py', names) 1327 finally: 1328 rmtree(TESTFN2) 1329 1330 1331class ExtractTests(unittest.TestCase): 1332 1333 def make_test_file(self): 1334 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1335 for fpath, fdata in SMALL_TEST_DATA: 1336 zipfp.writestr(fpath, fdata) 1337 1338 def test_extract(self): 1339 with temp_cwd(): 1340 self.make_test_file() 1341 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1342 for fpath, fdata in SMALL_TEST_DATA: 1343 writtenfile = zipfp.extract(fpath) 1344 1345 # make sure it was written to the right place 1346 correctfile = os.path.join(os.getcwd(), fpath) 1347 correctfile = os.path.normpath(correctfile) 1348 1349 self.assertEqual(writtenfile, correctfile) 1350 1351 # make sure correct data is in correct file 1352 with open(writtenfile, "rb") as f: 1353 self.assertEqual(fdata.encode(), f.read()) 1354 1355 unlink(writtenfile) 1356 1357 def _test_extract_with_target(self, target): 1358 self.make_test_file() 1359 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1360 for fpath, fdata in SMALL_TEST_DATA: 1361 writtenfile = zipfp.extract(fpath, target) 1362 1363 # make sure it was written to the right place 1364 correctfile = os.path.join(target, fpath) 1365 correctfile = os.path.normpath(correctfile) 1366 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1367 1368 # make sure correct data is in correct file 1369 with open(writtenfile, "rb") as f: 1370 self.assertEqual(fdata.encode(), f.read()) 1371 1372 unlink(writtenfile) 1373 1374 unlink(TESTFN2) 1375 1376 def test_extract_with_target(self): 1377 with temp_dir() as extdir: 1378 self._test_extract_with_target(extdir) 1379 1380 def test_extract_with_target_pathlike(self): 1381 with temp_dir() as extdir: 1382 self._test_extract_with_target(pathlib.Path(extdir)) 1383 1384 def test_extract_all(self): 1385 with temp_cwd(): 1386 self.make_test_file() 1387 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1388 zipfp.extractall() 1389 for fpath, fdata in SMALL_TEST_DATA: 1390 outfile = os.path.join(os.getcwd(), fpath) 1391 1392 with open(outfile, "rb") as f: 1393 self.assertEqual(fdata.encode(), f.read()) 1394 1395 unlink(outfile) 1396 1397 def _test_extract_all_with_target(self, target): 1398 self.make_test_file() 1399 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1400 zipfp.extractall(target) 1401 for fpath, fdata in SMALL_TEST_DATA: 1402 outfile = os.path.join(target, fpath) 1403 1404 with open(outfile, "rb") as f: 1405 self.assertEqual(fdata.encode(), f.read()) 1406 1407 unlink(outfile) 1408 1409 unlink(TESTFN2) 1410 1411 def test_extract_all_with_target(self): 1412 with temp_dir() as extdir: 1413 self._test_extract_all_with_target(extdir) 1414 1415 def test_extract_all_with_target_pathlike(self): 1416 with temp_dir() as extdir: 1417 self._test_extract_all_with_target(pathlib.Path(extdir)) 1418 1419 def check_file(self, filename, content): 1420 self.assertTrue(os.path.isfile(filename)) 1421 with open(filename, 'rb') as f: 1422 self.assertEqual(f.read(), content) 1423 1424 def test_sanitize_windows_name(self): 1425 san = zipfile.ZipFile._sanitize_windows_name 1426 # Passing pathsep in allows this test to work regardless of platform. 1427 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1428 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1429 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1430 1431 def test_extract_hackers_arcnames_common_cases(self): 1432 common_hacknames = [ 1433 ('../foo/bar', 'foo/bar'), 1434 ('foo/../bar', 'foo/bar'), 1435 ('foo/../../bar', 'foo/bar'), 1436 ('foo/bar/..', 'foo/bar'), 1437 ('./../foo/bar', 'foo/bar'), 1438 ('/foo/bar', 'foo/bar'), 1439 ('/foo/../bar', 'foo/bar'), 1440 ('/foo/../../bar', 'foo/bar'), 1441 ] 1442 self._test_extract_hackers_arcnames(common_hacknames) 1443 1444 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1445 def test_extract_hackers_arcnames_windows_only(self): 1446 """Test combination of path fixing and windows name sanitization.""" 1447 windows_hacknames = [ 1448 (r'..\foo\bar', 'foo/bar'), 1449 (r'..\/foo\/bar', 'foo/bar'), 1450 (r'foo/\..\/bar', 'foo/bar'), 1451 (r'foo\/../\bar', 'foo/bar'), 1452 (r'C:foo/bar', 'foo/bar'), 1453 (r'C:/foo/bar', 'foo/bar'), 1454 (r'C://foo/bar', 'foo/bar'), 1455 (r'C:\foo\bar', 'foo/bar'), 1456 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1457 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1458 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1459 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1460 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1461 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1462 (r'//?/C:/foo/bar', 'foo/bar'), 1463 (r'\\?\C:\foo\bar', 'foo/bar'), 1464 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1465 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1466 ('../../foo../../ba..r', 'foo/ba..r'), 1467 ] 1468 self._test_extract_hackers_arcnames(windows_hacknames) 1469 1470 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1471 def test_extract_hackers_arcnames_posix_only(self): 1472 posix_hacknames = [ 1473 ('//foo/bar', 'foo/bar'), 1474 ('../../foo../../ba..r', 'foo../ba..r'), 1475 (r'foo/..\bar', r'foo/..\bar'), 1476 ] 1477 self._test_extract_hackers_arcnames(posix_hacknames) 1478 1479 def _test_extract_hackers_arcnames(self, hacknames): 1480 for arcname, fixedname in hacknames: 1481 content = b'foobar' + arcname.encode() 1482 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1483 zinfo = zipfile.ZipInfo() 1484 # preserve backslashes 1485 zinfo.filename = arcname 1486 zinfo.external_attr = 0o600 << 16 1487 zipfp.writestr(zinfo, content) 1488 1489 arcname = arcname.replace(os.sep, "/") 1490 targetpath = os.path.join('target', 'subdir', 'subsub') 1491 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1492 1493 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1494 writtenfile = zipfp.extract(arcname, targetpath) 1495 self.assertEqual(writtenfile, correctfile, 1496 msg='extract %r: %r != %r' % 1497 (arcname, writtenfile, correctfile)) 1498 self.check_file(correctfile, content) 1499 rmtree('target') 1500 1501 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1502 zipfp.extractall(targetpath) 1503 self.check_file(correctfile, content) 1504 rmtree('target') 1505 1506 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1507 1508 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1509 writtenfile = zipfp.extract(arcname) 1510 self.assertEqual(writtenfile, correctfile, 1511 msg="extract %r" % arcname) 1512 self.check_file(correctfile, content) 1513 rmtree(fixedname.split('/')[0]) 1514 1515 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1516 zipfp.extractall() 1517 self.check_file(correctfile, content) 1518 rmtree(fixedname.split('/')[0]) 1519 1520 unlink(TESTFN2) 1521 1522 1523class OtherTests(unittest.TestCase): 1524 def test_open_via_zip_info(self): 1525 # Create the ZIP archive 1526 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1527 zipfp.writestr("name", "foo") 1528 with self.assertWarns(UserWarning): 1529 zipfp.writestr("name", "bar") 1530 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1531 1532 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1533 infos = zipfp.infolist() 1534 data = b"" 1535 for info in infos: 1536 with zipfp.open(info) as zipopen: 1537 data += zipopen.read() 1538 self.assertIn(data, {b"foobar", b"barfoo"}) 1539 data = b"" 1540 for info in infos: 1541 data += zipfp.read(info) 1542 self.assertIn(data, {b"foobar", b"barfoo"}) 1543 1544 def test_writestr_extended_local_header_issue1202(self): 1545 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1546 for data in 'abcdefghijklmnop': 1547 zinfo = zipfile.ZipInfo(data) 1548 zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR # Include an extended local header. 1549 orig_zip.writestr(zinfo, data) 1550 1551 def test_close(self): 1552 """Check that the zipfile is closed after the 'with' block.""" 1553 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1554 for fpath, fdata in SMALL_TEST_DATA: 1555 zipfp.writestr(fpath, fdata) 1556 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1557 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1558 1559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1560 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1561 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1562 1563 def test_close_on_exception(self): 1564 """Check that the zipfile is closed if an exception is raised in the 1565 'with' block.""" 1566 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1567 for fpath, fdata in SMALL_TEST_DATA: 1568 zipfp.writestr(fpath, fdata) 1569 1570 try: 1571 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1572 raise zipfile.BadZipFile() 1573 except zipfile.BadZipFile: 1574 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1575 1576 def test_unsupported_version(self): 1577 # File has an extract_version of 120 1578 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1579 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1580 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1581 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1582 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1583 1584 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1585 io.BytesIO(data), 'r') 1586 1587 @requires_zlib() 1588 def test_read_unicode_filenames(self): 1589 # bug #10801 1590 fname = findfile('zip_cp437_header.zip') 1591 with zipfile.ZipFile(fname) as zipfp: 1592 for name in zipfp.namelist(): 1593 zipfp.open(name).close() 1594 1595 def test_write_unicode_filenames(self): 1596 with zipfile.ZipFile(TESTFN, "w") as zf: 1597 zf.writestr("foo.txt", "Test for unicode filename") 1598 zf.writestr("\xf6.txt", "Test for unicode filename") 1599 self.assertIsInstance(zf.infolist()[0].filename, str) 1600 1601 with zipfile.ZipFile(TESTFN, "r") as zf: 1602 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1603 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1604 1605 def test_read_after_write_unicode_filenames(self): 1606 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1607 zipfp.writestr('приклад', b'sample') 1608 self.assertEqual(zipfp.read('приклад'), b'sample') 1609 1610 def test_exclusive_create_zip_file(self): 1611 """Test exclusive creating a new zipfile.""" 1612 unlink(TESTFN2) 1613 filename = 'testfile.txt' 1614 content = b'hello, world. this is some content.' 1615 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1616 zipfp.writestr(filename, content) 1617 with self.assertRaises(FileExistsError): 1618 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1619 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1620 self.assertEqual(zipfp.namelist(), [filename]) 1621 self.assertEqual(zipfp.read(filename), content) 1622 1623 def test_create_non_existent_file_for_append(self): 1624 if os.path.exists(TESTFN): 1625 os.unlink(TESTFN) 1626 1627 filename = 'testfile.txt' 1628 content = b'hello, world. this is some content.' 1629 1630 try: 1631 with zipfile.ZipFile(TESTFN, 'a') as zf: 1632 zf.writestr(filename, content) 1633 except OSError: 1634 self.fail('Could not append data to a non-existent zip file.') 1635 1636 self.assertTrue(os.path.exists(TESTFN)) 1637 1638 with zipfile.ZipFile(TESTFN, 'r') as zf: 1639 self.assertEqual(zf.read(filename), content) 1640 1641 def test_close_erroneous_file(self): 1642 # This test checks that the ZipFile constructor closes the file object 1643 # it opens if there's an error in the file. If it doesn't, the 1644 # traceback holds a reference to the ZipFile object and, indirectly, 1645 # the file object. 1646 # On Windows, this causes the os.unlink() call to fail because the 1647 # underlying file is still open. This is SF bug #412214. 1648 # 1649 with open(TESTFN, "w", encoding="utf-8") as fp: 1650 fp.write("this is not a legal zip file\n") 1651 try: 1652 zf = zipfile.ZipFile(TESTFN) 1653 except zipfile.BadZipFile: 1654 pass 1655 1656 def test_is_zip_erroneous_file(self): 1657 """Check that is_zipfile() correctly identifies non-zip files.""" 1658 # - passing a filename 1659 with open(TESTFN, "w", encoding='utf-8') as fp: 1660 fp.write("this is not a legal zip file\n") 1661 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1662 # - passing a path-like object 1663 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1664 # - passing a file object 1665 with open(TESTFN, "rb") as fp: 1666 self.assertFalse(zipfile.is_zipfile(fp)) 1667 # - passing a file-like object 1668 fp = io.BytesIO() 1669 fp.write(b"this is not a legal zip file\n") 1670 self.assertFalse(zipfile.is_zipfile(fp)) 1671 fp.seek(0, 0) 1672 self.assertFalse(zipfile.is_zipfile(fp)) 1673 1674 def test_damaged_zipfile(self): 1675 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1676 # - Create a valid zip file 1677 fp = io.BytesIO() 1678 with zipfile.ZipFile(fp, mode="w") as zipf: 1679 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1680 zipfiledata = fp.getvalue() 1681 1682 # - Now create copies of it missing the last N bytes and make sure 1683 # a BadZipFile exception is raised when we try to open it 1684 for N in range(len(zipfiledata)): 1685 fp = io.BytesIO(zipfiledata[:N]) 1686 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1687 1688 def test_is_zip_valid_file(self): 1689 """Check that is_zipfile() correctly identifies zip files.""" 1690 # - passing a filename 1691 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1692 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1693 1694 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1695 # - passing a file object 1696 with open(TESTFN, "rb") as fp: 1697 self.assertTrue(zipfile.is_zipfile(fp)) 1698 fp.seek(0, 0) 1699 zip_contents = fp.read() 1700 # - passing a file-like object 1701 fp = io.BytesIO() 1702 fp.write(zip_contents) 1703 self.assertTrue(zipfile.is_zipfile(fp)) 1704 fp.seek(0, 0) 1705 self.assertTrue(zipfile.is_zipfile(fp)) 1706 1707 def test_non_existent_file_raises_OSError(self): 1708 # make sure we don't raise an AttributeError when a partially-constructed 1709 # ZipFile instance is finalized; this tests for regression on SF tracker 1710 # bug #403871. 1711 1712 # The bug we're testing for caused an AttributeError to be raised 1713 # when a ZipFile instance was created for a file that did not 1714 # exist; the .fp member was not initialized but was needed by the 1715 # __del__() method. Since the AttributeError is in the __del__(), 1716 # it is ignored, but the user should be sufficiently annoyed by 1717 # the message on the output that regression will be noticed 1718 # quickly. 1719 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1720 1721 def test_empty_file_raises_BadZipFile(self): 1722 f = open(TESTFN, 'w', encoding='utf-8') 1723 f.close() 1724 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1725 1726 with open(TESTFN, 'w', encoding='utf-8') as fp: 1727 fp.write("short file") 1728 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1729 1730 def test_closed_zip_raises_ValueError(self): 1731 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1732 data = io.BytesIO() 1733 with zipfile.ZipFile(data, mode="w") as zipf: 1734 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1735 1736 # This is correct; calling .read on a closed ZipFile should raise 1737 # a ValueError, and so should calling .testzip. An earlier 1738 # version of .testzip would swallow this exception (and any other) 1739 # and report that the first file in the archive was corrupt. 1740 self.assertRaises(ValueError, zipf.read, "foo.txt") 1741 self.assertRaises(ValueError, zipf.open, "foo.txt") 1742 self.assertRaises(ValueError, zipf.testzip) 1743 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1744 with open(TESTFN, 'w', encoding='utf-8') as f: 1745 f.write('zipfile test data') 1746 self.assertRaises(ValueError, zipf.write, TESTFN) 1747 1748 def test_bad_constructor_mode(self): 1749 """Check that bad modes passed to ZipFile constructor are caught.""" 1750 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1751 1752 def test_bad_open_mode(self): 1753 """Check that bad modes passed to ZipFile.open are caught.""" 1754 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1755 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1756 1757 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1758 # read the data to make sure the file is there 1759 zipf.read("foo.txt") 1760 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1761 # universal newlines support is removed 1762 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1763 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1764 1765 def test_read0(self): 1766 """Check that calling read(0) on a ZipExtFile object returns an empty 1767 string and doesn't advance file pointer.""" 1768 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1769 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1770 # read the data to make sure the file is there 1771 with zipf.open("foo.txt") as f: 1772 for i in range(FIXEDTEST_SIZE): 1773 self.assertEqual(f.read(0), b'') 1774 1775 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1776 1777 def test_open_non_existent_item(self): 1778 """Check that attempting to call open() for an item that doesn't 1779 exist in the archive raises a RuntimeError.""" 1780 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1781 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1782 1783 def test_bad_compression_mode(self): 1784 """Check that bad compression methods passed to ZipFile.open are 1785 caught.""" 1786 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1787 1788 def test_unsupported_compression(self): 1789 # data is declared as shrunk, but actually deflated 1790 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1791 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1792 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1793 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1794 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1795 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1796 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1797 self.assertRaises(NotImplementedError, zipf.open, 'x') 1798 1799 def test_null_byte_in_filename(self): 1800 """Check that a filename containing a null byte is properly 1801 terminated.""" 1802 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1803 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1804 self.assertEqual(zipf.namelist(), ['foo.txt']) 1805 1806 def test_struct_sizes(self): 1807 """Check that ZIP internal structure sizes are calculated correctly.""" 1808 self.assertEqual(zipfile.sizeEndCentDir, 22) 1809 self.assertEqual(zipfile.sizeCentralDir, 46) 1810 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1811 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1812 1813 def test_comments(self): 1814 """Check that comments on the archive are handled properly.""" 1815 1816 # check default comment is empty 1817 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1818 self.assertEqual(zipf.comment, b'') 1819 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1820 1821 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1822 self.assertEqual(zipfr.comment, b'') 1823 1824 # check a simple short comment 1825 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1826 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1827 zipf.comment = comment 1828 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1829 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1830 self.assertEqual(zipf.comment, comment) 1831 1832 # check a comment of max length 1833 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1834 comment2 = comment2.encode("ascii") 1835 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1836 zipf.comment = comment2 1837 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1838 1839 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1840 self.assertEqual(zipfr.comment, comment2) 1841 1842 # check a comment that is too long is truncated 1843 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1844 with self.assertWarns(UserWarning): 1845 zipf.comment = comment2 + b'oops' 1846 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1847 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1848 self.assertEqual(zipfr.comment, comment2) 1849 1850 # check that comments are correctly modified in append mode 1851 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1852 zipf.comment = b"original comment" 1853 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1854 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1855 zipf.comment = b"an updated comment" 1856 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1857 self.assertEqual(zipf.comment, b"an updated comment") 1858 1859 # check that comments are correctly shortened in append mode 1860 # and the file is indeed truncated 1861 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1862 zipf.comment = b"original comment that's longer" 1863 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1864 original_zip_size = os.path.getsize(TESTFN) 1865 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1866 zipf.comment = b"shorter comment" 1867 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1868 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1869 self.assertEqual(zipf.comment, b"shorter comment") 1870 1871 def test_unicode_comment(self): 1872 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1873 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1874 with self.assertRaises(TypeError): 1875 zipf.comment = "this is an error" 1876 1877 def test_change_comment_in_empty_archive(self): 1878 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1879 self.assertFalse(zipf.filelist) 1880 zipf.comment = b"this is a comment" 1881 with zipfile.ZipFile(TESTFN, "r") as zipf: 1882 self.assertEqual(zipf.comment, b"this is a comment") 1883 1884 def test_change_comment_in_nonempty_archive(self): 1885 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1886 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1887 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1888 self.assertTrue(zipf.filelist) 1889 zipf.comment = b"this is a comment" 1890 with zipfile.ZipFile(TESTFN, "r") as zipf: 1891 self.assertEqual(zipf.comment, b"this is a comment") 1892 1893 def test_empty_zipfile(self): 1894 # Check that creating a file in 'w' or 'a' mode and closing without 1895 # adding any files to the archives creates a valid empty ZIP file 1896 zipf = zipfile.ZipFile(TESTFN, mode="w") 1897 zipf.close() 1898 try: 1899 zipf = zipfile.ZipFile(TESTFN, mode="r") 1900 except zipfile.BadZipFile: 1901 self.fail("Unable to create empty ZIP file in 'w' mode") 1902 1903 zipf = zipfile.ZipFile(TESTFN, mode="a") 1904 zipf.close() 1905 try: 1906 zipf = zipfile.ZipFile(TESTFN, mode="r") 1907 except: 1908 self.fail("Unable to create empty ZIP file in 'a' mode") 1909 1910 def test_open_empty_file(self): 1911 # Issue 1710703: Check that opening a file with less than 22 bytes 1912 # raises a BadZipFile exception (rather than the previously unhelpful 1913 # OSError) 1914 f = open(TESTFN, 'w', encoding='utf-8') 1915 f.close() 1916 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1917 1918 def test_create_zipinfo_before_1980(self): 1919 self.assertRaises(ValueError, 1920 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1921 1922 def test_create_empty_zipinfo_repr(self): 1923 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 1924 zi = zipfile.ZipInfo(filename="empty") 1925 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 1926 1927 def test_create_empty_zipinfo_default_attributes(self): 1928 """Ensure all required attributes are set.""" 1929 zi = zipfile.ZipInfo() 1930 self.assertEqual(zi.orig_filename, "NoName") 1931 self.assertEqual(zi.filename, "NoName") 1932 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 1933 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 1934 self.assertEqual(zi.comment, b"") 1935 self.assertEqual(zi.extra, b"") 1936 self.assertIn(zi.create_system, (0, 3)) 1937 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 1938 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 1939 self.assertEqual(zi.reserved, 0) 1940 self.assertEqual(zi.flag_bits, 0) 1941 self.assertEqual(zi.volume, 0) 1942 self.assertEqual(zi.internal_attr, 0) 1943 self.assertEqual(zi.external_attr, 0) 1944 1945 # Before bpo-26185, both were missing 1946 self.assertEqual(zi.file_size, 0) 1947 self.assertEqual(zi.compress_size, 0) 1948 1949 def test_zipfile_with_short_extra_field(self): 1950 """If an extra field in the header is less than 4 bytes, skip it.""" 1951 zipdata = ( 1952 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1953 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1954 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1955 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1956 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1957 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1958 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1959 ) 1960 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1961 # testzip returns the name of the first corrupt file, or None 1962 self.assertIsNone(zipf.testzip()) 1963 1964 def test_open_conflicting_handles(self): 1965 # It's only possible to open one writable file handle at a time 1966 msg1 = b"It's fun to charter an accountant!" 1967 msg2 = b"And sail the wide accountant sea" 1968 msg3 = b"To find, explore the funds offshore" 1969 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1970 with zipf.open('foo', mode='w') as w2: 1971 w2.write(msg1) 1972 with zipf.open('bar', mode='w') as w1: 1973 with self.assertRaises(ValueError): 1974 zipf.open('handle', mode='w') 1975 with self.assertRaises(ValueError): 1976 zipf.open('foo', mode='r') 1977 with self.assertRaises(ValueError): 1978 zipf.writestr('str', 'abcde') 1979 with self.assertRaises(ValueError): 1980 zipf.write(__file__, 'file') 1981 with self.assertRaises(ValueError): 1982 zipf.close() 1983 w1.write(msg2) 1984 with zipf.open('baz', mode='w') as w2: 1985 w2.write(msg3) 1986 1987 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1988 self.assertEqual(zipf.read('foo'), msg1) 1989 self.assertEqual(zipf.read('bar'), msg2) 1990 self.assertEqual(zipf.read('baz'), msg3) 1991 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1992 1993 def test_seek_tell(self): 1994 # Test seek functionality 1995 txt = b"Where's Bruce?" 1996 bloc = txt.find(b"Bruce") 1997 # Check seek on a file 1998 with zipfile.ZipFile(TESTFN, "w") as zipf: 1999 zipf.writestr("foo.txt", txt) 2000 with zipfile.ZipFile(TESTFN, "r") as zipf: 2001 with zipf.open("foo.txt", "r") as fp: 2002 fp.seek(bloc, os.SEEK_SET) 2003 self.assertEqual(fp.tell(), bloc) 2004 fp.seek(-bloc, os.SEEK_CUR) 2005 self.assertEqual(fp.tell(), 0) 2006 fp.seek(bloc, os.SEEK_CUR) 2007 self.assertEqual(fp.tell(), bloc) 2008 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2009 fp.seek(0, os.SEEK_END) 2010 self.assertEqual(fp.tell(), len(txt)) 2011 fp.seek(0, os.SEEK_SET) 2012 self.assertEqual(fp.tell(), 0) 2013 # Check seek on memory file 2014 data = io.BytesIO() 2015 with zipfile.ZipFile(data, mode="w") as zipf: 2016 zipf.writestr("foo.txt", txt) 2017 with zipfile.ZipFile(data, mode="r") as zipf: 2018 with zipf.open("foo.txt", "r") as fp: 2019 fp.seek(bloc, os.SEEK_SET) 2020 self.assertEqual(fp.tell(), bloc) 2021 fp.seek(-bloc, os.SEEK_CUR) 2022 self.assertEqual(fp.tell(), 0) 2023 fp.seek(bloc, os.SEEK_CUR) 2024 self.assertEqual(fp.tell(), bloc) 2025 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2026 fp.seek(0, os.SEEK_END) 2027 self.assertEqual(fp.tell(), len(txt)) 2028 fp.seek(0, os.SEEK_SET) 2029 self.assertEqual(fp.tell(), 0) 2030 2031 @requires_bz2() 2032 def test_decompress_without_3rd_party_library(self): 2033 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2034 zip_file = io.BytesIO(data) 2035 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2036 zf.writestr('a.txt', b'a') 2037 with mock.patch('zipfile.bz2', None): 2038 with zipfile.ZipFile(zip_file) as zf: 2039 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2040 2041 def tearDown(self): 2042 unlink(TESTFN) 2043 unlink(TESTFN2) 2044 2045 2046class AbstractBadCrcTests: 2047 def test_testzip_with_bad_crc(self): 2048 """Tests that files with bad CRCs return their name from testzip.""" 2049 zipdata = self.zip_with_bad_crc 2050 2051 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2052 # testzip returns the name of the first corrupt file, or None 2053 self.assertEqual('afile', zipf.testzip()) 2054 2055 def test_read_with_bad_crc(self): 2056 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2057 zipdata = self.zip_with_bad_crc 2058 2059 # Using ZipFile.read() 2060 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2061 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2062 2063 # Using ZipExtFile.read() 2064 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2065 with zipf.open('afile', 'r') as corrupt_file: 2066 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2067 2068 # Same with small reads (in order to exercise the buffering logic) 2069 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2070 with zipf.open('afile', 'r') as corrupt_file: 2071 corrupt_file.MIN_READ_SIZE = 2 2072 with self.assertRaises(zipfile.BadZipFile): 2073 while corrupt_file.read(2): 2074 pass 2075 2076 2077class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2078 compression = zipfile.ZIP_STORED 2079 zip_with_bad_crc = ( 2080 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2081 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2082 b'ilehello,AworldP' 2083 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2084 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2085 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2086 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2087 b'\0\0/\0\0\0\0\0') 2088 2089@requires_zlib() 2090class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2091 compression = zipfile.ZIP_DEFLATED 2092 zip_with_bad_crc = ( 2093 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2094 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2095 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2096 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2097 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2098 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2099 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2100 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2101 2102@requires_bz2() 2103class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2104 compression = zipfile.ZIP_BZIP2 2105 zip_with_bad_crc = ( 2106 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2107 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2108 b'ileBZh91AY&SY\xd4\xa8\xca' 2109 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2110 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2111 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2112 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2113 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2114 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2115 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2116 b'\x00\x00\x00\x00') 2117 2118@requires_lzma() 2119class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2120 compression = zipfile.ZIP_LZMA 2121 zip_with_bad_crc = ( 2122 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2123 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2124 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2125 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2126 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2127 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2128 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2129 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2130 b'\x00>\x00\x00\x00\x00\x00') 2131 2132 2133class DecryptionTests(unittest.TestCase): 2134 """Check that ZIP decryption works. Since the library does not 2135 support encryption at the moment, we use a pre-generated encrypted 2136 ZIP file.""" 2137 2138 data = ( 2139 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2140 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2141 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2142 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2143 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2144 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2145 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2146 data2 = ( 2147 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2148 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2149 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2150 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2151 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2152 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2153 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2154 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2155 2156 plain = b'zipfile.py encryption test' 2157 plain2 = b'\x00'*512 2158 2159 def setUp(self): 2160 with open(TESTFN, "wb") as fp: 2161 fp.write(self.data) 2162 self.zip = zipfile.ZipFile(TESTFN, "r") 2163 with open(TESTFN2, "wb") as fp: 2164 fp.write(self.data2) 2165 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2166 2167 def tearDown(self): 2168 self.zip.close() 2169 os.unlink(TESTFN) 2170 self.zip2.close() 2171 os.unlink(TESTFN2) 2172 2173 def test_no_password(self): 2174 # Reading the encrypted file without password 2175 # must generate a RunTime exception 2176 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2177 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2178 2179 def test_bad_password(self): 2180 self.zip.setpassword(b"perl") 2181 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2182 self.zip2.setpassword(b"perl") 2183 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2184 2185 @requires_zlib() 2186 def test_good_password(self): 2187 self.zip.setpassword(b"python") 2188 self.assertEqual(self.zip.read("test.txt"), self.plain) 2189 self.zip2.setpassword(b"12345") 2190 self.assertEqual(self.zip2.read("zero"), self.plain2) 2191 2192 def test_unicode_password(self): 2193 expected_msg = "pwd: expected bytes, got str" 2194 2195 with self.assertRaisesRegex(TypeError, expected_msg): 2196 self.zip.setpassword("unicode") 2197 2198 with self.assertRaisesRegex(TypeError, expected_msg): 2199 self.zip.read("test.txt", "python") 2200 2201 with self.assertRaisesRegex(TypeError, expected_msg): 2202 self.zip.open("test.txt", pwd="python") 2203 2204 with self.assertRaisesRegex(TypeError, expected_msg): 2205 self.zip.extract("test.txt", pwd="python") 2206 2207 with self.assertRaisesRegex(TypeError, expected_msg): 2208 self.zip.pwd = "python" 2209 self.zip.open("test.txt") 2210 2211 def test_seek_tell(self): 2212 self.zip.setpassword(b"python") 2213 txt = self.plain 2214 test_word = b'encryption' 2215 bloc = txt.find(test_word) 2216 bloc_len = len(test_word) 2217 with self.zip.open("test.txt", "r") as fp: 2218 fp.seek(bloc, os.SEEK_SET) 2219 self.assertEqual(fp.tell(), bloc) 2220 fp.seek(-bloc, os.SEEK_CUR) 2221 self.assertEqual(fp.tell(), 0) 2222 fp.seek(bloc, os.SEEK_CUR) 2223 self.assertEqual(fp.tell(), bloc) 2224 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2225 2226 # Make sure that the second read after seeking back beyond 2227 # _readbuffer returns the same content (ie. rewind to the start of 2228 # the file to read forward to the required position). 2229 old_read_size = fp.MIN_READ_SIZE 2230 fp.MIN_READ_SIZE = 1 2231 fp._readbuffer = b'' 2232 fp._offset = 0 2233 fp.seek(0, os.SEEK_SET) 2234 self.assertEqual(fp.tell(), 0) 2235 fp.seek(bloc, os.SEEK_CUR) 2236 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2237 fp.MIN_READ_SIZE = old_read_size 2238 2239 fp.seek(0, os.SEEK_END) 2240 self.assertEqual(fp.tell(), len(txt)) 2241 fp.seek(0, os.SEEK_SET) 2242 self.assertEqual(fp.tell(), 0) 2243 2244 # Read the file completely to definitely call any eof integrity 2245 # checks (crc) and make sure they still pass. 2246 fp.read() 2247 2248 2249class AbstractTestsWithRandomBinaryFiles: 2250 @classmethod 2251 def setUpClass(cls): 2252 datacount = randint(16, 64)*1024 + randint(1, 1024) 2253 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2254 for i in range(datacount)) 2255 2256 def setUp(self): 2257 # Make a source file with some lines 2258 with open(TESTFN, "wb") as fp: 2259 fp.write(self.data) 2260 2261 def tearDown(self): 2262 unlink(TESTFN) 2263 unlink(TESTFN2) 2264 2265 def make_test_archive(self, f, compression): 2266 # Create the ZIP archive 2267 with zipfile.ZipFile(f, "w", compression) as zipfp: 2268 zipfp.write(TESTFN, "another.name") 2269 zipfp.write(TESTFN, TESTFN) 2270 2271 def zip_test(self, f, compression): 2272 self.make_test_archive(f, compression) 2273 2274 # Read the ZIP archive 2275 with zipfile.ZipFile(f, "r", compression) as zipfp: 2276 testdata = zipfp.read(TESTFN) 2277 self.assertEqual(len(testdata), len(self.data)) 2278 self.assertEqual(testdata, self.data) 2279 self.assertEqual(zipfp.read("another.name"), self.data) 2280 2281 def test_read(self): 2282 for f in get_files(self): 2283 self.zip_test(f, self.compression) 2284 2285 def zip_open_test(self, f, compression): 2286 self.make_test_archive(f, compression) 2287 2288 # Read the ZIP archive 2289 with zipfile.ZipFile(f, "r", compression) as zipfp: 2290 zipdata1 = [] 2291 with zipfp.open(TESTFN) as zipopen1: 2292 while True: 2293 read_data = zipopen1.read(256) 2294 if not read_data: 2295 break 2296 zipdata1.append(read_data) 2297 2298 zipdata2 = [] 2299 with zipfp.open("another.name") as zipopen2: 2300 while True: 2301 read_data = zipopen2.read(256) 2302 if not read_data: 2303 break 2304 zipdata2.append(read_data) 2305 2306 testdata1 = b''.join(zipdata1) 2307 self.assertEqual(len(testdata1), len(self.data)) 2308 self.assertEqual(testdata1, self.data) 2309 2310 testdata2 = b''.join(zipdata2) 2311 self.assertEqual(len(testdata2), len(self.data)) 2312 self.assertEqual(testdata2, self.data) 2313 2314 def test_open(self): 2315 for f in get_files(self): 2316 self.zip_open_test(f, self.compression) 2317 2318 def zip_random_open_test(self, f, compression): 2319 self.make_test_archive(f, compression) 2320 2321 # Read the ZIP archive 2322 with zipfile.ZipFile(f, "r", compression) as zipfp: 2323 zipdata1 = [] 2324 with zipfp.open(TESTFN) as zipopen1: 2325 while True: 2326 read_data = zipopen1.read(randint(1, 1024)) 2327 if not read_data: 2328 break 2329 zipdata1.append(read_data) 2330 2331 testdata = b''.join(zipdata1) 2332 self.assertEqual(len(testdata), len(self.data)) 2333 self.assertEqual(testdata, self.data) 2334 2335 def test_random_open(self): 2336 for f in get_files(self): 2337 self.zip_random_open_test(f, self.compression) 2338 2339 2340class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2341 unittest.TestCase): 2342 compression = zipfile.ZIP_STORED 2343 2344@requires_zlib() 2345class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2346 unittest.TestCase): 2347 compression = zipfile.ZIP_DEFLATED 2348 2349@requires_bz2() 2350class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2351 unittest.TestCase): 2352 compression = zipfile.ZIP_BZIP2 2353 2354@requires_lzma() 2355class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2356 unittest.TestCase): 2357 compression = zipfile.ZIP_LZMA 2358 2359 2360# Provide the tell() method but not seek() 2361class Tellable: 2362 def __init__(self, fp): 2363 self.fp = fp 2364 self.offset = 0 2365 2366 def write(self, data): 2367 n = self.fp.write(data) 2368 self.offset += n 2369 return n 2370 2371 def tell(self): 2372 return self.offset 2373 2374 def flush(self): 2375 self.fp.flush() 2376 2377class Unseekable: 2378 def __init__(self, fp): 2379 self.fp = fp 2380 2381 def write(self, data): 2382 return self.fp.write(data) 2383 2384 def flush(self): 2385 self.fp.flush() 2386 2387class UnseekableTests(unittest.TestCase): 2388 def test_writestr(self): 2389 for wrapper in (lambda f: f), Tellable, Unseekable: 2390 with self.subTest(wrapper=wrapper): 2391 f = io.BytesIO() 2392 f.write(b'abc') 2393 bf = io.BufferedWriter(f) 2394 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2395 zipfp.writestr('ones', b'111') 2396 zipfp.writestr('twos', b'222') 2397 self.assertEqual(f.getvalue()[:5], b'abcPK') 2398 with zipfile.ZipFile(f, mode='r') as zipf: 2399 with zipf.open('ones') as zopen: 2400 self.assertEqual(zopen.read(), b'111') 2401 with zipf.open('twos') as zopen: 2402 self.assertEqual(zopen.read(), b'222') 2403 2404 def test_write(self): 2405 for wrapper in (lambda f: f), Tellable, Unseekable: 2406 with self.subTest(wrapper=wrapper): 2407 f = io.BytesIO() 2408 f.write(b'abc') 2409 bf = io.BufferedWriter(f) 2410 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2411 self.addCleanup(unlink, TESTFN) 2412 with open(TESTFN, 'wb') as f2: 2413 f2.write(b'111') 2414 zipfp.write(TESTFN, 'ones') 2415 with open(TESTFN, 'wb') as f2: 2416 f2.write(b'222') 2417 zipfp.write(TESTFN, 'twos') 2418 self.assertEqual(f.getvalue()[:5], b'abcPK') 2419 with zipfile.ZipFile(f, mode='r') as zipf: 2420 with zipf.open('ones') as zopen: 2421 self.assertEqual(zopen.read(), b'111') 2422 with zipf.open('twos') as zopen: 2423 self.assertEqual(zopen.read(), b'222') 2424 2425 def test_open_write(self): 2426 for wrapper in (lambda f: f), Tellable, Unseekable: 2427 with self.subTest(wrapper=wrapper): 2428 f = io.BytesIO() 2429 f.write(b'abc') 2430 bf = io.BufferedWriter(f) 2431 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2432 with zipf.open('ones', 'w') as zopen: 2433 zopen.write(b'111') 2434 with zipf.open('twos', 'w') as zopen: 2435 zopen.write(b'222') 2436 self.assertEqual(f.getvalue()[:5], b'abcPK') 2437 with zipfile.ZipFile(f) as zipf: 2438 self.assertEqual(zipf.read('ones'), b'111') 2439 self.assertEqual(zipf.read('twos'), b'222') 2440 2441 2442@requires_zlib() 2443class TestsWithMultipleOpens(unittest.TestCase): 2444 @classmethod 2445 def setUpClass(cls): 2446 cls.data1 = b'111' + randbytes(10000) 2447 cls.data2 = b'222' + randbytes(10000) 2448 2449 def make_test_archive(self, f): 2450 # Create the ZIP archive 2451 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2452 zipfp.writestr('ones', self.data1) 2453 zipfp.writestr('twos', self.data2) 2454 2455 def test_same_file(self): 2456 # Verify that (when the ZipFile is in control of creating file objects) 2457 # multiple open() calls can be made without interfering with each other. 2458 for f in get_files(self): 2459 self.make_test_archive(f) 2460 with zipfile.ZipFile(f, mode="r") as zipf: 2461 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2462 data1 = zopen1.read(500) 2463 data2 = zopen2.read(500) 2464 data1 += zopen1.read() 2465 data2 += zopen2.read() 2466 self.assertEqual(data1, data2) 2467 self.assertEqual(data1, self.data1) 2468 2469 def test_different_file(self): 2470 # Verify that (when the ZipFile is in control of creating file objects) 2471 # multiple open() calls can be made without interfering with each other. 2472 for f in get_files(self): 2473 self.make_test_archive(f) 2474 with zipfile.ZipFile(f, mode="r") as zipf: 2475 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2476 data1 = zopen1.read(500) 2477 data2 = zopen2.read(500) 2478 data1 += zopen1.read() 2479 data2 += zopen2.read() 2480 self.assertEqual(data1, self.data1) 2481 self.assertEqual(data2, self.data2) 2482 2483 def test_interleaved(self): 2484 # Verify that (when the ZipFile is in control of creating file objects) 2485 # multiple open() calls can be made without interfering with each other. 2486 for f in get_files(self): 2487 self.make_test_archive(f) 2488 with zipfile.ZipFile(f, mode="r") as zipf: 2489 with zipf.open('ones') as zopen1: 2490 data1 = zopen1.read(500) 2491 with zipf.open('twos') as zopen2: 2492 data2 = zopen2.read(500) 2493 data1 += zopen1.read() 2494 data2 += zopen2.read() 2495 self.assertEqual(data1, self.data1) 2496 self.assertEqual(data2, self.data2) 2497 2498 def test_read_after_close(self): 2499 for f in get_files(self): 2500 self.make_test_archive(f) 2501 with contextlib.ExitStack() as stack: 2502 with zipfile.ZipFile(f, 'r') as zipf: 2503 zopen1 = stack.enter_context(zipf.open('ones')) 2504 zopen2 = stack.enter_context(zipf.open('twos')) 2505 data1 = zopen1.read(500) 2506 data2 = zopen2.read(500) 2507 data1 += zopen1.read() 2508 data2 += zopen2.read() 2509 self.assertEqual(data1, self.data1) 2510 self.assertEqual(data2, self.data2) 2511 2512 def test_read_after_write(self): 2513 for f in get_files(self): 2514 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2515 zipf.writestr('ones', self.data1) 2516 zipf.writestr('twos', self.data2) 2517 with zipf.open('ones') as zopen1: 2518 data1 = zopen1.read(500) 2519 self.assertEqual(data1, self.data1[:500]) 2520 with zipfile.ZipFile(f, 'r') as zipf: 2521 data1 = zipf.read('ones') 2522 data2 = zipf.read('twos') 2523 self.assertEqual(data1, self.data1) 2524 self.assertEqual(data2, self.data2) 2525 2526 def test_write_after_read(self): 2527 for f in get_files(self): 2528 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2529 zipf.writestr('ones', self.data1) 2530 with zipf.open('ones') as zopen1: 2531 zopen1.read(500) 2532 zipf.writestr('twos', self.data2) 2533 with zipfile.ZipFile(f, 'r') as zipf: 2534 data1 = zipf.read('ones') 2535 data2 = zipf.read('twos') 2536 self.assertEqual(data1, self.data1) 2537 self.assertEqual(data2, self.data2) 2538 2539 def test_many_opens(self): 2540 # Verify that read() and open() promptly close the file descriptor, 2541 # and don't rely on the garbage collector to free resources. 2542 self.make_test_archive(TESTFN2) 2543 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2544 for x in range(100): 2545 zipf.read('ones') 2546 with zipf.open('ones') as zopen1: 2547 pass 2548 with open(os.devnull, "rb") as f: 2549 self.assertLess(f.fileno(), 100) 2550 2551 def test_write_while_reading(self): 2552 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2553 zipf.writestr('ones', self.data1) 2554 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2555 with zipf.open('ones', 'r') as r1: 2556 data1 = r1.read(500) 2557 with zipf.open('twos', 'w') as w1: 2558 w1.write(self.data2) 2559 data1 += r1.read() 2560 self.assertEqual(data1, self.data1) 2561 with zipfile.ZipFile(TESTFN2) as zipf: 2562 self.assertEqual(zipf.read('twos'), self.data2) 2563 2564 def tearDown(self): 2565 unlink(TESTFN2) 2566 2567 2568class TestWithDirectory(unittest.TestCase): 2569 def setUp(self): 2570 os.mkdir(TESTFN2) 2571 2572 def test_extract_dir(self): 2573 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2574 zipf.extractall(TESTFN2) 2575 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2576 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2577 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2578 2579 def test_bug_6050(self): 2580 # Extraction should succeed if directories already exist 2581 os.mkdir(os.path.join(TESTFN2, "a")) 2582 self.test_extract_dir() 2583 2584 def test_write_dir(self): 2585 dirpath = os.path.join(TESTFN2, "x") 2586 os.mkdir(dirpath) 2587 mode = os.stat(dirpath).st_mode & 0xFFFF 2588 with zipfile.ZipFile(TESTFN, "w") as zipf: 2589 zipf.write(dirpath) 2590 zinfo = zipf.filelist[0] 2591 self.assertTrue(zinfo.filename.endswith("/x/")) 2592 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2593 zipf.write(dirpath, "y") 2594 zinfo = zipf.filelist[1] 2595 self.assertTrue(zinfo.filename, "y/") 2596 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2597 with zipfile.ZipFile(TESTFN, "r") as zipf: 2598 zinfo = zipf.filelist[0] 2599 self.assertTrue(zinfo.filename.endswith("/x/")) 2600 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2601 zinfo = zipf.filelist[1] 2602 self.assertTrue(zinfo.filename, "y/") 2603 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2604 target = os.path.join(TESTFN2, "target") 2605 os.mkdir(target) 2606 zipf.extractall(target) 2607 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2608 self.assertEqual(len(os.listdir(target)), 2) 2609 2610 def test_writestr_dir(self): 2611 os.mkdir(os.path.join(TESTFN2, "x")) 2612 with zipfile.ZipFile(TESTFN, "w") as zipf: 2613 zipf.writestr("x/", b'') 2614 zinfo = zipf.filelist[0] 2615 self.assertEqual(zinfo.filename, "x/") 2616 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2617 with zipfile.ZipFile(TESTFN, "r") as zipf: 2618 zinfo = zipf.filelist[0] 2619 self.assertTrue(zinfo.filename.endswith("x/")) 2620 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2621 target = os.path.join(TESTFN2, "target") 2622 os.mkdir(target) 2623 zipf.extractall(target) 2624 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2625 self.assertEqual(os.listdir(target), ["x"]) 2626 2627 def tearDown(self): 2628 rmtree(TESTFN2) 2629 if os.path.exists(TESTFN): 2630 unlink(TESTFN) 2631 2632 2633class ZipInfoTests(unittest.TestCase): 2634 def test_from_file(self): 2635 zi = zipfile.ZipInfo.from_file(__file__) 2636 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2637 self.assertFalse(zi.is_dir()) 2638 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2639 2640 def test_from_file_pathlike(self): 2641 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2642 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2643 self.assertFalse(zi.is_dir()) 2644 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2645 2646 def test_from_file_bytes(self): 2647 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2648 self.assertEqual(posixpath.basename(zi.filename), 'test') 2649 self.assertFalse(zi.is_dir()) 2650 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2651 2652 def test_from_file_fileno(self): 2653 with open(__file__, 'rb') as f: 2654 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2655 self.assertEqual(posixpath.basename(zi.filename), 'test') 2656 self.assertFalse(zi.is_dir()) 2657 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2658 2659 def test_from_dir(self): 2660 dirpath = os.path.dirname(os.path.abspath(__file__)) 2661 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2662 self.assertEqual(zi.filename, 'stdlib_tests/') 2663 self.assertTrue(zi.is_dir()) 2664 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2665 self.assertEqual(zi.file_size, 0) 2666 2667 2668class CommandLineTest(unittest.TestCase): 2669 2670 def zipfilecmd(self, *args, **kwargs): 2671 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2672 **kwargs) 2673 return out.replace(os.linesep.encode(), b'\n') 2674 2675 def zipfilecmd_failure(self, *args): 2676 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2677 2678 def test_bad_use(self): 2679 rc, out, err = self.zipfilecmd_failure() 2680 self.assertEqual(out, b'') 2681 self.assertIn(b'usage', err.lower()) 2682 self.assertIn(b'error', err.lower()) 2683 self.assertIn(b'required', err.lower()) 2684 rc, out, err = self.zipfilecmd_failure('-l', '') 2685 self.assertEqual(out, b'') 2686 self.assertNotEqual(err.strip(), b'') 2687 2688 def test_test_command(self): 2689 zip_name = findfile('zipdir.zip') 2690 for opt in '-t', '--test': 2691 out = self.zipfilecmd(opt, zip_name) 2692 self.assertEqual(out.rstrip(), b'Done testing') 2693 zip_name = findfile('testtar.tar') 2694 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2695 self.assertEqual(out, b'') 2696 2697 def test_list_command(self): 2698 zip_name = findfile('zipdir.zip') 2699 t = io.StringIO() 2700 with zipfile.ZipFile(zip_name, 'r') as tf: 2701 tf.printdir(t) 2702 expected = t.getvalue().encode('ascii', 'backslashreplace') 2703 for opt in '-l', '--list': 2704 out = self.zipfilecmd(opt, zip_name, 2705 PYTHONIOENCODING='ascii:backslashreplace') 2706 self.assertEqual(out, expected) 2707 2708 @requires_zlib() 2709 def test_create_command(self): 2710 self.addCleanup(unlink, TESTFN) 2711 with open(TESTFN, 'w', encoding='utf-8') as f: 2712 f.write('test 1') 2713 os.mkdir(TESTFNDIR) 2714 self.addCleanup(rmtree, TESTFNDIR) 2715 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 2716 f.write('test 2') 2717 files = [TESTFN, TESTFNDIR] 2718 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2719 for opt in '-c', '--create': 2720 try: 2721 out = self.zipfilecmd(opt, TESTFN2, *files) 2722 self.assertEqual(out, b'') 2723 with zipfile.ZipFile(TESTFN2) as zf: 2724 self.assertEqual(zf.namelist(), namelist) 2725 self.assertEqual(zf.read(namelist[0]), b'test 1') 2726 self.assertEqual(zf.read(namelist[2]), b'test 2') 2727 finally: 2728 unlink(TESTFN2) 2729 2730 def test_extract_command(self): 2731 zip_name = findfile('zipdir.zip') 2732 for opt in '-e', '--extract': 2733 with temp_dir() as extdir: 2734 out = self.zipfilecmd(opt, zip_name, extdir) 2735 self.assertEqual(out, b'') 2736 with zipfile.ZipFile(zip_name) as zf: 2737 for zi in zf.infolist(): 2738 path = os.path.join(extdir, 2739 zi.filename.replace('/', os.sep)) 2740 if zi.is_dir(): 2741 self.assertTrue(os.path.isdir(path)) 2742 else: 2743 self.assertTrue(os.path.isfile(path)) 2744 with open(path, 'rb') as f: 2745 self.assertEqual(f.read(), zf.read(zi)) 2746 2747 2748class TestExecutablePrependedZip(unittest.TestCase): 2749 """Test our ability to open zip files with an executable prepended.""" 2750 2751 def setUp(self): 2752 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2753 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2754 2755 def _test_zip_works(self, name): 2756 # bpo28494 sanity check: ensure is_zipfile works on these. 2757 self.assertTrue(zipfile.is_zipfile(name), 2758 f'is_zipfile failed on {name}') 2759 # Ensure we can operate on these via ZipFile. 2760 with zipfile.ZipFile(name) as zipfp: 2761 for n in zipfp.namelist(): 2762 data = zipfp.read(n) 2763 self.assertIn(b'FAVORITE_NUMBER', data) 2764 2765 def test_read_zip_with_exe_prepended(self): 2766 self._test_zip_works(self.exe_zip) 2767 2768 def test_read_zip64_with_exe_prepended(self): 2769 self._test_zip_works(self.exe_zip64) 2770 2771 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2772 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2773 'Test relies on #!/bin/bash working.') 2774 def test_execute_zip2(self): 2775 output = subprocess.check_output([self.exe_zip, sys.executable]) 2776 self.assertIn(b'number in executable: 5', output) 2777 2778 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2779 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2780 'Test relies on #!/bin/bash working.') 2781 def test_execute_zip64(self): 2782 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2783 self.assertIn(b'number in executable: 5', output) 2784 2785 2786# Poor man's technique to consume a (smallish) iterable. 2787consume = tuple 2788 2789 2790# from jaraco.itertools 5.0 2791class jaraco: 2792 class itertools: 2793 class Counter: 2794 def __init__(self, i): 2795 self.count = 0 2796 self._orig_iter = iter(i) 2797 2798 def __iter__(self): 2799 return self 2800 2801 def __next__(self): 2802 result = next(self._orig_iter) 2803 self.count += 1 2804 return result 2805 2806 2807def add_dirs(zf): 2808 """ 2809 Given a writable zip file zf, inject directory entries for 2810 any directories implied by the presence of children. 2811 """ 2812 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2813 zf.writestr(name, b"") 2814 return zf 2815 2816 2817def build_alpharep_fixture(): 2818 """ 2819 Create a zip file with this structure: 2820 2821 . 2822 ├── a.txt 2823 ├── b 2824 │ ├── c.txt 2825 │ ├── d 2826 │ │ └── e.txt 2827 │ └── f.txt 2828 └── g 2829 └── h 2830 └── i.txt 2831 2832 This fixture has the following key characteristics: 2833 2834 - a file at the root (a) 2835 - a file two levels deep (b/d/e) 2836 - multiple files in a directory (b/c, b/f) 2837 - a directory containing only a directory (g/h) 2838 2839 "alpha" because it uses alphabet 2840 "rep" because it's a representative example 2841 """ 2842 data = io.BytesIO() 2843 zf = zipfile.ZipFile(data, "w") 2844 zf.writestr("a.txt", b"content of a") 2845 zf.writestr("b/c.txt", b"content of c") 2846 zf.writestr("b/d/e.txt", b"content of e") 2847 zf.writestr("b/f.txt", b"content of f") 2848 zf.writestr("g/h/i.txt", b"content of i") 2849 zf.filename = "alpharep.zip" 2850 return zf 2851 2852 2853def pass_alpharep(meth): 2854 """ 2855 Given a method, wrap it in a for loop that invokes method 2856 with each subtest. 2857 """ 2858 2859 @functools.wraps(meth) 2860 def wrapper(self): 2861 for alpharep in self.zipfile_alpharep(): 2862 meth(self, alpharep=alpharep) 2863 2864 return wrapper 2865 2866 2867class TestPath(unittest.TestCase): 2868 def setUp(self): 2869 self.fixtures = contextlib.ExitStack() 2870 self.addCleanup(self.fixtures.close) 2871 2872 def zipfile_alpharep(self): 2873 with self.subTest(): 2874 yield build_alpharep_fixture() 2875 with self.subTest(): 2876 yield add_dirs(build_alpharep_fixture()) 2877 2878 def zipfile_ondisk(self, alpharep): 2879 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2880 buffer = alpharep.fp 2881 alpharep.close() 2882 path = tmpdir / alpharep.filename 2883 with path.open("wb") as strm: 2884 strm.write(buffer.getvalue()) 2885 return path 2886 2887 @pass_alpharep 2888 def test_iterdir_and_types(self, alpharep): 2889 root = zipfile.Path(alpharep) 2890 assert root.is_dir() 2891 a, b, g = root.iterdir() 2892 assert a.is_file() 2893 assert b.is_dir() 2894 assert g.is_dir() 2895 c, f, d = b.iterdir() 2896 assert c.is_file() and f.is_file() 2897 (e,) = d.iterdir() 2898 assert e.is_file() 2899 (h,) = g.iterdir() 2900 (i,) = h.iterdir() 2901 assert i.is_file() 2902 2903 @pass_alpharep 2904 def test_is_file_missing(self, alpharep): 2905 root = zipfile.Path(alpharep) 2906 assert not root.joinpath('missing.txt').is_file() 2907 2908 @pass_alpharep 2909 def test_iterdir_on_file(self, alpharep): 2910 root = zipfile.Path(alpharep) 2911 a, b, g = root.iterdir() 2912 with self.assertRaises(ValueError): 2913 a.iterdir() 2914 2915 @pass_alpharep 2916 def test_subdir_is_dir(self, alpharep): 2917 root = zipfile.Path(alpharep) 2918 assert (root / 'b').is_dir() 2919 assert (root / 'b/').is_dir() 2920 assert (root / 'g').is_dir() 2921 assert (root / 'g/').is_dir() 2922 2923 @pass_alpharep 2924 def test_open(self, alpharep): 2925 root = zipfile.Path(alpharep) 2926 a, b, g = root.iterdir() 2927 with a.open(encoding="utf-8") as strm: 2928 data = strm.read() 2929 assert data == "content of a" 2930 2931 def test_open_write(self): 2932 """ 2933 If the zipfile is open for write, it should be possible to 2934 write bytes or text to it. 2935 """ 2936 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 2937 with zf.joinpath('file.bin').open('wb') as strm: 2938 strm.write(b'binary contents') 2939 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 2940 strm.write('text file') 2941 2942 def test_open_extant_directory(self): 2943 """ 2944 Attempting to open a directory raises IsADirectoryError. 2945 """ 2946 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2947 with self.assertRaises(IsADirectoryError): 2948 zf.joinpath('b').open() 2949 2950 @pass_alpharep 2951 def test_open_binary_invalid_args(self, alpharep): 2952 root = zipfile.Path(alpharep) 2953 with self.assertRaises(ValueError): 2954 root.joinpath('a.txt').open('rb', encoding='utf-8') 2955 with self.assertRaises(ValueError): 2956 root.joinpath('a.txt').open('rb', 'utf-8') 2957 2958 def test_open_missing_directory(self): 2959 """ 2960 Attempting to open a missing directory raises FileNotFoundError. 2961 """ 2962 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2963 with self.assertRaises(FileNotFoundError): 2964 zf.joinpath('z').open() 2965 2966 @pass_alpharep 2967 def test_read(self, alpharep): 2968 root = zipfile.Path(alpharep) 2969 a, b, g = root.iterdir() 2970 assert a.read_text(encoding="utf-8") == "content of a" 2971 assert a.read_bytes() == b"content of a" 2972 2973 @pass_alpharep 2974 def test_joinpath(self, alpharep): 2975 root = zipfile.Path(alpharep) 2976 a = root.joinpath("a.txt") 2977 assert a.is_file() 2978 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2979 assert e.read_text(encoding="utf-8") == "content of e" 2980 2981 @pass_alpharep 2982 def test_joinpath_multiple(self, alpharep): 2983 root = zipfile.Path(alpharep) 2984 e = root.joinpath("b", "d", "e.txt") 2985 assert e.read_text(encoding="utf-8") == "content of e" 2986 2987 @pass_alpharep 2988 def test_traverse_truediv(self, alpharep): 2989 root = zipfile.Path(alpharep) 2990 a = root / "a.txt" 2991 assert a.is_file() 2992 e = root / "b" / "d" / "e.txt" 2993 assert e.read_text(encoding="utf-8") == "content of e" 2994 2995 @pass_alpharep 2996 def test_traverse_simplediv(self, alpharep): 2997 """ 2998 Disable the __future__.division when testing traversal. 2999 """ 3000 code = compile( 3001 source="zipfile.Path(alpharep) / 'a'", 3002 filename="(test)", 3003 mode="eval", 3004 dont_inherit=True, 3005 ) 3006 eval(code) 3007 3008 @pass_alpharep 3009 def test_pathlike_construction(self, alpharep): 3010 """ 3011 zipfile.Path should be constructable from a path-like object 3012 """ 3013 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3014 pathlike = pathlib.Path(str(zipfile_ondisk)) 3015 zipfile.Path(pathlike) 3016 3017 @pass_alpharep 3018 def test_traverse_pathlike(self, alpharep): 3019 root = zipfile.Path(alpharep) 3020 root / pathlib.Path("a") 3021 3022 @pass_alpharep 3023 def test_parent(self, alpharep): 3024 root = zipfile.Path(alpharep) 3025 assert (root / 'a').parent.at == '' 3026 assert (root / 'a' / 'b').parent.at == 'a/' 3027 3028 @pass_alpharep 3029 def test_dir_parent(self, alpharep): 3030 root = zipfile.Path(alpharep) 3031 assert (root / 'b').parent.at == '' 3032 assert (root / 'b/').parent.at == '' 3033 3034 @pass_alpharep 3035 def test_missing_dir_parent(self, alpharep): 3036 root = zipfile.Path(alpharep) 3037 assert (root / 'missing dir/').parent.at == '' 3038 3039 @pass_alpharep 3040 def test_mutability(self, alpharep): 3041 """ 3042 If the underlying zipfile is changed, the Path object should 3043 reflect that change. 3044 """ 3045 root = zipfile.Path(alpharep) 3046 a, b, g = root.iterdir() 3047 alpharep.writestr('foo.txt', 'foo') 3048 alpharep.writestr('bar/baz.txt', 'baz') 3049 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3050 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3051 (baz,) = (root / 'bar').iterdir() 3052 assert baz.read_text(encoding="utf-8") == 'baz' 3053 3054 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3055 3056 def huge_zipfile(self): 3057 """Create a read-only zipfile with a huge number of entries entries.""" 3058 strm = io.BytesIO() 3059 zf = zipfile.ZipFile(strm, "w") 3060 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3061 zf.writestr(entry, entry) 3062 zf.mode = 'r' 3063 return zf 3064 3065 def test_joinpath_constant_time(self): 3066 """ 3067 Ensure joinpath on items in zipfile is linear time. 3068 """ 3069 root = zipfile.Path(self.huge_zipfile()) 3070 entries = jaraco.itertools.Counter(root.iterdir()) 3071 for entry in entries: 3072 entry.joinpath('suffix') 3073 # Check the file iterated all items 3074 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3075 3076 # @func_timeout.func_set_timeout(3) 3077 def test_implied_dirs_performance(self): 3078 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3079 zipfile.CompleteDirs._implied_dirs(data) 3080 3081 @pass_alpharep 3082 def test_read_does_not_close(self, alpharep): 3083 alpharep = self.zipfile_ondisk(alpharep) 3084 with zipfile.ZipFile(alpharep) as file: 3085 for rep in range(2): 3086 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3087 3088 @pass_alpharep 3089 def test_subclass(self, alpharep): 3090 class Subclass(zipfile.Path): 3091 pass 3092 3093 root = Subclass(alpharep) 3094 assert isinstance(root / 'b', Subclass) 3095 3096 @pass_alpharep 3097 def test_filename(self, alpharep): 3098 root = zipfile.Path(alpharep) 3099 assert root.filename == pathlib.Path('alpharep.zip') 3100 3101 @pass_alpharep 3102 def test_root_name(self, alpharep): 3103 """ 3104 The name of the root should be the name of the zipfile 3105 """ 3106 root = zipfile.Path(alpharep) 3107 assert root.name == 'alpharep.zip' == root.filename.name 3108 3109 @pass_alpharep 3110 def test_suffix(self, alpharep): 3111 """ 3112 The suffix of the root should be the suffix of the zipfile. 3113 The suffix of each nested file is the final component's last suffix, if any. 3114 Includes the leading period, just like pathlib.Path. 3115 """ 3116 root = zipfile.Path(alpharep) 3117 assert root.suffix == '.zip' == root.filename.suffix 3118 3119 b = root / "b.txt" 3120 assert b.suffix == ".txt" 3121 3122 c = root / "c" / "filename.tar.gz" 3123 assert c.suffix == ".gz" 3124 3125 d = root / "d" 3126 assert d.suffix == "" 3127 3128 @pass_alpharep 3129 def test_suffixes(self, alpharep): 3130 """ 3131 The suffix of the root should be the suffix of the zipfile. 3132 The suffix of each nested file is the final component's last suffix, if any. 3133 Includes the leading period, just like pathlib.Path. 3134 """ 3135 root = zipfile.Path(alpharep) 3136 assert root.suffixes == ['.zip'] == root.filename.suffixes 3137 3138 b = root / 'b.txt' 3139 assert b.suffixes == ['.txt'] 3140 3141 c = root / 'c' / 'filename.tar.gz' 3142 assert c.suffixes == ['.tar', '.gz'] 3143 3144 d = root / 'd' 3145 assert d.suffixes == [] 3146 3147 e = root / '.hgrc' 3148 assert e.suffixes == [] 3149 3150 @pass_alpharep 3151 def test_stem(self, alpharep): 3152 """ 3153 The final path component, without its suffix 3154 """ 3155 root = zipfile.Path(alpharep) 3156 assert root.stem == 'alpharep' == root.filename.stem 3157 3158 b = root / "b.txt" 3159 assert b.stem == "b" 3160 3161 c = root / "c" / "filename.tar.gz" 3162 assert c.stem == "filename.tar" 3163 3164 d = root / "d" 3165 assert d.stem == "d" 3166 3167 @pass_alpharep 3168 def test_root_parent(self, alpharep): 3169 root = zipfile.Path(alpharep) 3170 assert root.parent == pathlib.Path('.') 3171 root.root.filename = 'foo/bar.zip' 3172 assert root.parent == pathlib.Path('foo') 3173 3174 @pass_alpharep 3175 def test_root_unnamed(self, alpharep): 3176 """ 3177 It is an error to attempt to get the name 3178 or parent of an unnamed zipfile. 3179 """ 3180 alpharep.filename = None 3181 root = zipfile.Path(alpharep) 3182 with self.assertRaises(TypeError): 3183 root.name 3184 with self.assertRaises(TypeError): 3185 root.parent 3186 3187 # .name and .parent should still work on subs 3188 sub = root / "b" 3189 assert sub.name == "b" 3190 assert sub.parent 3191 3192 @pass_alpharep 3193 def test_inheritance(self, alpharep): 3194 cls = type('PathChild', (zipfile.Path,), {}) 3195 for alpharep in self.zipfile_alpharep(): 3196 file = cls(alpharep).joinpath('some dir').parent 3197 assert isinstance(file, cls) 3198 3199 3200if __name__ == "__main__": 3201 unittest.main() 3202