1"""This is the normal test_zipfile test suite but using the
2zipfile_aes.AESZipFile object instead of the zipfile.ZipFile object.
3"""
4import contextlib
5import io
6import os
7import sys
8import importlib.util
9import pathlib
10import posixpath
11import time
12import struct
13from pyzipper import zipfile
14from pyzipper import zipfile_aes
15import unittest
16
17
18from tempfile import TemporaryFile
19from random import randint, random, getrandbits
20
21from test.support import script_helper
22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd,
23                          requires_zlib, requires_bz2, requires_lzma,
24                          captured_stdout)
25
26TESTFN2 = TESTFN + "2"
27TESTFNDIR = TESTFN + "d"
28FIXEDTEST_SIZE = 1000
29DATAFILES_DIR = 'zipfile_datafiles'
30
31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
32                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
33                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
34                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
35
36def getrandbytes(size):
37    return getrandbits(8 * size).to_bytes(size, 'little')
38
39def get_files(test):
40    yield TESTFN2
41    with TemporaryFile() as f:
42        yield f
43        test.assertFalse(f.closed)
44    with io.BytesIO() as f:
45        yield f
46        test.assertFalse(f.closed)
47
48class AbstractTestsWithSourceFile:
49    @classmethod
50    def setUpClass(cls):
51        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
52                              (i, random()), "ascii")
53                        for i in range(FIXEDTEST_SIZE)]
54        cls.data = b''.join(cls.line_gen)
55
56    def setUp(self):
57        # Make a source file with some lines
58        with open(TESTFN, "wb") as fp:
59            fp.write(self.data)
60
61    def make_test_archive(self, f, compression, compresslevel=None):
62        kwargs = {'compression': compression, 'compresslevel': compresslevel}
63        # Create the ZIP archive
64        with zipfile_aes.AESZipFile(f, "w", **kwargs) as zipfp:
65            zipfp.write(TESTFN, "another.name")
66            zipfp.write(TESTFN, TESTFN)
67            zipfp.writestr("strfile", self.data)
68            with zipfp.open('written-open-w', mode='w') as f:
69                for line in self.line_gen:
70                    f.write(line)
71
72    def zip_test(self, f, compression, compresslevel=None):
73        self.make_test_archive(f, compression, compresslevel)
74
75        # Read the ZIP archive
76        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
77            self.assertEqual(zipfp.read(TESTFN), self.data)
78            self.assertEqual(zipfp.read("another.name"), self.data)
79            self.assertEqual(zipfp.read("strfile"), self.data)
80
81            # Print the ZIP directory
82            fp = io.StringIO()
83            zipfp.printdir(file=fp)
84            directory = fp.getvalue()
85            lines = directory.splitlines()
86            self.assertEqual(len(lines), 5) # Number of files + header
87
88            self.assertIn('File Name', lines[0])
89            self.assertIn('Modified', lines[0])
90            self.assertIn('Size', lines[0])
91
92            fn, date, time_, size = lines[1].split()
93            self.assertEqual(fn, 'another.name')
94            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
95            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
96            self.assertEqual(size, str(len(self.data)))
97
98            # Check the namelist
99            names = zipfp.namelist()
100            self.assertEqual(len(names), 4)
101            self.assertIn(TESTFN, names)
102            self.assertIn("another.name", names)
103            self.assertIn("strfile", names)
104            self.assertIn("written-open-w", names)
105
106            # Check infolist
107            infos = zipfp.infolist()
108            names = [i.filename for i in infos]
109            self.assertEqual(len(names), 4)
110            self.assertIn(TESTFN, names)
111            self.assertIn("another.name", names)
112            self.assertIn("strfile", names)
113            self.assertIn("written-open-w", names)
114            for i in infos:
115                self.assertEqual(i.file_size, len(self.data))
116
117            # check getinfo
118            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
119                info = zipfp.getinfo(nm)
120                self.assertEqual(info.filename, nm)
121                self.assertEqual(info.file_size, len(self.data))
122
123            # Check that testzip doesn't raise an exception
124            zipfp.testzip()
125
126    def test_basic(self):
127        for f in get_files(self):
128            self.zip_test(f, self.compression)
129
130    def zip_open_test(self, f, compression):
131        self.make_test_archive(f, compression)
132
133        # Read the ZIP archive
134        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
135            zipdata1 = []
136            with zipfp.open(TESTFN) as zipopen1:
137                while True:
138                    read_data = zipopen1.read(256)
139                    if not read_data:
140                        break
141                    zipdata1.append(read_data)
142
143            zipdata2 = []
144            with zipfp.open("another.name") as zipopen2:
145                while True:
146                    read_data = zipopen2.read(256)
147                    if not read_data:
148                        break
149                    zipdata2.append(read_data)
150
151            self.assertEqual(b''.join(zipdata1), self.data)
152            self.assertEqual(b''.join(zipdata2), self.data)
153
154    def test_open(self):
155        for f in get_files(self):
156            self.zip_open_test(f, self.compression)
157
158    def test_open_with_pathlike(self):
159        path = pathlib.Path(TESTFN2)
160        self.zip_open_test(path, self.compression)
161        with zipfile_aes.AESZipFile(path, "r", self.compression) as zipfp:
162            self.assertIsInstance(zipfp.filename, str)
163
164    def zip_random_open_test(self, f, compression):
165        self.make_test_archive(f, compression)
166
167        # Read the ZIP archive
168        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
169            zipdata1 = []
170            with zipfp.open(TESTFN) as zipopen1:
171                while True:
172                    read_data = zipopen1.read(randint(1, 1024))
173                    if not read_data:
174                        break
175                    zipdata1.append(read_data)
176
177            self.assertEqual(b''.join(zipdata1), self.data)
178
179    def test_random_open(self):
180        for f in get_files(self):
181            self.zip_random_open_test(f, self.compression)
182
183    def zip_read1_test(self, f, compression):
184        self.make_test_archive(f, compression)
185
186        # Read the ZIP archive
187        with zipfile_aes.AESZipFile(f, "r") as zipfp, \
188             zipfp.open(TESTFN) as zipopen:
189            zipdata = []
190            while True:
191                read_data = zipopen.read1(-1)
192                if not read_data:
193                    break
194                zipdata.append(read_data)
195
196        self.assertEqual(b''.join(zipdata), self.data)
197
198    def test_read1(self):
199        for f in get_files(self):
200            self.zip_read1_test(f, self.compression)
201
202    def zip_read1_10_test(self, f, compression):
203        self.make_test_archive(f, compression)
204
205        # Read the ZIP archive
206        with zipfile_aes.AESZipFile(f, "r") as zipfp, \
207             zipfp.open(TESTFN) as zipopen:
208            zipdata = []
209            while True:
210                read_data = zipopen.read1(10)
211                self.assertLessEqual(len(read_data), 10)
212                if not read_data:
213                    break
214                zipdata.append(read_data)
215
216        self.assertEqual(b''.join(zipdata), self.data)
217
218    def test_read1_10(self):
219        for f in get_files(self):
220            self.zip_read1_10_test(f, self.compression)
221
222    def zip_readline_read_test(self, f, compression):
223        self.make_test_archive(f, compression)
224
225        # Read the ZIP archive
226        with zipfile_aes.AESZipFile(f, "r") as zipfp, \
227             zipfp.open(TESTFN) as zipopen:
228            data = b''
229            while True:
230                read = zipopen.readline()
231                if not read:
232                    break
233                data += read
234
235                read = zipopen.read(100)
236                if not read:
237                    break
238                data += read
239
240        self.assertEqual(data, self.data)
241
242    def test_readline_read(self):
243        # Issue #7610: calls to readline() interleaved with calls to read().
244        for f in get_files(self):
245            self.zip_readline_read_test(f, self.compression)
246
247    def zip_readline_test(self, f, compression):
248        self.make_test_archive(f, compression)
249
250        # Read the ZIP archive
251        with zipfile_aes.AESZipFile(f, "r") as zipfp:
252            with zipfp.open(TESTFN) as zipopen:
253                for line in self.line_gen:
254                    linedata = zipopen.readline()
255                    self.assertEqual(linedata, line)
256
257    def test_readline(self):
258        for f in get_files(self):
259            self.zip_readline_test(f, self.compression)
260
261    def zip_readlines_test(self, f, compression):
262        self.make_test_archive(f, compression)
263
264        # Read the ZIP archive
265        with zipfile_aes.AESZipFile(f, "r") as zipfp:
266            with zipfp.open(TESTFN) as zipopen:
267                ziplines = zipopen.readlines()
268            for line, zipline in zip(self.line_gen, ziplines):
269                self.assertEqual(zipline, line)
270
271    def test_readlines(self):
272        for f in get_files(self):
273            self.zip_readlines_test(f, self.compression)
274
275    def zip_iterlines_test(self, f, compression):
276        self.make_test_archive(f, compression)
277
278        # Read the ZIP archive
279        with zipfile_aes.AESZipFile(f, "r") as zipfp:
280            with zipfp.open(TESTFN) as zipopen:
281                for line, zipline in zip(self.line_gen, zipopen):
282                    self.assertEqual(zipline, line)
283
284    def test_iterlines(self):
285        for f in get_files(self):
286            self.zip_iterlines_test(f, self.compression)
287
288    def test_low_compression(self):
289        """Check for cases where compressed data is larger than original."""
290        # Create the ZIP archive
291        with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipfp:
292            zipfp.writestr("strfile", '12')
293
294        # Get an open object for strfile
295        with zipfile_aes.AESZipFile(TESTFN2, "r", self.compression) as zipfp:
296            with zipfp.open("strfile") as openobj:
297                self.assertEqual(openobj.read(1), b'1')
298                self.assertEqual(openobj.read(1), b'2')
299
300    def test_writestr_compression(self):
301        zipfp = zipfile_aes.AESZipFile(TESTFN2, "w")
302        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
303        info = zipfp.getinfo('b.txt')
304        self.assertEqual(info.compress_type, self.compression)
305
306    def test_writestr_compresslevel(self):
307        zipfp = zipfile_aes.AESZipFile(TESTFN2, "w", compresslevel=1)
308        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
309        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
310                       compresslevel=2)
311
312        # Compression level follows the constructor.
313        a_info = zipfp.getinfo('a.txt')
314        self.assertEqual(a_info.compress_type, self.compression)
315        self.assertEqual(a_info._compresslevel, 1)
316
317        # Compression level is overridden.
318        b_info = zipfp.getinfo('b.txt')
319        self.assertEqual(b_info.compress_type, self.compression)
320        self.assertEqual(b_info._compresslevel, 2)
321
322    def test_read_return_size(self):
323        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
324        # than requested.
325        for test_size in (1, 4095, 4096, 4097, 16384):
326            file_size = test_size + 1
327            junk = getrandbytes(file_size)
328            with zipfile_aes.AESZipFile(io.BytesIO(), "w", self.compression) as zipf:
329                zipf.writestr('foo', junk)
330                with zipf.open('foo', 'r') as fp:
331                    buf = fp.read(test_size)
332                    self.assertEqual(len(buf), test_size)
333
334    def test_truncated_zipfile(self):
335        fp = io.BytesIO()
336        with zipfile_aes.AESZipFile(fp, mode='w') as zipf:
337            zipf.writestr('strfile', self.data, compress_type=self.compression)
338            end_offset = fp.tell()
339        zipfiledata = fp.getvalue()
340
341        fp = io.BytesIO(zipfiledata)
342        with zipfile_aes.AESZipFile(fp) as zipf:
343            with zipf.open('strfile') as zipopen:
344                fp.truncate(end_offset - 20)
345                with self.assertRaises(EOFError):
346                    zipopen.read()
347
348        fp = io.BytesIO(zipfiledata)
349        with zipfile_aes.AESZipFile(fp) as zipf:
350            with zipf.open('strfile') as zipopen:
351                fp.truncate(end_offset - 20)
352                with self.assertRaises(EOFError):
353                    while zipopen.read(100):
354                        pass
355
356        fp = io.BytesIO(zipfiledata)
357        with zipfile_aes.AESZipFile(fp) as zipf:
358            with zipf.open('strfile') as zipopen:
359                fp.truncate(end_offset - 20)
360                with self.assertRaises(EOFError):
361                    while zipopen.read1(100):
362                        pass
363
364    def test_repr(self):
365        fname = 'file.name'
366        for f in get_files(self):
367            with zipfile_aes.AESZipFile(f, 'w', self.compression) as zipfp:
368                zipfp.write(TESTFN, fname)
369                r = repr(zipfp)
370                self.assertIn("mode='w'", r)
371
372            with zipfile_aes.AESZipFile(f, 'r') as zipfp:
373                r = repr(zipfp)
374                if isinstance(f, str):
375                    self.assertIn('filename=%r' % f, r)
376                else:
377                    self.assertIn('file=%r' % f, r)
378                self.assertIn("mode='r'", r)
379                r = repr(zipfp.getinfo(fname))
380                self.assertIn('filename=%r' % fname, r)
381                self.assertIn('filemode=', r)
382                self.assertIn('file_size=', r)
383                if self.compression != zipfile.ZIP_STORED:
384                    self.assertIn('compress_type=', r)
385                    self.assertIn('compress_size=', r)
386                with zipfp.open(fname) as zipopen:
387                    r = repr(zipopen)
388                    self.assertIn('name=%r' % fname, r)
389                    self.assertIn("mode='r'", r)
390                    if self.compression != zipfile.ZIP_STORED:
391                        self.assertIn('compress_type=', r)
392                self.assertIn('[closed]', repr(zipopen))
393            self.assertIn('[closed]', repr(zipfp))
394
395    def test_compresslevel_basic(self):
396        for f in get_files(self):
397            self.zip_test(f, self.compression, compresslevel=9)
398
399    def test_per_file_compresslevel(self):
400        """Check that files within a Zip archive can have different
401        compression levels."""
402        with zipfile_aes.AESZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
403            zipfp.write(TESTFN, 'compress_1')
404            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
405            one_info = zipfp.getinfo('compress_1')
406            nine_info = zipfp.getinfo('compress_9')
407            self.assertEqual(one_info._compresslevel, 1)
408            self.assertEqual(nine_info._compresslevel, 9)
409
410    def tearDown(self):
411        unlink(TESTFN)
412        unlink(TESTFN2)
413
414
415class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
416                                unittest.TestCase):
417    compression = zipfile.ZIP_STORED
418    test_low_compression = None
419
420    def zip_test_writestr_permissions(self, f, compression):
421        # Make sure that writestr and open(... mode='w') create files with
422        # mode 0600, when they are passed a name rather than a ZipInfo
423        # instance.
424
425        self.make_test_archive(f, compression)
426        with zipfile_aes.AESZipFile(f, "r") as zipfp:
427            zinfo = zipfp.getinfo('strfile')
428            self.assertEqual(zinfo.external_attr, 0o600 << 16)
429
430            zinfo2 = zipfp.getinfo('written-open-w')
431            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
432
433    def test_writestr_permissions(self):
434        for f in get_files(self):
435            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
436
437    def test_absolute_arcnames(self):
438        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
439            zipfp.write(TESTFN, "/absolute")
440
441        with zipfile_aes.AESZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
442            self.assertEqual(zipfp.namelist(), ["absolute"])
443
444    def test_append_to_zip_file(self):
445        """Test appending to an existing zipfile."""
446        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
447            zipfp.write(TESTFN, TESTFN)
448
449        with zipfile_aes.AESZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
450            zipfp.writestr("strfile", self.data)
451            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
452
453    def test_append_to_non_zip_file(self):
454        """Test appending to an existing file that is not a zipfile."""
455        # NOTE: this test fails if len(d) < 22 because of the first
456        # line "fpin.seek(-22, 2)" in _EndRecData
457        data = b'I am not a ZipFile!'*10
458        with open(TESTFN2, 'wb') as f:
459            f.write(data)
460
461        with zipfile_aes.AESZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
462            zipfp.write(TESTFN, TESTFN)
463
464        with open(TESTFN2, 'rb') as f:
465            f.seek(len(data))
466            with zipfile_aes.AESZipFile(f, "r") as zipfp:
467                self.assertEqual(zipfp.namelist(), [TESTFN])
468                self.assertEqual(zipfp.read(TESTFN), self.data)
469        with open(TESTFN2, 'rb') as f:
470            self.assertEqual(f.read(len(data)), data)
471            zipfiledata = f.read()
472        with io.BytesIO(zipfiledata) as bio, zipfile_aes.AESZipFile(bio) as zipfp:
473            self.assertEqual(zipfp.namelist(), [TESTFN])
474            self.assertEqual(zipfp.read(TESTFN), self.data)
475
476    def test_read_concatenated_zip_file(self):
477        with io.BytesIO() as bio:
478            with zipfile_aes.AESZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
479                zipfp.write(TESTFN, TESTFN)
480            zipfiledata = bio.getvalue()
481        data = b'I am not a ZipFile!'*10
482        with open(TESTFN2, 'wb') as f:
483            f.write(data)
484            f.write(zipfiledata)
485
486        with zipfile_aes.AESZipFile(TESTFN2) as zipfp:
487            self.assertEqual(zipfp.namelist(), [TESTFN])
488            self.assertEqual(zipfp.read(TESTFN), self.data)
489
490    def test_append_to_concatenated_zip_file(self):
491        with io.BytesIO() as bio:
492            with zipfile_aes.AESZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
493                zipfp.write(TESTFN, TESTFN)
494            zipfiledata = bio.getvalue()
495        data = b'I am not a ZipFile!'*1000000
496        with open(TESTFN2, 'wb') as f:
497            f.write(data)
498            f.write(zipfiledata)
499
500        with zipfile_aes.AESZipFile(TESTFN2, 'a') as zipfp:
501            self.assertEqual(zipfp.namelist(), [TESTFN])
502            zipfp.writestr('strfile', self.data)
503
504        with open(TESTFN2, 'rb') as f:
505            self.assertEqual(f.read(len(data)), data)
506            zipfiledata = f.read()
507        with io.BytesIO(zipfiledata) as bio, zipfile_aes.AESZipFile(bio) as zipfp:
508            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
509            self.assertEqual(zipfp.read(TESTFN), self.data)
510            self.assertEqual(zipfp.read('strfile'), self.data)
511
512    def test_ignores_newline_at_end(self):
513        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
514            zipfp.write(TESTFN, TESTFN)
515        with open(TESTFN2, 'a') as f:
516            f.write("\r\n\00\00\00")
517        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
518            self.assertIsInstance(zipfp, zipfile_aes.AESZipFile)
519
520    def test_ignores_stuff_appended_past_comments(self):
521        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
522            zipfp.comment = b"this is a comment"
523            zipfp.write(TESTFN, TESTFN)
524        with open(TESTFN2, 'a') as f:
525            f.write("abcdef\r\n")
526        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
527            self.assertIsInstance(zipfp, zipfile_aes.AESZipFile)
528            self.assertEqual(zipfp.comment, b"this is a comment")
529
530    def test_write_default_name(self):
531        """Check that calling ZipFile.write without arcname specified
532        produces the expected result."""
533        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
534            zipfp.write(TESTFN)
535            with open(TESTFN, "rb") as f:
536                self.assertEqual(zipfp.read(TESTFN), f.read())
537
538    def test_write_to_readonly(self):
539        """Check that trying to call write() on a readonly ZipFile object
540        raises a ValueError."""
541        with zipfile_aes.AESZipFile(TESTFN2, mode="w") as zipfp:
542            zipfp.writestr("somefile.txt", "bogus")
543
544        with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipfp:
545            self.assertRaises(ValueError, zipfp.write, TESTFN)
546
547        with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipfp:
548            with self.assertRaises(ValueError):
549                zipfp.open(TESTFN, mode='w')
550
551    def test_add_file_before_1980(self):
552        # Set atime and mtime to 1970-01-01
553        os.utime(TESTFN, (0, 0))
554        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
555            self.assertRaises(ValueError, zipfp.write, TESTFN)
556
557        with zipfile_aes.AESZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
558            zipfp.write(TESTFN)
559            zinfo = zipfp.getinfo(TESTFN)
560            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
561
562    def test_add_file_after_2107(self):
563        # Set atime and mtime to 2108-12-30
564        try:
565            os.utime(TESTFN, (4386268800, 4386268800))
566        except OverflowError:
567            self.skipTest('Host fs cannot set timestamp to required value.')
568
569        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
570            self.assertRaises(struct.error, zipfp.write, TESTFN)
571
572        with zipfile_aes.AESZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
573            zipfp.write(TESTFN)
574            zinfo = zipfp.getinfo(TESTFN)
575            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
576
577
578@requires_zlib
579class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
580                                 unittest.TestCase):
581    compression = zipfile.ZIP_DEFLATED
582
583    def test_per_file_compression(self):
584        """Check that files within a Zip archive can have different
585        compression options."""
586        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
587            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
588            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
589            sinfo = zipfp.getinfo('storeme')
590            dinfo = zipfp.getinfo('deflateme')
591            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
592            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
593
594@requires_bz2
595class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
596                               unittest.TestCase):
597    compression = zipfile.ZIP_BZIP2
598
599@requires_lzma
600class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
601                              unittest.TestCase):
602    compression = zipfile.ZIP_LZMA
603
604
605class AbstractTestZip64InSmallFiles:
606    # These tests test the ZIP64 functionality without using large files,
607    # see test_zipfile64 for proper tests.
608
609    @classmethod
610    def setUpClass(cls):
611        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
612                    for i in range(0, FIXEDTEST_SIZE))
613        cls.data = b'\n'.join(line_gen)
614
615    def setUp(self):
616        self._limit = zipfile.ZIP64_LIMIT
617        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
618        zipfile.ZIP64_LIMIT = 1000
619        zipfile.ZIP_FILECOUNT_LIMIT = 9
620
621        # Make a source file with some lines
622        with open(TESTFN, "wb") as fp:
623            fp.write(self.data)
624
625    def zip_test(self, f, compression):
626        # Create the ZIP archive
627        with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=True) as zipfp:
628            zipfp.write(TESTFN, "another.name")
629            zipfp.write(TESTFN, TESTFN)
630            zipfp.writestr("strfile", self.data)
631
632        # Read the ZIP archive
633        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
634            self.assertEqual(zipfp.read(TESTFN), self.data)
635            self.assertEqual(zipfp.read("another.name"), self.data)
636            self.assertEqual(zipfp.read("strfile"), self.data)
637
638            # Print the ZIP directory
639            fp = io.StringIO()
640            zipfp.printdir(fp)
641
642            directory = fp.getvalue()
643            lines = directory.splitlines()
644            self.assertEqual(len(lines), 4) # Number of files + header
645
646            self.assertIn('File Name', lines[0])
647            self.assertIn('Modified', lines[0])
648            self.assertIn('Size', lines[0])
649
650            fn, date, time_, size = lines[1].split()
651            self.assertEqual(fn, 'another.name')
652            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
653            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
654            self.assertEqual(size, str(len(self.data)))
655
656            # Check the namelist
657            names = zipfp.namelist()
658            self.assertEqual(len(names), 3)
659            self.assertIn(TESTFN, names)
660            self.assertIn("another.name", names)
661            self.assertIn("strfile", names)
662
663            # Check infolist
664            infos = zipfp.infolist()
665            names = [i.filename for i in infos]
666            self.assertEqual(len(names), 3)
667            self.assertIn(TESTFN, names)
668            self.assertIn("another.name", names)
669            self.assertIn("strfile", names)
670            for i in infos:
671                self.assertEqual(i.file_size, len(self.data))
672
673            # check getinfo
674            for nm in (TESTFN, "another.name", "strfile"):
675                info = zipfp.getinfo(nm)
676                self.assertEqual(info.filename, nm)
677                self.assertEqual(info.file_size, len(self.data))
678
679            # Check that testzip doesn't raise an exception
680            zipfp.testzip()
681
682    def test_basic(self):
683        for f in get_files(self):
684            self.zip_test(f, self.compression)
685
686    def test_too_many_files(self):
687        # This test checks that more than 64k files can be added to an archive,
688        # and that the resulting archive can be read properly by ZipFile
689        zipf = zipfile_aes.AESZipFile(TESTFN, "w", self.compression,
690                               allowZip64=True)
691        zipf.debug = 100
692        numfiles = 15
693        for i in range(numfiles):
694            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
695        self.assertEqual(len(zipf.namelist()), numfiles)
696        zipf.close()
697
698        zipf2 = zipfile_aes.AESZipFile(TESTFN, "r", self.compression)
699        self.assertEqual(len(zipf2.namelist()), numfiles)
700        for i in range(numfiles):
701            content = zipf2.read("foo%08d" % i).decode('ascii')
702            self.assertEqual(content, "%d" % (i**3 % 57))
703        zipf2.close()
704
705    def test_too_many_files_append(self):
706        zipf = zipfile_aes.AESZipFile(TESTFN, "w", self.compression,
707                               allowZip64=False)
708        zipf.debug = 100
709        numfiles = 9
710        for i in range(numfiles):
711            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
712        self.assertEqual(len(zipf.namelist()), numfiles)
713        with self.assertRaises(zipfile.LargeZipFile):
714            zipf.writestr("foo%08d" % numfiles, b'')
715        self.assertEqual(len(zipf.namelist()), numfiles)
716        zipf.close()
717
718        zipf = zipfile_aes.AESZipFile(TESTFN, "a", self.compression,
719                               allowZip64=False)
720        zipf.debug = 100
721        self.assertEqual(len(zipf.namelist()), numfiles)
722        with self.assertRaises(zipfile.LargeZipFile):
723            zipf.writestr("foo%08d" % numfiles, b'')
724        self.assertEqual(len(zipf.namelist()), numfiles)
725        zipf.close()
726
727        zipf = zipfile_aes.AESZipFile(TESTFN, "a", self.compression,
728                               allowZip64=True)
729        zipf.debug = 100
730        self.assertEqual(len(zipf.namelist()), numfiles)
731        numfiles2 = 15
732        for i in range(numfiles, numfiles2):
733            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
734        self.assertEqual(len(zipf.namelist()), numfiles2)
735        zipf.close()
736
737        zipf2 = zipfile_aes.AESZipFile(TESTFN, "r", self.compression)
738        self.assertEqual(len(zipf2.namelist()), numfiles2)
739        for i in range(numfiles2):
740            content = zipf2.read("foo%08d" % i).decode('ascii')
741            self.assertEqual(content, "%d" % (i**3 % 57))
742        zipf2.close()
743
744    def tearDown(self):
745        zipfile.ZIP64_LIMIT = self._limit
746        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
747        unlink(TESTFN)
748        unlink(TESTFN2)
749
750
751class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
752                                  unittest.TestCase):
753    compression = zipfile.ZIP_STORED
754
755    def large_file_exception_test(self, f, compression):
756        with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=False) as zipfp:
757            self.assertRaises(zipfile.LargeZipFile,
758                              zipfp.write, TESTFN, "another.name")
759
760    def large_file_exception_test2(self, f, compression):
761        with zipfile_aes.AESZipFile(f, "w", compression, allowZip64=False) as zipfp:
762            self.assertRaises(zipfile.LargeZipFile,
763                              zipfp.writestr, "another.name", self.data)
764
765    def test_large_file_exception(self):
766        for f in get_files(self):
767            self.large_file_exception_test(f, zipfile.ZIP_STORED)
768            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
769
770    def test_absolute_arcnames(self):
771        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
772                             allowZip64=True) as zipfp:
773            zipfp.write(TESTFN, "/absolute")
774
775        with zipfile_aes.AESZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
776            self.assertEqual(zipfp.namelist(), ["absolute"])
777
778    def test_append(self):
779        # Test that appending to the Zip64 archive doesn't change
780        # extra fields of existing entries.
781        with zipfile_aes.AESZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
782            zipfp.writestr("strfile", self.data)
783        with zipfile_aes.AESZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
784            zinfo = zipfp.getinfo("strfile")
785            extra = zinfo.extra
786        with zipfile_aes.AESZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
787            zipfp.writestr("strfile2", self.data)
788        with zipfile_aes.AESZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
789            zinfo = zipfp.getinfo("strfile")
790            self.assertEqual(zinfo.extra, extra)
791
792@requires_zlib
793class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
794                                   unittest.TestCase):
795    compression = zipfile.ZIP_DEFLATED
796
797@requires_bz2
798class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
799                                 unittest.TestCase):
800    compression = zipfile.ZIP_BZIP2
801
802@requires_lzma
803class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
804                                unittest.TestCase):
805    compression = zipfile.ZIP_LZMA
806
807
808class AbstractWriterTests:
809
810    def tearDown(self):
811        unlink(TESTFN2)
812
813    def test_close_after_close(self):
814        data = b'content'
815        with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipf:
816            w = zipf.open('test', 'w')
817            w.write(data)
818            w.close()
819            self.assertTrue(w.closed)
820            w.close()
821            self.assertTrue(w.closed)
822            self.assertEqual(zipf.read('test'), data)
823
824    def test_write_after_close(self):
825        data = b'content'
826        with zipfile_aes.AESZipFile(TESTFN2, "w", self.compression) as zipf:
827            w = zipf.open('test', 'w')
828            w.write(data)
829            w.close()
830            self.assertTrue(w.closed)
831            self.assertRaises(ValueError, w.write, b'')
832            self.assertEqual(zipf.read('test'), data)
833
834class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
835    compression = zipfile.ZIP_STORED
836
837@requires_zlib
838class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
839    compression = zipfile.ZIP_DEFLATED
840
841@requires_bz2
842class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
843    compression = zipfile.ZIP_BZIP2
844
845@requires_lzma
846class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
847    compression = zipfile.ZIP_LZMA
848
849
850@unittest.skipIf(sys.version_info[0:2] < (3, 5), 'Requires Python >= 3.5')
851class PyZipFileTests(unittest.TestCase):
852    def assertCompiledIn(self, name, namelist):
853        if name + 'o' not in namelist:
854            self.assertIn(name + 'c', namelist)
855
856    def requiresWriteAccess(self, path):
857        # effective_ids unavailable on windows
858        if not os.access(path, os.W_OK,
859                         effective_ids=os.access in os.supports_effective_ids):
860            self.skipTest('requires write access to the installed location')
861        filename = os.path.join(path, 'test_zipfile.try')
862        try:
863            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
864            os.close(fd)
865        except Exception:
866            self.skipTest('requires write access to the installed location')
867        unlink(filename)
868
869    def test_write_pyfile(self):
870        self.requiresWriteAccess(os.path.dirname(__file__))
871        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
872            fn = __file__
873            if fn.endswith('.pyc'):
874                path_split = fn.split(os.sep)
875                if os.altsep is not None:
876                    path_split.extend(fn.split(os.altsep))
877                if '__pycache__' in path_split:
878                    fn = importlib.util.source_from_cache(fn)
879                else:
880                    fn = fn[:-1]
881
882            zipfp.writepy(fn)
883
884            bn = os.path.basename(fn)
885            self.assertNotIn(bn, zipfp.namelist())
886            self.assertCompiledIn(bn, zipfp.namelist())
887
888        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
889            fn = __file__
890            if fn.endswith('.pyc'):
891                fn = fn[:-1]
892
893            zipfp.writepy(fn, "testpackage")
894
895            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
896            self.assertNotIn(bn, zipfp.namelist())
897            self.assertCompiledIn(bn, zipfp.namelist())
898
899    def test_write_python_package(self):
900        import email
901        packagedir = os.path.dirname(email.__file__)
902        self.requiresWriteAccess(packagedir)
903
904        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
905            zipfp.writepy(packagedir)
906
907            # Check for a couple of modules at different levels of the
908            # hierarchy
909            names = zipfp.namelist()
910            self.assertCompiledIn('email/__init__.py', names)
911            self.assertCompiledIn('email/mime/text.py', names)
912
913    def test_write_filtered_python_package(self):
914        import test
915        packagedir = os.path.dirname(test.__file__)
916        self.requiresWriteAccess(packagedir)
917
918        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
919
920            # first make sure that the test folder gives error messages
921            # (on the badsyntax_... files)
922            with captured_stdout() as reportSIO:
923                zipfp.writepy(packagedir)
924            reportStr = reportSIO.getvalue()
925            self.assertTrue('SyntaxError' in reportStr)
926
927            # then check that the filter works on the whole package
928            with captured_stdout() as reportSIO:
929                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
930            reportStr = reportSIO.getvalue()
931            self.assertTrue('SyntaxError' not in reportStr)
932
933            # then check that the filter works on individual files
934            def filter(path):
935                return not os.path.basename(path).startswith("bad")
936            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
937                zipfp.writepy(packagedir, filterfunc=filter)
938            reportStr = reportSIO.getvalue()
939            if reportStr:
940                print(reportStr)
941            self.assertTrue('SyntaxError' not in reportStr)
942
943    def test_write_with_optimization(self):
944        import email
945        packagedir = os.path.dirname(email.__file__)
946        self.requiresWriteAccess(packagedir)
947        optlevel = 1 if __debug__ else 0
948        ext = '.pyc'
949
950        with TemporaryFile() as t, \
951             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
952            zipfp.writepy(packagedir)
953
954            names = zipfp.namelist()
955            self.assertIn('email/__init__' + ext, names)
956            self.assertIn('email/mime/text' + ext, names)
957
958    def test_write_python_directory(self):
959        os.mkdir(TESTFN2)
960        try:
961            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
962                fp.write("print(42)\n")
963
964            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
965                fp.write("print(42 * 42)\n")
966
967            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
968                fp.write("bla bla bla\n")
969
970            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
971                zipfp.writepy(TESTFN2)
972
973                names = zipfp.namelist()
974                self.assertCompiledIn('mod1.py', names)
975                self.assertCompiledIn('mod2.py', names)
976                self.assertNotIn('mod2.txt', names)
977
978        finally:
979            rmtree(TESTFN2)
980
981    def test_write_python_directory_filtered(self):
982        os.mkdir(TESTFN2)
983        try:
984            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
985                fp.write("print(42)\n")
986
987            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
988                fp.write("print(42 * 42)\n")
989
990            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
991                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
992                                                  not fn.endswith('mod2.py'))
993
994                names = zipfp.namelist()
995                self.assertCompiledIn('mod1.py', names)
996                self.assertNotIn('mod2.py', names)
997
998        finally:
999            rmtree(TESTFN2)
1000
1001    def test_write_non_pyfile(self):
1002        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1003            with open(TESTFN, 'w') as f:
1004                f.write('most definitely not a python file')
1005            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1006            unlink(TESTFN)
1007
1008    def test_write_pyfile_bad_syntax(self):
1009        os.mkdir(TESTFN2)
1010        try:
1011            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1012                fp.write("Bad syntax in python file\n")
1013
1014            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1015                # syntax errors are printed to stdout
1016                with captured_stdout() as s:
1017                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1018
1019                self.assertIn("SyntaxError", s.getvalue())
1020
1021                # as it will not have compiled the python file, it will
1022                # include the .py file not .pyc
1023                names = zipfp.namelist()
1024                self.assertIn('mod1.py', names)
1025                self.assertNotIn('mod1.pyc', names)
1026
1027        finally:
1028            rmtree(TESTFN2)
1029
1030    def test_write_pathlike(self):
1031        os.mkdir(TESTFN2)
1032        try:
1033            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1034                fp.write("print(42)\n")
1035
1036            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1037                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1038                names = zipfp.namelist()
1039                self.assertCompiledIn('mod1.py', names)
1040        finally:
1041            rmtree(TESTFN2)
1042
1043
1044class ExtractTests(unittest.TestCase):
1045
1046    def make_test_file(self):
1047        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1048            for fpath, fdata in SMALL_TEST_DATA:
1049                zipfp.writestr(fpath, fdata)
1050
1051    def test_extract(self):
1052        with temp_cwd():
1053            self.make_test_file()
1054            with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1055                for fpath, fdata in SMALL_TEST_DATA:
1056                    writtenfile = zipfp.extract(fpath)
1057
1058                    # make sure it was written to the right place
1059                    correctfile = os.path.join(os.getcwd(), fpath)
1060                    correctfile = os.path.normpath(correctfile)
1061
1062                    self.assertEqual(writtenfile, correctfile)
1063
1064                    # make sure correct data is in correct file
1065                    with open(writtenfile, "rb") as f:
1066                        self.assertEqual(fdata.encode(), f.read())
1067
1068                    unlink(writtenfile)
1069
1070    def _test_extract_with_target(self, target):
1071        self.make_test_file()
1072        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1073            for fpath, fdata in SMALL_TEST_DATA:
1074                writtenfile = zipfp.extract(fpath, target)
1075                # compat with Path objects were added in python 3.6
1076                if sys.version_info[0:2] < (3, 6):
1077                    target = str(target)
1078
1079                # make sure it was written to the right place
1080                correctfile = os.path.join(target, fpath)
1081                correctfile = os.path.normpath(correctfile)
1082                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1083
1084                # make sure correct data is in correct file
1085                with open(writtenfile, "rb") as f:
1086                    self.assertEqual(fdata.encode(), f.read())
1087
1088                unlink(writtenfile)
1089
1090        unlink(TESTFN2)
1091
1092    def test_extract_with_target(self):
1093        with temp_dir() as extdir:
1094            self._test_extract_with_target(extdir)
1095
1096    def test_extract_with_target_pathlike(self):
1097        with temp_dir() as extdir:
1098            self._test_extract_with_target(pathlib.Path(extdir))
1099
1100    def test_extract_all(self):
1101        with temp_cwd():
1102            self.make_test_file()
1103            with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1104                zipfp.extractall()
1105                for fpath, fdata in SMALL_TEST_DATA:
1106                    outfile = os.path.join(os.getcwd(), fpath)
1107
1108                    with open(outfile, "rb") as f:
1109                        self.assertEqual(fdata.encode(), f.read())
1110
1111                    unlink(outfile)
1112
1113    def _test_extract_all_with_target(self, target):
1114        self.make_test_file()
1115        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1116            zipfp.extractall(target)
1117            for fpath, fdata in SMALL_TEST_DATA:
1118                # compat with Path objects were added in python 3.6
1119                if sys.version_info[0:2] < (3, 6):
1120                    target = str(target)
1121                outfile = os.path.join(str(target), fpath)
1122
1123                with open(outfile, "rb") as f:
1124                    self.assertEqual(fdata.encode(), f.read())
1125
1126                unlink(outfile)
1127
1128        unlink(TESTFN2)
1129
1130    def test_extract_all_with_target(self):
1131        with temp_dir() as extdir:
1132            self._test_extract_all_with_target(extdir)
1133
1134    def test_extract_all_with_target_pathlike(self):
1135        with temp_dir() as extdir:
1136            self._test_extract_all_with_target(pathlib.Path(extdir))
1137
1138    def check_file(self, filename, content):
1139        self.assertTrue(os.path.isfile(filename))
1140        with open(filename, 'rb') as f:
1141            self.assertEqual(f.read(), content)
1142
1143    def test_sanitize_windows_name(self):
1144        san = zipfile_aes.AESZipFile._sanitize_windows_name
1145        # Passing pathsep in allows this test to work regardless of platform.
1146        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1147        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1148        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1149
1150    def test_extract_hackers_arcnames_common_cases(self):
1151        common_hacknames = [
1152            ('../foo/bar', 'foo/bar'),
1153            ('foo/../bar', 'foo/bar'),
1154            ('foo/../../bar', 'foo/bar'),
1155            ('foo/bar/..', 'foo/bar'),
1156            ('./../foo/bar', 'foo/bar'),
1157            ('/foo/bar', 'foo/bar'),
1158            ('/foo/../bar', 'foo/bar'),
1159            ('/foo/../../bar', 'foo/bar'),
1160        ]
1161        self._test_extract_hackers_arcnames(common_hacknames)
1162
1163    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1164    def test_extract_hackers_arcnames_windows_only(self):
1165        """Test combination of path fixing and windows name sanitization."""
1166        windows_hacknames = [
1167            (r'..\foo\bar', 'foo/bar'),
1168            (r'..\/foo\/bar', 'foo/bar'),
1169            (r'foo/\..\/bar', 'foo/bar'),
1170            (r'foo\/../\bar', 'foo/bar'),
1171            (r'C:foo/bar', 'foo/bar'),
1172            (r'C:/foo/bar', 'foo/bar'),
1173            (r'C://foo/bar', 'foo/bar'),
1174            (r'C:\foo\bar', 'foo/bar'),
1175            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1176            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1177            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1178            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1179            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1180            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1181            (r'//?/C:/foo/bar', 'foo/bar'),
1182            (r'\\?\C:\foo\bar', 'foo/bar'),
1183            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1184            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1185            ('../../foo../../ba..r', 'foo/ba..r'),
1186        ]
1187        self._test_extract_hackers_arcnames(windows_hacknames)
1188
1189    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1190    def test_extract_hackers_arcnames_posix_only(self):
1191        posix_hacknames = [
1192            ('//foo/bar', 'foo/bar'),
1193            ('../../foo../../ba..r', 'foo../ba..r'),
1194            (r'foo/..\bar', r'foo/..\bar'),
1195        ]
1196        self._test_extract_hackers_arcnames(posix_hacknames)
1197
1198    def _test_extract_hackers_arcnames(self, hacknames):
1199        for arcname, fixedname in hacknames:
1200            content = b'foobar' + arcname.encode()
1201            with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1202                zinfo = zipfile_aes.AESZipInfo()
1203                # preserve backslashes
1204                zinfo.filename = arcname
1205                zinfo.external_attr = 0o600 << 16
1206                zipfp.writestr(zinfo, content)
1207
1208            arcname = arcname.replace(os.sep, "/")
1209            targetpath = os.path.join('target', 'subdir', 'subsub')
1210            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1211
1212            with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp:
1213                writtenfile = zipfp.extract(arcname, targetpath)
1214                self.assertEqual(writtenfile, correctfile,
1215                                 msg='extract %r: %r != %r' %
1216                                 (arcname, writtenfile, correctfile))
1217            self.check_file(correctfile, content)
1218            rmtree('target')
1219
1220            with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp:
1221                zipfp.extractall(targetpath)
1222            self.check_file(correctfile, content)
1223            rmtree('target')
1224
1225            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1226
1227            with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp:
1228                writtenfile = zipfp.extract(arcname)
1229                self.assertEqual(writtenfile, correctfile,
1230                                 msg="extract %r" % arcname)
1231            self.check_file(correctfile, content)
1232            rmtree(fixedname.split('/')[0])
1233
1234            with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipfp:
1235                zipfp.extractall()
1236            self.check_file(correctfile, content)
1237            rmtree(fixedname.split('/')[0])
1238
1239            unlink(TESTFN2)
1240
1241
1242class OtherTests(unittest.TestCase):
1243    def test_open_via_zip_info(self):
1244        # Create the ZIP archive
1245        with zipfile_aes.AESZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1246            zipfp.writestr("name", "foo")
1247            with self.assertWarns(UserWarning):
1248                zipfp.writestr("name", "bar")
1249            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1250
1251        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1252            infos = zipfp.infolist()
1253            data = b""
1254            for info in infos:
1255                with zipfp.open(info) as zipopen:
1256                    data += zipopen.read()
1257            self.assertIn(data, {b"foobar", b"barfoo"})
1258            data = b""
1259            for info in infos:
1260                data += zipfp.read(info)
1261            self.assertIn(data, {b"foobar", b"barfoo"})
1262
1263    def test_writestr_extended_local_header_issue1202(self):
1264        with zipfile_aes.AESZipFile(TESTFN2, 'w') as orig_zip:
1265            for data in 'abcdefghijklmnop':
1266                zinfo = zipfile_aes.AESZipInfo(data)
1267                zinfo.flag_bits |= 0x08  # Include an extended local header.
1268                orig_zip.writestr(zinfo, data)
1269
1270    def test_close(self):
1271        """Check that the zipfile is closed after the 'with' block."""
1272        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
1273            for fpath, fdata in SMALL_TEST_DATA:
1274                zipfp.writestr(fpath, fdata)
1275                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1276        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1277
1278        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1279            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1280        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1281
1282    def test_close_on_exception(self):
1283        """Check that the zipfile is closed if an exception is raised in the
1284        'with' block."""
1285        with zipfile_aes.AESZipFile(TESTFN2, "w") as zipfp:
1286            for fpath, fdata in SMALL_TEST_DATA:
1287                zipfp.writestr(fpath, fdata)
1288
1289        try:
1290            with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp2:
1291                raise zipfile.BadZipFile()
1292        except zipfile.BadZipFile:
1293            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1294
1295    def test_unsupported_version(self):
1296        # File has an extract_version of 120
1297        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1298                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1299                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1300                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1301                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1302
1303        self.assertRaises(NotImplementedError, zipfile_aes.AESZipFile,
1304                          io.BytesIO(data), 'r')
1305
1306    @requires_zlib
1307    def test_read_unicode_filenames(self):
1308        # bug #10801
1309        fname = findfile('zip_cp437_header.zip')
1310        with zipfile_aes.AESZipFile(fname) as zipfp:
1311            for name in zipfp.namelist():
1312                zipfp.open(name).close()
1313
1314    def test_write_unicode_filenames(self):
1315        with zipfile_aes.AESZipFile(TESTFN, "w") as zf:
1316            zf.writestr("foo.txt", "Test for unicode filename")
1317            zf.writestr("\xf6.txt", "Test for unicode filename")
1318            self.assertIsInstance(zf.infolist()[0].filename, str)
1319
1320        with zipfile_aes.AESZipFile(TESTFN, "r") as zf:
1321            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1322            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1323
1324    def test_exclusive_create_zip_file(self):
1325        """Test exclusive creating a new zipfile."""
1326        unlink(TESTFN2)
1327        filename = 'testfile.txt'
1328        content = b'hello, world. this is some content.'
1329        with zipfile_aes.AESZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1330            zipfp.writestr(filename, content)
1331        with self.assertRaises(FileExistsError):
1332            zipfile_aes.AESZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1333        with zipfile_aes.AESZipFile(TESTFN2, "r") as zipfp:
1334            self.assertEqual(zipfp.namelist(), [filename])
1335            self.assertEqual(zipfp.read(filename), content)
1336
1337    def test_create_non_existent_file_for_append(self):
1338        if os.path.exists(TESTFN):
1339            os.unlink(TESTFN)
1340
1341        filename = 'testfile.txt'
1342        content = b'hello, world. this is some content.'
1343
1344        try:
1345            with zipfile_aes.AESZipFile(TESTFN, 'a') as zf:
1346                zf.writestr(filename, content)
1347        except OSError:
1348            self.fail('Could not append data to a non-existent zip file.')
1349
1350        self.assertTrue(os.path.exists(TESTFN))
1351
1352        with zipfile_aes.AESZipFile(TESTFN, 'r') as zf:
1353            self.assertEqual(zf.read(filename), content)
1354
1355    def test_close_erroneous_file(self):
1356        # This test checks that the ZipFile constructor closes the file object
1357        # it opens if there's an error in the file.  If it doesn't, the
1358        # traceback holds a reference to the ZipFile object and, indirectly,
1359        # the file object.
1360        # On Windows, this causes the os.unlink() call to fail because the
1361        # underlying file is still open.  This is SF bug #412214.
1362        #
1363        with open(TESTFN, "w") as fp:
1364            fp.write("this is not a legal zip file\n")
1365        try:
1366            zf = zipfile_aes.AESZipFile(TESTFN)
1367        except zipfile.BadZipFile:
1368            pass
1369
1370    def test_is_zip_erroneous_file(self):
1371        """Check that is_zipfile() correctly identifies non-zip files."""
1372        # - passing a filename
1373        with open(TESTFN, "w") as fp:
1374            fp.write("this is not a legal zip file\n")
1375        self.assertFalse(zipfile.is_zipfile(TESTFN))
1376        # - passing a path-like object
1377        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1378        # - passing a file object
1379        with open(TESTFN, "rb") as fp:
1380            self.assertFalse(zipfile.is_zipfile(fp))
1381        # - passing a file-like object
1382        fp = io.BytesIO()
1383        fp.write(b"this is not a legal zip file\n")
1384        self.assertFalse(zipfile.is_zipfile(fp))
1385        fp.seek(0, 0)
1386        self.assertFalse(zipfile.is_zipfile(fp))
1387
1388    def test_damaged_zipfile(self):
1389        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1390        # - Create a valid zip file
1391        fp = io.BytesIO()
1392        with zipfile_aes.AESZipFile(fp, mode="w") as zipf:
1393            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1394        zipfiledata = fp.getvalue()
1395
1396        # - Now create copies of it missing the last N bytes and make sure
1397        #   a BadZipFile exception is raised when we try to open it
1398        for N in range(len(zipfiledata)):
1399            fp = io.BytesIO(zipfiledata[:N])
1400            self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, fp)
1401
1402    def test_is_zip_valid_file(self):
1403        """Check that is_zipfile() correctly identifies zip files."""
1404        # - passing a filename
1405        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1406            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1407
1408        self.assertTrue(zipfile.is_zipfile(TESTFN))
1409        # - passing a file object
1410        with open(TESTFN, "rb") as fp:
1411            self.assertTrue(zipfile.is_zipfile(fp))
1412            fp.seek(0, 0)
1413            zip_contents = fp.read()
1414        # - passing a file-like object
1415        fp = io.BytesIO()
1416        fp.write(zip_contents)
1417        self.assertTrue(zipfile.is_zipfile(fp))
1418        fp.seek(0, 0)
1419        self.assertTrue(zipfile.is_zipfile(fp))
1420
1421    def test_non_existent_file_raises_OSError(self):
1422        # make sure we don't raise an AttributeError when a partially-constructed
1423        # ZipFile instance is finalized; this tests for regression on SF tracker
1424        # bug #403871.
1425
1426        # The bug we're testing for caused an AttributeError to be raised
1427        # when a ZipFile instance was created for a file that did not
1428        # exist; the .fp member was not initialized but was needed by the
1429        # __del__() method.  Since the AttributeError is in the __del__(),
1430        # it is ignored, but the user should be sufficiently annoyed by
1431        # the message on the output that regression will be noticed
1432        # quickly.
1433        self.assertRaises(OSError, zipfile_aes.AESZipFile, TESTFN)
1434
1435    def test_empty_file_raises_BadZipFile(self):
1436        f = open(TESTFN, 'w')
1437        f.close()
1438        self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN)
1439
1440        with open(TESTFN, 'w') as fp:
1441            fp.write("short file")
1442        self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN)
1443
1444    def test_closed_zip_raises_ValueError(self):
1445        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1446        data = io.BytesIO()
1447        with zipfile_aes.AESZipFile(data, mode="w") as zipf:
1448            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1449
1450        # This is correct; calling .read on a closed ZipFile should raise
1451        # a ValueError, and so should calling .testzip.  An earlier
1452        # version of .testzip would swallow this exception (and any other)
1453        # and report that the first file in the archive was corrupt.
1454        self.assertRaises(ValueError, zipf.read, "foo.txt")
1455        self.assertRaises(ValueError, zipf.open, "foo.txt")
1456        self.assertRaises(ValueError, zipf.testzip)
1457        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1458        with open(TESTFN, 'w') as f:
1459            f.write('zipfile test data')
1460        self.assertRaises(ValueError, zipf.write, TESTFN)
1461
1462    def test_bad_constructor_mode(self):
1463        """Check that bad modes passed to ZipFile constructor are caught."""
1464        self.assertRaises(ValueError, zipfile_aes.AESZipFile, TESTFN, "q")
1465
1466    def test_bad_open_mode(self):
1467        """Check that bad modes passed to ZipFile.open are caught."""
1468        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1469            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1470
1471        with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipf:
1472            # read the data to make sure the file is there
1473            zipf.read("foo.txt")
1474            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1475            # universal newlines support is removed
1476            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1477            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1478
1479    def test_read0(self):
1480        """Check that calling read(0) on a ZipExtFile object returns an empty
1481        string and doesn't advance file pointer."""
1482        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1483            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1484            # read the data to make sure the file is there
1485            with zipf.open("foo.txt") as f:
1486                for i in range(FIXEDTEST_SIZE):
1487                    self.assertEqual(f.read(0), b'')
1488
1489                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1490
1491    def test_open_non_existent_item(self):
1492        """Check that attempting to call open() for an item that doesn't
1493        exist in the archive raises a RuntimeError."""
1494        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1495            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1496
1497    def test_bad_compression_mode(self):
1498        """Check that bad compression methods passed to ZipFile.open are
1499        caught."""
1500        self.assertRaises(NotImplementedError, zipfile_aes.AESZipFile, TESTFN, "w", -1)
1501
1502    def test_unsupported_compression(self):
1503        # data is declared as shrunk, but actually deflated
1504        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1505                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1506                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1507                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1508                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1509                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1510        with zipfile_aes.AESZipFile(io.BytesIO(data), 'r') as zipf:
1511            self.assertRaises(NotImplementedError, zipf.open, 'x')
1512
1513    def test_null_byte_in_filename(self):
1514        """Check that a filename containing a null byte is properly
1515        terminated."""
1516        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1517            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1518            self.assertEqual(zipf.namelist(), ['foo.txt'])
1519
1520    def test_struct_sizes(self):
1521        """Check that ZIP internal structure sizes are calculated correctly."""
1522        self.assertEqual(zipfile.sizeEndCentDir, 22)
1523        self.assertEqual(zipfile.sizeCentralDir, 46)
1524        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1525        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1526
1527    def test_comments(self):
1528        """Check that comments on the archive are handled properly."""
1529
1530        # check default comment is empty
1531        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1532            self.assertEqual(zipf.comment, b'')
1533            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1534
1535        with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr:
1536            self.assertEqual(zipfr.comment, b'')
1537
1538        # check a simple short comment
1539        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1540        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1541            zipf.comment = comment
1542            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1543        with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr:
1544            self.assertEqual(zipf.comment, comment)
1545
1546        # check a comment of max length
1547        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1548        comment2 = comment2.encode("ascii")
1549        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1550            zipf.comment = comment2
1551            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1552
1553        with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr:
1554            self.assertEqual(zipfr.comment, comment2)
1555
1556        # check a comment that is too long is truncated
1557        with zipfile_aes.AESZipFile(TESTFN, mode="w") as zipf:
1558            with self.assertWarns(UserWarning):
1559                zipf.comment = comment2 + b'oops'
1560            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1561        with zipfile_aes.AESZipFile(TESTFN, mode="r") as zipfr:
1562            self.assertEqual(zipfr.comment, comment2)
1563
1564        # check that comments are correctly modified in append mode
1565        with zipfile_aes.AESZipFile(TESTFN,mode="w") as zipf:
1566            zipf.comment = b"original comment"
1567            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1568        with zipfile_aes.AESZipFile(TESTFN,mode="a") as zipf:
1569            zipf.comment = b"an updated comment"
1570        with zipfile_aes.AESZipFile(TESTFN,mode="r") as zipf:
1571            self.assertEqual(zipf.comment, b"an updated comment")
1572
1573        # check that comments are correctly shortened in append mode
1574        with zipfile_aes.AESZipFile(TESTFN,mode="w") as zipf:
1575            zipf.comment = b"original comment that's longer"
1576            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1577        with zipfile_aes.AESZipFile(TESTFN,mode="a") as zipf:
1578            zipf.comment = b"shorter comment"
1579        with zipfile_aes.AESZipFile(TESTFN,mode="r") as zipf:
1580            self.assertEqual(zipf.comment, b"shorter comment")
1581
1582    def test_unicode_comment(self):
1583        with zipfile_aes.AESZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1584            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1585            with self.assertRaises(TypeError):
1586                zipf.comment = "this is an error"
1587
1588    def test_change_comment_in_empty_archive(self):
1589        with zipfile_aes.AESZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1590            self.assertFalse(zipf.filelist)
1591            zipf.comment = b"this is a comment"
1592        with zipfile_aes.AESZipFile(TESTFN, "r") as zipf:
1593            self.assertEqual(zipf.comment, b"this is a comment")
1594
1595    def test_change_comment_in_nonempty_archive(self):
1596        with zipfile_aes.AESZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1597            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1598        with zipfile_aes.AESZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1599            self.assertTrue(zipf.filelist)
1600            zipf.comment = b"this is a comment"
1601        with zipfile_aes.AESZipFile(TESTFN, "r") as zipf:
1602            self.assertEqual(zipf.comment, b"this is a comment")
1603
1604    def test_empty_zipfile(self):
1605        # Check that creating a file in 'w' or 'a' mode and closing without
1606        # adding any files to the archives creates a valid empty ZIP file
1607        zipf = zipfile_aes.AESZipFile(TESTFN, mode="w")
1608        zipf.close()
1609        try:
1610            zipf = zipfile_aes.AESZipFile(TESTFN, mode="r")
1611        except zipfile.BadZipFile:
1612            self.fail("Unable to create empty ZIP file in 'w' mode")
1613
1614        zipf = zipfile_aes.AESZipFile(TESTFN, mode="a")
1615        zipf.close()
1616        try:
1617            zipf = zipfile_aes.AESZipFile(TESTFN, mode="r")
1618        except:
1619            self.fail("Unable to create empty ZIP file in 'a' mode")
1620
1621    def test_open_empty_file(self):
1622        # Issue 1710703: Check that opening a file with less than 22 bytes
1623        # raises a BadZipFile exception (rather than the previously unhelpful
1624        # OSError)
1625        f = open(TESTFN, 'w')
1626        f.close()
1627        self.assertRaises(zipfile.BadZipFile, zipfile_aes.AESZipFile, TESTFN, 'r')
1628
1629    def test_create_zipinfo_before_1980(self):
1630        self.assertRaises(ValueError,
1631                          zipfile_aes.AESZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1632
1633    def test_zipfile_with_short_extra_field(self):
1634        """If an extra field in the header is less than 4 bytes, skip it."""
1635        zipdata = (
1636            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1637            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1638            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1639            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1640            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1641            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1642            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1643        )
1644        with zipfile_aes.AESZipFile(io.BytesIO(zipdata), 'r') as zipf:
1645            # testzip returns the name of the first corrupt file, or None
1646            self.assertIsNone(zipf.testzip())
1647
1648    def test_open_conflicting_handles(self):
1649        # It's only possible to open one writable file handle at a time
1650        msg1 = b"It's fun to charter an accountant!"
1651        msg2 = b"And sail the wide accountant sea"
1652        msg3 = b"To find, explore the funds offshore"
1653        with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1654            with zipf.open('foo', mode='w') as w2:
1655                w2.write(msg1)
1656            with zipf.open('bar', mode='w') as w1:
1657                with self.assertRaises(ValueError):
1658                    zipf.open('handle', mode='w')
1659                with self.assertRaises(ValueError):
1660                    zipf.open('foo', mode='r')
1661                with self.assertRaises(ValueError):
1662                    zipf.writestr('str', 'abcde')
1663                with self.assertRaises(ValueError):
1664                    zipf.write(__file__, 'file')
1665                with self.assertRaises(ValueError):
1666                    zipf.close()
1667                w1.write(msg2)
1668            with zipf.open('baz', mode='w') as w2:
1669                w2.write(msg3)
1670
1671        with zipfile_aes.AESZipFile(TESTFN2, 'r') as zipf:
1672            self.assertEqual(zipf.read('foo'), msg1)
1673            self.assertEqual(zipf.read('bar'), msg2)
1674            self.assertEqual(zipf.read('baz'), msg3)
1675            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1676
1677    def test_seek_tell(self):
1678        # Test seek functionality
1679        txt = b"Where's Bruce?"
1680        bloc = txt.find(b"Bruce")
1681        # Check seek on a file
1682        with zipfile_aes.AESZipFile(TESTFN, "w") as zipf:
1683            zipf.writestr("foo.txt", txt)
1684        with zipfile_aes.AESZipFile(TESTFN, "r") as zipf:
1685            with zipf.open("foo.txt", "r") as fp:
1686                fp.seek(bloc, os.SEEK_SET)
1687                self.assertEqual(fp.tell(), bloc)
1688                fp.seek(-bloc, os.SEEK_CUR)
1689                self.assertEqual(fp.tell(), 0)
1690                fp.seek(bloc, os.SEEK_CUR)
1691                self.assertEqual(fp.tell(), bloc)
1692                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1693                fp.seek(0, os.SEEK_END)
1694                self.assertEqual(fp.tell(), len(txt))
1695                fp.seek(0, os.SEEK_SET)
1696                self.assertEqual(fp.tell(), 0)
1697        # Check seek on memory file
1698        data = io.BytesIO()
1699        with zipfile_aes.AESZipFile(data, mode="w") as zipf:
1700            zipf.writestr("foo.txt", txt)
1701        with zipfile_aes.AESZipFile(data, mode="r") as zipf:
1702            with zipf.open("foo.txt", "r") as fp:
1703                fp.seek(bloc, os.SEEK_SET)
1704                self.assertEqual(fp.tell(), bloc)
1705                fp.seek(-bloc, os.SEEK_CUR)
1706                self.assertEqual(fp.tell(), 0)
1707                fp.seek(bloc, os.SEEK_CUR)
1708                self.assertEqual(fp.tell(), bloc)
1709                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1710                fp.seek(0, os.SEEK_END)
1711                self.assertEqual(fp.tell(), len(txt))
1712                fp.seek(0, os.SEEK_SET)
1713                self.assertEqual(fp.tell(), 0)
1714
1715    def tearDown(self):
1716        unlink(TESTFN)
1717        unlink(TESTFN2)
1718
1719
1720class AbstractBadCrcTests:
1721    def test_testzip_with_bad_crc(self):
1722        """Tests that files with bad CRCs return their name from testzip."""
1723        zipdata = self.zip_with_bad_crc
1724
1725        with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1726            # testzip returns the name of the first corrupt file, or None
1727            self.assertEqual('afile', zipf.testzip())
1728
1729    def test_read_with_bad_crc(self):
1730        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
1731        zipdata = self.zip_with_bad_crc
1732
1733        # Using ZipFile.read()
1734        with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1735            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
1736
1737        # Using ZipExtFile.read()
1738        with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1739            with zipf.open('afile', 'r') as corrupt_file:
1740                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
1741
1742        # Same with small reads (in order to exercise the buffering logic)
1743        with zipfile_aes.AESZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1744            with zipf.open('afile', 'r') as corrupt_file:
1745                corrupt_file.MIN_READ_SIZE = 2
1746                with self.assertRaises(zipfile.BadZipFile):
1747                    while corrupt_file.read(2):
1748                        pass
1749
1750
1751class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1752    compression = zipfile.ZIP_STORED
1753    zip_with_bad_crc = (
1754        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
1755        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
1756        b'ilehello,AworldP'
1757        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
1758        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
1759        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
1760        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
1761        b'\0\0/\0\0\0\0\0')
1762
1763@requires_zlib
1764class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1765    compression = zipfile.ZIP_DEFLATED
1766    zip_with_bad_crc = (
1767        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
1768        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1769        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
1770        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
1771        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
1772        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
1773        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
1774        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
1775
1776@requires_bz2
1777class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1778    compression = zipfile.ZIP_BZIP2
1779    zip_with_bad_crc = (
1780        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
1781        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1782        b'ileBZh91AY&SY\xd4\xa8\xca'
1783        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
1784        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
1785        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
1786        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
1787        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
1788        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
1789        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
1790        b'\x00\x00\x00\x00')
1791
1792@requires_lzma
1793class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1794    compression = zipfile.ZIP_LZMA
1795    zip_with_bad_crc = (
1796        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
1797        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1798        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
1799        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
1800        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
1801        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
1802        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
1803        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
1804        b'\x00>\x00\x00\x00\x00\x00')
1805
1806
1807class DecryptionTests(unittest.TestCase):
1808    """Check that ZIP decryption works. Since the library does not
1809    support encryption at the moment, we use a pre-generated encrypted
1810    ZIP file."""
1811
1812    data = (
1813        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
1814        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
1815        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
1816        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
1817        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
1818        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
1819        b'\x00\x00L\x00\x00\x00\x00\x00' )
1820    data2 = (
1821        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
1822        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
1823        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
1824        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
1825        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
1826        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1827        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1828        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1829
1830    plain = b'zipfile.py encryption test'
1831    plain2 = b'\x00'*512
1832
1833    def setUp(self):
1834        with open(TESTFN, "wb") as fp:
1835            fp.write(self.data)
1836        self.zip = zipfile_aes.AESZipFile(TESTFN, "r")
1837        with open(TESTFN2, "wb") as fp:
1838            fp.write(self.data2)
1839        self.zip2 = zipfile_aes.AESZipFile(TESTFN2, "r")
1840
1841    def tearDown(self):
1842        self.zip.close()
1843        os.unlink(TESTFN)
1844        self.zip2.close()
1845        os.unlink(TESTFN2)
1846
1847    def test_no_password(self):
1848        # Reading the encrypted file without password
1849        # must generate a RunTime exception
1850        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1851        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1852
1853    def test_bad_password(self):
1854        self.zip.setpassword(b"perl")
1855        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1856        self.zip2.setpassword(b"perl")
1857        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1858
1859    @requires_zlib
1860    def test_good_password(self):
1861        self.zip.setpassword(b"python")
1862        self.assertEqual(self.zip.read("test.txt"), self.plain)
1863        self.zip2.setpassword(b"12345")
1864        self.assertEqual(self.zip2.read("zero"), self.plain2)
1865
1866    def test_unicode_password(self):
1867        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
1868        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
1869        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
1870        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
1871
1872class AbstractTestsWithRandomBinaryFiles:
1873    @classmethod
1874    def setUpClass(cls):
1875        datacount = randint(16, 64)*1024 + randint(1, 1024)
1876        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
1877                            for i in range(datacount))
1878
1879    def setUp(self):
1880        # Make a source file with some lines
1881        with open(TESTFN, "wb") as fp:
1882            fp.write(self.data)
1883
1884    def tearDown(self):
1885        unlink(TESTFN)
1886        unlink(TESTFN2)
1887
1888    def make_test_archive(self, f, compression):
1889        # Create the ZIP archive
1890        with zipfile_aes.AESZipFile(f, "w", compression) as zipfp:
1891            zipfp.write(TESTFN, "another.name")
1892            zipfp.write(TESTFN, TESTFN)
1893
1894    def zip_test(self, f, compression):
1895        self.make_test_archive(f, compression)
1896
1897        # Read the ZIP archive
1898        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
1899            testdata = zipfp.read(TESTFN)
1900            self.assertEqual(len(testdata), len(self.data))
1901            self.assertEqual(testdata, self.data)
1902            self.assertEqual(zipfp.read("another.name"), self.data)
1903
1904    def test_read(self):
1905        for f in get_files(self):
1906            self.zip_test(f, self.compression)
1907
1908    def zip_open_test(self, f, compression):
1909        self.make_test_archive(f, compression)
1910
1911        # Read the ZIP archive
1912        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
1913            zipdata1 = []
1914            with zipfp.open(TESTFN) as zipopen1:
1915                while True:
1916                    read_data = zipopen1.read(256)
1917                    if not read_data:
1918                        break
1919                    zipdata1.append(read_data)
1920
1921            zipdata2 = []
1922            with zipfp.open("another.name") as zipopen2:
1923                while True:
1924                    read_data = zipopen2.read(256)
1925                    if not read_data:
1926                        break
1927                    zipdata2.append(read_data)
1928
1929            testdata1 = b''.join(zipdata1)
1930            self.assertEqual(len(testdata1), len(self.data))
1931            self.assertEqual(testdata1, self.data)
1932
1933            testdata2 = b''.join(zipdata2)
1934            self.assertEqual(len(testdata2), len(self.data))
1935            self.assertEqual(testdata2, self.data)
1936
1937    def test_open(self):
1938        for f in get_files(self):
1939            self.zip_open_test(f, self.compression)
1940
1941    def zip_random_open_test(self, f, compression):
1942        self.make_test_archive(f, compression)
1943
1944        # Read the ZIP archive
1945        with zipfile_aes.AESZipFile(f, "r", compression) as zipfp:
1946            zipdata1 = []
1947            with zipfp.open(TESTFN) as zipopen1:
1948                while True:
1949                    read_data = zipopen1.read(randint(1, 1024))
1950                    if not read_data:
1951                        break
1952                    zipdata1.append(read_data)
1953
1954            testdata = b''.join(zipdata1)
1955            self.assertEqual(len(testdata), len(self.data))
1956            self.assertEqual(testdata, self.data)
1957
1958    def test_random_open(self):
1959        for f in get_files(self):
1960            self.zip_random_open_test(f, self.compression)
1961
1962
1963class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1964                                       unittest.TestCase):
1965    compression = zipfile.ZIP_STORED
1966
1967@requires_zlib
1968class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1969                                        unittest.TestCase):
1970    compression = zipfile.ZIP_DEFLATED
1971
1972@requires_bz2
1973class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1974                                      unittest.TestCase):
1975    compression = zipfile.ZIP_BZIP2
1976
1977@requires_lzma
1978class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1979                                     unittest.TestCase):
1980    compression = zipfile.ZIP_LZMA
1981
1982
1983# Provide the tell() method but not seek()
1984class Tellable:
1985    def __init__(self, fp):
1986        self.fp = fp
1987        self.offset = 0
1988
1989    def write(self, data):
1990        n = self.fp.write(data)
1991        self.offset += n
1992        return n
1993
1994    def tell(self):
1995        return self.offset
1996
1997    def flush(self):
1998        self.fp.flush()
1999
2000class Unseekable:
2001    def __init__(self, fp):
2002        self.fp = fp
2003
2004    def write(self, data):
2005        return self.fp.write(data)
2006
2007    def flush(self):
2008        self.fp.flush()
2009
2010class UnseekableTests(unittest.TestCase):
2011    def test_writestr(self):
2012        for wrapper in (lambda f: f), Tellable, Unseekable:
2013            with self.subTest(wrapper=wrapper):
2014                f = io.BytesIO()
2015                f.write(b'abc')
2016                bf = io.BufferedWriter(f)
2017                with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2018                    zipfp.writestr('ones', b'111')
2019                    zipfp.writestr('twos', b'222')
2020                self.assertEqual(f.getvalue()[:5], b'abcPK')
2021                with zipfile_aes.AESZipFile(f, mode='r') as zipf:
2022                    with zipf.open('ones') as zopen:
2023                        self.assertEqual(zopen.read(), b'111')
2024                    with zipf.open('twos') as zopen:
2025                        self.assertEqual(zopen.read(), b'222')
2026
2027    def test_write(self):
2028        for wrapper in (lambda f: f), Tellable, Unseekable:
2029            with self.subTest(wrapper=wrapper):
2030                f = io.BytesIO()
2031                f.write(b'abc')
2032                bf = io.BufferedWriter(f)
2033                with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2034                    self.addCleanup(unlink, TESTFN)
2035                    with open(TESTFN, 'wb') as f2:
2036                        f2.write(b'111')
2037                    zipfp.write(TESTFN, 'ones')
2038                    with open(TESTFN, 'wb') as f2:
2039                        f2.write(b'222')
2040                    zipfp.write(TESTFN, 'twos')
2041                self.assertEqual(f.getvalue()[:5], b'abcPK')
2042                with zipfile_aes.AESZipFile(f, mode='r') as zipf:
2043                    with zipf.open('ones') as zopen:
2044                        self.assertEqual(zopen.read(), b'111')
2045                    with zipf.open('twos') as zopen:
2046                        self.assertEqual(zopen.read(), b'222')
2047
2048    def test_open_write(self):
2049        for wrapper in (lambda f: f), Tellable, Unseekable:
2050            with self.subTest(wrapper=wrapper):
2051                f = io.BytesIO()
2052                f.write(b'abc')
2053                bf = io.BufferedWriter(f)
2054                with zipfile_aes.AESZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2055                    with zipf.open('ones', 'w') as zopen:
2056                        zopen.write(b'111')
2057                    with zipf.open('twos', 'w') as zopen:
2058                        zopen.write(b'222')
2059                self.assertEqual(f.getvalue()[:5], b'abcPK')
2060                with zipfile_aes.AESZipFile(f) as zipf:
2061                    self.assertEqual(zipf.read('ones'), b'111')
2062                    self.assertEqual(zipf.read('twos'), b'222')
2063
2064
2065@requires_zlib
2066class TestsWithMultipleOpens(unittest.TestCase):
2067    @classmethod
2068    def setUpClass(cls):
2069        cls.data1 = b'111' + getrandbytes(10000)
2070        cls.data2 = b'222' + getrandbytes(10000)
2071
2072    def make_test_archive(self, f):
2073        # Create the ZIP archive
2074        with zipfile_aes.AESZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2075            zipfp.writestr('ones', self.data1)
2076            zipfp.writestr('twos', self.data2)
2077
2078    def test_same_file(self):
2079        # Verify that (when the ZipFile is in control of creating file objects)
2080        # multiple open() calls can be made without interfering with each other.
2081        for f in get_files(self):
2082            self.make_test_archive(f)
2083            with zipfile_aes.AESZipFile(f, mode="r") as zipf:
2084                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2085                    data1 = zopen1.read(500)
2086                    data2 = zopen2.read(500)
2087                    data1 += zopen1.read()
2088                    data2 += zopen2.read()
2089                self.assertEqual(data1, data2)
2090                self.assertEqual(data1, self.data1)
2091
2092    def test_different_file(self):
2093        # Verify that (when the ZipFile is in control of creating file objects)
2094        # multiple open() calls can be made without interfering with each other.
2095        for f in get_files(self):
2096            self.make_test_archive(f)
2097            with zipfile_aes.AESZipFile(f, mode="r") as zipf:
2098                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2099                    data1 = zopen1.read(500)
2100                    data2 = zopen2.read(500)
2101                    data1 += zopen1.read()
2102                    data2 += zopen2.read()
2103                self.assertEqual(data1, self.data1)
2104                self.assertEqual(data2, self.data2)
2105
2106    def test_interleaved(self):
2107        # Verify that (when the ZipFile is in control of creating file objects)
2108        # multiple open() calls can be made without interfering with each other.
2109        for f in get_files(self):
2110            self.make_test_archive(f)
2111            with zipfile_aes.AESZipFile(f, mode="r") as zipf:
2112                with zipf.open('ones') as zopen1:
2113                    data1 = zopen1.read(500)
2114                    with zipf.open('twos') as zopen2:
2115                        data2 = zopen2.read(500)
2116                        data1 += zopen1.read()
2117                        data2 += zopen2.read()
2118                self.assertEqual(data1, self.data1)
2119                self.assertEqual(data2, self.data2)
2120
2121    def test_read_after_close(self):
2122        for f in get_files(self):
2123            self.make_test_archive(f)
2124            with contextlib.ExitStack() as stack:
2125                with zipfile_aes.AESZipFile(f, 'r') as zipf:
2126                    zopen1 = stack.enter_context(zipf.open('ones'))
2127                    zopen2 = stack.enter_context(zipf.open('twos'))
2128                data1 = zopen1.read(500)
2129                data2 = zopen2.read(500)
2130                data1 += zopen1.read()
2131                data2 += zopen2.read()
2132            self.assertEqual(data1, self.data1)
2133            self.assertEqual(data2, self.data2)
2134
2135    def test_read_after_write(self):
2136        for f in get_files(self):
2137            with zipfile_aes.AESZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2138                zipf.writestr('ones', self.data1)
2139                zipf.writestr('twos', self.data2)
2140                with zipf.open('ones') as zopen1:
2141                    data1 = zopen1.read(500)
2142            self.assertEqual(data1, self.data1[:500])
2143            with zipfile_aes.AESZipFile(f, 'r') as zipf:
2144                data1 = zipf.read('ones')
2145                data2 = zipf.read('twos')
2146            self.assertEqual(data1, self.data1)
2147            self.assertEqual(data2, self.data2)
2148
2149    def test_write_after_read(self):
2150        for f in get_files(self):
2151            with zipfile_aes.AESZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2152                zipf.writestr('ones', self.data1)
2153                with zipf.open('ones') as zopen1:
2154                    zopen1.read(500)
2155                    zipf.writestr('twos', self.data2)
2156            with zipfile_aes.AESZipFile(f, 'r') as zipf:
2157                data1 = zipf.read('ones')
2158                data2 = zipf.read('twos')
2159            self.assertEqual(data1, self.data1)
2160            self.assertEqual(data2, self.data2)
2161
2162    def test_many_opens(self):
2163        # Verify that read() and open() promptly close the file descriptor,
2164        # and don't rely on the garbage collector to free resources.
2165        self.make_test_archive(TESTFN2)
2166        with zipfile_aes.AESZipFile(TESTFN2, mode="r") as zipf:
2167            for x in range(100):
2168                zipf.read('ones')
2169                with zipf.open('ones') as zopen1:
2170                    pass
2171        with open(os.devnull) as f:
2172            self.assertLess(f.fileno(), 100)
2173
2174    def test_write_while_reading(self):
2175        with zipfile_aes.AESZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2176            zipf.writestr('ones', self.data1)
2177        with zipfile_aes.AESZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2178            with zipf.open('ones', 'r') as r1:
2179                data1 = r1.read(500)
2180                with zipf.open('twos', 'w') as w1:
2181                    w1.write(self.data2)
2182                data1 += r1.read()
2183        self.assertEqual(data1, self.data1)
2184        with zipfile_aes.AESZipFile(TESTFN2) as zipf:
2185            self.assertEqual(zipf.read('twos'), self.data2)
2186
2187    def tearDown(self):
2188        unlink(TESTFN2)
2189
2190
2191class TestWithDirectory(unittest.TestCase):
2192    def setUp(self):
2193        os.mkdir(TESTFN2)
2194
2195    def test_extract_dir(self):
2196        with zipfile_aes.AESZipFile(findfile("zipdir.zip")) as zipf:
2197            zipf.extractall(TESTFN2)
2198        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2199        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2200        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2201
2202    def test_bug_6050(self):
2203        # Extraction should succeed if directories already exist
2204        os.mkdir(os.path.join(TESTFN2, "a"))
2205        self.test_extract_dir()
2206
2207    def test_write_dir(self):
2208        dirpath = os.path.join(TESTFN2, "x")
2209        os.mkdir(dirpath)
2210        mode = os.stat(dirpath).st_mode & 0xFFFF
2211        with zipfile_aes.AESZipFile(TESTFN, "w") as zipf:
2212            zipf.write(dirpath)
2213            zinfo = zipf.filelist[0]
2214            self.assertTrue(zinfo.filename.endswith("/x/"))
2215            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2216            zipf.write(dirpath, "y")
2217            zinfo = zipf.filelist[1]
2218            self.assertTrue(zinfo.filename, "y/")
2219            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2220        with zipfile_aes.AESZipFile(TESTFN, "r") as zipf:
2221            zinfo = zipf.filelist[0]
2222            self.assertTrue(zinfo.filename.endswith("/x/"))
2223            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2224            zinfo = zipf.filelist[1]
2225            self.assertTrue(zinfo.filename, "y/")
2226            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2227            target = os.path.join(TESTFN2, "target")
2228            os.mkdir(target)
2229            zipf.extractall(target)
2230            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2231            self.assertEqual(len(os.listdir(target)), 2)
2232
2233    def test_writestr_dir(self):
2234        os.mkdir(os.path.join(TESTFN2, "x"))
2235        with zipfile_aes.AESZipFile(TESTFN, "w") as zipf:
2236            zipf.writestr("x/", b'')
2237            zinfo = zipf.filelist[0]
2238            self.assertEqual(zinfo.filename, "x/")
2239            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2240        with zipfile_aes.AESZipFile(TESTFN, "r") as zipf:
2241            zinfo = zipf.filelist[0]
2242            self.assertTrue(zinfo.filename.endswith("x/"))
2243            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2244            target = os.path.join(TESTFN2, "target")
2245            os.mkdir(target)
2246            zipf.extractall(target)
2247            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2248            self.assertEqual(os.listdir(target), ["x"])
2249
2250    def tearDown(self):
2251        rmtree(TESTFN2)
2252        if os.path.exists(TESTFN):
2253            unlink(TESTFN)
2254
2255
2256class ZipInfoTests(unittest.TestCase):
2257    def test_from_file(self):
2258        zi = zipfile_aes.AESZipInfo.from_file(__file__)
2259        expected_fname = os.path.basename(__file__)
2260        self.assertEqual(posixpath.basename(zi.filename), expected_fname)
2261        self.assertFalse(zi.is_dir())
2262        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2263
2264    def test_from_file_pathlike(self):
2265        zi = zipfile_aes.AESZipInfo.from_file(pathlib.Path(__file__))
2266        expected_fname = os.path.basename(__file__)
2267        self.assertEqual(posixpath.basename(zi.filename), expected_fname)
2268        self.assertFalse(zi.is_dir())
2269        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2270
2271    def test_from_file_bytes(self):
2272        zi = zipfile_aes.AESZipInfo.from_file(os.fsencode(__file__), 'test')
2273        self.assertEqual(posixpath.basename(zi.filename), 'test')
2274        self.assertFalse(zi.is_dir())
2275        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2276
2277    def test_from_file_fileno(self):
2278        with open(__file__, 'rb') as f:
2279            zi = zipfile_aes.AESZipInfo.from_file(f.fileno(), 'test')
2280            self.assertEqual(posixpath.basename(zi.filename), 'test')
2281            self.assertFalse(zi.is_dir())
2282            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2283
2284    def test_from_dir(self):
2285        dirpath = os.path.dirname(os.path.abspath(__file__))
2286        zi = zipfile_aes.AESZipInfo.from_file(dirpath, 'stdlib_tests')
2287        self.assertEqual(zi.filename, 'stdlib_tests/')
2288        self.assertTrue(zi.is_dir())
2289        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2290        self.assertEqual(zi.file_size, 0)
2291
2292
2293class CommandLineTest(unittest.TestCase):
2294
2295    def zipfilecmd(self, *args, **kwargs):
2296        rc, out, err = script_helper.assert_python_ok('-m', 'pyzipper.zipfile', *args,
2297                                                      **kwargs)
2298        return out.replace(os.linesep.encode(), b'\n')
2299
2300    def zipfilecmd_failure(self, *args):
2301        return script_helper.assert_python_failure('-m', 'pyzipper.zipfile', *args)
2302
2303    def test_bad_use(self):
2304        rc, out, err = self.zipfilecmd_failure()
2305        self.assertEqual(out, b'')
2306        self.assertIn(b'usage', err.lower())
2307        self.assertIn(b'error', err.lower())
2308        self.assertIn(b'required', err.lower())
2309        rc, out, err = self.zipfilecmd_failure('-l', '')
2310        self.assertEqual(out, b'')
2311        self.assertNotEqual(err.strip(), b'')
2312
2313    def test_test_command(self):
2314        zip_name = findfile('zipdir.zip')
2315        for opt in '-t', '--test':
2316            out = self.zipfilecmd(opt, zip_name)
2317            self.assertEqual(out.rstrip(), b'Done testing')
2318        zip_name = findfile('testtar.tar')
2319        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2320        self.assertEqual(out, b'')
2321
2322    def test_list_command(self):
2323        zip_name = findfile('zipdir.zip')
2324        t = io.StringIO()
2325        with zipfile_aes.AESZipFile(zip_name, 'r') as tf:
2326            tf.printdir(t)
2327        expected = t.getvalue().encode('ascii', 'backslashreplace')
2328        for opt in '-l', '--list':
2329            out = self.zipfilecmd(opt, zip_name,
2330                                  PYTHONIOENCODING='ascii:backslashreplace')
2331            self.assertEqual(out, expected)
2332
2333    @requires_zlib
2334    def test_create_command(self):
2335        self.addCleanup(unlink, TESTFN)
2336        with open(TESTFN, 'w') as f:
2337            f.write('test 1')
2338        os.mkdir(TESTFNDIR)
2339        self.addCleanup(rmtree, TESTFNDIR)
2340        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2341            f.write('test 2')
2342        files = [TESTFN, TESTFNDIR]
2343        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2344        for opt in '-c', '--create':
2345            try:
2346                out = self.zipfilecmd(opt, TESTFN2, *files)
2347                self.assertEqual(out, b'')
2348                with zipfile_aes.AESZipFile(TESTFN2) as zf:
2349                    self.assertEqual(zf.namelist(), namelist)
2350                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2351                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2352            finally:
2353                unlink(TESTFN2)
2354
2355    def test_extract_command(self):
2356        zip_name = findfile('zipdir.zip')
2357        for opt in '-e', '--extract':
2358            with temp_dir() as extdir:
2359                out = self.zipfilecmd(opt, zip_name, extdir)
2360                self.assertEqual(out, b'')
2361                with zipfile_aes.AESZipFile(zip_name) as zf:
2362                    for zi in zf.infolist():
2363                        path = os.path.join(extdir,
2364                                    zi.filename.replace('/', os.sep))
2365                        if zi.is_dir():
2366                            self.assertTrue(os.path.isdir(path))
2367                        else:
2368                            self.assertTrue(os.path.isfile(path))
2369                            with open(path, 'rb') as f:
2370                                self.assertEqual(f.read(), zf.read(zi))
2371
2372if __name__ == "__main__":
2373    unittest.main()
2374