1"""This is the normal test_zipfile test suite but using the 2zipfile_aes.AESZipFile object instead of the zipfile.ZipFile object. 3""" 4import contextlib 5import io 6import os 7import sys 8import importlib.util 9import pathlib 10import posixpath 11import time 12import struct 13from pyzipper import zipfile 14from pyzipper import zipfile_aes 15import unittest 16 17 18from tempfile import TemporaryFile 19from random import randint, random, getrandbits 20 21from test.support import script_helper 22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd, 23 requires_zlib, requires_bz2, requires_lzma, 24 captured_stdout) 25 26TESTFN2 = TESTFN + "2" 27TESTFNDIR = TESTFN + "d" 28FIXEDTEST_SIZE = 1000 29DATAFILES_DIR = 'zipfile_datafiles' 30 31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 32 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 33 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 34 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 35 36def getrandbytes(size): 37 return getrandbits(8 * size).to_bytes(size, 'little') 38 39def get_files(test): 40 yield TESTFN2 41 with TemporaryFile() as f: 42 yield f 43 test.assertFalse(f.closed) 44 with io.BytesIO() as f: 45 yield f 46 test.assertFalse(f.closed) 47 48class AbstractTestsWithSourceFile: 49 @classmethod 50 def setUpClass(cls): 51 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 52 (i, random()), "ascii") 53 for i in range(FIXEDTEST_SIZE)] 54 cls.data = b''.join(cls.line_gen) 55 56 def setUp(self): 57 # Make a source file with some lines 58 with open(TESTFN, "wb") as fp: 59 fp.write(self.data) 60 61 def make_test_archive(self, f, compression, compresslevel=None): 62 kwargs = {'compression': compression, 'compresslevel': compresslevel} 63 # Create the ZIP archive 64 with zipfile_aes.AESZipFile(f, "w", **kwargs) as zipfp: 65 zipfp.write(TESTFN, "another.name") 66 zipfp.write(TESTFN, TESTFN) 67 zipfp.writestr("strfile", self.data) 68 with zipfp.open('written-open-w', mode='w') as f: 69 for line in self.line_gen: 70 f.write(line) 71 72 def zip_test(self, f, compression, compresslevel=None): 73 self.make_test_archive(f, compression, compresslevel) 74 75 # Read the ZIP archive 76 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 77 self.assertEqual(zipfp.read(TESTFN), self.data) 78 self.assertEqual(zipfp.read("another.name"), self.data) 79 self.assertEqual(zipfp.read("strfile"), self.data) 80 81 # Print the ZIP directory 82 fp = io.StringIO() 83 zipfp.printdir(file=fp) 84 directory = fp.getvalue() 85 lines = directory.splitlines() 86 self.assertEqual(len(lines), 5) # Number of files + header 87 88 self.assertIn('File Name', lines[0]) 89 self.assertIn('Modified', lines[0]) 90 self.assertIn('Size', lines[0]) 91 92 fn, date, time_, size = lines[1].split() 93 self.assertEqual(fn, 'another.name') 94 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 95 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 96 self.assertEqual(size, str(len(self.data))) 97 98 # Check the namelist 99 names = zipfp.namelist() 100 self.assertEqual(len(names), 4) 101 self.assertIn(TESTFN, names) 102 self.assertIn("another.name", names) 103 self.assertIn("strfile", names) 104 self.assertIn("written-open-w", names) 105 106 # Check infolist 107 infos = zipfp.infolist() 108 names = [i.filename for i in infos] 109 self.assertEqual(len(names), 4) 110 self.assertIn(TESTFN, names) 111 self.assertIn("another.name", names) 112 self.assertIn("strfile", names) 113 self.assertIn("written-open-w", names) 114 for i in infos: 115 self.assertEqual(i.file_size, len(self.data)) 116 117 # check getinfo 118 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 119 info = zipfp.getinfo(nm) 120 self.assertEqual(info.filename, nm) 121 self.assertEqual(info.file_size, len(self.data)) 122 123 # Check that testzip doesn't raise an exception 124 zipfp.testzip() 125 126 def test_basic(self): 127 for f in get_files(self): 128 self.zip_test(f, self.compression) 129 130 def zip_open_test(self, f, compression): 131 self.make_test_archive(f, compression) 132 133 # Read the ZIP archive 134 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 135 zipdata1 = [] 136 with zipfp.open(TESTFN) as zipopen1: 137 while True: 138 read_data = zipopen1.read(256) 139 if not read_data: 140 break 141 zipdata1.append(read_data) 142 143 zipdata2 = [] 144 with zipfp.open("another.name") as zipopen2: 145 while True: 146 read_data = zipopen2.read(256) 147 if not read_data: 148 break 149 zipdata2.append(read_data) 150 151 self.assertEqual(b''.join(zipdata1), self.data) 152 self.assertEqual(b''.join(zipdata2), self.data) 153 154 def test_open(self): 155 for f in get_files(self): 156 self.zip_open_test(f, self.compression) 157 158 def test_open_with_pathlike(self): 159 path = pathlib.Path(TESTFN2) 160 self.zip_open_test(path, self.compression) 161 with zipfile_aes.AESZipFile(path, "r", self.compression) as zipfp: 162 self.assertIsInstance(zipfp.filename, str) 163 164 def zip_random_open_test(self, f, compression): 165 self.make_test_archive(f, compression) 166 167 # Read the ZIP archive 168 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 169 zipdata1 = [] 170 with zipfp.open(TESTFN) as zipopen1: 171 while True: 172 read_data = zipopen1.read(randint(1, 1024)) 173 if not read_data: 174 break 175 zipdata1.append(read_data) 176 177 self.assertEqual(b''.join(zipdata1), self.data) 178 179 def test_random_open(self): 180 for f in get_files(self): 181 self.zip_random_open_test(f, self.compression) 182 183 def zip_read1_test(self, f, compression): 184 self.make_test_archive(f, compression) 185 186 # Read the ZIP archive 187 with zipfile_aes.AESZipFile(f, "r") as zipfp, \ 188 zipfp.open(TESTFN) as zipopen: 189 zipdata = [] 190 while True: 191 read_data = zipopen.read1(-1) 192 if not read_data: 193 break 194 zipdata.append(read_data) 195 196 self.assertEqual(b''.join(zipdata), self.data) 197 198 def test_read1(self): 199 for f in get_files(self): 200 self.zip_read1_test(f, self.compression) 201 202 def zip_read1_10_test(self, f, compression): 203 self.make_test_archive(f, compression) 204 205 # Read the ZIP archive 206 with zipfile_aes.AESZipFile(f, "r") as zipfp, \ 207 zipfp.open(TESTFN) as zipopen: 208 zipdata = [] 209 while True: 210 read_data = zipopen.read1(10) 211 self.assertLessEqual(len(read_data), 10) 212 if not read_data: 213 break 214 zipdata.append(read_data) 215 216 self.assertEqual(b''.join(zipdata), self.data) 217 218 def test_read1_10(self): 219 for f in get_files(self): 220 self.zip_read1_10_test(f, self.compression) 221 222 def zip_readline_read_test(self, f, compression): 223 self.make_test_archive(f, compression) 224 225 # Read the ZIP archive 226 with zipfile_aes.AESZipFile(f, "r") as zipfp, \ 227 zipfp.open(TESTFN) as zipopen: 228 data = b'' 229 while True: 230 read = zipopen.readline() 231 if not read: 232 break 233 data += read 234 235 read = zipopen.read(100) 236 if not read: 237 break 238 data += read 239 240 self.assertEqual(data, self.data) 241 242 def test_readline_read(self): 243 # Issue #7610: calls to readline() interleaved with calls to read(). 244 for f in get_files(self): 245 self.zip_readline_read_test(f, self.compression) 246 247 def zip_readline_test(self, f, compression): 248 self.make_test_archive(f, compression) 249 250 # Read the ZIP archive 251 with zipfile_aes.AESZipFile(f, "r") as zipfp: 252 with zipfp.open(TESTFN) as zipopen: 253 for line in self.line_gen: 254 linedata = zipopen.readline() 255 self.assertEqual(linedata, line) 256 257 def test_readline(self): 258 for f in get_files(self): 259 self.zip_readline_test(f, self.compression) 260 261 def zip_readlines_test(self, f, compression): 262 self.make_test_archive(f, compression) 263 264 # Read the ZIP archive 265 with zipfile_aes.AESZipFile(f, "r") as zipfp: 266 with zipfp.open(TESTFN) as zipopen: 267 ziplines = zipopen.readlines() 268 for line, zipline in zip(self.line_gen, ziplines): 269 self.assertEqual(zipline, line) 270 271 def test_readlines(self): 272 for f in get_files(self): 273 self.zip_readlines_test(f, self.compression) 274 275 def zip_iterlines_test(self, f, compression): 276 self.make_test_archive(f, compression) 277 278 # Read the ZIP archive 279 with zipfile_aes.AESZipFile(f, "r") as zipfp: 280 with zipfp.open(TESTFN) as zipopen: 281 for line, zipline in zip(self.line_gen, zipopen): 282 self.assertEqual(zipline, line) 283 284 def test_iterlines(self): 285 for f in get_files(self): 286 self.zip_iterlines_test(f, self.compression) 287 288 def test_low_compression(self): 289 """Check for cases where compressed data is larger than original.""" 290 # Create the ZIP archive 291 with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipfp: 292 zipfp.writestr("strfile", '12') 293 294 # Get an open object for strfile 295 with zipfile_aes.AESZipFile(TESTFN2, "r", self.compression) as zipfp: 296 with zipfp.open("strfile") as openobj: 297 self.assertEqual(openobj.read(1), b'1') 298 self.assertEqual(openobj.read(1), b'2') 299 300 def test_writestr_compression(self): 301 zipfp = zipfile_aes.AESZipFile(TESTFN2, "w") 302 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 303 info = zipfp.getinfo('b.txt') 304 self.assertEqual(info.compress_type, self.compression) 305 306 def test_writestr_compresslevel(self): 307 zipfp = zipfile_aes.AESZipFile(TESTFN2, "w", compresslevel=1) 308 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 309 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 310 compresslevel=2) 311 312 # Compression level follows the constructor. 313 a_info = zipfp.getinfo('a.txt') 314 self.assertEqual(a_info.compress_type, self.compression) 315 self.assertEqual(a_info._compresslevel, 1) 316 317 # Compression level is overridden. 318 b_info = zipfp.getinfo('b.txt') 319 self.assertEqual(b_info.compress_type, self.compression) 320 self.assertEqual(b_info._compresslevel, 2) 321 322 def test_read_return_size(self): 323 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 324 # than requested. 325 for test_size in (1, 4095, 4096, 4097, 16384): 326 file_size = test_size + 1 327 junk = getrandbytes(file_size) 328 with zipfile_aes.AESZipFile(io.BytesIO(), "w", self.compression) as zipf: 329 zipf.writestr('foo', junk) 330 with zipf.open('foo', 'r') as fp: 331 buf = fp.read(test_size) 332 self.assertEqual(len(buf), test_size) 333 334 def test_truncated_zipfile(self): 335 fp = io.BytesIO() 336 with zipfile_aes.AESZipFile(fp, mode='w') as zipf: 337 zipf.writestr('strfile', self.data, compress_type=self.compression) 338 end_offset = fp.tell() 339 zipfiledata = fp.getvalue() 340 341 fp = io.BytesIO(zipfiledata) 342 with zipfile_aes.AESZipFile(fp) as zipf: 343 with zipf.open('strfile') as zipopen: 344 fp.truncate(end_offset - 20) 345 with self.assertRaises(EOFError): 346 zipopen.read() 347 348 fp = io.BytesIO(zipfiledata) 349 with zipfile_aes.AESZipFile(fp) as zipf: 350 with zipf.open('strfile') as zipopen: 351 fp.truncate(end_offset - 20) 352 with self.assertRaises(EOFError): 353 while zipopen.read(100): 354 pass 355 356 fp = io.BytesIO(zipfiledata) 357 with zipfile_aes.AESZipFile(fp) as zipf: 358 with zipf.open('strfile') as zipopen: 359 fp.truncate(end_offset - 20) 360 with self.assertRaises(EOFError): 361 while zipopen.read1(100): 362 pass 363 364 def test_repr(self): 365 fname = 'file.name' 366 for f in get_files(self): 367 with zipfile_aes.AESZipFile(f, 'w', self.compression) as zipfp: 368 zipfp.write(TESTFN, fname) 369 r = repr(zipfp) 370 self.assertIn("mode='w'", r) 371 372 with zipfile_aes.AESZipFile(f, 'r') as zipfp: 373 r = repr(zipfp) 374 if isinstance(f, str): 375 self.assertIn('filename=%r' % f, r) 376 else: 377 self.assertIn('file=%r' % f, r) 378 self.assertIn("mode='r'", r) 379 r = repr(zipfp.getinfo(fname)) 380 self.assertIn('filename=%r' % fname, r) 381 self.assertIn('filemode=', r) 382 self.assertIn('file_size=', r) 383 if self.compression != zipfile.ZIP_STORED: 384 self.assertIn('compress_type=', r) 385 self.assertIn('compress_size=', r) 386 with zipfp.open(fname) as zipopen: 387 r = repr(zipopen) 388 self.assertIn('name=%r' % fname, r) 389 self.assertIn("mode='r'", r) 390 if self.compression != zipfile.ZIP_STORED: 391 self.assertIn('compress_type=', r) 392 self.assertIn('[closed]', repr(zipopen)) 393 self.assertIn('[closed]', repr(zipfp)) 394 395 def test_compresslevel_basic(self): 396 for f in get_files(self): 397 self.zip_test(f, self.compression, compresslevel=9) 398 399 def test_per_file_compresslevel(self): 400 """Check that files within a Zip archive can have different 401 compression levels.""" 402 with zipfile_aes.AESZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 403 zipfp.write(TESTFN, 'compress_1') 404 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 405 one_info = zipfp.getinfo('compress_1') 406 nine_info = zipfp.getinfo('compress_9') 407 self.assertEqual(one_info._compresslevel, 1) 408 self.assertEqual(nine_info._compresslevel, 9) 409 410 def tearDown(self): 411 unlink(TESTFN) 412 unlink(TESTFN2) 413 414 415class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 416 unittest.TestCase): 417 compression = zipfile.ZIP_STORED 418 test_low_compression = None 419 420 def zip_test_writestr_permissions(self, f, compression): 421 # Make sure that writestr and open(... mode='w') create files with 422 # mode 0600, when they are passed a name rather than a ZipInfo 423 # instance. 424 425 self.make_test_archive(f, compression) 426 with zipfile_aes.AESZipFile(f, "r") as zipfp: 427 zinfo = zipfp.getinfo('strfile') 428 self.assertEqual(zinfo.external_attr, 0o600 << 16) 429 430 zinfo2 = zipfp.getinfo('written-open-w') 431 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 432 433 def test_writestr_permissions(self): 434 for f in get_files(self): 435 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 436 437 def test_absolute_arcnames(self): 438 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 439 zipfp.write(TESTFN, "/absolute") 440 441 with zipfile_aes.AESZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 442 self.assertEqual(zipfp.namelist(), ["absolute"]) 443 444 def test_append_to_zip_file(self): 445 """Test appending to an existing zipfile.""" 446 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 447 zipfp.write(TESTFN, TESTFN) 448 449 with zipfile_aes.AESZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 450 zipfp.writestr("strfile", self.data) 451 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 452 453 def test_append_to_non_zip_file(self): 454 """Test appending to an existing file that is not a zipfile.""" 455 # NOTE: this test fails if len(d) < 22 because of the first 456 # line "fpin.seek(-22, 2)" in _EndRecData 457 data = b'I am not a ZipFile!'*10 458 with open(TESTFN2, 'wb') as f: 459 f.write(data) 460 461 with zipfile_aes.AESZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 462 zipfp.write(TESTFN, TESTFN) 463 464 with open(TESTFN2, 'rb') as f: 465 f.seek(len(data)) 466 with zipfile_aes.AESZipFile(f, "r") as zipfp: 467 self.assertEqual(zipfp.namelist(), [TESTFN]) 468 self.assertEqual(zipfp.read(TESTFN), self.data) 469 with open(TESTFN2, 'rb') as f: 470 self.assertEqual(f.read(len(data)), data) 471 zipfiledata = f.read() 472 with io.BytesIO(zipfiledata) as bio, zipfile_aes.AESZipFile(bio) as zipfp: 473 self.assertEqual(zipfp.namelist(), [TESTFN]) 474 self.assertEqual(zipfp.read(TESTFN), self.data) 475 476 def test_read_concatenated_zip_file(self): 477 with io.BytesIO() as bio: 478 with zipfile_aes.AESZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 479 zipfp.write(TESTFN, TESTFN) 480 zipfiledata = bio.getvalue() 481 data = b'I am not a ZipFile!'*10 482 with open(TESTFN2, 'wb') as f: 483 f.write(data) 484 f.write(zipfiledata) 485 486 with zipfile_aes.AESZipFile(TESTFN2) as zipfp: 487 self.assertEqual(zipfp.namelist(), [TESTFN]) 488 self.assertEqual(zipfp.read(TESTFN), self.data) 489 490 def test_append_to_concatenated_zip_file(self): 491 with io.BytesIO() as bio: 492 with zipfile_aes.AESZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 493 zipfp.write(TESTFN, TESTFN) 494 zipfiledata = bio.getvalue() 495 data = b'I am not a ZipFile!'*1000000 496 with open(TESTFN2, 'wb') as f: 497 f.write(data) 498 f.write(zipfiledata) 499 500 with zipfile_aes.AESZipFile(TESTFN2, 'a') as zipfp: 501 self.assertEqual(zipfp.namelist(), [TESTFN]) 502 zipfp.writestr('strfile', self.data) 503 504 with open(TESTFN2, 'rb') as f: 505 self.assertEqual(f.read(len(data)), data) 506 zipfiledata = f.read() 507 with io.BytesIO(zipfiledata) as bio, zipfile_aes.AESZipFile(bio) as zipfp: 508 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 509 self.assertEqual(zipfp.read(TESTFN), self.data) 510 self.assertEqual(zipfp.read('strfile'), self.data) 511 512 def test_ignores_newline_at_end(self): 513 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 514 zipfp.write(TESTFN, TESTFN) 515 with open(TESTFN2, 'a') as f: 516 f.write("\r\n\00\00\00") 517 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 518 self.assertIsInstance(zipfp, zipfile_aes.AESZipFile) 519 520 def test_ignores_stuff_appended_past_comments(self): 521 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 522 zipfp.comment = b"this is a comment" 523 zipfp.write(TESTFN, TESTFN) 524 with open(TESTFN2, 'a') as f: 525 f.write("abcdef\r\n") 526 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 527 self.assertIsInstance(zipfp, zipfile_aes.AESZipFile) 528 self.assertEqual(zipfp.comment, b"this is a comment") 529 530 def test_write_default_name(self): 531 """Check that calling ZipFile.write without arcname specified 532 produces the expected result.""" 533 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 534 zipfp.write(TESTFN) 535 with open(TESTFN, "rb") as f: 536 self.assertEqual(zipfp.read(TESTFN), f.read()) 537 538 def test_write_to_readonly(self): 539 """Check that trying to call write() on a readonly ZipFile object 540 raises a ValueError.""" 541 with zipfile_aes.AESZipFile(TESTFN2, mode="w") as zipfp: 542 zipfp.writestr("somefile.txt", "bogus") 543 544 with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipfp: 545 self.assertRaises(ValueError, zipfp.write, TESTFN) 546 547 with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipfp: 548 with self.assertRaises(ValueError): 549 zipfp.open(TESTFN, mode='w') 550 551 def test_add_file_before_1980(self): 552 # Set atime and mtime to 1970-01-01 553 os.utime(TESTFN, (0, 0)) 554 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 555 self.assertRaises(ValueError, zipfp.write, TESTFN) 556 557 with zipfile_aes.AESZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 558 zipfp.write(TESTFN) 559 zinfo = zipfp.getinfo(TESTFN) 560 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 561 562 def test_add_file_after_2107(self): 563 # Set atime and mtime to 2108-12-30 564 try: 565 os.utime(TESTFN, (4386268800, 4386268800)) 566 except OverflowError: 567 self.skipTest('Host fs cannot set timestamp to required value.') 568 569 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 570 self.assertRaises(struct.error, zipfp.write, TESTFN) 571 572 with zipfile_aes.AESZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 573 zipfp.write(TESTFN) 574 zinfo = zipfp.getinfo(TESTFN) 575 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 576 577 578@requires_zlib 579class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 580 unittest.TestCase): 581 compression = zipfile.ZIP_DEFLATED 582 583 def test_per_file_compression(self): 584 """Check that files within a Zip archive can have different 585 compression options.""" 586 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 587 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 588 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 589 sinfo = zipfp.getinfo('storeme') 590 dinfo = zipfp.getinfo('deflateme') 591 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 592 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 593 594@requires_bz2 595class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 596 unittest.TestCase): 597 compression = zipfile.ZIP_BZIP2 598 599@requires_lzma 600class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 601 unittest.TestCase): 602 compression = zipfile.ZIP_LZMA 603 604 605class AbstractTestZip64InSmallFiles: 606 # These tests test the ZIP64 functionality without using large files, 607 # see test_zipfile64 for proper tests. 608 609 @classmethod 610 def setUpClass(cls): 611 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 612 for i in range(0, FIXEDTEST_SIZE)) 613 cls.data = b'\n'.join(line_gen) 614 615 def setUp(self): 616 self._limit = zipfile.ZIP64_LIMIT 617 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 618 zipfile.ZIP64_LIMIT = 1000 619 zipfile.ZIP_FILECOUNT_LIMIT = 9 620 621 # Make a source file with some lines 622 with open(TESTFN, "wb") as fp: 623 fp.write(self.data) 624 625 def zip_test(self, f, compression): 626 # Create the ZIP archive 627 with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=True) as zipfp: 628 zipfp.write(TESTFN, "another.name") 629 zipfp.write(TESTFN, TESTFN) 630 zipfp.writestr("strfile", self.data) 631 632 # Read the ZIP archive 633 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 634 self.assertEqual(zipfp.read(TESTFN), self.data) 635 self.assertEqual(zipfp.read("another.name"), self.data) 636 self.assertEqual(zipfp.read("strfile"), self.data) 637 638 # Print the ZIP directory 639 fp = io.StringIO() 640 zipfp.printdir(fp) 641 642 directory = fp.getvalue() 643 lines = directory.splitlines() 644 self.assertEqual(len(lines), 4) # Number of files + header 645 646 self.assertIn('File Name', lines[0]) 647 self.assertIn('Modified', lines[0]) 648 self.assertIn('Size', lines[0]) 649 650 fn, date, time_, size = lines[1].split() 651 self.assertEqual(fn, 'another.name') 652 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 653 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 654 self.assertEqual(size, str(len(self.data))) 655 656 # Check the namelist 657 names = zipfp.namelist() 658 self.assertEqual(len(names), 3) 659 self.assertIn(TESTFN, names) 660 self.assertIn("another.name", names) 661 self.assertIn("strfile", names) 662 663 # Check infolist 664 infos = zipfp.infolist() 665 names = [i.filename for i in infos] 666 self.assertEqual(len(names), 3) 667 self.assertIn(TESTFN, names) 668 self.assertIn("another.name", names) 669 self.assertIn("strfile", names) 670 for i in infos: 671 self.assertEqual(i.file_size, len(self.data)) 672 673 # check getinfo 674 for nm in (TESTFN, "another.name", "strfile"): 675 info = zipfp.getinfo(nm) 676 self.assertEqual(info.filename, nm) 677 self.assertEqual(info.file_size, len(self.data)) 678 679 # Check that testzip doesn't raise an exception 680 zipfp.testzip() 681 682 def test_basic(self): 683 for f in get_files(self): 684 self.zip_test(f, self.compression) 685 686 def test_too_many_files(self): 687 # This test checks that more than 64k files can be added to an archive, 688 # and that the resulting archive can be read properly by ZipFile 689 zipf = zipfile_aes.AESZipFile(TESTFN, "w", self.compression, 690 allowZip64=True) 691 zipf.debug = 100 692 numfiles = 15 693 for i in range(numfiles): 694 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 695 self.assertEqual(len(zipf.namelist()), numfiles) 696 zipf.close() 697 698 zipf2 = zipfile_aes.AESZipFile(TESTFN, "r", self.compression) 699 self.assertEqual(len(zipf2.namelist()), numfiles) 700 for i in range(numfiles): 701 content = zipf2.read("foo%08d" % i).decode('ascii') 702 self.assertEqual(content, "%d" % (i**3 % 57)) 703 zipf2.close() 704 705 def test_too_many_files_append(self): 706 zipf = zipfile_aes.AESZipFile(TESTFN, "w", self.compression, 707 allowZip64=False) 708 zipf.debug = 100 709 numfiles = 9 710 for i in range(numfiles): 711 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 712 self.assertEqual(len(zipf.namelist()), numfiles) 713 with self.assertRaises(zipfile.LargeZipFile): 714 zipf.writestr("foo%08d" % numfiles, b'') 715 self.assertEqual(len(zipf.namelist()), numfiles) 716 zipf.close() 717 718 zipf = zipfile_aes.AESZipFile(TESTFN, "a", self.compression, 719 allowZip64=False) 720 zipf.debug = 100 721 self.assertEqual(len(zipf.namelist()), numfiles) 722 with self.assertRaises(zipfile.LargeZipFile): 723 zipf.writestr("foo%08d" % numfiles, b'') 724 self.assertEqual(len(zipf.namelist()), numfiles) 725 zipf.close() 726 727 zipf = zipfile_aes.AESZipFile(TESTFN, "a", self.compression, 728 allowZip64=True) 729 zipf.debug = 100 730 self.assertEqual(len(zipf.namelist()), numfiles) 731 numfiles2 = 15 732 for i in range(numfiles, numfiles2): 733 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 734 self.assertEqual(len(zipf.namelist()), numfiles2) 735 zipf.close() 736 737 zipf2 = zipfile_aes.AESZipFile(TESTFN, "r", self.compression) 738 self.assertEqual(len(zipf2.namelist()), numfiles2) 739 for i in range(numfiles2): 740 content = zipf2.read("foo%08d" % i).decode('ascii') 741 self.assertEqual(content, "%d" % (i**3 % 57)) 742 zipf2.close() 743 744 def tearDown(self): 745 zipfile.ZIP64_LIMIT = self._limit 746 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 747 unlink(TESTFN) 748 unlink(TESTFN2) 749 750 751class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 752 unittest.TestCase): 753 compression = zipfile.ZIP_STORED 754 755 def large_file_exception_test(self, f, compression): 756 with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=False) as zipfp: 757 self.assertRaises(zipfile.LargeZipFile, 758 zipfp.write, TESTFN, "another.name") 759 760 def large_file_exception_test2(self, f, compression): 761 with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=False) as zipfp: 762 self.assertRaises(zipfile.LargeZipFile, 763 zipfp.writestr, "another.name", self.data) 764 765 def test_large_file_exception(self): 766 for f in get_files(self): 767 self.large_file_exception_test(f, zipfile.ZIP_STORED) 768 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 769 770 def test_absolute_arcnames(self): 771 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 772 allowZip64=True) as zipfp: 773 zipfp.write(TESTFN, "/absolute") 774 775 with zipfile_aes.AESZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 776 self.assertEqual(zipfp.namelist(), ["absolute"]) 777 778 def test_append(self): 779 # Test that appending to the Zip64 archive doesn't change 780 # extra fields of existing entries. 781 with zipfile_aes.AESZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 782 zipfp.writestr("strfile", self.data) 783 with zipfile_aes.AESZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 784 zinfo = zipfp.getinfo("strfile") 785 extra = zinfo.extra 786 with zipfile_aes.AESZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 787 zipfp.writestr("strfile2", self.data) 788 with zipfile_aes.AESZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 789 zinfo = zipfp.getinfo("strfile") 790 self.assertEqual(zinfo.extra, extra) 791 792@requires_zlib 793class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 794 unittest.TestCase): 795 compression = zipfile.ZIP_DEFLATED 796 797@requires_bz2 798class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 799 unittest.TestCase): 800 compression = zipfile.ZIP_BZIP2 801 802@requires_lzma 803class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 804 unittest.TestCase): 805 compression = zipfile.ZIP_LZMA 806 807 808class AbstractWriterTests: 809 810 def tearDown(self): 811 unlink(TESTFN2) 812 813 def test_close_after_close(self): 814 data = b'content' 815 with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipf: 816 w = zipf.open('test', 'w') 817 w.write(data) 818 w.close() 819 self.assertTrue(w.closed) 820 w.close() 821 self.assertTrue(w.closed) 822 self.assertEqual(zipf.read('test'), data) 823 824 def test_write_after_close(self): 825 data = b'content' 826 with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipf: 827 w = zipf.open('test', 'w') 828 w.write(data) 829 w.close() 830 self.assertTrue(w.closed) 831 self.assertRaises(ValueError, w.write, b'') 832 self.assertEqual(zipf.read('test'), data) 833 834class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 835 compression = zipfile.ZIP_STORED 836 837@requires_zlib 838class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 839 compression = zipfile.ZIP_DEFLATED 840 841@requires_bz2 842class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 843 compression = zipfile.ZIP_BZIP2 844 845@requires_lzma 846class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 847 compression = zipfile.ZIP_LZMA 848 849 850@unittest.skipIf(sys.version_info[0:2] < (3, 5), 'Requires Python >= 3.5') 851class PyZipFileTests(unittest.TestCase): 852 def assertCompiledIn(self, name, namelist): 853 if name + 'o' not in namelist: 854 self.assertIn(name + 'c', namelist) 855 856 def requiresWriteAccess(self, path): 857 # effective_ids unavailable on windows 858 if not os.access(path, os.W_OK, 859 effective_ids=os.access in os.supports_effective_ids): 860 self.skipTest('requires write access to the installed location') 861 filename = os.path.join(path, 'test_zipfile.try') 862 try: 863 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 864 os.close(fd) 865 except Exception: 866 self.skipTest('requires write access to the installed location') 867 unlink(filename) 868 869 def test_write_pyfile(self): 870 self.requiresWriteAccess(os.path.dirname(__file__)) 871 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 872 fn = __file__ 873 if fn.endswith('.pyc'): 874 path_split = fn.split(os.sep) 875 if os.altsep is not None: 876 path_split.extend(fn.split(os.altsep)) 877 if '__pycache__' in path_split: 878 fn = importlib.util.source_from_cache(fn) 879 else: 880 fn = fn[:-1] 881 882 zipfp.writepy(fn) 883 884 bn = os.path.basename(fn) 885 self.assertNotIn(bn, zipfp.namelist()) 886 self.assertCompiledIn(bn, zipfp.namelist()) 887 888 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 889 fn = __file__ 890 if fn.endswith('.pyc'): 891 fn = fn[:-1] 892 893 zipfp.writepy(fn, "testpackage") 894 895 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 896 self.assertNotIn(bn, zipfp.namelist()) 897 self.assertCompiledIn(bn, zipfp.namelist()) 898 899 def test_write_python_package(self): 900 import email 901 packagedir = os.path.dirname(email.__file__) 902 self.requiresWriteAccess(packagedir) 903 904 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 905 zipfp.writepy(packagedir) 906 907 # Check for a couple of modules at different levels of the 908 # hierarchy 909 names = zipfp.namelist() 910 self.assertCompiledIn('email/__init__.py', names) 911 self.assertCompiledIn('email/mime/text.py', names) 912 913 def test_write_filtered_python_package(self): 914 import test 915 packagedir = os.path.dirname(test.__file__) 916 self.requiresWriteAccess(packagedir) 917 918 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 919 920 # first make sure that the test folder gives error messages 921 # (on the badsyntax_... files) 922 with captured_stdout() as reportSIO: 923 zipfp.writepy(packagedir) 924 reportStr = reportSIO.getvalue() 925 self.assertTrue('SyntaxError' in reportStr) 926 927 # then check that the filter works on the whole package 928 with captured_stdout() as reportSIO: 929 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 930 reportStr = reportSIO.getvalue() 931 self.assertTrue('SyntaxError' not in reportStr) 932 933 # then check that the filter works on individual files 934 def filter(path): 935 return not os.path.basename(path).startswith("bad") 936 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 937 zipfp.writepy(packagedir, filterfunc=filter) 938 reportStr = reportSIO.getvalue() 939 if reportStr: 940 print(reportStr) 941 self.assertTrue('SyntaxError' not in reportStr) 942 943 def test_write_with_optimization(self): 944 import email 945 packagedir = os.path.dirname(email.__file__) 946 self.requiresWriteAccess(packagedir) 947 optlevel = 1 if __debug__ else 0 948 ext = '.pyc' 949 950 with TemporaryFile() as t, \ 951 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 952 zipfp.writepy(packagedir) 953 954 names = zipfp.namelist() 955 self.assertIn('email/__init__' + ext, names) 956 self.assertIn('email/mime/text' + ext, names) 957 958 def test_write_python_directory(self): 959 os.mkdir(TESTFN2) 960 try: 961 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 962 fp.write("print(42)\n") 963 964 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 965 fp.write("print(42 * 42)\n") 966 967 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 968 fp.write("bla bla bla\n") 969 970 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 971 zipfp.writepy(TESTFN2) 972 973 names = zipfp.namelist() 974 self.assertCompiledIn('mod1.py', names) 975 self.assertCompiledIn('mod2.py', names) 976 self.assertNotIn('mod2.txt', names) 977 978 finally: 979 rmtree(TESTFN2) 980 981 def test_write_python_directory_filtered(self): 982 os.mkdir(TESTFN2) 983 try: 984 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 985 fp.write("print(42)\n") 986 987 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 988 fp.write("print(42 * 42)\n") 989 990 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 991 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 992 not fn.endswith('mod2.py')) 993 994 names = zipfp.namelist() 995 self.assertCompiledIn('mod1.py', names) 996 self.assertNotIn('mod2.py', names) 997 998 finally: 999 rmtree(TESTFN2) 1000 1001 def test_write_non_pyfile(self): 1002 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1003 with open(TESTFN, 'w') as f: 1004 f.write('most definitely not a python file') 1005 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1006 unlink(TESTFN) 1007 1008 def test_write_pyfile_bad_syntax(self): 1009 os.mkdir(TESTFN2) 1010 try: 1011 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1012 fp.write("Bad syntax in python file\n") 1013 1014 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1015 # syntax errors are printed to stdout 1016 with captured_stdout() as s: 1017 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1018 1019 self.assertIn("SyntaxError", s.getvalue()) 1020 1021 # as it will not have compiled the python file, it will 1022 # include the .py file not .pyc 1023 names = zipfp.namelist() 1024 self.assertIn('mod1.py', names) 1025 self.assertNotIn('mod1.pyc', names) 1026 1027 finally: 1028 rmtree(TESTFN2) 1029 1030 def test_write_pathlike(self): 1031 os.mkdir(TESTFN2) 1032 try: 1033 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1034 fp.write("print(42)\n") 1035 1036 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1037 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1038 names = zipfp.namelist() 1039 self.assertCompiledIn('mod1.py', names) 1040 finally: 1041 rmtree(TESTFN2) 1042 1043 1044class ExtractTests(unittest.TestCase): 1045 1046 def make_test_file(self): 1047 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1048 for fpath, fdata in SMALL_TEST_DATA: 1049 zipfp.writestr(fpath, fdata) 1050 1051 def test_extract(self): 1052 with temp_cwd(): 1053 self.make_test_file() 1054 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1055 for fpath, fdata in SMALL_TEST_DATA: 1056 writtenfile = zipfp.extract(fpath) 1057 1058 # make sure it was written to the right place 1059 correctfile = os.path.join(os.getcwd(), fpath) 1060 correctfile = os.path.normpath(correctfile) 1061 1062 self.assertEqual(writtenfile, correctfile) 1063 1064 # make sure correct data is in correct file 1065 with open(writtenfile, "rb") as f: 1066 self.assertEqual(fdata.encode(), f.read()) 1067 1068 unlink(writtenfile) 1069 1070 def _test_extract_with_target(self, target): 1071 self.make_test_file() 1072 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1073 for fpath, fdata in SMALL_TEST_DATA: 1074 writtenfile = zipfp.extract(fpath, target) 1075 # compat with Path objects were added in python 3.6 1076 if sys.version_info[0:2] < (3, 6): 1077 target = str(target) 1078 1079 # make sure it was written to the right place 1080 correctfile = os.path.join(target, fpath) 1081 correctfile = os.path.normpath(correctfile) 1082 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1083 1084 # make sure correct data is in correct file 1085 with open(writtenfile, "rb") as f: 1086 self.assertEqual(fdata.encode(), f.read()) 1087 1088 unlink(writtenfile) 1089 1090 unlink(TESTFN2) 1091 1092 def test_extract_with_target(self): 1093 with temp_dir() as extdir: 1094 self._test_extract_with_target(extdir) 1095 1096 def test_extract_with_target_pathlike(self): 1097 with temp_dir() as extdir: 1098 self._test_extract_with_target(pathlib.Path(extdir)) 1099 1100 def test_extract_all(self): 1101 with temp_cwd(): 1102 self.make_test_file() 1103 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1104 zipfp.extractall() 1105 for fpath, fdata in SMALL_TEST_DATA: 1106 outfile = os.path.join(os.getcwd(), fpath) 1107 1108 with open(outfile, "rb") as f: 1109 self.assertEqual(fdata.encode(), f.read()) 1110 1111 unlink(outfile) 1112 1113 def _test_extract_all_with_target(self, target): 1114 self.make_test_file() 1115 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1116 zipfp.extractall(target) 1117 for fpath, fdata in SMALL_TEST_DATA: 1118 # compat with Path objects were added in python 3.6 1119 if sys.version_info[0:2] < (3, 6): 1120 target = str(target) 1121 outfile = os.path.join(str(target), fpath) 1122 1123 with open(outfile, "rb") as f: 1124 self.assertEqual(fdata.encode(), f.read()) 1125 1126 unlink(outfile) 1127 1128 unlink(TESTFN2) 1129 1130 def test_extract_all_with_target(self): 1131 with temp_dir() as extdir: 1132 self._test_extract_all_with_target(extdir) 1133 1134 def test_extract_all_with_target_pathlike(self): 1135 with temp_dir() as extdir: 1136 self._test_extract_all_with_target(pathlib.Path(extdir)) 1137 1138 def check_file(self, filename, content): 1139 self.assertTrue(os.path.isfile(filename)) 1140 with open(filename, 'rb') as f: 1141 self.assertEqual(f.read(), content) 1142 1143 def test_sanitize_windows_name(self): 1144 san = zipfile_aes.AESZipFile._sanitize_windows_name 1145 # Passing pathsep in allows this test to work regardless of platform. 1146 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1147 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1148 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1149 1150 def test_extract_hackers_arcnames_common_cases(self): 1151 common_hacknames = [ 1152 ('../foo/bar', 'foo/bar'), 1153 ('foo/../bar', 'foo/bar'), 1154 ('foo/../../bar', 'foo/bar'), 1155 ('foo/bar/..', 'foo/bar'), 1156 ('./../foo/bar', 'foo/bar'), 1157 ('/foo/bar', 'foo/bar'), 1158 ('/foo/../bar', 'foo/bar'), 1159 ('/foo/../../bar', 'foo/bar'), 1160 ] 1161 self._test_extract_hackers_arcnames(common_hacknames) 1162 1163 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1164 def test_extract_hackers_arcnames_windows_only(self): 1165 """Test combination of path fixing and windows name sanitization.""" 1166 windows_hacknames = [ 1167 (r'..\foo\bar', 'foo/bar'), 1168 (r'..\/foo\/bar', 'foo/bar'), 1169 (r'foo/\..\/bar', 'foo/bar'), 1170 (r'foo\/../\bar', 'foo/bar'), 1171 (r'C:foo/bar', 'foo/bar'), 1172 (r'C:/foo/bar', 'foo/bar'), 1173 (r'C://foo/bar', 'foo/bar'), 1174 (r'C:\foo\bar', 'foo/bar'), 1175 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1176 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1177 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1178 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1179 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1180 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1181 (r'//?/C:/foo/bar', 'foo/bar'), 1182 (r'\\?\C:\foo\bar', 'foo/bar'), 1183 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1184 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1185 ('../../foo../../ba..r', 'foo/ba..r'), 1186 ] 1187 self._test_extract_hackers_arcnames(windows_hacknames) 1188 1189 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1190 def test_extract_hackers_arcnames_posix_only(self): 1191 posix_hacknames = [ 1192 ('//foo/bar', 'foo/bar'), 1193 ('../../foo../../ba..r', 'foo../ba..r'), 1194 (r'foo/..\bar', r'foo/..\bar'), 1195 ] 1196 self._test_extract_hackers_arcnames(posix_hacknames) 1197 1198 def _test_extract_hackers_arcnames(self, hacknames): 1199 for arcname, fixedname in hacknames: 1200 content = b'foobar' + arcname.encode() 1201 with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1202 zinfo = zipfile_aes.AESZipInfo() 1203 # preserve backslashes 1204 zinfo.filename = arcname 1205 zinfo.external_attr = 0o600 << 16 1206 zipfp.writestr(zinfo, content) 1207 1208 arcname = arcname.replace(os.sep, "/") 1209 targetpath = os.path.join('target', 'subdir', 'subsub') 1210 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1211 1212 with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp: 1213 writtenfile = zipfp.extract(arcname, targetpath) 1214 self.assertEqual(writtenfile, correctfile, 1215 msg='extract %r: %r != %r' % 1216 (arcname, writtenfile, correctfile)) 1217 self.check_file(correctfile, content) 1218 rmtree('target') 1219 1220 with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp: 1221 zipfp.extractall(targetpath) 1222 self.check_file(correctfile, content) 1223 rmtree('target') 1224 1225 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1226 1227 with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp: 1228 writtenfile = zipfp.extract(arcname) 1229 self.assertEqual(writtenfile, correctfile, 1230 msg="extract %r" % arcname) 1231 self.check_file(correctfile, content) 1232 rmtree(fixedname.split('/')[0]) 1233 1234 with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp: 1235 zipfp.extractall() 1236 self.check_file(correctfile, content) 1237 rmtree(fixedname.split('/')[0]) 1238 1239 unlink(TESTFN2) 1240 1241 1242class OtherTests(unittest.TestCase): 1243 def test_open_via_zip_info(self): 1244 # Create the ZIP archive 1245 with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1246 zipfp.writestr("name", "foo") 1247 with self.assertWarns(UserWarning): 1248 zipfp.writestr("name", "bar") 1249 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1250 1251 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1252 infos = zipfp.infolist() 1253 data = b"" 1254 for info in infos: 1255 with zipfp.open(info) as zipopen: 1256 data += zipopen.read() 1257 self.assertIn(data, {b"foobar", b"barfoo"}) 1258 data = b"" 1259 for info in infos: 1260 data += zipfp.read(info) 1261 self.assertIn(data, {b"foobar", b"barfoo"}) 1262 1263 def test_writestr_extended_local_header_issue1202(self): 1264 with zipfile_aes.AESZipFile(TESTFN2, 'w') as orig_zip: 1265 for data in 'abcdefghijklmnop': 1266 zinfo = zipfile_aes.AESZipInfo(data) 1267 zinfo.flag_bits |= 0x08 # Include an extended local header. 1268 orig_zip.writestr(zinfo, data) 1269 1270 def test_close(self): 1271 """Check that the zipfile is closed after the 'with' block.""" 1272 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 1273 for fpath, fdata in SMALL_TEST_DATA: 1274 zipfp.writestr(fpath, fdata) 1275 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1276 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1277 1278 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1279 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1280 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1281 1282 def test_close_on_exception(self): 1283 """Check that the zipfile is closed if an exception is raised in the 1284 'with' block.""" 1285 with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp: 1286 for fpath, fdata in SMALL_TEST_DATA: 1287 zipfp.writestr(fpath, fdata) 1288 1289 try: 1290 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp2: 1291 raise zipfile.BadZipFile() 1292 except zipfile.BadZipFile: 1293 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1294 1295 def test_unsupported_version(self): 1296 # File has an extract_version of 120 1297 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1298 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1299 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1300 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1301 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1302 1303 self.assertRaises(NotImplementedError, zipfile_aes.AESZipFile, 1304 io.BytesIO(data), 'r') 1305 1306 @requires_zlib 1307 def test_read_unicode_filenames(self): 1308 # bug #10801 1309 fname = findfile('zip_cp437_header.zip') 1310 with zipfile_aes.AESZipFile(fname) as zipfp: 1311 for name in zipfp.namelist(): 1312 zipfp.open(name).close() 1313 1314 def test_write_unicode_filenames(self): 1315 with zipfile_aes.AESZipFile(TESTFN, "w") as zf: 1316 zf.writestr("foo.txt", "Test for unicode filename") 1317 zf.writestr("\xf6.txt", "Test for unicode filename") 1318 self.assertIsInstance(zf.infolist()[0].filename, str) 1319 1320 with zipfile_aes.AESZipFile(TESTFN, "r") as zf: 1321 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1322 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1323 1324 def test_exclusive_create_zip_file(self): 1325 """Test exclusive creating a new zipfile.""" 1326 unlink(TESTFN2) 1327 filename = 'testfile.txt' 1328 content = b'hello, world. this is some content.' 1329 with zipfile_aes.AESZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1330 zipfp.writestr(filename, content) 1331 with self.assertRaises(FileExistsError): 1332 zipfile_aes.AESZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1333 with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp: 1334 self.assertEqual(zipfp.namelist(), [filename]) 1335 self.assertEqual(zipfp.read(filename), content) 1336 1337 def test_create_non_existent_file_for_append(self): 1338 if os.path.exists(TESTFN): 1339 os.unlink(TESTFN) 1340 1341 filename = 'testfile.txt' 1342 content = b'hello, world. this is some content.' 1343 1344 try: 1345 with zipfile_aes.AESZipFile(TESTFN, 'a') as zf: 1346 zf.writestr(filename, content) 1347 except OSError: 1348 self.fail('Could not append data to a non-existent zip file.') 1349 1350 self.assertTrue(os.path.exists(TESTFN)) 1351 1352 with zipfile_aes.AESZipFile(TESTFN, 'r') as zf: 1353 self.assertEqual(zf.read(filename), content) 1354 1355 def test_close_erroneous_file(self): 1356 # This test checks that the ZipFile constructor closes the file object 1357 # it opens if there's an error in the file. If it doesn't, the 1358 # traceback holds a reference to the ZipFile object and, indirectly, 1359 # the file object. 1360 # On Windows, this causes the os.unlink() call to fail because the 1361 # underlying file is still open. This is SF bug #412214. 1362 # 1363 with open(TESTFN, "w") as fp: 1364 fp.write("this is not a legal zip file\n") 1365 try: 1366 zf = zipfile_aes.AESZipFile(TESTFN) 1367 except zipfile.BadZipFile: 1368 pass 1369 1370 def test_is_zip_erroneous_file(self): 1371 """Check that is_zipfile() correctly identifies non-zip files.""" 1372 # - passing a filename 1373 with open(TESTFN, "w") as fp: 1374 fp.write("this is not a legal zip file\n") 1375 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1376 # - passing a path-like object 1377 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1378 # - passing a file object 1379 with open(TESTFN, "rb") as fp: 1380 self.assertFalse(zipfile.is_zipfile(fp)) 1381 # - passing a file-like object 1382 fp = io.BytesIO() 1383 fp.write(b"this is not a legal zip file\n") 1384 self.assertFalse(zipfile.is_zipfile(fp)) 1385 fp.seek(0, 0) 1386 self.assertFalse(zipfile.is_zipfile(fp)) 1387 1388 def test_damaged_zipfile(self): 1389 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1390 # - Create a valid zip file 1391 fp = io.BytesIO() 1392 with zipfile_aes.AESZipFile(fp, mode="w") as zipf: 1393 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1394 zipfiledata = fp.getvalue() 1395 1396 # - Now create copies of it missing the last N bytes and make sure 1397 # a BadZipFile exception is raised when we try to open it 1398 for N in range(len(zipfiledata)): 1399 fp = io.BytesIO(zipfiledata[:N]) 1400 self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, fp) 1401 1402 def test_is_zip_valid_file(self): 1403 """Check that is_zipfile() correctly identifies zip files.""" 1404 # - passing a filename 1405 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1406 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1407 1408 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1409 # - passing a file object 1410 with open(TESTFN, "rb") as fp: 1411 self.assertTrue(zipfile.is_zipfile(fp)) 1412 fp.seek(0, 0) 1413 zip_contents = fp.read() 1414 # - passing a file-like object 1415 fp = io.BytesIO() 1416 fp.write(zip_contents) 1417 self.assertTrue(zipfile.is_zipfile(fp)) 1418 fp.seek(0, 0) 1419 self.assertTrue(zipfile.is_zipfile(fp)) 1420 1421 def test_non_existent_file_raises_OSError(self): 1422 # make sure we don't raise an AttributeError when a partially-constructed 1423 # ZipFile instance is finalized; this tests for regression on SF tracker 1424 # bug #403871. 1425 1426 # The bug we're testing for caused an AttributeError to be raised 1427 # when a ZipFile instance was created for a file that did not 1428 # exist; the .fp member was not initialized but was needed by the 1429 # __del__() method. Since the AttributeError is in the __del__(), 1430 # it is ignored, but the user should be sufficiently annoyed by 1431 # the message on the output that regression will be noticed 1432 # quickly. 1433 self.assertRaises(OSError, zipfile_aes.AESZipFile, TESTFN) 1434 1435 def test_empty_file_raises_BadZipFile(self): 1436 f = open(TESTFN, 'w') 1437 f.close() 1438 self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN) 1439 1440 with open(TESTFN, 'w') as fp: 1441 fp.write("short file") 1442 self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN) 1443 1444 def test_closed_zip_raises_ValueError(self): 1445 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1446 data = io.BytesIO() 1447 with zipfile_aes.AESZipFile(data, mode="w") as zipf: 1448 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1449 1450 # This is correct; calling .read on a closed ZipFile should raise 1451 # a ValueError, and so should calling .testzip. An earlier 1452 # version of .testzip would swallow this exception (and any other) 1453 # and report that the first file in the archive was corrupt. 1454 self.assertRaises(ValueError, zipf.read, "foo.txt") 1455 self.assertRaises(ValueError, zipf.open, "foo.txt") 1456 self.assertRaises(ValueError, zipf.testzip) 1457 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1458 with open(TESTFN, 'w') as f: 1459 f.write('zipfile test data') 1460 self.assertRaises(ValueError, zipf.write, TESTFN) 1461 1462 def test_bad_constructor_mode(self): 1463 """Check that bad modes passed to ZipFile constructor are caught.""" 1464 self.assertRaises(ValueError, zipfile_aes.AESZipFile, TESTFN, "q") 1465 1466 def test_bad_open_mode(self): 1467 """Check that bad modes passed to ZipFile.open are caught.""" 1468 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1469 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1470 1471 with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipf: 1472 # read the data to make sure the file is there 1473 zipf.read("foo.txt") 1474 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1475 # universal newlines support is removed 1476 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1477 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1478 1479 def test_read0(self): 1480 """Check that calling read(0) on a ZipExtFile object returns an empty 1481 string and doesn't advance file pointer.""" 1482 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1483 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1484 # read the data to make sure the file is there 1485 with zipf.open("foo.txt") as f: 1486 for i in range(FIXEDTEST_SIZE): 1487 self.assertEqual(f.read(0), b'') 1488 1489 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1490 1491 def test_open_non_existent_item(self): 1492 """Check that attempting to call open() for an item that doesn't 1493 exist in the archive raises a RuntimeError.""" 1494 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1495 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1496 1497 def test_bad_compression_mode(self): 1498 """Check that bad compression methods passed to ZipFile.open are 1499 caught.""" 1500 self.assertRaises(NotImplementedError, zipfile_aes.AESZipFile, TESTFN, "w", -1) 1501 1502 def test_unsupported_compression(self): 1503 # data is declared as shrunk, but actually deflated 1504 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1505 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1506 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1507 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1508 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1509 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1510 with zipfile_aes.AESZipFile(io.BytesIO(data), 'r') as zipf: 1511 self.assertRaises(NotImplementedError, zipf.open, 'x') 1512 1513 def test_null_byte_in_filename(self): 1514 """Check that a filename containing a null byte is properly 1515 terminated.""" 1516 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1517 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1518 self.assertEqual(zipf.namelist(), ['foo.txt']) 1519 1520 def test_struct_sizes(self): 1521 """Check that ZIP internal structure sizes are calculated correctly.""" 1522 self.assertEqual(zipfile.sizeEndCentDir, 22) 1523 self.assertEqual(zipfile.sizeCentralDir, 46) 1524 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1525 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1526 1527 def test_comments(self): 1528 """Check that comments on the archive are handled properly.""" 1529 1530 # check default comment is empty 1531 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1532 self.assertEqual(zipf.comment, b'') 1533 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1534 1535 with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr: 1536 self.assertEqual(zipfr.comment, b'') 1537 1538 # check a simple short comment 1539 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1540 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1541 zipf.comment = comment 1542 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1543 with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr: 1544 self.assertEqual(zipf.comment, comment) 1545 1546 # check a comment of max length 1547 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1548 comment2 = comment2.encode("ascii") 1549 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1550 zipf.comment = comment2 1551 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1552 1553 with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr: 1554 self.assertEqual(zipfr.comment, comment2) 1555 1556 # check a comment that is too long is truncated 1557 with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf: 1558 with self.assertWarns(UserWarning): 1559 zipf.comment = comment2 + b'oops' 1560 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1561 with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr: 1562 self.assertEqual(zipfr.comment, comment2) 1563 1564 # check that comments are correctly modified in append mode 1565 with zipfile_aes.AESZipFile(TESTFN,mode="w") as zipf: 1566 zipf.comment = b"original comment" 1567 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1568 with zipfile_aes.AESZipFile(TESTFN,mode="a") as zipf: 1569 zipf.comment = b"an updated comment" 1570 with zipfile_aes.AESZipFile(TESTFN,mode="r") as zipf: 1571 self.assertEqual(zipf.comment, b"an updated comment") 1572 1573 # check that comments are correctly shortened in append mode 1574 with zipfile_aes.AESZipFile(TESTFN,mode="w") as zipf: 1575 zipf.comment = b"original comment that's longer" 1576 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1577 with zipfile_aes.AESZipFile(TESTFN,mode="a") as zipf: 1578 zipf.comment = b"shorter comment" 1579 with zipfile_aes.AESZipFile(TESTFN,mode="r") as zipf: 1580 self.assertEqual(zipf.comment, b"shorter comment") 1581 1582 def test_unicode_comment(self): 1583 with zipfile_aes.AESZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1584 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1585 with self.assertRaises(TypeError): 1586 zipf.comment = "this is an error" 1587 1588 def test_change_comment_in_empty_archive(self): 1589 with zipfile_aes.AESZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1590 self.assertFalse(zipf.filelist) 1591 zipf.comment = b"this is a comment" 1592 with zipfile_aes.AESZipFile(TESTFN, "r") as zipf: 1593 self.assertEqual(zipf.comment, b"this is a comment") 1594 1595 def test_change_comment_in_nonempty_archive(self): 1596 with zipfile_aes.AESZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1597 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1598 with zipfile_aes.AESZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1599 self.assertTrue(zipf.filelist) 1600 zipf.comment = b"this is a comment" 1601 with zipfile_aes.AESZipFile(TESTFN, "r") as zipf: 1602 self.assertEqual(zipf.comment, b"this is a comment") 1603 1604 def test_empty_zipfile(self): 1605 # Check that creating a file in 'w' or 'a' mode and closing without 1606 # adding any files to the archives creates a valid empty ZIP file 1607 zipf = zipfile_aes.AESZipFile(TESTFN, mode="w") 1608 zipf.close() 1609 try: 1610 zipf = zipfile_aes.AESZipFile(TESTFN, mode="r") 1611 except zipfile.BadZipFile: 1612 self.fail("Unable to create empty ZIP file in 'w' mode") 1613 1614 zipf = zipfile_aes.AESZipFile(TESTFN, mode="a") 1615 zipf.close() 1616 try: 1617 zipf = zipfile_aes.AESZipFile(TESTFN, mode="r") 1618 except: 1619 self.fail("Unable to create empty ZIP file in 'a' mode") 1620 1621 def test_open_empty_file(self): 1622 # Issue 1710703: Check that opening a file with less than 22 bytes 1623 # raises a BadZipFile exception (rather than the previously unhelpful 1624 # OSError) 1625 f = open(TESTFN, 'w') 1626 f.close() 1627 self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN, 'r') 1628 1629 def test_create_zipinfo_before_1980(self): 1630 self.assertRaises(ValueError, 1631 zipfile_aes.AESZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1632 1633 def test_zipfile_with_short_extra_field(self): 1634 """If an extra field in the header is less than 4 bytes, skip it.""" 1635 zipdata = ( 1636 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1637 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1638 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1639 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1640 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1641 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1642 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1643 ) 1644 with zipfile_aes.AESZipFile(io.BytesIO(zipdata), 'r') as zipf: 1645 # testzip returns the name of the first corrupt file, or None 1646 self.assertIsNone(zipf.testzip()) 1647 1648 def test_open_conflicting_handles(self): 1649 # It's only possible to open one writable file handle at a time 1650 msg1 = b"It's fun to charter an accountant!" 1651 msg2 = b"And sail the wide accountant sea" 1652 msg3 = b"To find, explore the funds offshore" 1653 with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1654 with zipf.open('foo', mode='w') as w2: 1655 w2.write(msg1) 1656 with zipf.open('bar', mode='w') as w1: 1657 with self.assertRaises(ValueError): 1658 zipf.open('handle', mode='w') 1659 with self.assertRaises(ValueError): 1660 zipf.open('foo', mode='r') 1661 with self.assertRaises(ValueError): 1662 zipf.writestr('str', 'abcde') 1663 with self.assertRaises(ValueError): 1664 zipf.write(__file__, 'file') 1665 with self.assertRaises(ValueError): 1666 zipf.close() 1667 w1.write(msg2) 1668 with zipf.open('baz', mode='w') as w2: 1669 w2.write(msg3) 1670 1671 with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipf: 1672 self.assertEqual(zipf.read('foo'), msg1) 1673 self.assertEqual(zipf.read('bar'), msg2) 1674 self.assertEqual(zipf.read('baz'), msg3) 1675 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1676 1677 def test_seek_tell(self): 1678 # Test seek functionality 1679 txt = b"Where's Bruce?" 1680 bloc = txt.find(b"Bruce") 1681 # Check seek on a file 1682 with zipfile_aes.AESZipFile(TESTFN, "w") as zipf: 1683 zipf.writestr("foo.txt", txt) 1684 with zipfile_aes.AESZipFile(TESTFN, "r") as zipf: 1685 with zipf.open("foo.txt", "r") as fp: 1686 fp.seek(bloc, os.SEEK_SET) 1687 self.assertEqual(fp.tell(), bloc) 1688 fp.seek(-bloc, os.SEEK_CUR) 1689 self.assertEqual(fp.tell(), 0) 1690 fp.seek(bloc, os.SEEK_CUR) 1691 self.assertEqual(fp.tell(), bloc) 1692 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1693 fp.seek(0, os.SEEK_END) 1694 self.assertEqual(fp.tell(), len(txt)) 1695 fp.seek(0, os.SEEK_SET) 1696 self.assertEqual(fp.tell(), 0) 1697 # Check seek on memory file 1698 data = io.BytesIO() 1699 with zipfile_aes.AESZipFile(data, mode="w") as zipf: 1700 zipf.writestr("foo.txt", txt) 1701 with zipfile_aes.AESZipFile(data, mode="r") as zipf: 1702 with zipf.open("foo.txt", "r") as fp: 1703 fp.seek(bloc, os.SEEK_SET) 1704 self.assertEqual(fp.tell(), bloc) 1705 fp.seek(-bloc, os.SEEK_CUR) 1706 self.assertEqual(fp.tell(), 0) 1707 fp.seek(bloc, os.SEEK_CUR) 1708 self.assertEqual(fp.tell(), bloc) 1709 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1710 fp.seek(0, os.SEEK_END) 1711 self.assertEqual(fp.tell(), len(txt)) 1712 fp.seek(0, os.SEEK_SET) 1713 self.assertEqual(fp.tell(), 0) 1714 1715 def tearDown(self): 1716 unlink(TESTFN) 1717 unlink(TESTFN2) 1718 1719 1720class AbstractBadCrcTests: 1721 def test_testzip_with_bad_crc(self): 1722 """Tests that files with bad CRCs return their name from testzip.""" 1723 zipdata = self.zip_with_bad_crc 1724 1725 with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1726 # testzip returns the name of the first corrupt file, or None 1727 self.assertEqual('afile', zipf.testzip()) 1728 1729 def test_read_with_bad_crc(self): 1730 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 1731 zipdata = self.zip_with_bad_crc 1732 1733 # Using ZipFile.read() 1734 with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1735 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 1736 1737 # Using ZipExtFile.read() 1738 with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1739 with zipf.open('afile', 'r') as corrupt_file: 1740 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 1741 1742 # Same with small reads (in order to exercise the buffering logic) 1743 with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1744 with zipf.open('afile', 'r') as corrupt_file: 1745 corrupt_file.MIN_READ_SIZE = 2 1746 with self.assertRaises(zipfile.BadZipFile): 1747 while corrupt_file.read(2): 1748 pass 1749 1750 1751class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1752 compression = zipfile.ZIP_STORED 1753 zip_with_bad_crc = ( 1754 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 1755 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 1756 b'ilehello,AworldP' 1757 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 1758 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 1759 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 1760 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 1761 b'\0\0/\0\0\0\0\0') 1762 1763@requires_zlib 1764class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1765 compression = zipfile.ZIP_DEFLATED 1766 zip_with_bad_crc = ( 1767 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 1768 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 1769 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 1770 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 1771 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 1772 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 1773 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 1774 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 1775 1776@requires_bz2 1777class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1778 compression = zipfile.ZIP_BZIP2 1779 zip_with_bad_crc = ( 1780 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 1781 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 1782 b'ileBZh91AY&SY\xd4\xa8\xca' 1783 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 1784 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 1785 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 1786 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 1787 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 1788 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 1789 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 1790 b'\x00\x00\x00\x00') 1791 1792@requires_lzma 1793class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 1794 compression = zipfile.ZIP_LZMA 1795 zip_with_bad_crc = ( 1796 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 1797 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 1798 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 1799 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 1800 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 1801 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 1802 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 1803 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 1804 b'\x00>\x00\x00\x00\x00\x00') 1805 1806 1807class DecryptionTests(unittest.TestCase): 1808 """Check that ZIP decryption works. Since the library does not 1809 support encryption at the moment, we use a pre-generated encrypted 1810 ZIP file.""" 1811 1812 data = ( 1813 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 1814 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 1815 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 1816 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 1817 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 1818 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 1819 b'\x00\x00L\x00\x00\x00\x00\x00' ) 1820 data2 = ( 1821 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 1822 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 1823 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 1824 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 1825 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 1826 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 1827 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 1828 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 1829 1830 plain = b'zipfile.py encryption test' 1831 plain2 = b'\x00'*512 1832 1833 def setUp(self): 1834 with open(TESTFN, "wb") as fp: 1835 fp.write(self.data) 1836 self.zip = zipfile_aes.AESZipFile(TESTFN, "r") 1837 with open(TESTFN2, "wb") as fp: 1838 fp.write(self.data2) 1839 self.zip2 = zipfile_aes.AESZipFile(TESTFN2, "r") 1840 1841 def tearDown(self): 1842 self.zip.close() 1843 os.unlink(TESTFN) 1844 self.zip2.close() 1845 os.unlink(TESTFN2) 1846 1847 def test_no_password(self): 1848 # Reading the encrypted file without password 1849 # must generate a RunTime exception 1850 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1851 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1852 1853 def test_bad_password(self): 1854 self.zip.setpassword(b"perl") 1855 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1856 self.zip2.setpassword(b"perl") 1857 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1858 1859 @requires_zlib 1860 def test_good_password(self): 1861 self.zip.setpassword(b"python") 1862 self.assertEqual(self.zip.read("test.txt"), self.plain) 1863 self.zip2.setpassword(b"12345") 1864 self.assertEqual(self.zip2.read("zero"), self.plain2) 1865 1866 def test_unicode_password(self): 1867 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 1868 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 1869 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 1870 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 1871 1872class AbstractTestsWithRandomBinaryFiles: 1873 @classmethod 1874 def setUpClass(cls): 1875 datacount = randint(16, 64)*1024 + randint(1, 1024) 1876 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 1877 for i in range(datacount)) 1878 1879 def setUp(self): 1880 # Make a source file with some lines 1881 with open(TESTFN, "wb") as fp: 1882 fp.write(self.data) 1883 1884 def tearDown(self): 1885 unlink(TESTFN) 1886 unlink(TESTFN2) 1887 1888 def make_test_archive(self, f, compression): 1889 # Create the ZIP archive 1890 with zipfile_aes.AESZipFile(f, "w", compression) as zipfp: 1891 zipfp.write(TESTFN, "another.name") 1892 zipfp.write(TESTFN, TESTFN) 1893 1894 def zip_test(self, f, compression): 1895 self.make_test_archive(f, compression) 1896 1897 # Read the ZIP archive 1898 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 1899 testdata = zipfp.read(TESTFN) 1900 self.assertEqual(len(testdata), len(self.data)) 1901 self.assertEqual(testdata, self.data) 1902 self.assertEqual(zipfp.read("another.name"), self.data) 1903 1904 def test_read(self): 1905 for f in get_files(self): 1906 self.zip_test(f, self.compression) 1907 1908 def zip_open_test(self, f, compression): 1909 self.make_test_archive(f, compression) 1910 1911 # Read the ZIP archive 1912 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 1913 zipdata1 = [] 1914 with zipfp.open(TESTFN) as zipopen1: 1915 while True: 1916 read_data = zipopen1.read(256) 1917 if not read_data: 1918 break 1919 zipdata1.append(read_data) 1920 1921 zipdata2 = [] 1922 with zipfp.open("another.name") as zipopen2: 1923 while True: 1924 read_data = zipopen2.read(256) 1925 if not read_data: 1926 break 1927 zipdata2.append(read_data) 1928 1929 testdata1 = b''.join(zipdata1) 1930 self.assertEqual(len(testdata1), len(self.data)) 1931 self.assertEqual(testdata1, self.data) 1932 1933 testdata2 = b''.join(zipdata2) 1934 self.assertEqual(len(testdata2), len(self.data)) 1935 self.assertEqual(testdata2, self.data) 1936 1937 def test_open(self): 1938 for f in get_files(self): 1939 self.zip_open_test(f, self.compression) 1940 1941 def zip_random_open_test(self, f, compression): 1942 self.make_test_archive(f, compression) 1943 1944 # Read the ZIP archive 1945 with zipfile_aes.AESZipFile(f, "r", compression) as zipfp: 1946 zipdata1 = [] 1947 with zipfp.open(TESTFN) as zipopen1: 1948 while True: 1949 read_data = zipopen1.read(randint(1, 1024)) 1950 if not read_data: 1951 break 1952 zipdata1.append(read_data) 1953 1954 testdata = b''.join(zipdata1) 1955 self.assertEqual(len(testdata), len(self.data)) 1956 self.assertEqual(testdata, self.data) 1957 1958 def test_random_open(self): 1959 for f in get_files(self): 1960 self.zip_random_open_test(f, self.compression) 1961 1962 1963class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 1964 unittest.TestCase): 1965 compression = zipfile.ZIP_STORED 1966 1967@requires_zlib 1968class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 1969 unittest.TestCase): 1970 compression = zipfile.ZIP_DEFLATED 1971 1972@requires_bz2 1973class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 1974 unittest.TestCase): 1975 compression = zipfile.ZIP_BZIP2 1976 1977@requires_lzma 1978class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 1979 unittest.TestCase): 1980 compression = zipfile.ZIP_LZMA 1981 1982 1983# Provide the tell() method but not seek() 1984class Tellable: 1985 def __init__(self, fp): 1986 self.fp = fp 1987 self.offset = 0 1988 1989 def write(self, data): 1990 n = self.fp.write(data) 1991 self.offset += n 1992 return n 1993 1994 def tell(self): 1995 return self.offset 1996 1997 def flush(self): 1998 self.fp.flush() 1999 2000class Unseekable: 2001 def __init__(self, fp): 2002 self.fp = fp 2003 2004 def write(self, data): 2005 return self.fp.write(data) 2006 2007 def flush(self): 2008 self.fp.flush() 2009 2010class UnseekableTests(unittest.TestCase): 2011 def test_writestr(self): 2012 for wrapper in (lambda f: f), Tellable, Unseekable: 2013 with self.subTest(wrapper=wrapper): 2014 f = io.BytesIO() 2015 f.write(b'abc') 2016 bf = io.BufferedWriter(f) 2017 with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2018 zipfp.writestr('ones', b'111') 2019 zipfp.writestr('twos', b'222') 2020 self.assertEqual(f.getvalue()[:5], b'abcPK') 2021 with zipfile_aes.AESZipFile(f, mode='r') as zipf: 2022 with zipf.open('ones') as zopen: 2023 self.assertEqual(zopen.read(), b'111') 2024 with zipf.open('twos') as zopen: 2025 self.assertEqual(zopen.read(), b'222') 2026 2027 def test_write(self): 2028 for wrapper in (lambda f: f), Tellable, Unseekable: 2029 with self.subTest(wrapper=wrapper): 2030 f = io.BytesIO() 2031 f.write(b'abc') 2032 bf = io.BufferedWriter(f) 2033 with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2034 self.addCleanup(unlink, TESTFN) 2035 with open(TESTFN, 'wb') as f2: 2036 f2.write(b'111') 2037 zipfp.write(TESTFN, 'ones') 2038 with open(TESTFN, 'wb') as f2: 2039 f2.write(b'222') 2040 zipfp.write(TESTFN, 'twos') 2041 self.assertEqual(f.getvalue()[:5], b'abcPK') 2042 with zipfile_aes.AESZipFile(f, mode='r') as zipf: 2043 with zipf.open('ones') as zopen: 2044 self.assertEqual(zopen.read(), b'111') 2045 with zipf.open('twos') as zopen: 2046 self.assertEqual(zopen.read(), b'222') 2047 2048 def test_open_write(self): 2049 for wrapper in (lambda f: f), Tellable, Unseekable: 2050 with self.subTest(wrapper=wrapper): 2051 f = io.BytesIO() 2052 f.write(b'abc') 2053 bf = io.BufferedWriter(f) 2054 with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2055 with zipf.open('ones', 'w') as zopen: 2056 zopen.write(b'111') 2057 with zipf.open('twos', 'w') as zopen: 2058 zopen.write(b'222') 2059 self.assertEqual(f.getvalue()[:5], b'abcPK') 2060 with zipfile_aes.AESZipFile(f) as zipf: 2061 self.assertEqual(zipf.read('ones'), b'111') 2062 self.assertEqual(zipf.read('twos'), b'222') 2063 2064 2065@requires_zlib 2066class TestsWithMultipleOpens(unittest.TestCase): 2067 @classmethod 2068 def setUpClass(cls): 2069 cls.data1 = b'111' + getrandbytes(10000) 2070 cls.data2 = b'222' + getrandbytes(10000) 2071 2072 def make_test_archive(self, f): 2073 # Create the ZIP archive 2074 with zipfile_aes.AESZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2075 zipfp.writestr('ones', self.data1) 2076 zipfp.writestr('twos', self.data2) 2077 2078 def test_same_file(self): 2079 # Verify that (when the ZipFile is in control of creating file objects) 2080 # multiple open() calls can be made without interfering with each other. 2081 for f in get_files(self): 2082 self.make_test_archive(f) 2083 with zipfile_aes.AESZipFile(f, mode="r") as zipf: 2084 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2085 data1 = zopen1.read(500) 2086 data2 = zopen2.read(500) 2087 data1 += zopen1.read() 2088 data2 += zopen2.read() 2089 self.assertEqual(data1, data2) 2090 self.assertEqual(data1, self.data1) 2091 2092 def test_different_file(self): 2093 # Verify that (when the ZipFile is in control of creating file objects) 2094 # multiple open() calls can be made without interfering with each other. 2095 for f in get_files(self): 2096 self.make_test_archive(f) 2097 with zipfile_aes.AESZipFile(f, mode="r") as zipf: 2098 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2099 data1 = zopen1.read(500) 2100 data2 = zopen2.read(500) 2101 data1 += zopen1.read() 2102 data2 += zopen2.read() 2103 self.assertEqual(data1, self.data1) 2104 self.assertEqual(data2, self.data2) 2105 2106 def test_interleaved(self): 2107 # Verify that (when the ZipFile is in control of creating file objects) 2108 # multiple open() calls can be made without interfering with each other. 2109 for f in get_files(self): 2110 self.make_test_archive(f) 2111 with zipfile_aes.AESZipFile(f, mode="r") as zipf: 2112 with zipf.open('ones') as zopen1: 2113 data1 = zopen1.read(500) 2114 with zipf.open('twos') as zopen2: 2115 data2 = zopen2.read(500) 2116 data1 += zopen1.read() 2117 data2 += zopen2.read() 2118 self.assertEqual(data1, self.data1) 2119 self.assertEqual(data2, self.data2) 2120 2121 def test_read_after_close(self): 2122 for f in get_files(self): 2123 self.make_test_archive(f) 2124 with contextlib.ExitStack() as stack: 2125 with zipfile_aes.AESZipFile(f, 'r') as zipf: 2126 zopen1 = stack.enter_context(zipf.open('ones')) 2127 zopen2 = stack.enter_context(zipf.open('twos')) 2128 data1 = zopen1.read(500) 2129 data2 = zopen2.read(500) 2130 data1 += zopen1.read() 2131 data2 += zopen2.read() 2132 self.assertEqual(data1, self.data1) 2133 self.assertEqual(data2, self.data2) 2134 2135 def test_read_after_write(self): 2136 for f in get_files(self): 2137 with zipfile_aes.AESZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2138 zipf.writestr('ones', self.data1) 2139 zipf.writestr('twos', self.data2) 2140 with zipf.open('ones') as zopen1: 2141 data1 = zopen1.read(500) 2142 self.assertEqual(data1, self.data1[:500]) 2143 with zipfile_aes.AESZipFile(f, 'r') as zipf: 2144 data1 = zipf.read('ones') 2145 data2 = zipf.read('twos') 2146 self.assertEqual(data1, self.data1) 2147 self.assertEqual(data2, self.data2) 2148 2149 def test_write_after_read(self): 2150 for f in get_files(self): 2151 with zipfile_aes.AESZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2152 zipf.writestr('ones', self.data1) 2153 with zipf.open('ones') as zopen1: 2154 zopen1.read(500) 2155 zipf.writestr('twos', self.data2) 2156 with zipfile_aes.AESZipFile(f, 'r') as zipf: 2157 data1 = zipf.read('ones') 2158 data2 = zipf.read('twos') 2159 self.assertEqual(data1, self.data1) 2160 self.assertEqual(data2, self.data2) 2161 2162 def test_many_opens(self): 2163 # Verify that read() and open() promptly close the file descriptor, 2164 # and don't rely on the garbage collector to free resources. 2165 self.make_test_archive(TESTFN2) 2166 with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipf: 2167 for x in range(100): 2168 zipf.read('ones') 2169 with zipf.open('ones') as zopen1: 2170 pass 2171 with open(os.devnull) as f: 2172 self.assertLess(f.fileno(), 100) 2173 2174 def test_write_while_reading(self): 2175 with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2176 zipf.writestr('ones', self.data1) 2177 with zipfile_aes.AESZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2178 with zipf.open('ones', 'r') as r1: 2179 data1 = r1.read(500) 2180 with zipf.open('twos', 'w') as w1: 2181 w1.write(self.data2) 2182 data1 += r1.read() 2183 self.assertEqual(data1, self.data1) 2184 with zipfile_aes.AESZipFile(TESTFN2) as zipf: 2185 self.assertEqual(zipf.read('twos'), self.data2) 2186 2187 def tearDown(self): 2188 unlink(TESTFN2) 2189 2190 2191class TestWithDirectory(unittest.TestCase): 2192 def setUp(self): 2193 os.mkdir(TESTFN2) 2194 2195 def test_extract_dir(self): 2196 with zipfile_aes.AESZipFile(findfile("zipdir.zip")) as zipf: 2197 zipf.extractall(TESTFN2) 2198 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2199 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2200 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2201 2202 def test_bug_6050(self): 2203 # Extraction should succeed if directories already exist 2204 os.mkdir(os.path.join(TESTFN2, "a")) 2205 self.test_extract_dir() 2206 2207 def test_write_dir(self): 2208 dirpath = os.path.join(TESTFN2, "x") 2209 os.mkdir(dirpath) 2210 mode = os.stat(dirpath).st_mode & 0xFFFF 2211 with zipfile_aes.AESZipFile(TESTFN, "w") as zipf: 2212 zipf.write(dirpath) 2213 zinfo = zipf.filelist[0] 2214 self.assertTrue(zinfo.filename.endswith("/x/")) 2215 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2216 zipf.write(dirpath, "y") 2217 zinfo = zipf.filelist[1] 2218 self.assertTrue(zinfo.filename, "y/") 2219 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2220 with zipfile_aes.AESZipFile(TESTFN, "r") as zipf: 2221 zinfo = zipf.filelist[0] 2222 self.assertTrue(zinfo.filename.endswith("/x/")) 2223 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2224 zinfo = zipf.filelist[1] 2225 self.assertTrue(zinfo.filename, "y/") 2226 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2227 target = os.path.join(TESTFN2, "target") 2228 os.mkdir(target) 2229 zipf.extractall(target) 2230 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2231 self.assertEqual(len(os.listdir(target)), 2) 2232 2233 def test_writestr_dir(self): 2234 os.mkdir(os.path.join(TESTFN2, "x")) 2235 with zipfile_aes.AESZipFile(TESTFN, "w") as zipf: 2236 zipf.writestr("x/", b'') 2237 zinfo = zipf.filelist[0] 2238 self.assertEqual(zinfo.filename, "x/") 2239 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2240 with zipfile_aes.AESZipFile(TESTFN, "r") as zipf: 2241 zinfo = zipf.filelist[0] 2242 self.assertTrue(zinfo.filename.endswith("x/")) 2243 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2244 target = os.path.join(TESTFN2, "target") 2245 os.mkdir(target) 2246 zipf.extractall(target) 2247 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2248 self.assertEqual(os.listdir(target), ["x"]) 2249 2250 def tearDown(self): 2251 rmtree(TESTFN2) 2252 if os.path.exists(TESTFN): 2253 unlink(TESTFN) 2254 2255 2256class ZipInfoTests(unittest.TestCase): 2257 def test_from_file(self): 2258 zi = zipfile_aes.AESZipInfo.from_file(__file__) 2259 expected_fname = os.path.basename(__file__) 2260 self.assertEqual(posixpath.basename(zi.filename), expected_fname) 2261 self.assertFalse(zi.is_dir()) 2262 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2263 2264 def test_from_file_pathlike(self): 2265 zi = zipfile_aes.AESZipInfo.from_file(pathlib.Path(__file__)) 2266 expected_fname = os.path.basename(__file__) 2267 self.assertEqual(posixpath.basename(zi.filename), expected_fname) 2268 self.assertFalse(zi.is_dir()) 2269 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2270 2271 def test_from_file_bytes(self): 2272 zi = zipfile_aes.AESZipInfo.from_file(os.fsencode(__file__), 'test') 2273 self.assertEqual(posixpath.basename(zi.filename), 'test') 2274 self.assertFalse(zi.is_dir()) 2275 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2276 2277 def test_from_file_fileno(self): 2278 with open(__file__, 'rb') as f: 2279 zi = zipfile_aes.AESZipInfo.from_file(f.fileno(), 'test') 2280 self.assertEqual(posixpath.basename(zi.filename), 'test') 2281 self.assertFalse(zi.is_dir()) 2282 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2283 2284 def test_from_dir(self): 2285 dirpath = os.path.dirname(os.path.abspath(__file__)) 2286 zi = zipfile_aes.AESZipInfo.from_file(dirpath, 'stdlib_tests') 2287 self.assertEqual(zi.filename, 'stdlib_tests/') 2288 self.assertTrue(zi.is_dir()) 2289 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2290 self.assertEqual(zi.file_size, 0) 2291 2292 2293class CommandLineTest(unittest.TestCase): 2294 2295 def zipfilecmd(self, *args, **kwargs): 2296 rc, out, err = script_helper.assert_python_ok('-m', 'pyzipper.zipfile', *args, 2297 **kwargs) 2298 return out.replace(os.linesep.encode(), b'\n') 2299 2300 def zipfilecmd_failure(self, *args): 2301 return script_helper.assert_python_failure('-m', 'pyzipper.zipfile', *args) 2302 2303 def test_bad_use(self): 2304 rc, out, err = self.zipfilecmd_failure() 2305 self.assertEqual(out, b'') 2306 self.assertIn(b'usage', err.lower()) 2307 self.assertIn(b'error', err.lower()) 2308 self.assertIn(b'required', err.lower()) 2309 rc, out, err = self.zipfilecmd_failure('-l', '') 2310 self.assertEqual(out, b'') 2311 self.assertNotEqual(err.strip(), b'') 2312 2313 def test_test_command(self): 2314 zip_name = findfile('zipdir.zip') 2315 for opt in '-t', '--test': 2316 out = self.zipfilecmd(opt, zip_name) 2317 self.assertEqual(out.rstrip(), b'Done testing') 2318 zip_name = findfile('testtar.tar') 2319 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2320 self.assertEqual(out, b'') 2321 2322 def test_list_command(self): 2323 zip_name = findfile('zipdir.zip') 2324 t = io.StringIO() 2325 with zipfile_aes.AESZipFile(zip_name, 'r') as tf: 2326 tf.printdir(t) 2327 expected = t.getvalue().encode('ascii', 'backslashreplace') 2328 for opt in '-l', '--list': 2329 out = self.zipfilecmd(opt, zip_name, 2330 PYTHONIOENCODING='ascii:backslashreplace') 2331 self.assertEqual(out, expected) 2332 2333 @requires_zlib 2334 def test_create_command(self): 2335 self.addCleanup(unlink, TESTFN) 2336 with open(TESTFN, 'w') as f: 2337 f.write('test 1') 2338 os.mkdir(TESTFNDIR) 2339 self.addCleanup(rmtree, TESTFNDIR) 2340 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f: 2341 f.write('test 2') 2342 files = [TESTFN, TESTFNDIR] 2343 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2344 for opt in '-c', '--create': 2345 try: 2346 out = self.zipfilecmd(opt, TESTFN2, *files) 2347 self.assertEqual(out, b'') 2348 with zipfile_aes.AESZipFile(TESTFN2) as zf: 2349 self.assertEqual(zf.namelist(), namelist) 2350 self.assertEqual(zf.read(namelist[0]), b'test 1') 2351 self.assertEqual(zf.read(namelist[2]), b'test 2') 2352 finally: 2353 unlink(TESTFN2) 2354 2355 def test_extract_command(self): 2356 zip_name = findfile('zipdir.zip') 2357 for opt in '-e', '--extract': 2358 with temp_dir() as extdir: 2359 out = self.zipfilecmd(opt, zip_name, extdir) 2360 self.assertEqual(out, b'') 2361 with zipfile_aes.AESZipFile(zip_name) as zf: 2362 for zi in zf.infolist(): 2363 path = os.path.join(extdir, 2364 zi.filename.replace('/', os.sep)) 2365 if zi.is_dir(): 2366 self.assertTrue(os.path.isdir(path)) 2367 else: 2368 self.assertTrue(os.path.isfile(path)) 2369 with open(path, 'rb') as f: 2370 self.assertEqual(f.read(), zf.read(zi)) 2371 2372if __name__ == "__main__": 2373 unittest.main() 2374