1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16
17
18from tempfile import TemporaryFile
19from random import randint, random, getrandbits
20
21from test.support import script_helper
22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd,
23                          requires_zlib, requires_bz2, requires_lzma,
24                          captured_stdout)
25
26TESTFN2 = TESTFN + "2"
27TESTFNDIR = TESTFN + "d"
28FIXEDTEST_SIZE = 1000
29DATAFILES_DIR = 'zipfile_datafiles'
30
31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
32                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
33                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
34                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
35
36def getrandbytes(size):
37    return getrandbits(8 * size).to_bytes(size, 'little')
38
39def get_files(test):
40    yield TESTFN2
41    with TemporaryFile() as f:
42        yield f
43        test.assertFalse(f.closed)
44    with io.BytesIO() as f:
45        yield f
46        test.assertFalse(f.closed)
47
48class AbstractTestsWithSourceFile:
49    @classmethod
50    def setUpClass(cls):
51        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
52                              (i, random()), "ascii")
53                        for i in range(FIXEDTEST_SIZE)]
54        cls.data = b''.join(cls.line_gen)
55
56    def setUp(self):
57        # Make a source file with some lines
58        with open(TESTFN, "wb") as fp:
59            fp.write(self.data)
60
61    def make_test_archive(self, f, compression, compresslevel=None):
62        kwargs = {'compression': compression, 'compresslevel': compresslevel}
63        # Create the ZIP archive
64        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
65            zipfp.write(TESTFN, "another.name")
66            zipfp.write(TESTFN, TESTFN)
67            zipfp.writestr("strfile", self.data)
68            with zipfp.open('written-open-w', mode='w') as f:
69                for line in self.line_gen:
70                    f.write(line)
71
72    def zip_test(self, f, compression, compresslevel=None):
73        self.make_test_archive(f, compression, compresslevel)
74
75        # Read the ZIP archive
76        with zipfile.ZipFile(f, "r", compression) as zipfp:
77            self.assertEqual(zipfp.read(TESTFN), self.data)
78            self.assertEqual(zipfp.read("another.name"), self.data)
79            self.assertEqual(zipfp.read("strfile"), self.data)
80
81            # Print the ZIP directory
82            fp = io.StringIO()
83            zipfp.printdir(file=fp)
84            directory = fp.getvalue()
85            lines = directory.splitlines()
86            self.assertEqual(len(lines), 5) # Number of files + header
87
88            self.assertIn('File Name', lines[0])
89            self.assertIn('Modified', lines[0])
90            self.assertIn('Size', lines[0])
91
92            fn, date, time_, size = lines[1].split()
93            self.assertEqual(fn, 'another.name')
94            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
95            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
96            self.assertEqual(size, str(len(self.data)))
97
98            # Check the namelist
99            names = zipfp.namelist()
100            self.assertEqual(len(names), 4)
101            self.assertIn(TESTFN, names)
102            self.assertIn("another.name", names)
103            self.assertIn("strfile", names)
104            self.assertIn("written-open-w", names)
105
106            # Check infolist
107            infos = zipfp.infolist()
108            names = [i.filename for i in infos]
109            self.assertEqual(len(names), 4)
110            self.assertIn(TESTFN, names)
111            self.assertIn("another.name", names)
112            self.assertIn("strfile", names)
113            self.assertIn("written-open-w", names)
114            for i in infos:
115                self.assertEqual(i.file_size, len(self.data))
116
117            # check getinfo
118            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
119                info = zipfp.getinfo(nm)
120                self.assertEqual(info.filename, nm)
121                self.assertEqual(info.file_size, len(self.data))
122
123            # Check that testzip doesn't raise an exception
124            zipfp.testzip()
125
126    def test_basic(self):
127        for f in get_files(self):
128            self.zip_test(f, self.compression)
129
130    def zip_open_test(self, f, compression):
131        self.make_test_archive(f, compression)
132
133        # Read the ZIP archive
134        with zipfile.ZipFile(f, "r", compression) as zipfp:
135            zipdata1 = []
136            with zipfp.open(TESTFN) as zipopen1:
137                while True:
138                    read_data = zipopen1.read(256)
139                    if not read_data:
140                        break
141                    zipdata1.append(read_data)
142
143            zipdata2 = []
144            with zipfp.open("another.name") as zipopen2:
145                while True:
146                    read_data = zipopen2.read(256)
147                    if not read_data:
148                        break
149                    zipdata2.append(read_data)
150
151            self.assertEqual(b''.join(zipdata1), self.data)
152            self.assertEqual(b''.join(zipdata2), self.data)
153
154    def test_open(self):
155        for f in get_files(self):
156            self.zip_open_test(f, self.compression)
157
158    def test_open_with_pathlike(self):
159        path = pathlib.Path(TESTFN2)
160        self.zip_open_test(path, self.compression)
161        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
162            self.assertIsInstance(zipfp.filename, str)
163
164    def zip_random_open_test(self, f, compression):
165        self.make_test_archive(f, compression)
166
167        # Read the ZIP archive
168        with zipfile.ZipFile(f, "r", compression) as zipfp:
169            zipdata1 = []
170            with zipfp.open(TESTFN) as zipopen1:
171                while True:
172                    read_data = zipopen1.read(randint(1, 1024))
173                    if not read_data:
174                        break
175                    zipdata1.append(read_data)
176
177            self.assertEqual(b''.join(zipdata1), self.data)
178
179    def test_random_open(self):
180        for f in get_files(self):
181            self.zip_random_open_test(f, self.compression)
182
183    def zip_read1_test(self, f, compression):
184        self.make_test_archive(f, compression)
185
186        # Read the ZIP archive
187        with zipfile.ZipFile(f, "r") as zipfp, \
188             zipfp.open(TESTFN) as zipopen:
189            zipdata = []
190            while True:
191                read_data = zipopen.read1(-1)
192                if not read_data:
193                    break
194                zipdata.append(read_data)
195
196        self.assertEqual(b''.join(zipdata), self.data)
197
198    def test_read1(self):
199        for f in get_files(self):
200            self.zip_read1_test(f, self.compression)
201
202    def zip_read1_10_test(self, f, compression):
203        self.make_test_archive(f, compression)
204
205        # Read the ZIP archive
206        with zipfile.ZipFile(f, "r") as zipfp, \
207             zipfp.open(TESTFN) as zipopen:
208            zipdata = []
209            while True:
210                read_data = zipopen.read1(10)
211                self.assertLessEqual(len(read_data), 10)
212                if not read_data:
213                    break
214                zipdata.append(read_data)
215
216        self.assertEqual(b''.join(zipdata), self.data)
217
218    def test_read1_10(self):
219        for f in get_files(self):
220            self.zip_read1_10_test(f, self.compression)
221
222    def zip_readline_read_test(self, f, compression):
223        self.make_test_archive(f, compression)
224
225        # Read the ZIP archive
226        with zipfile.ZipFile(f, "r") as zipfp, \
227             zipfp.open(TESTFN) as zipopen:
228            data = b''
229            while True:
230                read = zipopen.readline()
231                if not read:
232                    break
233                data += read
234
235                read = zipopen.read(100)
236                if not read:
237                    break
238                data += read
239
240        self.assertEqual(data, self.data)
241
242    def test_readline_read(self):
243        # Issue #7610: calls to readline() interleaved with calls to read().
244        for f in get_files(self):
245            self.zip_readline_read_test(f, self.compression)
246
247    def zip_readline_test(self, f, compression):
248        self.make_test_archive(f, compression)
249
250        # Read the ZIP archive
251        with zipfile.ZipFile(f, "r") as zipfp:
252            with zipfp.open(TESTFN) as zipopen:
253                for line in self.line_gen:
254                    linedata = zipopen.readline()
255                    self.assertEqual(linedata, line)
256
257    def test_readline(self):
258        for f in get_files(self):
259            self.zip_readline_test(f, self.compression)
260
261    def zip_readlines_test(self, f, compression):
262        self.make_test_archive(f, compression)
263
264        # Read the ZIP archive
265        with zipfile.ZipFile(f, "r") as zipfp:
266            with zipfp.open(TESTFN) as zipopen:
267                ziplines = zipopen.readlines()
268            for line, zipline in zip(self.line_gen, ziplines):
269                self.assertEqual(zipline, line)
270
271    def test_readlines(self):
272        for f in get_files(self):
273            self.zip_readlines_test(f, self.compression)
274
275    def zip_iterlines_test(self, f, compression):
276        self.make_test_archive(f, compression)
277
278        # Read the ZIP archive
279        with zipfile.ZipFile(f, "r") as zipfp:
280            with zipfp.open(TESTFN) as zipopen:
281                for line, zipline in zip(self.line_gen, zipopen):
282                    self.assertEqual(zipline, line)
283
284    def test_iterlines(self):
285        for f in get_files(self):
286            self.zip_iterlines_test(f, self.compression)
287
288    def test_low_compression(self):
289        """Check for cases where compressed data is larger than original."""
290        # Create the ZIP archive
291        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
292            zipfp.writestr("strfile", '12')
293
294        # Get an open object for strfile
295        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
296            with zipfp.open("strfile") as openobj:
297                self.assertEqual(openobj.read(1), b'1')
298                self.assertEqual(openobj.read(1), b'2')
299
300    def test_writestr_compression(self):
301        zipfp = zipfile.ZipFile(TESTFN2, "w")
302        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
303        info = zipfp.getinfo('b.txt')
304        self.assertEqual(info.compress_type, self.compression)
305
306    def test_writestr_compresslevel(self):
307        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
308        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
309        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
310                       compresslevel=2)
311
312        # Compression level follows the constructor.
313        a_info = zipfp.getinfo('a.txt')
314        self.assertEqual(a_info.compress_type, self.compression)
315        self.assertEqual(a_info._compresslevel, 1)
316
317        # Compression level is overridden.
318        b_info = zipfp.getinfo('b.txt')
319        self.assertEqual(b_info.compress_type, self.compression)
320        self.assertEqual(b_info._compresslevel, 2)
321
322    def test_read_return_size(self):
323        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
324        # than requested.
325        for test_size in (1, 4095, 4096, 4097, 16384):
326            file_size = test_size + 1
327            junk = getrandbytes(file_size)
328            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
329                zipf.writestr('foo', junk)
330                with zipf.open('foo', 'r') as fp:
331                    buf = fp.read(test_size)
332                    self.assertEqual(len(buf), test_size)
333
334    def test_truncated_zipfile(self):
335        fp = io.BytesIO()
336        with zipfile.ZipFile(fp, mode='w') as zipf:
337            zipf.writestr('strfile', self.data, compress_type=self.compression)
338            end_offset = fp.tell()
339        zipfiledata = fp.getvalue()
340
341        fp = io.BytesIO(zipfiledata)
342        with zipfile.ZipFile(fp) as zipf:
343            with zipf.open('strfile') as zipopen:
344                fp.truncate(end_offset - 20)
345                with self.assertRaises(EOFError):
346                    zipopen.read()
347
348        fp = io.BytesIO(zipfiledata)
349        with zipfile.ZipFile(fp) as zipf:
350            with zipf.open('strfile') as zipopen:
351                fp.truncate(end_offset - 20)
352                with self.assertRaises(EOFError):
353                    while zipopen.read(100):
354                        pass
355
356        fp = io.BytesIO(zipfiledata)
357        with zipfile.ZipFile(fp) as zipf:
358            with zipf.open('strfile') as zipopen:
359                fp.truncate(end_offset - 20)
360                with self.assertRaises(EOFError):
361                    while zipopen.read1(100):
362                        pass
363
364    def test_repr(self):
365        fname = 'file.name'
366        for f in get_files(self):
367            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
368                zipfp.write(TESTFN, fname)
369                r = repr(zipfp)
370                self.assertIn("mode='w'", r)
371
372            with zipfile.ZipFile(f, 'r') as zipfp:
373                r = repr(zipfp)
374                if isinstance(f, str):
375                    self.assertIn('filename=%r' % f, r)
376                else:
377                    self.assertIn('file=%r' % f, r)
378                self.assertIn("mode='r'", r)
379                r = repr(zipfp.getinfo(fname))
380                self.assertIn('filename=%r' % fname, r)
381                self.assertIn('filemode=', r)
382                self.assertIn('file_size=', r)
383                if self.compression != zipfile.ZIP_STORED:
384                    self.assertIn('compress_type=', r)
385                    self.assertIn('compress_size=', r)
386                with zipfp.open(fname) as zipopen:
387                    r = repr(zipopen)
388                    self.assertIn('name=%r' % fname, r)
389                    self.assertIn("mode='r'", r)
390                    if self.compression != zipfile.ZIP_STORED:
391                        self.assertIn('compress_type=', r)
392                self.assertIn('[closed]', repr(zipopen))
393            self.assertIn('[closed]', repr(zipfp))
394
395    def test_compresslevel_basic(self):
396        for f in get_files(self):
397            self.zip_test(f, self.compression, compresslevel=9)
398
399    def test_per_file_compresslevel(self):
400        """Check that files within a Zip archive can have different
401        compression levels."""
402        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
403            zipfp.write(TESTFN, 'compress_1')
404            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
405            one_info = zipfp.getinfo('compress_1')
406            nine_info = zipfp.getinfo('compress_9')
407            self.assertEqual(one_info._compresslevel, 1)
408            self.assertEqual(nine_info._compresslevel, 9)
409
410    def test_writing_errors(self):
411        class BrokenFile(io.BytesIO):
412            def write(self, data):
413                nonlocal count
414                if count is not None:
415                    if count == stop:
416                        raise OSError
417                    count += 1
418                super().write(data)
419
420        stop = 0
421        while True:
422            testfile = BrokenFile()
423            count = None
424            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
425                with zipfp.open('file1', 'w') as f:
426                    f.write(b'data1')
427                count = 0
428                try:
429                    with zipfp.open('file2', 'w') as f:
430                        f.write(b'data2')
431                except OSError:
432                    stop += 1
433                else:
434                    break
435                finally:
436                    count = None
437            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
438                self.assertEqual(zipfp.namelist(), ['file1'])
439                self.assertEqual(zipfp.read('file1'), b'data1')
440
441        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
442            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
443            self.assertEqual(zipfp.read('file1'), b'data1')
444            self.assertEqual(zipfp.read('file2'), b'data2')
445
446
447    def tearDown(self):
448        unlink(TESTFN)
449        unlink(TESTFN2)
450
451
452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
453                                unittest.TestCase):
454    compression = zipfile.ZIP_STORED
455    test_low_compression = None
456
457    def zip_test_writestr_permissions(self, f, compression):
458        # Make sure that writestr and open(... mode='w') create files with
459        # mode 0600, when they are passed a name rather than a ZipInfo
460        # instance.
461
462        self.make_test_archive(f, compression)
463        with zipfile.ZipFile(f, "r") as zipfp:
464            zinfo = zipfp.getinfo('strfile')
465            self.assertEqual(zinfo.external_attr, 0o600 << 16)
466
467            zinfo2 = zipfp.getinfo('written-open-w')
468            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
469
470    def test_writestr_permissions(self):
471        for f in get_files(self):
472            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
473
474    def test_absolute_arcnames(self):
475        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
476            zipfp.write(TESTFN, "/absolute")
477
478        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
479            self.assertEqual(zipfp.namelist(), ["absolute"])
480
481    def test_append_to_zip_file(self):
482        """Test appending to an existing zipfile."""
483        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
484            zipfp.write(TESTFN, TESTFN)
485
486        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
487            zipfp.writestr("strfile", self.data)
488            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
489
490    def test_append_to_non_zip_file(self):
491        """Test appending to an existing file that is not a zipfile."""
492        # NOTE: this test fails if len(d) < 22 because of the first
493        # line "fpin.seek(-22, 2)" in _EndRecData
494        data = b'I am not a ZipFile!'*10
495        with open(TESTFN2, 'wb') as f:
496            f.write(data)
497
498        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
499            zipfp.write(TESTFN, TESTFN)
500
501        with open(TESTFN2, 'rb') as f:
502            f.seek(len(data))
503            with zipfile.ZipFile(f, "r") as zipfp:
504                self.assertEqual(zipfp.namelist(), [TESTFN])
505                self.assertEqual(zipfp.read(TESTFN), self.data)
506        with open(TESTFN2, 'rb') as f:
507            self.assertEqual(f.read(len(data)), data)
508            zipfiledata = f.read()
509        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
510            self.assertEqual(zipfp.namelist(), [TESTFN])
511            self.assertEqual(zipfp.read(TESTFN), self.data)
512
513    def test_read_concatenated_zip_file(self):
514        with io.BytesIO() as bio:
515            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
516                zipfp.write(TESTFN, TESTFN)
517            zipfiledata = bio.getvalue()
518        data = b'I am not a ZipFile!'*10
519        with open(TESTFN2, 'wb') as f:
520            f.write(data)
521            f.write(zipfiledata)
522
523        with zipfile.ZipFile(TESTFN2) as zipfp:
524            self.assertEqual(zipfp.namelist(), [TESTFN])
525            self.assertEqual(zipfp.read(TESTFN), self.data)
526
527    def test_append_to_concatenated_zip_file(self):
528        with io.BytesIO() as bio:
529            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
530                zipfp.write(TESTFN, TESTFN)
531            zipfiledata = bio.getvalue()
532        data = b'I am not a ZipFile!'*1000000
533        with open(TESTFN2, 'wb') as f:
534            f.write(data)
535            f.write(zipfiledata)
536
537        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
538            self.assertEqual(zipfp.namelist(), [TESTFN])
539            zipfp.writestr('strfile', self.data)
540
541        with open(TESTFN2, 'rb') as f:
542            self.assertEqual(f.read(len(data)), data)
543            zipfiledata = f.read()
544        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
545            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
546            self.assertEqual(zipfp.read(TESTFN), self.data)
547            self.assertEqual(zipfp.read('strfile'), self.data)
548
549    def test_ignores_newline_at_end(self):
550        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
551            zipfp.write(TESTFN, TESTFN)
552        with open(TESTFN2, 'a') as f:
553            f.write("\r\n\00\00\00")
554        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
555            self.assertIsInstance(zipfp, zipfile.ZipFile)
556
557    def test_ignores_stuff_appended_past_comments(self):
558        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
559            zipfp.comment = b"this is a comment"
560            zipfp.write(TESTFN, TESTFN)
561        with open(TESTFN2, 'a') as f:
562            f.write("abcdef\r\n")
563        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
564            self.assertIsInstance(zipfp, zipfile.ZipFile)
565            self.assertEqual(zipfp.comment, b"this is a comment")
566
567    def test_write_default_name(self):
568        """Check that calling ZipFile.write without arcname specified
569        produces the expected result."""
570        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
571            zipfp.write(TESTFN)
572            with open(TESTFN, "rb") as f:
573                self.assertEqual(zipfp.read(TESTFN), f.read())
574
575    def test_write_to_readonly(self):
576        """Check that trying to call write() on a readonly ZipFile object
577        raises a ValueError."""
578        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
579            zipfp.writestr("somefile.txt", "bogus")
580
581        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
582            self.assertRaises(ValueError, zipfp.write, TESTFN)
583
584        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
585            with self.assertRaises(ValueError):
586                zipfp.open(TESTFN, mode='w')
587
588    def test_add_file_before_1980(self):
589        # Set atime and mtime to 1970-01-01
590        os.utime(TESTFN, (0, 0))
591        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
592            self.assertRaises(ValueError, zipfp.write, TESTFN)
593
594        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
595            zipfp.write(TESTFN)
596            zinfo = zipfp.getinfo(TESTFN)
597            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
598
599    def test_add_file_after_2107(self):
600        # Set atime and mtime to 2108-12-30
601        ts = 4386268800
602        try:
603            time.localtime(ts)
604        except OverflowError:
605            self.skipTest(f'time.localtime({ts}) raises OverflowError')
606        try:
607            os.utime(TESTFN, (ts, ts))
608        except OverflowError:
609            self.skipTest('Host fs cannot set timestamp to required value.')
610
611        mtime_ns = os.stat(TESTFN).st_mtime_ns
612        if mtime_ns != (4386268800 * 10**9):
613            # XFS filesystem is limited to 32-bit timestamp, but the syscall
614            # didn't fail. Moreover, there is a VFS bug which returns
615            # a cached timestamp which is different than the value on disk.
616            #
617            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
618            #
619            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
620            # https://bugs.python.org/issue39460#msg360952
621            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
622
623        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
624            self.assertRaises(struct.error, zipfp.write, TESTFN)
625
626        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
627            zipfp.write(TESTFN)
628            zinfo = zipfp.getinfo(TESTFN)
629            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
630
631
632@requires_zlib
633class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
634                                 unittest.TestCase):
635    compression = zipfile.ZIP_DEFLATED
636
637    def test_per_file_compression(self):
638        """Check that files within a Zip archive can have different
639        compression options."""
640        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
641            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
642            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
643            sinfo = zipfp.getinfo('storeme')
644            dinfo = zipfp.getinfo('deflateme')
645            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
646            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
647
648@requires_bz2
649class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
650                               unittest.TestCase):
651    compression = zipfile.ZIP_BZIP2
652
653@requires_lzma
654class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
655                              unittest.TestCase):
656    compression = zipfile.ZIP_LZMA
657
658
659class AbstractTestZip64InSmallFiles:
660    # These tests test the ZIP64 functionality without using large files,
661    # see test_zipfile64 for proper tests.
662
663    @classmethod
664    def setUpClass(cls):
665        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
666                    for i in range(0, FIXEDTEST_SIZE))
667        cls.data = b'\n'.join(line_gen)
668
669    def setUp(self):
670        self._limit = zipfile.ZIP64_LIMIT
671        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
672        zipfile.ZIP64_LIMIT = 1000
673        zipfile.ZIP_FILECOUNT_LIMIT = 9
674
675        # Make a source file with some lines
676        with open(TESTFN, "wb") as fp:
677            fp.write(self.data)
678
679    def zip_test(self, f, compression):
680        # Create the ZIP archive
681        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
682            zipfp.write(TESTFN, "another.name")
683            zipfp.write(TESTFN, TESTFN)
684            zipfp.writestr("strfile", self.data)
685
686        # Read the ZIP archive
687        with zipfile.ZipFile(f, "r", compression) as zipfp:
688            self.assertEqual(zipfp.read(TESTFN), self.data)
689            self.assertEqual(zipfp.read("another.name"), self.data)
690            self.assertEqual(zipfp.read("strfile"), self.data)
691
692            # Print the ZIP directory
693            fp = io.StringIO()
694            zipfp.printdir(fp)
695
696            directory = fp.getvalue()
697            lines = directory.splitlines()
698            self.assertEqual(len(lines), 4) # Number of files + header
699
700            self.assertIn('File Name', lines[0])
701            self.assertIn('Modified', lines[0])
702            self.assertIn('Size', lines[0])
703
704            fn, date, time_, size = lines[1].split()
705            self.assertEqual(fn, 'another.name')
706            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
707            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
708            self.assertEqual(size, str(len(self.data)))
709
710            # Check the namelist
711            names = zipfp.namelist()
712            self.assertEqual(len(names), 3)
713            self.assertIn(TESTFN, names)
714            self.assertIn("another.name", names)
715            self.assertIn("strfile", names)
716
717            # Check infolist
718            infos = zipfp.infolist()
719            names = [i.filename for i in infos]
720            self.assertEqual(len(names), 3)
721            self.assertIn(TESTFN, names)
722            self.assertIn("another.name", names)
723            self.assertIn("strfile", names)
724            for i in infos:
725                self.assertEqual(i.file_size, len(self.data))
726
727            # check getinfo
728            for nm in (TESTFN, "another.name", "strfile"):
729                info = zipfp.getinfo(nm)
730                self.assertEqual(info.filename, nm)
731                self.assertEqual(info.file_size, len(self.data))
732
733            # Check that testzip doesn't raise an exception
734            zipfp.testzip()
735
736    def test_basic(self):
737        for f in get_files(self):
738            self.zip_test(f, self.compression)
739
740    def test_too_many_files(self):
741        # This test checks that more than 64k files can be added to an archive,
742        # and that the resulting archive can be read properly by ZipFile
743        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
744                               allowZip64=True)
745        zipf.debug = 100
746        numfiles = 15
747        for i in range(numfiles):
748            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
749        self.assertEqual(len(zipf.namelist()), numfiles)
750        zipf.close()
751
752        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
753        self.assertEqual(len(zipf2.namelist()), numfiles)
754        for i in range(numfiles):
755            content = zipf2.read("foo%08d" % i).decode('ascii')
756            self.assertEqual(content, "%d" % (i**3 % 57))
757        zipf2.close()
758
759    def test_too_many_files_append(self):
760        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
761                               allowZip64=False)
762        zipf.debug = 100
763        numfiles = 9
764        for i in range(numfiles):
765            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
766        self.assertEqual(len(zipf.namelist()), numfiles)
767        with self.assertRaises(zipfile.LargeZipFile):
768            zipf.writestr("foo%08d" % numfiles, b'')
769        self.assertEqual(len(zipf.namelist()), numfiles)
770        zipf.close()
771
772        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
773                               allowZip64=False)
774        zipf.debug = 100
775        self.assertEqual(len(zipf.namelist()), numfiles)
776        with self.assertRaises(zipfile.LargeZipFile):
777            zipf.writestr("foo%08d" % numfiles, b'')
778        self.assertEqual(len(zipf.namelist()), numfiles)
779        zipf.close()
780
781        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
782                               allowZip64=True)
783        zipf.debug = 100
784        self.assertEqual(len(zipf.namelist()), numfiles)
785        numfiles2 = 15
786        for i in range(numfiles, numfiles2):
787            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
788        self.assertEqual(len(zipf.namelist()), numfiles2)
789        zipf.close()
790
791        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
792        self.assertEqual(len(zipf2.namelist()), numfiles2)
793        for i in range(numfiles2):
794            content = zipf2.read("foo%08d" % i).decode('ascii')
795            self.assertEqual(content, "%d" % (i**3 % 57))
796        zipf2.close()
797
798    def tearDown(self):
799        zipfile.ZIP64_LIMIT = self._limit
800        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
801        unlink(TESTFN)
802        unlink(TESTFN2)
803
804
805class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
806                                  unittest.TestCase):
807    compression = zipfile.ZIP_STORED
808
809    def large_file_exception_test(self, f, compression):
810        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
811            self.assertRaises(zipfile.LargeZipFile,
812                              zipfp.write, TESTFN, "another.name")
813
814    def large_file_exception_test2(self, f, compression):
815        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
816            self.assertRaises(zipfile.LargeZipFile,
817                              zipfp.writestr, "another.name", self.data)
818
819    def test_large_file_exception(self):
820        for f in get_files(self):
821            self.large_file_exception_test(f, zipfile.ZIP_STORED)
822            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
823
824    def test_absolute_arcnames(self):
825        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
826                             allowZip64=True) as zipfp:
827            zipfp.write(TESTFN, "/absolute")
828
829        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
830            self.assertEqual(zipfp.namelist(), ["absolute"])
831
832    def test_append(self):
833        # Test that appending to the Zip64 archive doesn't change
834        # extra fields of existing entries.
835        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
836            zipfp.writestr("strfile", self.data)
837        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
838            zinfo = zipfp.getinfo("strfile")
839            extra = zinfo.extra
840        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
841            zipfp.writestr("strfile2", self.data)
842        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
843            zinfo = zipfp.getinfo("strfile")
844            self.assertEqual(zinfo.extra, extra)
845
846    def make_zip64_file(
847        self, file_size_64_set=False, file_size_extra=False,
848        compress_size_64_set=False, compress_size_extra=False,
849        header_offset_64_set=False, header_offset_extra=False,
850    ):
851        """Generate bytes sequence for a zip with (incomplete) zip64 data.
852
853        The actual values (not the zip 64 0xffffffff values) stored in the file
854        are:
855        file_size: 8
856        compress_size: 8
857        header_offset: 0
858        """
859        actual_size = 8
860        actual_header_offset = 0
861        local_zip64_fields = []
862        central_zip64_fields = []
863
864        file_size = actual_size
865        if file_size_64_set:
866            file_size = 0xffffffff
867            if file_size_extra:
868                local_zip64_fields.append(actual_size)
869                central_zip64_fields.append(actual_size)
870        file_size = struct.pack("<L", file_size)
871
872        compress_size = actual_size
873        if compress_size_64_set:
874            compress_size = 0xffffffff
875            if compress_size_extra:
876                local_zip64_fields.append(actual_size)
877                central_zip64_fields.append(actual_size)
878        compress_size = struct.pack("<L", compress_size)
879
880        header_offset = actual_header_offset
881        if header_offset_64_set:
882            header_offset = 0xffffffff
883            if header_offset_extra:
884                central_zip64_fields.append(actual_header_offset)
885        header_offset = struct.pack("<L", header_offset)
886
887        local_extra = struct.pack(
888            '<HH' + 'Q'*len(local_zip64_fields),
889            0x0001,
890            8*len(local_zip64_fields),
891            *local_zip64_fields
892        )
893
894        central_extra = struct.pack(
895            '<HH' + 'Q'*len(central_zip64_fields),
896            0x0001,
897            8*len(central_zip64_fields),
898            *central_zip64_fields
899        )
900
901        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
902        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
903
904        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
905        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
906
907        filename = b"test.txt"
908        content = b"test1234"
909        filename_length = struct.pack("<H", len(filename))
910        zip64_contents = (
911            # Local file header
912            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
913            + compress_size
914            + file_size
915            + filename_length
916            + local_extra_length
917            + filename
918            + local_extra
919            + content
920            # Central directory:
921            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
922            + compress_size
923            + file_size
924            + filename_length
925            + central_extra_length
926            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
927            + header_offset
928            + filename
929            + central_extra
930            # Zip64 end of central directory
931            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
932            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
933            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
934            + central_dir_size
935            + offset_to_central_dir
936            # Zip64 end of central directory locator
937            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
938            + b"\x00\x00\x00"
939            # end of central directory
940            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
941            + b"\x00\x00\x00\x00"
942        )
943        return zip64_contents
944
945    def test_bad_zip64_extra(self):
946        """Missing zip64 extra records raises an exception.
947
948        There are 4 fields that the zip64 format handles (the disk number is
949        not used in this module and so is ignored here). According to the zip
950        spec:
951              The order of the fields in the zip64 extended
952              information record is fixed, but the fields MUST
953              only appear if the corresponding Local or Central
954              directory record field is set to 0xFFFF or 0xFFFFFFFF.
955
956        If the zip64 extra content doesn't contain enough entries for the
957        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
958        This test mismatches the length of the zip64 extra field and the number
959        of fields set to indicate the presence of zip64 data.
960        """
961        # zip64 file size present, no fields in extra, expecting one, equals
962        # missing file size.
963        missing_file_size_extra = self.make_zip64_file(
964            file_size_64_set=True,
965        )
966        with self.assertRaises(zipfile.BadZipFile) as e:
967            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
968        self.assertIn('file size', str(e.exception).lower())
969
970        # zip64 file size present, zip64 compress size present, one field in
971        # extra, expecting two, equals missing compress size.
972        missing_compress_size_extra = self.make_zip64_file(
973            file_size_64_set=True,
974            file_size_extra=True,
975            compress_size_64_set=True,
976        )
977        with self.assertRaises(zipfile.BadZipFile) as e:
978            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
979        self.assertIn('compress size', str(e.exception).lower())
980
981        # zip64 compress size present, no fields in extra, expecting one,
982        # equals missing compress size.
983        missing_compress_size_extra = self.make_zip64_file(
984            compress_size_64_set=True,
985        )
986        with self.assertRaises(zipfile.BadZipFile) as e:
987            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
988        self.assertIn('compress size', str(e.exception).lower())
989
990        # zip64 file size present, zip64 compress size present, zip64 header
991        # offset present, two fields in extra, expecting three, equals missing
992        # header offset
993        missing_header_offset_extra = self.make_zip64_file(
994            file_size_64_set=True,
995            file_size_extra=True,
996            compress_size_64_set=True,
997            compress_size_extra=True,
998            header_offset_64_set=True,
999        )
1000        with self.assertRaises(zipfile.BadZipFile) as e:
1001            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1002        self.assertIn('header offset', str(e.exception).lower())
1003
1004        # zip64 compress size present, zip64 header offset present, one field
1005        # in extra, expecting two, equals missing header offset
1006        missing_header_offset_extra = self.make_zip64_file(
1007            file_size_64_set=False,
1008            compress_size_64_set=True,
1009            compress_size_extra=True,
1010            header_offset_64_set=True,
1011        )
1012        with self.assertRaises(zipfile.BadZipFile) as e:
1013            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1014        self.assertIn('header offset', str(e.exception).lower())
1015
1016        # zip64 file size present, zip64 header offset present, one field in
1017        # extra, expecting two, equals missing header offset
1018        missing_header_offset_extra = self.make_zip64_file(
1019            file_size_64_set=True,
1020            file_size_extra=True,
1021            compress_size_64_set=False,
1022            header_offset_64_set=True,
1023        )
1024        with self.assertRaises(zipfile.BadZipFile) as e:
1025            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1026        self.assertIn('header offset', str(e.exception).lower())
1027
1028        # zip64 header offset present, no fields in extra, expecting one,
1029        # equals missing header offset
1030        missing_header_offset_extra = self.make_zip64_file(
1031            file_size_64_set=False,
1032            compress_size_64_set=False,
1033            header_offset_64_set=True,
1034        )
1035        with self.assertRaises(zipfile.BadZipFile) as e:
1036            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1037        self.assertIn('header offset', str(e.exception).lower())
1038
1039    def test_generated_valid_zip64_extra(self):
1040        # These values are what is set in the make_zip64_file method.
1041        expected_file_size = 8
1042        expected_compress_size = 8
1043        expected_header_offset = 0
1044        expected_content = b"test1234"
1045
1046        # Loop through the various valid combinations of zip64 masks
1047        # present and extra fields present.
1048        params = (
1049            {"file_size_64_set": True, "file_size_extra": True},
1050            {"compress_size_64_set": True, "compress_size_extra": True},
1051            {"header_offset_64_set": True, "header_offset_extra": True},
1052        )
1053
1054        for r in range(1, len(params) + 1):
1055            for combo in itertools.combinations(params, r):
1056                kwargs = {}
1057                for c in combo:
1058                    kwargs.update(c)
1059                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1060                    zinfo = zf.infolist()[0]
1061                    self.assertEqual(zinfo.file_size, expected_file_size)
1062                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1063                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1064                    self.assertEqual(zf.read(zinfo), expected_content)
1065
1066
1067@requires_zlib
1068class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1069                                   unittest.TestCase):
1070    compression = zipfile.ZIP_DEFLATED
1071
1072@requires_bz2
1073class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1074                                 unittest.TestCase):
1075    compression = zipfile.ZIP_BZIP2
1076
1077@requires_lzma
1078class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1079                                unittest.TestCase):
1080    compression = zipfile.ZIP_LZMA
1081
1082
1083class AbstractWriterTests:
1084
1085    def tearDown(self):
1086        unlink(TESTFN2)
1087
1088    def test_close_after_close(self):
1089        data = b'content'
1090        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1091            w = zipf.open('test', 'w')
1092            w.write(data)
1093            w.close()
1094            self.assertTrue(w.closed)
1095            w.close()
1096            self.assertTrue(w.closed)
1097            self.assertEqual(zipf.read('test'), data)
1098
1099    def test_write_after_close(self):
1100        data = b'content'
1101        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1102            w = zipf.open('test', 'w')
1103            w.write(data)
1104            w.close()
1105            self.assertTrue(w.closed)
1106            self.assertRaises(ValueError, w.write, b'')
1107            self.assertEqual(zipf.read('test'), data)
1108
1109class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1110    compression = zipfile.ZIP_STORED
1111
1112@requires_zlib
1113class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1114    compression = zipfile.ZIP_DEFLATED
1115
1116@requires_bz2
1117class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1118    compression = zipfile.ZIP_BZIP2
1119
1120@requires_lzma
1121class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1122    compression = zipfile.ZIP_LZMA
1123
1124
1125class PyZipFileTests(unittest.TestCase):
1126    def assertCompiledIn(self, name, namelist):
1127        if name + 'o' not in namelist:
1128            self.assertIn(name + 'c', namelist)
1129
1130    def requiresWriteAccess(self, path):
1131        # effective_ids unavailable on windows
1132        if not os.access(path, os.W_OK,
1133                         effective_ids=os.access in os.supports_effective_ids):
1134            self.skipTest('requires write access to the installed location')
1135        filename = os.path.join(path, 'test_zipfile.try')
1136        try:
1137            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1138            os.close(fd)
1139        except Exception:
1140            self.skipTest('requires write access to the installed location')
1141        unlink(filename)
1142
1143    def test_write_pyfile(self):
1144        self.requiresWriteAccess(os.path.dirname(__file__))
1145        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1146            fn = __file__
1147            if fn.endswith('.pyc'):
1148                path_split = fn.split(os.sep)
1149                if os.altsep is not None:
1150                    path_split.extend(fn.split(os.altsep))
1151                if '__pycache__' in path_split:
1152                    fn = importlib.util.source_from_cache(fn)
1153                else:
1154                    fn = fn[:-1]
1155
1156            zipfp.writepy(fn)
1157
1158            bn = os.path.basename(fn)
1159            self.assertNotIn(bn, zipfp.namelist())
1160            self.assertCompiledIn(bn, zipfp.namelist())
1161
1162        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1163            fn = __file__
1164            if fn.endswith('.pyc'):
1165                fn = fn[:-1]
1166
1167            zipfp.writepy(fn, "testpackage")
1168
1169            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1170            self.assertNotIn(bn, zipfp.namelist())
1171            self.assertCompiledIn(bn, zipfp.namelist())
1172
1173    def test_write_python_package(self):
1174        import email
1175        packagedir = os.path.dirname(email.__file__)
1176        self.requiresWriteAccess(packagedir)
1177
1178        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1179            zipfp.writepy(packagedir)
1180
1181            # Check for a couple of modules at different levels of the
1182            # hierarchy
1183            names = zipfp.namelist()
1184            self.assertCompiledIn('email/__init__.py', names)
1185            self.assertCompiledIn('email/mime/text.py', names)
1186
1187    def test_write_filtered_python_package(self):
1188        import test
1189        packagedir = os.path.dirname(test.__file__)
1190        self.requiresWriteAccess(packagedir)
1191
1192        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1193
1194            # first make sure that the test folder gives error messages
1195            # (on the badsyntax_... files)
1196            with captured_stdout() as reportSIO:
1197                zipfp.writepy(packagedir)
1198            reportStr = reportSIO.getvalue()
1199            self.assertTrue('SyntaxError' in reportStr)
1200
1201            # then check that the filter works on the whole package
1202            with captured_stdout() as reportSIO:
1203                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1204            reportStr = reportSIO.getvalue()
1205            self.assertTrue('SyntaxError' not in reportStr)
1206
1207            # then check that the filter works on individual files
1208            def filter(path):
1209                return not os.path.basename(path).startswith("bad")
1210            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1211                zipfp.writepy(packagedir, filterfunc=filter)
1212            reportStr = reportSIO.getvalue()
1213            if reportStr:
1214                print(reportStr)
1215            self.assertTrue('SyntaxError' not in reportStr)
1216
1217    def test_write_with_optimization(self):
1218        import email
1219        packagedir = os.path.dirname(email.__file__)
1220        self.requiresWriteAccess(packagedir)
1221        optlevel = 1 if __debug__ else 0
1222        ext = '.pyc'
1223
1224        with TemporaryFile() as t, \
1225             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1226            zipfp.writepy(packagedir)
1227
1228            names = zipfp.namelist()
1229            self.assertIn('email/__init__' + ext, names)
1230            self.assertIn('email/mime/text' + ext, names)
1231
1232    def test_write_python_directory(self):
1233        os.mkdir(TESTFN2)
1234        try:
1235            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1236                fp.write("print(42)\n")
1237
1238            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1239                fp.write("print(42 * 42)\n")
1240
1241            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
1242                fp.write("bla bla bla\n")
1243
1244            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1245                zipfp.writepy(TESTFN2)
1246
1247                names = zipfp.namelist()
1248                self.assertCompiledIn('mod1.py', names)
1249                self.assertCompiledIn('mod2.py', names)
1250                self.assertNotIn('mod2.txt', names)
1251
1252        finally:
1253            rmtree(TESTFN2)
1254
1255    def test_write_python_directory_filtered(self):
1256        os.mkdir(TESTFN2)
1257        try:
1258            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1259                fp.write("print(42)\n")
1260
1261            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1262                fp.write("print(42 * 42)\n")
1263
1264            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1265                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1266                                                  not fn.endswith('mod2.py'))
1267
1268                names = zipfp.namelist()
1269                self.assertCompiledIn('mod1.py', names)
1270                self.assertNotIn('mod2.py', names)
1271
1272        finally:
1273            rmtree(TESTFN2)
1274
1275    def test_write_non_pyfile(self):
1276        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1277            with open(TESTFN, 'w') as f:
1278                f.write('most definitely not a python file')
1279            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1280            unlink(TESTFN)
1281
1282    def test_write_pyfile_bad_syntax(self):
1283        os.mkdir(TESTFN2)
1284        try:
1285            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1286                fp.write("Bad syntax in python file\n")
1287
1288            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1289                # syntax errors are printed to stdout
1290                with captured_stdout() as s:
1291                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1292
1293                self.assertIn("SyntaxError", s.getvalue())
1294
1295                # as it will not have compiled the python file, it will
1296                # include the .py file not .pyc
1297                names = zipfp.namelist()
1298                self.assertIn('mod1.py', names)
1299                self.assertNotIn('mod1.pyc', names)
1300
1301        finally:
1302            rmtree(TESTFN2)
1303
1304    def test_write_pathlike(self):
1305        os.mkdir(TESTFN2)
1306        try:
1307            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1308                fp.write("print(42)\n")
1309
1310            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1311                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1312                names = zipfp.namelist()
1313                self.assertCompiledIn('mod1.py', names)
1314        finally:
1315            rmtree(TESTFN2)
1316
1317
1318class ExtractTests(unittest.TestCase):
1319
1320    def make_test_file(self):
1321        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1322            for fpath, fdata in SMALL_TEST_DATA:
1323                zipfp.writestr(fpath, fdata)
1324
1325    def test_extract(self):
1326        with temp_cwd():
1327            self.make_test_file()
1328            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1329                for fpath, fdata in SMALL_TEST_DATA:
1330                    writtenfile = zipfp.extract(fpath)
1331
1332                    # make sure it was written to the right place
1333                    correctfile = os.path.join(os.getcwd(), fpath)
1334                    correctfile = os.path.normpath(correctfile)
1335
1336                    self.assertEqual(writtenfile, correctfile)
1337
1338                    # make sure correct data is in correct file
1339                    with open(writtenfile, "rb") as f:
1340                        self.assertEqual(fdata.encode(), f.read())
1341
1342                    unlink(writtenfile)
1343
1344    def _test_extract_with_target(self, target):
1345        self.make_test_file()
1346        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1347            for fpath, fdata in SMALL_TEST_DATA:
1348                writtenfile = zipfp.extract(fpath, target)
1349
1350                # make sure it was written to the right place
1351                correctfile = os.path.join(target, fpath)
1352                correctfile = os.path.normpath(correctfile)
1353                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1354
1355                # make sure correct data is in correct file
1356                with open(writtenfile, "rb") as f:
1357                    self.assertEqual(fdata.encode(), f.read())
1358
1359                unlink(writtenfile)
1360
1361        unlink(TESTFN2)
1362
1363    def test_extract_with_target(self):
1364        with temp_dir() as extdir:
1365            self._test_extract_with_target(extdir)
1366
1367    def test_extract_with_target_pathlike(self):
1368        with temp_dir() as extdir:
1369            self._test_extract_with_target(pathlib.Path(extdir))
1370
1371    def test_extract_all(self):
1372        with temp_cwd():
1373            self.make_test_file()
1374            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1375                zipfp.extractall()
1376                for fpath, fdata in SMALL_TEST_DATA:
1377                    outfile = os.path.join(os.getcwd(), fpath)
1378
1379                    with open(outfile, "rb") as f:
1380                        self.assertEqual(fdata.encode(), f.read())
1381
1382                    unlink(outfile)
1383
1384    def _test_extract_all_with_target(self, target):
1385        self.make_test_file()
1386        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1387            zipfp.extractall(target)
1388            for fpath, fdata in SMALL_TEST_DATA:
1389                outfile = os.path.join(target, fpath)
1390
1391                with open(outfile, "rb") as f:
1392                    self.assertEqual(fdata.encode(), f.read())
1393
1394                unlink(outfile)
1395
1396        unlink(TESTFN2)
1397
1398    def test_extract_all_with_target(self):
1399        with temp_dir() as extdir:
1400            self._test_extract_all_with_target(extdir)
1401
1402    def test_extract_all_with_target_pathlike(self):
1403        with temp_dir() as extdir:
1404            self._test_extract_all_with_target(pathlib.Path(extdir))
1405
1406    def check_file(self, filename, content):
1407        self.assertTrue(os.path.isfile(filename))
1408        with open(filename, 'rb') as f:
1409            self.assertEqual(f.read(), content)
1410
1411    def test_sanitize_windows_name(self):
1412        san = zipfile.ZipFile._sanitize_windows_name
1413        # Passing pathsep in allows this test to work regardless of platform.
1414        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1415        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1416        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1417
1418    def test_extract_hackers_arcnames_common_cases(self):
1419        common_hacknames = [
1420            ('../foo/bar', 'foo/bar'),
1421            ('foo/../bar', 'foo/bar'),
1422            ('foo/../../bar', 'foo/bar'),
1423            ('foo/bar/..', 'foo/bar'),
1424            ('./../foo/bar', 'foo/bar'),
1425            ('/foo/bar', 'foo/bar'),
1426            ('/foo/../bar', 'foo/bar'),
1427            ('/foo/../../bar', 'foo/bar'),
1428        ]
1429        self._test_extract_hackers_arcnames(common_hacknames)
1430
1431    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1432    def test_extract_hackers_arcnames_windows_only(self):
1433        """Test combination of path fixing and windows name sanitization."""
1434        windows_hacknames = [
1435            (r'..\foo\bar', 'foo/bar'),
1436            (r'..\/foo\/bar', 'foo/bar'),
1437            (r'foo/\..\/bar', 'foo/bar'),
1438            (r'foo\/../\bar', 'foo/bar'),
1439            (r'C:foo/bar', 'foo/bar'),
1440            (r'C:/foo/bar', 'foo/bar'),
1441            (r'C://foo/bar', 'foo/bar'),
1442            (r'C:\foo\bar', 'foo/bar'),
1443            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1444            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1445            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1446            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1447            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1448            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1449            (r'//?/C:/foo/bar', 'foo/bar'),
1450            (r'\\?\C:\foo\bar', 'foo/bar'),
1451            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1452            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1453            ('../../foo../../ba..r', 'foo/ba..r'),
1454        ]
1455        self._test_extract_hackers_arcnames(windows_hacknames)
1456
1457    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1458    def test_extract_hackers_arcnames_posix_only(self):
1459        posix_hacknames = [
1460            ('//foo/bar', 'foo/bar'),
1461            ('../../foo../../ba..r', 'foo../ba..r'),
1462            (r'foo/..\bar', r'foo/..\bar'),
1463        ]
1464        self._test_extract_hackers_arcnames(posix_hacknames)
1465
1466    def _test_extract_hackers_arcnames(self, hacknames):
1467        for arcname, fixedname in hacknames:
1468            content = b'foobar' + arcname.encode()
1469            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1470                zinfo = zipfile.ZipInfo()
1471                # preserve backslashes
1472                zinfo.filename = arcname
1473                zinfo.external_attr = 0o600 << 16
1474                zipfp.writestr(zinfo, content)
1475
1476            arcname = arcname.replace(os.sep, "/")
1477            targetpath = os.path.join('target', 'subdir', 'subsub')
1478            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1479
1480            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1481                writtenfile = zipfp.extract(arcname, targetpath)
1482                self.assertEqual(writtenfile, correctfile,
1483                                 msg='extract %r: %r != %r' %
1484                                 (arcname, writtenfile, correctfile))
1485            self.check_file(correctfile, content)
1486            rmtree('target')
1487
1488            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1489                zipfp.extractall(targetpath)
1490            self.check_file(correctfile, content)
1491            rmtree('target')
1492
1493            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1494
1495            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1496                writtenfile = zipfp.extract(arcname)
1497                self.assertEqual(writtenfile, correctfile,
1498                                 msg="extract %r" % arcname)
1499            self.check_file(correctfile, content)
1500            rmtree(fixedname.split('/')[0])
1501
1502            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1503                zipfp.extractall()
1504            self.check_file(correctfile, content)
1505            rmtree(fixedname.split('/')[0])
1506
1507            unlink(TESTFN2)
1508
1509
1510class OtherTests(unittest.TestCase):
1511    def test_open_via_zip_info(self):
1512        # Create the ZIP archive
1513        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1514            zipfp.writestr("name", "foo")
1515            with self.assertWarns(UserWarning):
1516                zipfp.writestr("name", "bar")
1517            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1518
1519        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1520            infos = zipfp.infolist()
1521            data = b""
1522            for info in infos:
1523                with zipfp.open(info) as zipopen:
1524                    data += zipopen.read()
1525            self.assertIn(data, {b"foobar", b"barfoo"})
1526            data = b""
1527            for info in infos:
1528                data += zipfp.read(info)
1529            self.assertIn(data, {b"foobar", b"barfoo"})
1530
1531    def test_writestr_extended_local_header_issue1202(self):
1532        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1533            for data in 'abcdefghijklmnop':
1534                zinfo = zipfile.ZipInfo(data)
1535                zinfo.flag_bits |= 0x08  # Include an extended local header.
1536                orig_zip.writestr(zinfo, data)
1537
1538    def test_close(self):
1539        """Check that the zipfile is closed after the 'with' block."""
1540        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1541            for fpath, fdata in SMALL_TEST_DATA:
1542                zipfp.writestr(fpath, fdata)
1543                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1544        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1545
1546        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1547            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1548        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1549
1550    def test_close_on_exception(self):
1551        """Check that the zipfile is closed if an exception is raised in the
1552        'with' block."""
1553        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1554            for fpath, fdata in SMALL_TEST_DATA:
1555                zipfp.writestr(fpath, fdata)
1556
1557        try:
1558            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1559                raise zipfile.BadZipFile()
1560        except zipfile.BadZipFile:
1561            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1562
1563    def test_unsupported_version(self):
1564        # File has an extract_version of 120
1565        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1566                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1567                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1568                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1569                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1570
1571        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1572                          io.BytesIO(data), 'r')
1573
1574    @requires_zlib
1575    def test_read_unicode_filenames(self):
1576        # bug #10801
1577        fname = findfile('zip_cp437_header.zip')
1578        with zipfile.ZipFile(fname) as zipfp:
1579            for name in zipfp.namelist():
1580                zipfp.open(name).close()
1581
1582    def test_write_unicode_filenames(self):
1583        with zipfile.ZipFile(TESTFN, "w") as zf:
1584            zf.writestr("foo.txt", "Test for unicode filename")
1585            zf.writestr("\xf6.txt", "Test for unicode filename")
1586            self.assertIsInstance(zf.infolist()[0].filename, str)
1587
1588        with zipfile.ZipFile(TESTFN, "r") as zf:
1589            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1590            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1591
1592    def test_read_after_write_unicode_filenames(self):
1593        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1594            zipfp.writestr('приклад', b'sample')
1595            self.assertEqual(zipfp.read('приклад'), b'sample')
1596
1597    def test_exclusive_create_zip_file(self):
1598        """Test exclusive creating a new zipfile."""
1599        unlink(TESTFN2)
1600        filename = 'testfile.txt'
1601        content = b'hello, world. this is some content.'
1602        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1603            zipfp.writestr(filename, content)
1604        with self.assertRaises(FileExistsError):
1605            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1606        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1607            self.assertEqual(zipfp.namelist(), [filename])
1608            self.assertEqual(zipfp.read(filename), content)
1609
1610    def test_create_non_existent_file_for_append(self):
1611        if os.path.exists(TESTFN):
1612            os.unlink(TESTFN)
1613
1614        filename = 'testfile.txt'
1615        content = b'hello, world. this is some content.'
1616
1617        try:
1618            with zipfile.ZipFile(TESTFN, 'a') as zf:
1619                zf.writestr(filename, content)
1620        except OSError:
1621            self.fail('Could not append data to a non-existent zip file.')
1622
1623        self.assertTrue(os.path.exists(TESTFN))
1624
1625        with zipfile.ZipFile(TESTFN, 'r') as zf:
1626            self.assertEqual(zf.read(filename), content)
1627
1628    def test_close_erroneous_file(self):
1629        # This test checks that the ZipFile constructor closes the file object
1630        # it opens if there's an error in the file.  If it doesn't, the
1631        # traceback holds a reference to the ZipFile object and, indirectly,
1632        # the file object.
1633        # On Windows, this causes the os.unlink() call to fail because the
1634        # underlying file is still open.  This is SF bug #412214.
1635        #
1636        with open(TESTFN, "w") as fp:
1637            fp.write("this is not a legal zip file\n")
1638        try:
1639            zf = zipfile.ZipFile(TESTFN)
1640        except zipfile.BadZipFile:
1641            pass
1642
1643    def test_is_zip_erroneous_file(self):
1644        """Check that is_zipfile() correctly identifies non-zip files."""
1645        # - passing a filename
1646        with open(TESTFN, "w") as fp:
1647            fp.write("this is not a legal zip file\n")
1648        self.assertFalse(zipfile.is_zipfile(TESTFN))
1649        # - passing a path-like object
1650        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1651        # - passing a file object
1652        with open(TESTFN, "rb") as fp:
1653            self.assertFalse(zipfile.is_zipfile(fp))
1654        # - passing a file-like object
1655        fp = io.BytesIO()
1656        fp.write(b"this is not a legal zip file\n")
1657        self.assertFalse(zipfile.is_zipfile(fp))
1658        fp.seek(0, 0)
1659        self.assertFalse(zipfile.is_zipfile(fp))
1660
1661    def test_damaged_zipfile(self):
1662        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1663        # - Create a valid zip file
1664        fp = io.BytesIO()
1665        with zipfile.ZipFile(fp, mode="w") as zipf:
1666            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1667        zipfiledata = fp.getvalue()
1668
1669        # - Now create copies of it missing the last N bytes and make sure
1670        #   a BadZipFile exception is raised when we try to open it
1671        for N in range(len(zipfiledata)):
1672            fp = io.BytesIO(zipfiledata[:N])
1673            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1674
1675    def test_is_zip_valid_file(self):
1676        """Check that is_zipfile() correctly identifies zip files."""
1677        # - passing a filename
1678        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1679            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1680
1681        self.assertTrue(zipfile.is_zipfile(TESTFN))
1682        # - passing a file object
1683        with open(TESTFN, "rb") as fp:
1684            self.assertTrue(zipfile.is_zipfile(fp))
1685            fp.seek(0, 0)
1686            zip_contents = fp.read()
1687        # - passing a file-like object
1688        fp = io.BytesIO()
1689        fp.write(zip_contents)
1690        self.assertTrue(zipfile.is_zipfile(fp))
1691        fp.seek(0, 0)
1692        self.assertTrue(zipfile.is_zipfile(fp))
1693
1694    def test_non_existent_file_raises_OSError(self):
1695        # make sure we don't raise an AttributeError when a partially-constructed
1696        # ZipFile instance is finalized; this tests for regression on SF tracker
1697        # bug #403871.
1698
1699        # The bug we're testing for caused an AttributeError to be raised
1700        # when a ZipFile instance was created for a file that did not
1701        # exist; the .fp member was not initialized but was needed by the
1702        # __del__() method.  Since the AttributeError is in the __del__(),
1703        # it is ignored, but the user should be sufficiently annoyed by
1704        # the message on the output that regression will be noticed
1705        # quickly.
1706        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1707
1708    def test_empty_file_raises_BadZipFile(self):
1709        f = open(TESTFN, 'w')
1710        f.close()
1711        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1712
1713        with open(TESTFN, 'w') as fp:
1714            fp.write("short file")
1715        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1716
1717    def test_closed_zip_raises_ValueError(self):
1718        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1719        data = io.BytesIO()
1720        with zipfile.ZipFile(data, mode="w") as zipf:
1721            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1722
1723        # This is correct; calling .read on a closed ZipFile should raise
1724        # a ValueError, and so should calling .testzip.  An earlier
1725        # version of .testzip would swallow this exception (and any other)
1726        # and report that the first file in the archive was corrupt.
1727        self.assertRaises(ValueError, zipf.read, "foo.txt")
1728        self.assertRaises(ValueError, zipf.open, "foo.txt")
1729        self.assertRaises(ValueError, zipf.testzip)
1730        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1731        with open(TESTFN, 'w') as f:
1732            f.write('zipfile test data')
1733        self.assertRaises(ValueError, zipf.write, TESTFN)
1734
1735    def test_bad_constructor_mode(self):
1736        """Check that bad modes passed to ZipFile constructor are caught."""
1737        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1738
1739    def test_bad_open_mode(self):
1740        """Check that bad modes passed to ZipFile.open are caught."""
1741        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1742            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1743
1744        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1745            # read the data to make sure the file is there
1746            zipf.read("foo.txt")
1747            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1748            # universal newlines support is removed
1749            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1750            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1751
1752    def test_read0(self):
1753        """Check that calling read(0) on a ZipExtFile object returns an empty
1754        string and doesn't advance file pointer."""
1755        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1756            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1757            # read the data to make sure the file is there
1758            with zipf.open("foo.txt") as f:
1759                for i in range(FIXEDTEST_SIZE):
1760                    self.assertEqual(f.read(0), b'')
1761
1762                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1763
1764    def test_open_non_existent_item(self):
1765        """Check that attempting to call open() for an item that doesn't
1766        exist in the archive raises a RuntimeError."""
1767        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1768            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1769
1770    def test_bad_compression_mode(self):
1771        """Check that bad compression methods passed to ZipFile.open are
1772        caught."""
1773        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1774
1775    def test_unsupported_compression(self):
1776        # data is declared as shrunk, but actually deflated
1777        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1778                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1779                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1780                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1781                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1782                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1783        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1784            self.assertRaises(NotImplementedError, zipf.open, 'x')
1785
1786    def test_null_byte_in_filename(self):
1787        """Check that a filename containing a null byte is properly
1788        terminated."""
1789        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1790            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1791            self.assertEqual(zipf.namelist(), ['foo.txt'])
1792
1793    def test_struct_sizes(self):
1794        """Check that ZIP internal structure sizes are calculated correctly."""
1795        self.assertEqual(zipfile.sizeEndCentDir, 22)
1796        self.assertEqual(zipfile.sizeCentralDir, 46)
1797        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1798        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1799
1800    def test_comments(self):
1801        """Check that comments on the archive are handled properly."""
1802
1803        # check default comment is empty
1804        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1805            self.assertEqual(zipf.comment, b'')
1806            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1807
1808        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1809            self.assertEqual(zipfr.comment, b'')
1810
1811        # check a simple short comment
1812        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1813        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1814            zipf.comment = comment
1815            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1816        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1817            self.assertEqual(zipf.comment, comment)
1818
1819        # check a comment of max length
1820        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1821        comment2 = comment2.encode("ascii")
1822        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1823            zipf.comment = comment2
1824            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1825
1826        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1827            self.assertEqual(zipfr.comment, comment2)
1828
1829        # check a comment that is too long is truncated
1830        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1831            with self.assertWarns(UserWarning):
1832                zipf.comment = comment2 + b'oops'
1833            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1834        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1835            self.assertEqual(zipfr.comment, comment2)
1836
1837        # check that comments are correctly modified in append mode
1838        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1839            zipf.comment = b"original comment"
1840            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1841        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1842            zipf.comment = b"an updated comment"
1843        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1844            self.assertEqual(zipf.comment, b"an updated comment")
1845
1846        # check that comments are correctly shortened in append mode
1847        # and the file is indeed truncated
1848        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1849            zipf.comment = b"original comment that's longer"
1850            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1851        original_zip_size = os.path.getsize(TESTFN)
1852        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1853            zipf.comment = b"shorter comment"
1854        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1855        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1856            self.assertEqual(zipf.comment, b"shorter comment")
1857
1858    def test_unicode_comment(self):
1859        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1860            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1861            with self.assertRaises(TypeError):
1862                zipf.comment = "this is an error"
1863
1864    def test_change_comment_in_empty_archive(self):
1865        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1866            self.assertFalse(zipf.filelist)
1867            zipf.comment = b"this is a comment"
1868        with zipfile.ZipFile(TESTFN, "r") as zipf:
1869            self.assertEqual(zipf.comment, b"this is a comment")
1870
1871    def test_change_comment_in_nonempty_archive(self):
1872        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1873            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1874        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1875            self.assertTrue(zipf.filelist)
1876            zipf.comment = b"this is a comment"
1877        with zipfile.ZipFile(TESTFN, "r") as zipf:
1878            self.assertEqual(zipf.comment, b"this is a comment")
1879
1880    def test_empty_zipfile(self):
1881        # Check that creating a file in 'w' or 'a' mode and closing without
1882        # adding any files to the archives creates a valid empty ZIP file
1883        zipf = zipfile.ZipFile(TESTFN, mode="w")
1884        zipf.close()
1885        try:
1886            zipf = zipfile.ZipFile(TESTFN, mode="r")
1887        except zipfile.BadZipFile:
1888            self.fail("Unable to create empty ZIP file in 'w' mode")
1889
1890        zipf = zipfile.ZipFile(TESTFN, mode="a")
1891        zipf.close()
1892        try:
1893            zipf = zipfile.ZipFile(TESTFN, mode="r")
1894        except:
1895            self.fail("Unable to create empty ZIP file in 'a' mode")
1896
1897    def test_open_empty_file(self):
1898        # Issue 1710703: Check that opening a file with less than 22 bytes
1899        # raises a BadZipFile exception (rather than the previously unhelpful
1900        # OSError)
1901        f = open(TESTFN, 'w')
1902        f.close()
1903        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1904
1905    def test_create_zipinfo_before_1980(self):
1906        self.assertRaises(ValueError,
1907                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1908
1909    def test_zipfile_with_short_extra_field(self):
1910        """If an extra field in the header is less than 4 bytes, skip it."""
1911        zipdata = (
1912            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1913            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1914            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1915            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1916            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1917            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1918            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1919        )
1920        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1921            # testzip returns the name of the first corrupt file, or None
1922            self.assertIsNone(zipf.testzip())
1923
1924    def test_open_conflicting_handles(self):
1925        # It's only possible to open one writable file handle at a time
1926        msg1 = b"It's fun to charter an accountant!"
1927        msg2 = b"And sail the wide accountant sea"
1928        msg3 = b"To find, explore the funds offshore"
1929        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1930            with zipf.open('foo', mode='w') as w2:
1931                w2.write(msg1)
1932            with zipf.open('bar', mode='w') as w1:
1933                with self.assertRaises(ValueError):
1934                    zipf.open('handle', mode='w')
1935                with self.assertRaises(ValueError):
1936                    zipf.open('foo', mode='r')
1937                with self.assertRaises(ValueError):
1938                    zipf.writestr('str', 'abcde')
1939                with self.assertRaises(ValueError):
1940                    zipf.write(__file__, 'file')
1941                with self.assertRaises(ValueError):
1942                    zipf.close()
1943                w1.write(msg2)
1944            with zipf.open('baz', mode='w') as w2:
1945                w2.write(msg3)
1946
1947        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1948            self.assertEqual(zipf.read('foo'), msg1)
1949            self.assertEqual(zipf.read('bar'), msg2)
1950            self.assertEqual(zipf.read('baz'), msg3)
1951            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1952
1953    def test_seek_tell(self):
1954        # Test seek functionality
1955        txt = b"Where's Bruce?"
1956        bloc = txt.find(b"Bruce")
1957        # Check seek on a file
1958        with zipfile.ZipFile(TESTFN, "w") as zipf:
1959            zipf.writestr("foo.txt", txt)
1960        with zipfile.ZipFile(TESTFN, "r") as zipf:
1961            with zipf.open("foo.txt", "r") as fp:
1962                fp.seek(bloc, os.SEEK_SET)
1963                self.assertEqual(fp.tell(), bloc)
1964                fp.seek(-bloc, os.SEEK_CUR)
1965                self.assertEqual(fp.tell(), 0)
1966                fp.seek(bloc, os.SEEK_CUR)
1967                self.assertEqual(fp.tell(), bloc)
1968                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1969                fp.seek(0, os.SEEK_END)
1970                self.assertEqual(fp.tell(), len(txt))
1971                fp.seek(0, os.SEEK_SET)
1972                self.assertEqual(fp.tell(), 0)
1973        # Check seek on memory file
1974        data = io.BytesIO()
1975        with zipfile.ZipFile(data, mode="w") as zipf:
1976            zipf.writestr("foo.txt", txt)
1977        with zipfile.ZipFile(data, mode="r") as zipf:
1978            with zipf.open("foo.txt", "r") as fp:
1979                fp.seek(bloc, os.SEEK_SET)
1980                self.assertEqual(fp.tell(), bloc)
1981                fp.seek(-bloc, os.SEEK_CUR)
1982                self.assertEqual(fp.tell(), 0)
1983                fp.seek(bloc, os.SEEK_CUR)
1984                self.assertEqual(fp.tell(), bloc)
1985                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1986                fp.seek(0, os.SEEK_END)
1987                self.assertEqual(fp.tell(), len(txt))
1988                fp.seek(0, os.SEEK_SET)
1989                self.assertEqual(fp.tell(), 0)
1990
1991    @requires_bz2
1992    def test_decompress_without_3rd_party_library(self):
1993        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1994        zip_file = io.BytesIO(data)
1995        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
1996            zf.writestr('a.txt', b'a')
1997        with mock.patch('zipfile.bz2', None):
1998            with zipfile.ZipFile(zip_file) as zf:
1999                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2000
2001    def tearDown(self):
2002        unlink(TESTFN)
2003        unlink(TESTFN2)
2004
2005
2006class AbstractBadCrcTests:
2007    def test_testzip_with_bad_crc(self):
2008        """Tests that files with bad CRCs return their name from testzip."""
2009        zipdata = self.zip_with_bad_crc
2010
2011        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2012            # testzip returns the name of the first corrupt file, or None
2013            self.assertEqual('afile', zipf.testzip())
2014
2015    def test_read_with_bad_crc(self):
2016        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2017        zipdata = self.zip_with_bad_crc
2018
2019        # Using ZipFile.read()
2020        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2021            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2022
2023        # Using ZipExtFile.read()
2024        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2025            with zipf.open('afile', 'r') as corrupt_file:
2026                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2027
2028        # Same with small reads (in order to exercise the buffering logic)
2029        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2030            with zipf.open('afile', 'r') as corrupt_file:
2031                corrupt_file.MIN_READ_SIZE = 2
2032                with self.assertRaises(zipfile.BadZipFile):
2033                    while corrupt_file.read(2):
2034                        pass
2035
2036
2037class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2038    compression = zipfile.ZIP_STORED
2039    zip_with_bad_crc = (
2040        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2041        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2042        b'ilehello,AworldP'
2043        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2044        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2045        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2046        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2047        b'\0\0/\0\0\0\0\0')
2048
2049@requires_zlib
2050class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2051    compression = zipfile.ZIP_DEFLATED
2052    zip_with_bad_crc = (
2053        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2054        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2055        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2056        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2057        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2058        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2059        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2060        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2061
2062@requires_bz2
2063class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2064    compression = zipfile.ZIP_BZIP2
2065    zip_with_bad_crc = (
2066        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2067        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2068        b'ileBZh91AY&SY\xd4\xa8\xca'
2069        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2070        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2071        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2072        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2073        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2074        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2075        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2076        b'\x00\x00\x00\x00')
2077
2078@requires_lzma
2079class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2080    compression = zipfile.ZIP_LZMA
2081    zip_with_bad_crc = (
2082        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2083        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2084        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2085        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2086        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2087        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2088        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2089        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2090        b'\x00>\x00\x00\x00\x00\x00')
2091
2092
2093class DecryptionTests(unittest.TestCase):
2094    """Check that ZIP decryption works. Since the library does not
2095    support encryption at the moment, we use a pre-generated encrypted
2096    ZIP file."""
2097
2098    data = (
2099        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2100        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2101        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2102        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2103        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2104        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2105        b'\x00\x00L\x00\x00\x00\x00\x00' )
2106    data2 = (
2107        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2108        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2109        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2110        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2111        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2112        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2113        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2114        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2115
2116    plain = b'zipfile.py encryption test'
2117    plain2 = b'\x00'*512
2118
2119    def setUp(self):
2120        with open(TESTFN, "wb") as fp:
2121            fp.write(self.data)
2122        self.zip = zipfile.ZipFile(TESTFN, "r")
2123        with open(TESTFN2, "wb") as fp:
2124            fp.write(self.data2)
2125        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2126
2127    def tearDown(self):
2128        self.zip.close()
2129        os.unlink(TESTFN)
2130        self.zip2.close()
2131        os.unlink(TESTFN2)
2132
2133    def test_no_password(self):
2134        # Reading the encrypted file without password
2135        # must generate a RunTime exception
2136        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2137        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2138
2139    def test_bad_password(self):
2140        self.zip.setpassword(b"perl")
2141        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2142        self.zip2.setpassword(b"perl")
2143        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2144
2145    @requires_zlib
2146    def test_good_password(self):
2147        self.zip.setpassword(b"python")
2148        self.assertEqual(self.zip.read("test.txt"), self.plain)
2149        self.zip2.setpassword(b"12345")
2150        self.assertEqual(self.zip2.read("zero"), self.plain2)
2151
2152    def test_unicode_password(self):
2153        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2154        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2155        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2156        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2157
2158    def test_seek_tell(self):
2159        self.zip.setpassword(b"python")
2160        txt = self.plain
2161        test_word = b'encryption'
2162        bloc = txt.find(test_word)
2163        bloc_len = len(test_word)
2164        with self.zip.open("test.txt", "r") as fp:
2165            fp.seek(bloc, os.SEEK_SET)
2166            self.assertEqual(fp.tell(), bloc)
2167            fp.seek(-bloc, os.SEEK_CUR)
2168            self.assertEqual(fp.tell(), 0)
2169            fp.seek(bloc, os.SEEK_CUR)
2170            self.assertEqual(fp.tell(), bloc)
2171            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2172
2173            # Make sure that the second read after seeking back beyond
2174            # _readbuffer returns the same content (ie. rewind to the start of
2175            # the file to read forward to the required position).
2176            old_read_size = fp.MIN_READ_SIZE
2177            fp.MIN_READ_SIZE = 1
2178            fp._readbuffer = b''
2179            fp._offset = 0
2180            fp.seek(0, os.SEEK_SET)
2181            self.assertEqual(fp.tell(), 0)
2182            fp.seek(bloc, os.SEEK_CUR)
2183            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2184            fp.MIN_READ_SIZE = old_read_size
2185
2186            fp.seek(0, os.SEEK_END)
2187            self.assertEqual(fp.tell(), len(txt))
2188            fp.seek(0, os.SEEK_SET)
2189            self.assertEqual(fp.tell(), 0)
2190
2191            # Read the file completely to definitely call any eof integrity
2192            # checks (crc) and make sure they still pass.
2193            fp.read()
2194
2195
2196class AbstractTestsWithRandomBinaryFiles:
2197    @classmethod
2198    def setUpClass(cls):
2199        datacount = randint(16, 64)*1024 + randint(1, 1024)
2200        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2201                            for i in range(datacount))
2202
2203    def setUp(self):
2204        # Make a source file with some lines
2205        with open(TESTFN, "wb") as fp:
2206            fp.write(self.data)
2207
2208    def tearDown(self):
2209        unlink(TESTFN)
2210        unlink(TESTFN2)
2211
2212    def make_test_archive(self, f, compression):
2213        # Create the ZIP archive
2214        with zipfile.ZipFile(f, "w", compression) as zipfp:
2215            zipfp.write(TESTFN, "another.name")
2216            zipfp.write(TESTFN, TESTFN)
2217
2218    def zip_test(self, f, compression):
2219        self.make_test_archive(f, compression)
2220
2221        # Read the ZIP archive
2222        with zipfile.ZipFile(f, "r", compression) as zipfp:
2223            testdata = zipfp.read(TESTFN)
2224            self.assertEqual(len(testdata), len(self.data))
2225            self.assertEqual(testdata, self.data)
2226            self.assertEqual(zipfp.read("another.name"), self.data)
2227
2228    def test_read(self):
2229        for f in get_files(self):
2230            self.zip_test(f, self.compression)
2231
2232    def zip_open_test(self, f, compression):
2233        self.make_test_archive(f, compression)
2234
2235        # Read the ZIP archive
2236        with zipfile.ZipFile(f, "r", compression) as zipfp:
2237            zipdata1 = []
2238            with zipfp.open(TESTFN) as zipopen1:
2239                while True:
2240                    read_data = zipopen1.read(256)
2241                    if not read_data:
2242                        break
2243                    zipdata1.append(read_data)
2244
2245            zipdata2 = []
2246            with zipfp.open("another.name") as zipopen2:
2247                while True:
2248                    read_data = zipopen2.read(256)
2249                    if not read_data:
2250                        break
2251                    zipdata2.append(read_data)
2252
2253            testdata1 = b''.join(zipdata1)
2254            self.assertEqual(len(testdata1), len(self.data))
2255            self.assertEqual(testdata1, self.data)
2256
2257            testdata2 = b''.join(zipdata2)
2258            self.assertEqual(len(testdata2), len(self.data))
2259            self.assertEqual(testdata2, self.data)
2260
2261    def test_open(self):
2262        for f in get_files(self):
2263            self.zip_open_test(f, self.compression)
2264
2265    def zip_random_open_test(self, f, compression):
2266        self.make_test_archive(f, compression)
2267
2268        # Read the ZIP archive
2269        with zipfile.ZipFile(f, "r", compression) as zipfp:
2270            zipdata1 = []
2271            with zipfp.open(TESTFN) as zipopen1:
2272                while True:
2273                    read_data = zipopen1.read(randint(1, 1024))
2274                    if not read_data:
2275                        break
2276                    zipdata1.append(read_data)
2277
2278            testdata = b''.join(zipdata1)
2279            self.assertEqual(len(testdata), len(self.data))
2280            self.assertEqual(testdata, self.data)
2281
2282    def test_random_open(self):
2283        for f in get_files(self):
2284            self.zip_random_open_test(f, self.compression)
2285
2286
2287class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2288                                       unittest.TestCase):
2289    compression = zipfile.ZIP_STORED
2290
2291@requires_zlib
2292class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2293                                        unittest.TestCase):
2294    compression = zipfile.ZIP_DEFLATED
2295
2296@requires_bz2
2297class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2298                                      unittest.TestCase):
2299    compression = zipfile.ZIP_BZIP2
2300
2301@requires_lzma
2302class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2303                                     unittest.TestCase):
2304    compression = zipfile.ZIP_LZMA
2305
2306
2307# Provide the tell() method but not seek()
2308class Tellable:
2309    def __init__(self, fp):
2310        self.fp = fp
2311        self.offset = 0
2312
2313    def write(self, data):
2314        n = self.fp.write(data)
2315        self.offset += n
2316        return n
2317
2318    def tell(self):
2319        return self.offset
2320
2321    def flush(self):
2322        self.fp.flush()
2323
2324class Unseekable:
2325    def __init__(self, fp):
2326        self.fp = fp
2327
2328    def write(self, data):
2329        return self.fp.write(data)
2330
2331    def flush(self):
2332        self.fp.flush()
2333
2334class UnseekableTests(unittest.TestCase):
2335    def test_writestr(self):
2336        for wrapper in (lambda f: f), Tellable, Unseekable:
2337            with self.subTest(wrapper=wrapper):
2338                f = io.BytesIO()
2339                f.write(b'abc')
2340                bf = io.BufferedWriter(f)
2341                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2342                    zipfp.writestr('ones', b'111')
2343                    zipfp.writestr('twos', b'222')
2344                self.assertEqual(f.getvalue()[:5], b'abcPK')
2345                with zipfile.ZipFile(f, mode='r') as zipf:
2346                    with zipf.open('ones') as zopen:
2347                        self.assertEqual(zopen.read(), b'111')
2348                    with zipf.open('twos') as zopen:
2349                        self.assertEqual(zopen.read(), b'222')
2350
2351    def test_write(self):
2352        for wrapper in (lambda f: f), Tellable, Unseekable:
2353            with self.subTest(wrapper=wrapper):
2354                f = io.BytesIO()
2355                f.write(b'abc')
2356                bf = io.BufferedWriter(f)
2357                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2358                    self.addCleanup(unlink, TESTFN)
2359                    with open(TESTFN, 'wb') as f2:
2360                        f2.write(b'111')
2361                    zipfp.write(TESTFN, 'ones')
2362                    with open(TESTFN, 'wb') as f2:
2363                        f2.write(b'222')
2364                    zipfp.write(TESTFN, 'twos')
2365                self.assertEqual(f.getvalue()[:5], b'abcPK')
2366                with zipfile.ZipFile(f, mode='r') as zipf:
2367                    with zipf.open('ones') as zopen:
2368                        self.assertEqual(zopen.read(), b'111')
2369                    with zipf.open('twos') as zopen:
2370                        self.assertEqual(zopen.read(), b'222')
2371
2372    def test_open_write(self):
2373        for wrapper in (lambda f: f), Tellable, Unseekable:
2374            with self.subTest(wrapper=wrapper):
2375                f = io.BytesIO()
2376                f.write(b'abc')
2377                bf = io.BufferedWriter(f)
2378                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2379                    with zipf.open('ones', 'w') as zopen:
2380                        zopen.write(b'111')
2381                    with zipf.open('twos', 'w') as zopen:
2382                        zopen.write(b'222')
2383                self.assertEqual(f.getvalue()[:5], b'abcPK')
2384                with zipfile.ZipFile(f) as zipf:
2385                    self.assertEqual(zipf.read('ones'), b'111')
2386                    self.assertEqual(zipf.read('twos'), b'222')
2387
2388
2389@requires_zlib
2390class TestsWithMultipleOpens(unittest.TestCase):
2391    @classmethod
2392    def setUpClass(cls):
2393        cls.data1 = b'111' + getrandbytes(10000)
2394        cls.data2 = b'222' + getrandbytes(10000)
2395
2396    def make_test_archive(self, f):
2397        # Create the ZIP archive
2398        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2399            zipfp.writestr('ones', self.data1)
2400            zipfp.writestr('twos', self.data2)
2401
2402    def test_same_file(self):
2403        # Verify that (when the ZipFile is in control of creating file objects)
2404        # multiple open() calls can be made without interfering with each other.
2405        for f in get_files(self):
2406            self.make_test_archive(f)
2407            with zipfile.ZipFile(f, mode="r") as zipf:
2408                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2409                    data1 = zopen1.read(500)
2410                    data2 = zopen2.read(500)
2411                    data1 += zopen1.read()
2412                    data2 += zopen2.read()
2413                self.assertEqual(data1, data2)
2414                self.assertEqual(data1, self.data1)
2415
2416    def test_different_file(self):
2417        # Verify that (when the ZipFile is in control of creating file objects)
2418        # multiple open() calls can be made without interfering with each other.
2419        for f in get_files(self):
2420            self.make_test_archive(f)
2421            with zipfile.ZipFile(f, mode="r") as zipf:
2422                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2423                    data1 = zopen1.read(500)
2424                    data2 = zopen2.read(500)
2425                    data1 += zopen1.read()
2426                    data2 += zopen2.read()
2427                self.assertEqual(data1, self.data1)
2428                self.assertEqual(data2, self.data2)
2429
2430    def test_interleaved(self):
2431        # Verify that (when the ZipFile is in control of creating file objects)
2432        # multiple open() calls can be made without interfering with each other.
2433        for f in get_files(self):
2434            self.make_test_archive(f)
2435            with zipfile.ZipFile(f, mode="r") as zipf:
2436                with zipf.open('ones') as zopen1:
2437                    data1 = zopen1.read(500)
2438                    with zipf.open('twos') as zopen2:
2439                        data2 = zopen2.read(500)
2440                        data1 += zopen1.read()
2441                        data2 += zopen2.read()
2442                self.assertEqual(data1, self.data1)
2443                self.assertEqual(data2, self.data2)
2444
2445    def test_read_after_close(self):
2446        for f in get_files(self):
2447            self.make_test_archive(f)
2448            with contextlib.ExitStack() as stack:
2449                with zipfile.ZipFile(f, 'r') as zipf:
2450                    zopen1 = stack.enter_context(zipf.open('ones'))
2451                    zopen2 = stack.enter_context(zipf.open('twos'))
2452                data1 = zopen1.read(500)
2453                data2 = zopen2.read(500)
2454                data1 += zopen1.read()
2455                data2 += zopen2.read()
2456            self.assertEqual(data1, self.data1)
2457            self.assertEqual(data2, self.data2)
2458
2459    def test_read_after_write(self):
2460        for f in get_files(self):
2461            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2462                zipf.writestr('ones', self.data1)
2463                zipf.writestr('twos', self.data2)
2464                with zipf.open('ones') as zopen1:
2465                    data1 = zopen1.read(500)
2466            self.assertEqual(data1, self.data1[:500])
2467            with zipfile.ZipFile(f, 'r') as zipf:
2468                data1 = zipf.read('ones')
2469                data2 = zipf.read('twos')
2470            self.assertEqual(data1, self.data1)
2471            self.assertEqual(data2, self.data2)
2472
2473    def test_write_after_read(self):
2474        for f in get_files(self):
2475            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2476                zipf.writestr('ones', self.data1)
2477                with zipf.open('ones') as zopen1:
2478                    zopen1.read(500)
2479                    zipf.writestr('twos', self.data2)
2480            with zipfile.ZipFile(f, 'r') as zipf:
2481                data1 = zipf.read('ones')
2482                data2 = zipf.read('twos')
2483            self.assertEqual(data1, self.data1)
2484            self.assertEqual(data2, self.data2)
2485
2486    def test_many_opens(self):
2487        # Verify that read() and open() promptly close the file descriptor,
2488        # and don't rely on the garbage collector to free resources.
2489        self.make_test_archive(TESTFN2)
2490        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2491            for x in range(100):
2492                zipf.read('ones')
2493                with zipf.open('ones') as zopen1:
2494                    pass
2495        with open(os.devnull) as f:
2496            self.assertLess(f.fileno(), 100)
2497
2498    def test_write_while_reading(self):
2499        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2500            zipf.writestr('ones', self.data1)
2501        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2502            with zipf.open('ones', 'r') as r1:
2503                data1 = r1.read(500)
2504                with zipf.open('twos', 'w') as w1:
2505                    w1.write(self.data2)
2506                data1 += r1.read()
2507        self.assertEqual(data1, self.data1)
2508        with zipfile.ZipFile(TESTFN2) as zipf:
2509            self.assertEqual(zipf.read('twos'), self.data2)
2510
2511    def tearDown(self):
2512        unlink(TESTFN2)
2513
2514
2515class TestWithDirectory(unittest.TestCase):
2516    def setUp(self):
2517        os.mkdir(TESTFN2)
2518
2519    def test_extract_dir(self):
2520        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2521            zipf.extractall(TESTFN2)
2522        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2523        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2524        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2525
2526    def test_bug_6050(self):
2527        # Extraction should succeed if directories already exist
2528        os.mkdir(os.path.join(TESTFN2, "a"))
2529        self.test_extract_dir()
2530
2531    def test_write_dir(self):
2532        dirpath = os.path.join(TESTFN2, "x")
2533        os.mkdir(dirpath)
2534        mode = os.stat(dirpath).st_mode & 0xFFFF
2535        with zipfile.ZipFile(TESTFN, "w") as zipf:
2536            zipf.write(dirpath)
2537            zinfo = zipf.filelist[0]
2538            self.assertTrue(zinfo.filename.endswith("/x/"))
2539            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2540            zipf.write(dirpath, "y")
2541            zinfo = zipf.filelist[1]
2542            self.assertTrue(zinfo.filename, "y/")
2543            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2544        with zipfile.ZipFile(TESTFN, "r") as zipf:
2545            zinfo = zipf.filelist[0]
2546            self.assertTrue(zinfo.filename.endswith("/x/"))
2547            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2548            zinfo = zipf.filelist[1]
2549            self.assertTrue(zinfo.filename, "y/")
2550            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2551            target = os.path.join(TESTFN2, "target")
2552            os.mkdir(target)
2553            zipf.extractall(target)
2554            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2555            self.assertEqual(len(os.listdir(target)), 2)
2556
2557    def test_writestr_dir(self):
2558        os.mkdir(os.path.join(TESTFN2, "x"))
2559        with zipfile.ZipFile(TESTFN, "w") as zipf:
2560            zipf.writestr("x/", b'')
2561            zinfo = zipf.filelist[0]
2562            self.assertEqual(zinfo.filename, "x/")
2563            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2564        with zipfile.ZipFile(TESTFN, "r") as zipf:
2565            zinfo = zipf.filelist[0]
2566            self.assertTrue(zinfo.filename.endswith("x/"))
2567            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2568            target = os.path.join(TESTFN2, "target")
2569            os.mkdir(target)
2570            zipf.extractall(target)
2571            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2572            self.assertEqual(os.listdir(target), ["x"])
2573
2574    def tearDown(self):
2575        rmtree(TESTFN2)
2576        if os.path.exists(TESTFN):
2577            unlink(TESTFN)
2578
2579
2580class ZipInfoTests(unittest.TestCase):
2581    def test_from_file(self):
2582        zi = zipfile.ZipInfo.from_file(__file__)
2583        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2584        self.assertFalse(zi.is_dir())
2585        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2586
2587    def test_from_file_pathlike(self):
2588        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2589        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2590        self.assertFalse(zi.is_dir())
2591        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2592
2593    def test_from_file_bytes(self):
2594        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2595        self.assertEqual(posixpath.basename(zi.filename), 'test')
2596        self.assertFalse(zi.is_dir())
2597        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2598
2599    def test_from_file_fileno(self):
2600        with open(__file__, 'rb') as f:
2601            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2602            self.assertEqual(posixpath.basename(zi.filename), 'test')
2603            self.assertFalse(zi.is_dir())
2604            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2605
2606    def test_from_dir(self):
2607        dirpath = os.path.dirname(os.path.abspath(__file__))
2608        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2609        self.assertEqual(zi.filename, 'stdlib_tests/')
2610        self.assertTrue(zi.is_dir())
2611        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2612        self.assertEqual(zi.file_size, 0)
2613
2614
2615class CommandLineTest(unittest.TestCase):
2616
2617    def zipfilecmd(self, *args, **kwargs):
2618        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2619                                                      **kwargs)
2620        return out.replace(os.linesep.encode(), b'\n')
2621
2622    def zipfilecmd_failure(self, *args):
2623        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2624
2625    def test_bad_use(self):
2626        rc, out, err = self.zipfilecmd_failure()
2627        self.assertEqual(out, b'')
2628        self.assertIn(b'usage', err.lower())
2629        self.assertIn(b'error', err.lower())
2630        self.assertIn(b'required', err.lower())
2631        rc, out, err = self.zipfilecmd_failure('-l', '')
2632        self.assertEqual(out, b'')
2633        self.assertNotEqual(err.strip(), b'')
2634
2635    def test_test_command(self):
2636        zip_name = findfile('zipdir.zip')
2637        for opt in '-t', '--test':
2638            out = self.zipfilecmd(opt, zip_name)
2639            self.assertEqual(out.rstrip(), b'Done testing')
2640        zip_name = findfile('testtar.tar')
2641        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2642        self.assertEqual(out, b'')
2643
2644    def test_list_command(self):
2645        zip_name = findfile('zipdir.zip')
2646        t = io.StringIO()
2647        with zipfile.ZipFile(zip_name, 'r') as tf:
2648            tf.printdir(t)
2649        expected = t.getvalue().encode('ascii', 'backslashreplace')
2650        for opt in '-l', '--list':
2651            out = self.zipfilecmd(opt, zip_name,
2652                                  PYTHONIOENCODING='ascii:backslashreplace')
2653            self.assertEqual(out, expected)
2654
2655    @requires_zlib
2656    def test_create_command(self):
2657        self.addCleanup(unlink, TESTFN)
2658        with open(TESTFN, 'w') as f:
2659            f.write('test 1')
2660        os.mkdir(TESTFNDIR)
2661        self.addCleanup(rmtree, TESTFNDIR)
2662        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2663            f.write('test 2')
2664        files = [TESTFN, TESTFNDIR]
2665        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2666        for opt in '-c', '--create':
2667            try:
2668                out = self.zipfilecmd(opt, TESTFN2, *files)
2669                self.assertEqual(out, b'')
2670                with zipfile.ZipFile(TESTFN2) as zf:
2671                    self.assertEqual(zf.namelist(), namelist)
2672                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2673                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2674            finally:
2675                unlink(TESTFN2)
2676
2677    def test_extract_command(self):
2678        zip_name = findfile('zipdir.zip')
2679        for opt in '-e', '--extract':
2680            with temp_dir() as extdir:
2681                out = self.zipfilecmd(opt, zip_name, extdir)
2682                self.assertEqual(out, b'')
2683                with zipfile.ZipFile(zip_name) as zf:
2684                    for zi in zf.infolist():
2685                        path = os.path.join(extdir,
2686                                    zi.filename.replace('/', os.sep))
2687                        if zi.is_dir():
2688                            self.assertTrue(os.path.isdir(path))
2689                        else:
2690                            self.assertTrue(os.path.isfile(path))
2691                            with open(path, 'rb') as f:
2692                                self.assertEqual(f.read(), zf.read(zi))
2693
2694
2695class TestExecutablePrependedZip(unittest.TestCase):
2696    """Test our ability to open zip files with an executable prepended."""
2697
2698    def setUp(self):
2699        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2700        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2701
2702    def _test_zip_works(self, name):
2703        # bpo28494 sanity check: ensure is_zipfile works on these.
2704        self.assertTrue(zipfile.is_zipfile(name),
2705                        f'is_zipfile failed on {name}')
2706        # Ensure we can operate on these via ZipFile.
2707        with zipfile.ZipFile(name) as zipfp:
2708            for n in zipfp.namelist():
2709                data = zipfp.read(n)
2710                self.assertIn(b'FAVORITE_NUMBER', data)
2711
2712    def test_read_zip_with_exe_prepended(self):
2713        self._test_zip_works(self.exe_zip)
2714
2715    def test_read_zip64_with_exe_prepended(self):
2716        self._test_zip_works(self.exe_zip64)
2717
2718    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2719    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2720                         'Test relies on #!/bin/bash working.')
2721    def test_execute_zip2(self):
2722        output = subprocess.check_output([self.exe_zip, sys.executable])
2723        self.assertIn(b'number in executable: 5', output)
2724
2725    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2726    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2727                         'Test relies on #!/bin/bash working.')
2728    def test_execute_zip64(self):
2729        output = subprocess.check_output([self.exe_zip64, sys.executable])
2730        self.assertIn(b'number in executable: 5', output)
2731
2732
2733# Poor man's technique to consume a (smallish) iterable.
2734consume = tuple
2735
2736
2737# from jaraco.itertools 5.0
2738class jaraco:
2739    class itertools:
2740        class Counter:
2741            def __init__(self, i):
2742                self.count = 0
2743                self._orig_iter = iter(i)
2744
2745            def __iter__(self):
2746                return self
2747
2748            def __next__(self):
2749                result = next(self._orig_iter)
2750                self.count += 1
2751                return result
2752
2753
2754def add_dirs(zf):
2755    """
2756    Given a writable zip file zf, inject directory entries for
2757    any directories implied by the presence of children.
2758    """
2759    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2760        zf.writestr(name, b"")
2761    return zf
2762
2763
2764def build_alpharep_fixture():
2765    """
2766    Create a zip file with this structure:
2767
2768    .
2769    ├── a.txt
2770    ├── b
2771    │   ├── c.txt
2772    │   ├── d
2773    │   │   └── e.txt
2774    │   └── f.txt
2775    └── g
2776        └── h
2777            └── i.txt
2778
2779    This fixture has the following key characteristics:
2780
2781    - a file at the root (a)
2782    - a file two levels deep (b/d/e)
2783    - multiple files in a directory (b/c, b/f)
2784    - a directory containing only a directory (g/h)
2785
2786    "alpha" because it uses alphabet
2787    "rep" because it's a representative example
2788    """
2789    data = io.BytesIO()
2790    zf = zipfile.ZipFile(data, "w")
2791    zf.writestr("a.txt", b"content of a")
2792    zf.writestr("b/c.txt", b"content of c")
2793    zf.writestr("b/d/e.txt", b"content of e")
2794    zf.writestr("b/f.txt", b"content of f")
2795    zf.writestr("g/h/i.txt", b"content of i")
2796    zf.filename = "alpharep.zip"
2797    return zf
2798
2799
2800class TestPath(unittest.TestCase):
2801    def setUp(self):
2802        self.fixtures = contextlib.ExitStack()
2803        self.addCleanup(self.fixtures.close)
2804
2805    def zipfile_alpharep(self):
2806        with self.subTest():
2807            yield build_alpharep_fixture()
2808        with self.subTest():
2809            yield add_dirs(build_alpharep_fixture())
2810
2811    def zipfile_ondisk(self):
2812        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2813        for alpharep in self.zipfile_alpharep():
2814            buffer = alpharep.fp
2815            alpharep.close()
2816            path = tmpdir / alpharep.filename
2817            with path.open("wb") as strm:
2818                strm.write(buffer.getvalue())
2819            yield path
2820
2821    def test_iterdir_and_types(self):
2822        for alpharep in self.zipfile_alpharep():
2823            root = zipfile.Path(alpharep)
2824            assert root.is_dir()
2825            a, b, g = root.iterdir()
2826            assert a.is_file()
2827            assert b.is_dir()
2828            assert g.is_dir()
2829            c, f, d = b.iterdir()
2830            assert c.is_file() and f.is_file()
2831            e, = d.iterdir()
2832            assert e.is_file()
2833            h, = g.iterdir()
2834            i, = h.iterdir()
2835            assert i.is_file()
2836
2837    def test_subdir_is_dir(self):
2838        for alpharep in self.zipfile_alpharep():
2839            root = zipfile.Path(alpharep)
2840            assert (root / 'b').is_dir()
2841            assert (root / 'b/').is_dir()
2842            assert (root / 'g').is_dir()
2843            assert (root / 'g/').is_dir()
2844
2845    def test_open(self):
2846        for alpharep in self.zipfile_alpharep():
2847            root = zipfile.Path(alpharep)
2848            a, b, g = root.iterdir()
2849            with a.open() as strm:
2850                data = strm.read()
2851            assert data == b"content of a"
2852
2853    def test_read(self):
2854        for alpharep in self.zipfile_alpharep():
2855            root = zipfile.Path(alpharep)
2856            a, b, g = root.iterdir()
2857            assert a.read_text() == "content of a"
2858            assert a.read_bytes() == b"content of a"
2859
2860    def test_joinpath(self):
2861        for alpharep in self.zipfile_alpharep():
2862            root = zipfile.Path(alpharep)
2863            a = root.joinpath("a")
2864            assert a.is_file()
2865            e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2866            assert e.read_text() == "content of e"
2867
2868    def test_traverse_truediv(self):
2869        for alpharep in self.zipfile_alpharep():
2870            root = zipfile.Path(alpharep)
2871            a = root / "a"
2872            assert a.is_file()
2873            e = root / "b" / "d" / "e.txt"
2874            assert e.read_text() == "content of e"
2875
2876    def test_pathlike_construction(self):
2877        """
2878        zipfile.Path should be constructable from a path-like object
2879        """
2880        for zipfile_ondisk in self.zipfile_ondisk():
2881            pathlike = pathlib.Path(str(zipfile_ondisk))
2882            zipfile.Path(pathlike)
2883
2884    def test_traverse_pathlike(self):
2885        for alpharep in self.zipfile_alpharep():
2886            root = zipfile.Path(alpharep)
2887            root / pathlib.Path("a")
2888
2889    def test_parent(self):
2890        for alpharep in self.zipfile_alpharep():
2891            root = zipfile.Path(alpharep)
2892            assert (root / 'a').parent.at == ''
2893            assert (root / 'a' / 'b').parent.at == 'a/'
2894
2895    def test_dir_parent(self):
2896        for alpharep in self.zipfile_alpharep():
2897            root = zipfile.Path(alpharep)
2898            assert (root / 'b').parent.at == ''
2899            assert (root / 'b/').parent.at == ''
2900
2901    def test_missing_dir_parent(self):
2902        for alpharep in self.zipfile_alpharep():
2903            root = zipfile.Path(alpharep)
2904            assert (root / 'missing dir/').parent.at == ''
2905
2906    def test_mutability(self):
2907        """
2908        If the underlying zipfile is changed, the Path object should
2909        reflect that change.
2910        """
2911        for alpharep in self.zipfile_alpharep():
2912            root = zipfile.Path(alpharep)
2913            a, b, g = root.iterdir()
2914            alpharep.writestr('foo.txt', 'foo')
2915            alpharep.writestr('bar/baz.txt', 'baz')
2916            assert any(
2917                child.name == 'foo.txt'
2918                for child in root.iterdir())
2919            assert (root / 'foo.txt').read_text() == 'foo'
2920            baz, = (root / 'bar').iterdir()
2921            assert baz.read_text() == 'baz'
2922
2923    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
2924
2925    def huge_zipfile(self):
2926        """Create a read-only zipfile with a huge number of entries entries."""
2927        strm = io.BytesIO()
2928        zf = zipfile.ZipFile(strm, "w")
2929        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
2930            zf.writestr(entry, entry)
2931        zf.mode = 'r'
2932        return zf
2933
2934    def test_joinpath_constant_time(self):
2935        """
2936        Ensure joinpath on items in zipfile is linear time.
2937        """
2938        root = zipfile.Path(self.huge_zipfile())
2939        entries = jaraco.itertools.Counter(root.iterdir())
2940        for entry in entries:
2941            entry.joinpath('suffix')
2942        # Check the file iterated all items
2943        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
2944
2945    # @func_timeout.func_set_timeout(3)
2946    def test_implied_dirs_performance(self):
2947        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
2948        zipfile.CompleteDirs._implied_dirs(data)
2949
2950
2951if __name__ == "__main__":
2952    unittest.main()
2953