1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16
17
18from tempfile import TemporaryFile
19from random import randint, random, randbytes
20
21from test.support import script_helper
22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd,
23                          requires_zlib, requires_bz2, requires_lzma,
24                          captured_stdout)
25
26TESTFN2 = TESTFN + "2"
27TESTFNDIR = TESTFN + "d"
28FIXEDTEST_SIZE = 1000
29DATAFILES_DIR = 'zipfile_datafiles'
30
31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
32                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
33                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
34                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
35
36def get_files(test):
37    yield TESTFN2
38    with TemporaryFile() as f:
39        yield f
40        test.assertFalse(f.closed)
41    with io.BytesIO() as f:
42        yield f
43        test.assertFalse(f.closed)
44
45class AbstractTestsWithSourceFile:
46    @classmethod
47    def setUpClass(cls):
48        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
49                              (i, random()), "ascii")
50                        for i in range(FIXEDTEST_SIZE)]
51        cls.data = b''.join(cls.line_gen)
52
53    def setUp(self):
54        # Make a source file with some lines
55        with open(TESTFN, "wb") as fp:
56            fp.write(self.data)
57
58    def make_test_archive(self, f, compression, compresslevel=None):
59        kwargs = {'compression': compression, 'compresslevel': compresslevel}
60        # Create the ZIP archive
61        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
62            zipfp.write(TESTFN, "another.name")
63            zipfp.write(TESTFN, TESTFN)
64            zipfp.writestr("strfile", self.data)
65            with zipfp.open('written-open-w', mode='w') as f:
66                for line in self.line_gen:
67                    f.write(line)
68
69    def zip_test(self, f, compression, compresslevel=None):
70        self.make_test_archive(f, compression, compresslevel)
71
72        # Read the ZIP archive
73        with zipfile.ZipFile(f, "r", compression) as zipfp:
74            self.assertEqual(zipfp.read(TESTFN), self.data)
75            self.assertEqual(zipfp.read("another.name"), self.data)
76            self.assertEqual(zipfp.read("strfile"), self.data)
77
78            # Print the ZIP directory
79            fp = io.StringIO()
80            zipfp.printdir(file=fp)
81            directory = fp.getvalue()
82            lines = directory.splitlines()
83            self.assertEqual(len(lines), 5) # Number of files + header
84
85            self.assertIn('File Name', lines[0])
86            self.assertIn('Modified', lines[0])
87            self.assertIn('Size', lines[0])
88
89            fn, date, time_, size = lines[1].split()
90            self.assertEqual(fn, 'another.name')
91            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
92            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
93            self.assertEqual(size, str(len(self.data)))
94
95            # Check the namelist
96            names = zipfp.namelist()
97            self.assertEqual(len(names), 4)
98            self.assertIn(TESTFN, names)
99            self.assertIn("another.name", names)
100            self.assertIn("strfile", names)
101            self.assertIn("written-open-w", names)
102
103            # Check infolist
104            infos = zipfp.infolist()
105            names = [i.filename for i in infos]
106            self.assertEqual(len(names), 4)
107            self.assertIn(TESTFN, names)
108            self.assertIn("another.name", names)
109            self.assertIn("strfile", names)
110            self.assertIn("written-open-w", names)
111            for i in infos:
112                self.assertEqual(i.file_size, len(self.data))
113
114            # check getinfo
115            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
116                info = zipfp.getinfo(nm)
117                self.assertEqual(info.filename, nm)
118                self.assertEqual(info.file_size, len(self.data))
119
120            # Check that testzip doesn't raise an exception
121            zipfp.testzip()
122
123    def test_basic(self):
124        for f in get_files(self):
125            self.zip_test(f, self.compression)
126
127    def zip_open_test(self, f, compression):
128        self.make_test_archive(f, compression)
129
130        # Read the ZIP archive
131        with zipfile.ZipFile(f, "r", compression) as zipfp:
132            zipdata1 = []
133            with zipfp.open(TESTFN) as zipopen1:
134                while True:
135                    read_data = zipopen1.read(256)
136                    if not read_data:
137                        break
138                    zipdata1.append(read_data)
139
140            zipdata2 = []
141            with zipfp.open("another.name") as zipopen2:
142                while True:
143                    read_data = zipopen2.read(256)
144                    if not read_data:
145                        break
146                    zipdata2.append(read_data)
147
148            self.assertEqual(b''.join(zipdata1), self.data)
149            self.assertEqual(b''.join(zipdata2), self.data)
150
151    def test_open(self):
152        for f in get_files(self):
153            self.zip_open_test(f, self.compression)
154
155    def test_open_with_pathlike(self):
156        path = pathlib.Path(TESTFN2)
157        self.zip_open_test(path, self.compression)
158        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
159            self.assertIsInstance(zipfp.filename, str)
160
161    def zip_random_open_test(self, f, compression):
162        self.make_test_archive(f, compression)
163
164        # Read the ZIP archive
165        with zipfile.ZipFile(f, "r", compression) as zipfp:
166            zipdata1 = []
167            with zipfp.open(TESTFN) as zipopen1:
168                while True:
169                    read_data = zipopen1.read(randint(1, 1024))
170                    if not read_data:
171                        break
172                    zipdata1.append(read_data)
173
174            self.assertEqual(b''.join(zipdata1), self.data)
175
176    def test_random_open(self):
177        for f in get_files(self):
178            self.zip_random_open_test(f, self.compression)
179
180    def zip_read1_test(self, f, compression):
181        self.make_test_archive(f, compression)
182
183        # Read the ZIP archive
184        with zipfile.ZipFile(f, "r") as zipfp, \
185             zipfp.open(TESTFN) as zipopen:
186            zipdata = []
187            while True:
188                read_data = zipopen.read1(-1)
189                if not read_data:
190                    break
191                zipdata.append(read_data)
192
193        self.assertEqual(b''.join(zipdata), self.data)
194
195    def test_read1(self):
196        for f in get_files(self):
197            self.zip_read1_test(f, self.compression)
198
199    def zip_read1_10_test(self, f, compression):
200        self.make_test_archive(f, compression)
201
202        # Read the ZIP archive
203        with zipfile.ZipFile(f, "r") as zipfp, \
204             zipfp.open(TESTFN) as zipopen:
205            zipdata = []
206            while True:
207                read_data = zipopen.read1(10)
208                self.assertLessEqual(len(read_data), 10)
209                if not read_data:
210                    break
211                zipdata.append(read_data)
212
213        self.assertEqual(b''.join(zipdata), self.data)
214
215    def test_read1_10(self):
216        for f in get_files(self):
217            self.zip_read1_10_test(f, self.compression)
218
219    def zip_readline_read_test(self, f, compression):
220        self.make_test_archive(f, compression)
221
222        # Read the ZIP archive
223        with zipfile.ZipFile(f, "r") as zipfp, \
224             zipfp.open(TESTFN) as zipopen:
225            data = b''
226            while True:
227                read = zipopen.readline()
228                if not read:
229                    break
230                data += read
231
232                read = zipopen.read(100)
233                if not read:
234                    break
235                data += read
236
237        self.assertEqual(data, self.data)
238
239    def test_readline_read(self):
240        # Issue #7610: calls to readline() interleaved with calls to read().
241        for f in get_files(self):
242            self.zip_readline_read_test(f, self.compression)
243
244    def zip_readline_test(self, f, compression):
245        self.make_test_archive(f, compression)
246
247        # Read the ZIP archive
248        with zipfile.ZipFile(f, "r") as zipfp:
249            with zipfp.open(TESTFN) as zipopen:
250                for line in self.line_gen:
251                    linedata = zipopen.readline()
252                    self.assertEqual(linedata, line)
253
254    def test_readline(self):
255        for f in get_files(self):
256            self.zip_readline_test(f, self.compression)
257
258    def zip_readlines_test(self, f, compression):
259        self.make_test_archive(f, compression)
260
261        # Read the ZIP archive
262        with zipfile.ZipFile(f, "r") as zipfp:
263            with zipfp.open(TESTFN) as zipopen:
264                ziplines = zipopen.readlines()
265            for line, zipline in zip(self.line_gen, ziplines):
266                self.assertEqual(zipline, line)
267
268    def test_readlines(self):
269        for f in get_files(self):
270            self.zip_readlines_test(f, self.compression)
271
272    def zip_iterlines_test(self, f, compression):
273        self.make_test_archive(f, compression)
274
275        # Read the ZIP archive
276        with zipfile.ZipFile(f, "r") as zipfp:
277            with zipfp.open(TESTFN) as zipopen:
278                for line, zipline in zip(self.line_gen, zipopen):
279                    self.assertEqual(zipline, line)
280
281    def test_iterlines(self):
282        for f in get_files(self):
283            self.zip_iterlines_test(f, self.compression)
284
285    def test_low_compression(self):
286        """Check for cases where compressed data is larger than original."""
287        # Create the ZIP archive
288        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
289            zipfp.writestr("strfile", '12')
290
291        # Get an open object for strfile
292        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
293            with zipfp.open("strfile") as openobj:
294                self.assertEqual(openobj.read(1), b'1')
295                self.assertEqual(openobj.read(1), b'2')
296
297    def test_writestr_compression(self):
298        zipfp = zipfile.ZipFile(TESTFN2, "w")
299        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
300        info = zipfp.getinfo('b.txt')
301        self.assertEqual(info.compress_type, self.compression)
302
303    def test_writestr_compresslevel(self):
304        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
305        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
306        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
307                       compresslevel=2)
308
309        # Compression level follows the constructor.
310        a_info = zipfp.getinfo('a.txt')
311        self.assertEqual(a_info.compress_type, self.compression)
312        self.assertEqual(a_info._compresslevel, 1)
313
314        # Compression level is overridden.
315        b_info = zipfp.getinfo('b.txt')
316        self.assertEqual(b_info.compress_type, self.compression)
317        self.assertEqual(b_info._compresslevel, 2)
318
319    def test_read_return_size(self):
320        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
321        # than requested.
322        for test_size in (1, 4095, 4096, 4097, 16384):
323            file_size = test_size + 1
324            junk = randbytes(file_size)
325            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
326                zipf.writestr('foo', junk)
327                with zipf.open('foo', 'r') as fp:
328                    buf = fp.read(test_size)
329                    self.assertEqual(len(buf), test_size)
330
331    def test_truncated_zipfile(self):
332        fp = io.BytesIO()
333        with zipfile.ZipFile(fp, mode='w') as zipf:
334            zipf.writestr('strfile', self.data, compress_type=self.compression)
335            end_offset = fp.tell()
336        zipfiledata = fp.getvalue()
337
338        fp = io.BytesIO(zipfiledata)
339        with zipfile.ZipFile(fp) as zipf:
340            with zipf.open('strfile') as zipopen:
341                fp.truncate(end_offset - 20)
342                with self.assertRaises(EOFError):
343                    zipopen.read()
344
345        fp = io.BytesIO(zipfiledata)
346        with zipfile.ZipFile(fp) as zipf:
347            with zipf.open('strfile') as zipopen:
348                fp.truncate(end_offset - 20)
349                with self.assertRaises(EOFError):
350                    while zipopen.read(100):
351                        pass
352
353        fp = io.BytesIO(zipfiledata)
354        with zipfile.ZipFile(fp) as zipf:
355            with zipf.open('strfile') as zipopen:
356                fp.truncate(end_offset - 20)
357                with self.assertRaises(EOFError):
358                    while zipopen.read1(100):
359                        pass
360
361    def test_repr(self):
362        fname = 'file.name'
363        for f in get_files(self):
364            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
365                zipfp.write(TESTFN, fname)
366                r = repr(zipfp)
367                self.assertIn("mode='w'", r)
368
369            with zipfile.ZipFile(f, 'r') as zipfp:
370                r = repr(zipfp)
371                if isinstance(f, str):
372                    self.assertIn('filename=%r' % f, r)
373                else:
374                    self.assertIn('file=%r' % f, r)
375                self.assertIn("mode='r'", r)
376                r = repr(zipfp.getinfo(fname))
377                self.assertIn('filename=%r' % fname, r)
378                self.assertIn('filemode=', r)
379                self.assertIn('file_size=', r)
380                if self.compression != zipfile.ZIP_STORED:
381                    self.assertIn('compress_type=', r)
382                    self.assertIn('compress_size=', r)
383                with zipfp.open(fname) as zipopen:
384                    r = repr(zipopen)
385                    self.assertIn('name=%r' % fname, r)
386                    self.assertIn("mode='r'", r)
387                    if self.compression != zipfile.ZIP_STORED:
388                        self.assertIn('compress_type=', r)
389                self.assertIn('[closed]', repr(zipopen))
390            self.assertIn('[closed]', repr(zipfp))
391
392    def test_compresslevel_basic(self):
393        for f in get_files(self):
394            self.zip_test(f, self.compression, compresslevel=9)
395
396    def test_per_file_compresslevel(self):
397        """Check that files within a Zip archive can have different
398        compression levels."""
399        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
400            zipfp.write(TESTFN, 'compress_1')
401            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
402            one_info = zipfp.getinfo('compress_1')
403            nine_info = zipfp.getinfo('compress_9')
404            self.assertEqual(one_info._compresslevel, 1)
405            self.assertEqual(nine_info._compresslevel, 9)
406
407    def test_writing_errors(self):
408        class BrokenFile(io.BytesIO):
409            def write(self, data):
410                nonlocal count
411                if count is not None:
412                    if count == stop:
413                        raise OSError
414                    count += 1
415                super().write(data)
416
417        stop = 0
418        while True:
419            testfile = BrokenFile()
420            count = None
421            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
422                with zipfp.open('file1', 'w') as f:
423                    f.write(b'data1')
424                count = 0
425                try:
426                    with zipfp.open('file2', 'w') as f:
427                        f.write(b'data2')
428                except OSError:
429                    stop += 1
430                else:
431                    break
432                finally:
433                    count = None
434            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
435                self.assertEqual(zipfp.namelist(), ['file1'])
436                self.assertEqual(zipfp.read('file1'), b'data1')
437
438        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
439            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
440            self.assertEqual(zipfp.read('file1'), b'data1')
441            self.assertEqual(zipfp.read('file2'), b'data2')
442
443
444    def tearDown(self):
445        unlink(TESTFN)
446        unlink(TESTFN2)
447
448
449class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
450                                unittest.TestCase):
451    compression = zipfile.ZIP_STORED
452    test_low_compression = None
453
454    def zip_test_writestr_permissions(self, f, compression):
455        # Make sure that writestr and open(... mode='w') create files with
456        # mode 0600, when they are passed a name rather than a ZipInfo
457        # instance.
458
459        self.make_test_archive(f, compression)
460        with zipfile.ZipFile(f, "r") as zipfp:
461            zinfo = zipfp.getinfo('strfile')
462            self.assertEqual(zinfo.external_attr, 0o600 << 16)
463
464            zinfo2 = zipfp.getinfo('written-open-w')
465            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
466
467    def test_writestr_permissions(self):
468        for f in get_files(self):
469            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
470
471    def test_absolute_arcnames(self):
472        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
473            zipfp.write(TESTFN, "/absolute")
474
475        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
476            self.assertEqual(zipfp.namelist(), ["absolute"])
477
478    def test_append_to_zip_file(self):
479        """Test appending to an existing zipfile."""
480        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
481            zipfp.write(TESTFN, TESTFN)
482
483        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
484            zipfp.writestr("strfile", self.data)
485            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
486
487    def test_append_to_non_zip_file(self):
488        """Test appending to an existing file that is not a zipfile."""
489        # NOTE: this test fails if len(d) < 22 because of the first
490        # line "fpin.seek(-22, 2)" in _EndRecData
491        data = b'I am not a ZipFile!'*10
492        with open(TESTFN2, 'wb') as f:
493            f.write(data)
494
495        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
496            zipfp.write(TESTFN, TESTFN)
497
498        with open(TESTFN2, 'rb') as f:
499            f.seek(len(data))
500            with zipfile.ZipFile(f, "r") as zipfp:
501                self.assertEqual(zipfp.namelist(), [TESTFN])
502                self.assertEqual(zipfp.read(TESTFN), self.data)
503        with open(TESTFN2, 'rb') as f:
504            self.assertEqual(f.read(len(data)), data)
505            zipfiledata = f.read()
506        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
507            self.assertEqual(zipfp.namelist(), [TESTFN])
508            self.assertEqual(zipfp.read(TESTFN), self.data)
509
510    def test_read_concatenated_zip_file(self):
511        with io.BytesIO() as bio:
512            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
513                zipfp.write(TESTFN, TESTFN)
514            zipfiledata = bio.getvalue()
515        data = b'I am not a ZipFile!'*10
516        with open(TESTFN2, 'wb') as f:
517            f.write(data)
518            f.write(zipfiledata)
519
520        with zipfile.ZipFile(TESTFN2) as zipfp:
521            self.assertEqual(zipfp.namelist(), [TESTFN])
522            self.assertEqual(zipfp.read(TESTFN), self.data)
523
524    def test_append_to_concatenated_zip_file(self):
525        with io.BytesIO() as bio:
526            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
527                zipfp.write(TESTFN, TESTFN)
528            zipfiledata = bio.getvalue()
529        data = b'I am not a ZipFile!'*1000000
530        with open(TESTFN2, 'wb') as f:
531            f.write(data)
532            f.write(zipfiledata)
533
534        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
535            self.assertEqual(zipfp.namelist(), [TESTFN])
536            zipfp.writestr('strfile', self.data)
537
538        with open(TESTFN2, 'rb') as f:
539            self.assertEqual(f.read(len(data)), data)
540            zipfiledata = f.read()
541        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
542            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
543            self.assertEqual(zipfp.read(TESTFN), self.data)
544            self.assertEqual(zipfp.read('strfile'), self.data)
545
546    def test_ignores_newline_at_end(self):
547        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
548            zipfp.write(TESTFN, TESTFN)
549        with open(TESTFN2, 'a') as f:
550            f.write("\r\n\00\00\00")
551        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
552            self.assertIsInstance(zipfp, zipfile.ZipFile)
553
554    def test_ignores_stuff_appended_past_comments(self):
555        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
556            zipfp.comment = b"this is a comment"
557            zipfp.write(TESTFN, TESTFN)
558        with open(TESTFN2, 'a') as f:
559            f.write("abcdef\r\n")
560        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
561            self.assertIsInstance(zipfp, zipfile.ZipFile)
562            self.assertEqual(zipfp.comment, b"this is a comment")
563
564    def test_write_default_name(self):
565        """Check that calling ZipFile.write without arcname specified
566        produces the expected result."""
567        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
568            zipfp.write(TESTFN)
569            with open(TESTFN, "rb") as f:
570                self.assertEqual(zipfp.read(TESTFN), f.read())
571
572    def test_io_on_closed_zipextfile(self):
573        fname = "somefile.txt"
574        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
575            zipfp.writestr(fname, "bogus")
576
577        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
578            with zipfp.open(fname) as fid:
579                fid.close()
580                self.assertRaises(ValueError, fid.read)
581                self.assertRaises(ValueError, fid.seek, 0)
582                self.assertRaises(ValueError, fid.tell)
583                self.assertRaises(ValueError, fid.readable)
584                self.assertRaises(ValueError, fid.seekable)
585
586    def test_write_to_readonly(self):
587        """Check that trying to call write() on a readonly ZipFile object
588        raises a ValueError."""
589        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
590            zipfp.writestr("somefile.txt", "bogus")
591
592        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
593            self.assertRaises(ValueError, zipfp.write, TESTFN)
594
595        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
596            with self.assertRaises(ValueError):
597                zipfp.open(TESTFN, mode='w')
598
599    def test_add_file_before_1980(self):
600        # Set atime and mtime to 1970-01-01
601        os.utime(TESTFN, (0, 0))
602        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
603            self.assertRaises(ValueError, zipfp.write, TESTFN)
604
605        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
606            zipfp.write(TESTFN)
607            zinfo = zipfp.getinfo(TESTFN)
608            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
609
610    def test_add_file_after_2107(self):
611        # Set atime and mtime to 2108-12-30
612        ts = 4386268800
613        try:
614            time.localtime(ts)
615        except OverflowError:
616            self.skipTest(f'time.localtime({ts}) raises OverflowError')
617        try:
618            os.utime(TESTFN, (ts, ts))
619        except OverflowError:
620            self.skipTest('Host fs cannot set timestamp to required value.')
621
622        mtime_ns = os.stat(TESTFN).st_mtime_ns
623        if mtime_ns != (4386268800 * 10**9):
624            # XFS filesystem is limited to 32-bit timestamp, but the syscall
625            # didn't fail. Moreover, there is a VFS bug which returns
626            # a cached timestamp which is different than the value on disk.
627            #
628            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
629            #
630            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
631            # https://bugs.python.org/issue39460#msg360952
632            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
633
634        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
635            self.assertRaises(struct.error, zipfp.write, TESTFN)
636
637        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
638            zipfp.write(TESTFN)
639            zinfo = zipfp.getinfo(TESTFN)
640            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
641
642
643@requires_zlib()
644class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
645                                 unittest.TestCase):
646    compression = zipfile.ZIP_DEFLATED
647
648    def test_per_file_compression(self):
649        """Check that files within a Zip archive can have different
650        compression options."""
651        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
652            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
653            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
654            sinfo = zipfp.getinfo('storeme')
655            dinfo = zipfp.getinfo('deflateme')
656            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
657            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
658
659@requires_bz2()
660class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
661                               unittest.TestCase):
662    compression = zipfile.ZIP_BZIP2
663
664@requires_lzma()
665class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
666                              unittest.TestCase):
667    compression = zipfile.ZIP_LZMA
668
669
670class AbstractTestZip64InSmallFiles:
671    # These tests test the ZIP64 functionality without using large files,
672    # see test_zipfile64 for proper tests.
673
674    @classmethod
675    def setUpClass(cls):
676        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
677                    for i in range(0, FIXEDTEST_SIZE))
678        cls.data = b'\n'.join(line_gen)
679
680    def setUp(self):
681        self._limit = zipfile.ZIP64_LIMIT
682        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
683        zipfile.ZIP64_LIMIT = 1000
684        zipfile.ZIP_FILECOUNT_LIMIT = 9
685
686        # Make a source file with some lines
687        with open(TESTFN, "wb") as fp:
688            fp.write(self.data)
689
690    def zip_test(self, f, compression):
691        # Create the ZIP archive
692        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
693            zipfp.write(TESTFN, "another.name")
694            zipfp.write(TESTFN, TESTFN)
695            zipfp.writestr("strfile", self.data)
696
697        # Read the ZIP archive
698        with zipfile.ZipFile(f, "r", compression) as zipfp:
699            self.assertEqual(zipfp.read(TESTFN), self.data)
700            self.assertEqual(zipfp.read("another.name"), self.data)
701            self.assertEqual(zipfp.read("strfile"), self.data)
702
703            # Print the ZIP directory
704            fp = io.StringIO()
705            zipfp.printdir(fp)
706
707            directory = fp.getvalue()
708            lines = directory.splitlines()
709            self.assertEqual(len(lines), 4) # Number of files + header
710
711            self.assertIn('File Name', lines[0])
712            self.assertIn('Modified', lines[0])
713            self.assertIn('Size', lines[0])
714
715            fn, date, time_, size = lines[1].split()
716            self.assertEqual(fn, 'another.name')
717            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
718            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
719            self.assertEqual(size, str(len(self.data)))
720
721            # Check the namelist
722            names = zipfp.namelist()
723            self.assertEqual(len(names), 3)
724            self.assertIn(TESTFN, names)
725            self.assertIn("another.name", names)
726            self.assertIn("strfile", names)
727
728            # Check infolist
729            infos = zipfp.infolist()
730            names = [i.filename for i in infos]
731            self.assertEqual(len(names), 3)
732            self.assertIn(TESTFN, names)
733            self.assertIn("another.name", names)
734            self.assertIn("strfile", names)
735            for i in infos:
736                self.assertEqual(i.file_size, len(self.data))
737
738            # check getinfo
739            for nm in (TESTFN, "another.name", "strfile"):
740                info = zipfp.getinfo(nm)
741                self.assertEqual(info.filename, nm)
742                self.assertEqual(info.file_size, len(self.data))
743
744            # Check that testzip doesn't raise an exception
745            zipfp.testzip()
746
747    def test_basic(self):
748        for f in get_files(self):
749            self.zip_test(f, self.compression)
750
751    def test_too_many_files(self):
752        # This test checks that more than 64k files can be added to an archive,
753        # and that the resulting archive can be read properly by ZipFile
754        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
755                               allowZip64=True)
756        zipf.debug = 100
757        numfiles = 15
758        for i in range(numfiles):
759            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
760        self.assertEqual(len(zipf.namelist()), numfiles)
761        zipf.close()
762
763        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
764        self.assertEqual(len(zipf2.namelist()), numfiles)
765        for i in range(numfiles):
766            content = zipf2.read("foo%08d" % i).decode('ascii')
767            self.assertEqual(content, "%d" % (i**3 % 57))
768        zipf2.close()
769
770    def test_too_many_files_append(self):
771        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
772                               allowZip64=False)
773        zipf.debug = 100
774        numfiles = 9
775        for i in range(numfiles):
776            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
777        self.assertEqual(len(zipf.namelist()), numfiles)
778        with self.assertRaises(zipfile.LargeZipFile):
779            zipf.writestr("foo%08d" % numfiles, b'')
780        self.assertEqual(len(zipf.namelist()), numfiles)
781        zipf.close()
782
783        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
784                               allowZip64=False)
785        zipf.debug = 100
786        self.assertEqual(len(zipf.namelist()), numfiles)
787        with self.assertRaises(zipfile.LargeZipFile):
788            zipf.writestr("foo%08d" % numfiles, b'')
789        self.assertEqual(len(zipf.namelist()), numfiles)
790        zipf.close()
791
792        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
793                               allowZip64=True)
794        zipf.debug = 100
795        self.assertEqual(len(zipf.namelist()), numfiles)
796        numfiles2 = 15
797        for i in range(numfiles, numfiles2):
798            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
799        self.assertEqual(len(zipf.namelist()), numfiles2)
800        zipf.close()
801
802        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
803        self.assertEqual(len(zipf2.namelist()), numfiles2)
804        for i in range(numfiles2):
805            content = zipf2.read("foo%08d" % i).decode('ascii')
806            self.assertEqual(content, "%d" % (i**3 % 57))
807        zipf2.close()
808
809    def tearDown(self):
810        zipfile.ZIP64_LIMIT = self._limit
811        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
812        unlink(TESTFN)
813        unlink(TESTFN2)
814
815
816class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
817                                  unittest.TestCase):
818    compression = zipfile.ZIP_STORED
819
820    def large_file_exception_test(self, f, compression):
821        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
822            self.assertRaises(zipfile.LargeZipFile,
823                              zipfp.write, TESTFN, "another.name")
824
825    def large_file_exception_test2(self, f, compression):
826        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
827            self.assertRaises(zipfile.LargeZipFile,
828                              zipfp.writestr, "another.name", self.data)
829
830    def test_large_file_exception(self):
831        for f in get_files(self):
832            self.large_file_exception_test(f, zipfile.ZIP_STORED)
833            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
834
835    def test_absolute_arcnames(self):
836        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
837                             allowZip64=True) as zipfp:
838            zipfp.write(TESTFN, "/absolute")
839
840        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
841            self.assertEqual(zipfp.namelist(), ["absolute"])
842
843    def test_append(self):
844        # Test that appending to the Zip64 archive doesn't change
845        # extra fields of existing entries.
846        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
847            zipfp.writestr("strfile", self.data)
848        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
849            zinfo = zipfp.getinfo("strfile")
850            extra = zinfo.extra
851        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
852            zipfp.writestr("strfile2", self.data)
853        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
854            zinfo = zipfp.getinfo("strfile")
855            self.assertEqual(zinfo.extra, extra)
856
857    def make_zip64_file(
858        self, file_size_64_set=False, file_size_extra=False,
859        compress_size_64_set=False, compress_size_extra=False,
860        header_offset_64_set=False, header_offset_extra=False,
861    ):
862        """Generate bytes sequence for a zip with (incomplete) zip64 data.
863
864        The actual values (not the zip 64 0xffffffff values) stored in the file
865        are:
866        file_size: 8
867        compress_size: 8
868        header_offset: 0
869        """
870        actual_size = 8
871        actual_header_offset = 0
872        local_zip64_fields = []
873        central_zip64_fields = []
874
875        file_size = actual_size
876        if file_size_64_set:
877            file_size = 0xffffffff
878            if file_size_extra:
879                local_zip64_fields.append(actual_size)
880                central_zip64_fields.append(actual_size)
881        file_size = struct.pack("<L", file_size)
882
883        compress_size = actual_size
884        if compress_size_64_set:
885            compress_size = 0xffffffff
886            if compress_size_extra:
887                local_zip64_fields.append(actual_size)
888                central_zip64_fields.append(actual_size)
889        compress_size = struct.pack("<L", compress_size)
890
891        header_offset = actual_header_offset
892        if header_offset_64_set:
893            header_offset = 0xffffffff
894            if header_offset_extra:
895                central_zip64_fields.append(actual_header_offset)
896        header_offset = struct.pack("<L", header_offset)
897
898        local_extra = struct.pack(
899            '<HH' + 'Q'*len(local_zip64_fields),
900            0x0001,
901            8*len(local_zip64_fields),
902            *local_zip64_fields
903        )
904
905        central_extra = struct.pack(
906            '<HH' + 'Q'*len(central_zip64_fields),
907            0x0001,
908            8*len(central_zip64_fields),
909            *central_zip64_fields
910        )
911
912        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
913        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
914
915        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
916        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
917
918        filename = b"test.txt"
919        content = b"test1234"
920        filename_length = struct.pack("<H", len(filename))
921        zip64_contents = (
922            # Local file header
923            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
924            + compress_size
925            + file_size
926            + filename_length
927            + local_extra_length
928            + filename
929            + local_extra
930            + content
931            # Central directory:
932            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
933            + compress_size
934            + file_size
935            + filename_length
936            + central_extra_length
937            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
938            + header_offset
939            + filename
940            + central_extra
941            # Zip64 end of central directory
942            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
943            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
944            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
945            + central_dir_size
946            + offset_to_central_dir
947            # Zip64 end of central directory locator
948            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
949            + b"\x00\x00\x00"
950            # end of central directory
951            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
952            + b"\x00\x00\x00\x00"
953        )
954        return zip64_contents
955
956    def test_bad_zip64_extra(self):
957        """Missing zip64 extra records raises an exception.
958
959        There are 4 fields that the zip64 format handles (the disk number is
960        not used in this module and so is ignored here). According to the zip
961        spec:
962              The order of the fields in the zip64 extended
963              information record is fixed, but the fields MUST
964              only appear if the corresponding Local or Central
965              directory record field is set to 0xFFFF or 0xFFFFFFFF.
966
967        If the zip64 extra content doesn't contain enough entries for the
968        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
969        This test mismatches the length of the zip64 extra field and the number
970        of fields set to indicate the presence of zip64 data.
971        """
972        # zip64 file size present, no fields in extra, expecting one, equals
973        # missing file size.
974        missing_file_size_extra = self.make_zip64_file(
975            file_size_64_set=True,
976        )
977        with self.assertRaises(zipfile.BadZipFile) as e:
978            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
979        self.assertIn('file size', str(e.exception).lower())
980
981        # zip64 file size present, zip64 compress size present, one field in
982        # extra, expecting two, equals missing compress size.
983        missing_compress_size_extra = self.make_zip64_file(
984            file_size_64_set=True,
985            file_size_extra=True,
986            compress_size_64_set=True,
987        )
988        with self.assertRaises(zipfile.BadZipFile) as e:
989            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
990        self.assertIn('compress size', str(e.exception).lower())
991
992        # zip64 compress size present, no fields in extra, expecting one,
993        # equals missing compress size.
994        missing_compress_size_extra = self.make_zip64_file(
995            compress_size_64_set=True,
996        )
997        with self.assertRaises(zipfile.BadZipFile) as e:
998            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
999        self.assertIn('compress size', str(e.exception).lower())
1000
1001        # zip64 file size present, zip64 compress size present, zip64 header
1002        # offset present, two fields in extra, expecting three, equals missing
1003        # header offset
1004        missing_header_offset_extra = self.make_zip64_file(
1005            file_size_64_set=True,
1006            file_size_extra=True,
1007            compress_size_64_set=True,
1008            compress_size_extra=True,
1009            header_offset_64_set=True,
1010        )
1011        with self.assertRaises(zipfile.BadZipFile) as e:
1012            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1013        self.assertIn('header offset', str(e.exception).lower())
1014
1015        # zip64 compress size present, zip64 header offset present, one field
1016        # in extra, expecting two, equals missing header offset
1017        missing_header_offset_extra = self.make_zip64_file(
1018            file_size_64_set=False,
1019            compress_size_64_set=True,
1020            compress_size_extra=True,
1021            header_offset_64_set=True,
1022        )
1023        with self.assertRaises(zipfile.BadZipFile) as e:
1024            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1025        self.assertIn('header offset', str(e.exception).lower())
1026
1027        # zip64 file size present, zip64 header offset present, one field in
1028        # extra, expecting two, equals missing header offset
1029        missing_header_offset_extra = self.make_zip64_file(
1030            file_size_64_set=True,
1031            file_size_extra=True,
1032            compress_size_64_set=False,
1033            header_offset_64_set=True,
1034        )
1035        with self.assertRaises(zipfile.BadZipFile) as e:
1036            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1037        self.assertIn('header offset', str(e.exception).lower())
1038
1039        # zip64 header offset present, no fields in extra, expecting one,
1040        # equals missing header offset
1041        missing_header_offset_extra = self.make_zip64_file(
1042            file_size_64_set=False,
1043            compress_size_64_set=False,
1044            header_offset_64_set=True,
1045        )
1046        with self.assertRaises(zipfile.BadZipFile) as e:
1047            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1048        self.assertIn('header offset', str(e.exception).lower())
1049
1050    def test_generated_valid_zip64_extra(self):
1051        # These values are what is set in the make_zip64_file method.
1052        expected_file_size = 8
1053        expected_compress_size = 8
1054        expected_header_offset = 0
1055        expected_content = b"test1234"
1056
1057        # Loop through the various valid combinations of zip64 masks
1058        # present and extra fields present.
1059        params = (
1060            {"file_size_64_set": True, "file_size_extra": True},
1061            {"compress_size_64_set": True, "compress_size_extra": True},
1062            {"header_offset_64_set": True, "header_offset_extra": True},
1063        )
1064
1065        for r in range(1, len(params) + 1):
1066            for combo in itertools.combinations(params, r):
1067                kwargs = {}
1068                for c in combo:
1069                    kwargs.update(c)
1070                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1071                    zinfo = zf.infolist()[0]
1072                    self.assertEqual(zinfo.file_size, expected_file_size)
1073                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1074                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1075                    self.assertEqual(zf.read(zinfo), expected_content)
1076
1077
1078@requires_zlib()
1079class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1080                                   unittest.TestCase):
1081    compression = zipfile.ZIP_DEFLATED
1082
1083@requires_bz2()
1084class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1085                                 unittest.TestCase):
1086    compression = zipfile.ZIP_BZIP2
1087
1088@requires_lzma()
1089class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1090                                unittest.TestCase):
1091    compression = zipfile.ZIP_LZMA
1092
1093
1094class AbstractWriterTests:
1095
1096    def tearDown(self):
1097        unlink(TESTFN2)
1098
1099    def test_close_after_close(self):
1100        data = b'content'
1101        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1102            w = zipf.open('test', 'w')
1103            w.write(data)
1104            w.close()
1105            self.assertTrue(w.closed)
1106            w.close()
1107            self.assertTrue(w.closed)
1108            self.assertEqual(zipf.read('test'), data)
1109
1110    def test_write_after_close(self):
1111        data = b'content'
1112        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1113            w = zipf.open('test', 'w')
1114            w.write(data)
1115            w.close()
1116            self.assertTrue(w.closed)
1117            self.assertRaises(ValueError, w.write, b'')
1118            self.assertEqual(zipf.read('test'), data)
1119
1120class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1121    compression = zipfile.ZIP_STORED
1122
1123@requires_zlib()
1124class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1125    compression = zipfile.ZIP_DEFLATED
1126
1127@requires_bz2()
1128class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1129    compression = zipfile.ZIP_BZIP2
1130
1131@requires_lzma()
1132class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1133    compression = zipfile.ZIP_LZMA
1134
1135
1136class PyZipFileTests(unittest.TestCase):
1137    def assertCompiledIn(self, name, namelist):
1138        if name + 'o' not in namelist:
1139            self.assertIn(name + 'c', namelist)
1140
1141    def requiresWriteAccess(self, path):
1142        # effective_ids unavailable on windows
1143        if not os.access(path, os.W_OK,
1144                         effective_ids=os.access in os.supports_effective_ids):
1145            self.skipTest('requires write access to the installed location')
1146        filename = os.path.join(path, 'test_zipfile.try')
1147        try:
1148            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1149            os.close(fd)
1150        except Exception:
1151            self.skipTest('requires write access to the installed location')
1152        unlink(filename)
1153
1154    def test_write_pyfile(self):
1155        self.requiresWriteAccess(os.path.dirname(__file__))
1156        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1157            fn = __file__
1158            if fn.endswith('.pyc'):
1159                path_split = fn.split(os.sep)
1160                if os.altsep is not None:
1161                    path_split.extend(fn.split(os.altsep))
1162                if '__pycache__' in path_split:
1163                    fn = importlib.util.source_from_cache(fn)
1164                else:
1165                    fn = fn[:-1]
1166
1167            zipfp.writepy(fn)
1168
1169            bn = os.path.basename(fn)
1170            self.assertNotIn(bn, zipfp.namelist())
1171            self.assertCompiledIn(bn, zipfp.namelist())
1172
1173        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1174            fn = __file__
1175            if fn.endswith('.pyc'):
1176                fn = fn[:-1]
1177
1178            zipfp.writepy(fn, "testpackage")
1179
1180            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1181            self.assertNotIn(bn, zipfp.namelist())
1182            self.assertCompiledIn(bn, zipfp.namelist())
1183
1184    def test_write_python_package(self):
1185        import email
1186        packagedir = os.path.dirname(email.__file__)
1187        self.requiresWriteAccess(packagedir)
1188
1189        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1190            zipfp.writepy(packagedir)
1191
1192            # Check for a couple of modules at different levels of the
1193            # hierarchy
1194            names = zipfp.namelist()
1195            self.assertCompiledIn('email/__init__.py', names)
1196            self.assertCompiledIn('email/mime/text.py', names)
1197
1198    def test_write_filtered_python_package(self):
1199        import test
1200        packagedir = os.path.dirname(test.__file__)
1201        self.requiresWriteAccess(packagedir)
1202
1203        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1204
1205            # first make sure that the test folder gives error messages
1206            # (on the badsyntax_... files)
1207            with captured_stdout() as reportSIO:
1208                zipfp.writepy(packagedir)
1209            reportStr = reportSIO.getvalue()
1210            self.assertTrue('SyntaxError' in reportStr)
1211
1212            # then check that the filter works on the whole package
1213            with captured_stdout() as reportSIO:
1214                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1215            reportStr = reportSIO.getvalue()
1216            self.assertTrue('SyntaxError' not in reportStr)
1217
1218            # then check that the filter works on individual files
1219            def filter(path):
1220                return not os.path.basename(path).startswith("bad")
1221            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1222                zipfp.writepy(packagedir, filterfunc=filter)
1223            reportStr = reportSIO.getvalue()
1224            if reportStr:
1225                print(reportStr)
1226            self.assertTrue('SyntaxError' not in reportStr)
1227
1228    def test_write_with_optimization(self):
1229        import email
1230        packagedir = os.path.dirname(email.__file__)
1231        self.requiresWriteAccess(packagedir)
1232        optlevel = 1 if __debug__ else 0
1233        ext = '.pyc'
1234
1235        with TemporaryFile() as t, \
1236             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1237            zipfp.writepy(packagedir)
1238
1239            names = zipfp.namelist()
1240            self.assertIn('email/__init__' + ext, names)
1241            self.assertIn('email/mime/text' + ext, names)
1242
1243    def test_write_python_directory(self):
1244        os.mkdir(TESTFN2)
1245        try:
1246            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1247                fp.write("print(42)\n")
1248
1249            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1250                fp.write("print(42 * 42)\n")
1251
1252            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
1253                fp.write("bla bla bla\n")
1254
1255            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1256                zipfp.writepy(TESTFN2)
1257
1258                names = zipfp.namelist()
1259                self.assertCompiledIn('mod1.py', names)
1260                self.assertCompiledIn('mod2.py', names)
1261                self.assertNotIn('mod2.txt', names)
1262
1263        finally:
1264            rmtree(TESTFN2)
1265
1266    def test_write_python_directory_filtered(self):
1267        os.mkdir(TESTFN2)
1268        try:
1269            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1270                fp.write("print(42)\n")
1271
1272            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1273                fp.write("print(42 * 42)\n")
1274
1275            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1276                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1277                                                  not fn.endswith('mod2.py'))
1278
1279                names = zipfp.namelist()
1280                self.assertCompiledIn('mod1.py', names)
1281                self.assertNotIn('mod2.py', names)
1282
1283        finally:
1284            rmtree(TESTFN2)
1285
1286    def test_write_non_pyfile(self):
1287        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1288            with open(TESTFN, 'w') as f:
1289                f.write('most definitely not a python file')
1290            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1291            unlink(TESTFN)
1292
1293    def test_write_pyfile_bad_syntax(self):
1294        os.mkdir(TESTFN2)
1295        try:
1296            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1297                fp.write("Bad syntax in python file\n")
1298
1299            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1300                # syntax errors are printed to stdout
1301                with captured_stdout() as s:
1302                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1303
1304                self.assertIn("SyntaxError", s.getvalue())
1305
1306                # as it will not have compiled the python file, it will
1307                # include the .py file not .pyc
1308                names = zipfp.namelist()
1309                self.assertIn('mod1.py', names)
1310                self.assertNotIn('mod1.pyc', names)
1311
1312        finally:
1313            rmtree(TESTFN2)
1314
1315    def test_write_pathlike(self):
1316        os.mkdir(TESTFN2)
1317        try:
1318            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1319                fp.write("print(42)\n")
1320
1321            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1322                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1323                names = zipfp.namelist()
1324                self.assertCompiledIn('mod1.py', names)
1325        finally:
1326            rmtree(TESTFN2)
1327
1328
1329class ExtractTests(unittest.TestCase):
1330
1331    def make_test_file(self):
1332        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1333            for fpath, fdata in SMALL_TEST_DATA:
1334                zipfp.writestr(fpath, fdata)
1335
1336    def test_extract(self):
1337        with temp_cwd():
1338            self.make_test_file()
1339            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1340                for fpath, fdata in SMALL_TEST_DATA:
1341                    writtenfile = zipfp.extract(fpath)
1342
1343                    # make sure it was written to the right place
1344                    correctfile = os.path.join(os.getcwd(), fpath)
1345                    correctfile = os.path.normpath(correctfile)
1346
1347                    self.assertEqual(writtenfile, correctfile)
1348
1349                    # make sure correct data is in correct file
1350                    with open(writtenfile, "rb") as f:
1351                        self.assertEqual(fdata.encode(), f.read())
1352
1353                    unlink(writtenfile)
1354
1355    def _test_extract_with_target(self, target):
1356        self.make_test_file()
1357        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1358            for fpath, fdata in SMALL_TEST_DATA:
1359                writtenfile = zipfp.extract(fpath, target)
1360
1361                # make sure it was written to the right place
1362                correctfile = os.path.join(target, fpath)
1363                correctfile = os.path.normpath(correctfile)
1364                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1365
1366                # make sure correct data is in correct file
1367                with open(writtenfile, "rb") as f:
1368                    self.assertEqual(fdata.encode(), f.read())
1369
1370                unlink(writtenfile)
1371
1372        unlink(TESTFN2)
1373
1374    def test_extract_with_target(self):
1375        with temp_dir() as extdir:
1376            self._test_extract_with_target(extdir)
1377
1378    def test_extract_with_target_pathlike(self):
1379        with temp_dir() as extdir:
1380            self._test_extract_with_target(pathlib.Path(extdir))
1381
1382    def test_extract_all(self):
1383        with temp_cwd():
1384            self.make_test_file()
1385            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1386                zipfp.extractall()
1387                for fpath, fdata in SMALL_TEST_DATA:
1388                    outfile = os.path.join(os.getcwd(), fpath)
1389
1390                    with open(outfile, "rb") as f:
1391                        self.assertEqual(fdata.encode(), f.read())
1392
1393                    unlink(outfile)
1394
1395    def _test_extract_all_with_target(self, target):
1396        self.make_test_file()
1397        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1398            zipfp.extractall(target)
1399            for fpath, fdata in SMALL_TEST_DATA:
1400                outfile = os.path.join(target, fpath)
1401
1402                with open(outfile, "rb") as f:
1403                    self.assertEqual(fdata.encode(), f.read())
1404
1405                unlink(outfile)
1406
1407        unlink(TESTFN2)
1408
1409    def test_extract_all_with_target(self):
1410        with temp_dir() as extdir:
1411            self._test_extract_all_with_target(extdir)
1412
1413    def test_extract_all_with_target_pathlike(self):
1414        with temp_dir() as extdir:
1415            self._test_extract_all_with_target(pathlib.Path(extdir))
1416
1417    def check_file(self, filename, content):
1418        self.assertTrue(os.path.isfile(filename))
1419        with open(filename, 'rb') as f:
1420            self.assertEqual(f.read(), content)
1421
1422    def test_sanitize_windows_name(self):
1423        san = zipfile.ZipFile._sanitize_windows_name
1424        # Passing pathsep in allows this test to work regardless of platform.
1425        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1426        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1427        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1428
1429    def test_extract_hackers_arcnames_common_cases(self):
1430        common_hacknames = [
1431            ('../foo/bar', 'foo/bar'),
1432            ('foo/../bar', 'foo/bar'),
1433            ('foo/../../bar', 'foo/bar'),
1434            ('foo/bar/..', 'foo/bar'),
1435            ('./../foo/bar', 'foo/bar'),
1436            ('/foo/bar', 'foo/bar'),
1437            ('/foo/../bar', 'foo/bar'),
1438            ('/foo/../../bar', 'foo/bar'),
1439        ]
1440        self._test_extract_hackers_arcnames(common_hacknames)
1441
1442    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1443    def test_extract_hackers_arcnames_windows_only(self):
1444        """Test combination of path fixing and windows name sanitization."""
1445        windows_hacknames = [
1446            (r'..\foo\bar', 'foo/bar'),
1447            (r'..\/foo\/bar', 'foo/bar'),
1448            (r'foo/\..\/bar', 'foo/bar'),
1449            (r'foo\/../\bar', 'foo/bar'),
1450            (r'C:foo/bar', 'foo/bar'),
1451            (r'C:/foo/bar', 'foo/bar'),
1452            (r'C://foo/bar', 'foo/bar'),
1453            (r'C:\foo\bar', 'foo/bar'),
1454            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1455            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1456            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1457            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1458            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1459            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1460            (r'//?/C:/foo/bar', 'foo/bar'),
1461            (r'\\?\C:\foo\bar', 'foo/bar'),
1462            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1463            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1464            ('../../foo../../ba..r', 'foo/ba..r'),
1465        ]
1466        self._test_extract_hackers_arcnames(windows_hacknames)
1467
1468    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1469    def test_extract_hackers_arcnames_posix_only(self):
1470        posix_hacknames = [
1471            ('//foo/bar', 'foo/bar'),
1472            ('../../foo../../ba..r', 'foo../ba..r'),
1473            (r'foo/..\bar', r'foo/..\bar'),
1474        ]
1475        self._test_extract_hackers_arcnames(posix_hacknames)
1476
1477    def _test_extract_hackers_arcnames(self, hacknames):
1478        for arcname, fixedname in hacknames:
1479            content = b'foobar' + arcname.encode()
1480            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1481                zinfo = zipfile.ZipInfo()
1482                # preserve backslashes
1483                zinfo.filename = arcname
1484                zinfo.external_attr = 0o600 << 16
1485                zipfp.writestr(zinfo, content)
1486
1487            arcname = arcname.replace(os.sep, "/")
1488            targetpath = os.path.join('target', 'subdir', 'subsub')
1489            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1490
1491            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1492                writtenfile = zipfp.extract(arcname, targetpath)
1493                self.assertEqual(writtenfile, correctfile,
1494                                 msg='extract %r: %r != %r' %
1495                                 (arcname, writtenfile, correctfile))
1496            self.check_file(correctfile, content)
1497            rmtree('target')
1498
1499            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1500                zipfp.extractall(targetpath)
1501            self.check_file(correctfile, content)
1502            rmtree('target')
1503
1504            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1505
1506            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1507                writtenfile = zipfp.extract(arcname)
1508                self.assertEqual(writtenfile, correctfile,
1509                                 msg="extract %r" % arcname)
1510            self.check_file(correctfile, content)
1511            rmtree(fixedname.split('/')[0])
1512
1513            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1514                zipfp.extractall()
1515            self.check_file(correctfile, content)
1516            rmtree(fixedname.split('/')[0])
1517
1518            unlink(TESTFN2)
1519
1520
1521class OtherTests(unittest.TestCase):
1522    def test_open_via_zip_info(self):
1523        # Create the ZIP archive
1524        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1525            zipfp.writestr("name", "foo")
1526            with self.assertWarns(UserWarning):
1527                zipfp.writestr("name", "bar")
1528            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1529
1530        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1531            infos = zipfp.infolist()
1532            data = b""
1533            for info in infos:
1534                with zipfp.open(info) as zipopen:
1535                    data += zipopen.read()
1536            self.assertIn(data, {b"foobar", b"barfoo"})
1537            data = b""
1538            for info in infos:
1539                data += zipfp.read(info)
1540            self.assertIn(data, {b"foobar", b"barfoo"})
1541
1542    def test_writestr_extended_local_header_issue1202(self):
1543        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1544            for data in 'abcdefghijklmnop':
1545                zinfo = zipfile.ZipInfo(data)
1546                zinfo.flag_bits |= 0x08  # Include an extended local header.
1547                orig_zip.writestr(zinfo, data)
1548
1549    def test_close(self):
1550        """Check that the zipfile is closed after the 'with' block."""
1551        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1552            for fpath, fdata in SMALL_TEST_DATA:
1553                zipfp.writestr(fpath, fdata)
1554                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1555        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1556
1557        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1558            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1559        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1560
1561    def test_close_on_exception(self):
1562        """Check that the zipfile is closed if an exception is raised in the
1563        'with' block."""
1564        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1565            for fpath, fdata in SMALL_TEST_DATA:
1566                zipfp.writestr(fpath, fdata)
1567
1568        try:
1569            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1570                raise zipfile.BadZipFile()
1571        except zipfile.BadZipFile:
1572            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1573
1574    def test_unsupported_version(self):
1575        # File has an extract_version of 120
1576        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1577                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1578                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1579                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1580                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1581
1582        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1583                          io.BytesIO(data), 'r')
1584
1585    @requires_zlib()
1586    def test_read_unicode_filenames(self):
1587        # bug #10801
1588        fname = findfile('zip_cp437_header.zip')
1589        with zipfile.ZipFile(fname) as zipfp:
1590            for name in zipfp.namelist():
1591                zipfp.open(name).close()
1592
1593    def test_write_unicode_filenames(self):
1594        with zipfile.ZipFile(TESTFN, "w") as zf:
1595            zf.writestr("foo.txt", "Test for unicode filename")
1596            zf.writestr("\xf6.txt", "Test for unicode filename")
1597            self.assertIsInstance(zf.infolist()[0].filename, str)
1598
1599        with zipfile.ZipFile(TESTFN, "r") as zf:
1600            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1601            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1602
1603    def test_read_after_write_unicode_filenames(self):
1604        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1605            zipfp.writestr('приклад', b'sample')
1606            self.assertEqual(zipfp.read('приклад'), b'sample')
1607
1608    def test_exclusive_create_zip_file(self):
1609        """Test exclusive creating a new zipfile."""
1610        unlink(TESTFN2)
1611        filename = 'testfile.txt'
1612        content = b'hello, world. this is some content.'
1613        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1614            zipfp.writestr(filename, content)
1615        with self.assertRaises(FileExistsError):
1616            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1617        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1618            self.assertEqual(zipfp.namelist(), [filename])
1619            self.assertEqual(zipfp.read(filename), content)
1620
1621    def test_create_non_existent_file_for_append(self):
1622        if os.path.exists(TESTFN):
1623            os.unlink(TESTFN)
1624
1625        filename = 'testfile.txt'
1626        content = b'hello, world. this is some content.'
1627
1628        try:
1629            with zipfile.ZipFile(TESTFN, 'a') as zf:
1630                zf.writestr(filename, content)
1631        except OSError:
1632            self.fail('Could not append data to a non-existent zip file.')
1633
1634        self.assertTrue(os.path.exists(TESTFN))
1635
1636        with zipfile.ZipFile(TESTFN, 'r') as zf:
1637            self.assertEqual(zf.read(filename), content)
1638
1639    def test_close_erroneous_file(self):
1640        # This test checks that the ZipFile constructor closes the file object
1641        # it opens if there's an error in the file.  If it doesn't, the
1642        # traceback holds a reference to the ZipFile object and, indirectly,
1643        # the file object.
1644        # On Windows, this causes the os.unlink() call to fail because the
1645        # underlying file is still open.  This is SF bug #412214.
1646        #
1647        with open(TESTFN, "w") as fp:
1648            fp.write("this is not a legal zip file\n")
1649        try:
1650            zf = zipfile.ZipFile(TESTFN)
1651        except zipfile.BadZipFile:
1652            pass
1653
1654    def test_is_zip_erroneous_file(self):
1655        """Check that is_zipfile() correctly identifies non-zip files."""
1656        # - passing a filename
1657        with open(TESTFN, "w") as fp:
1658            fp.write("this is not a legal zip file\n")
1659        self.assertFalse(zipfile.is_zipfile(TESTFN))
1660        # - passing a path-like object
1661        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1662        # - passing a file object
1663        with open(TESTFN, "rb") as fp:
1664            self.assertFalse(zipfile.is_zipfile(fp))
1665        # - passing a file-like object
1666        fp = io.BytesIO()
1667        fp.write(b"this is not a legal zip file\n")
1668        self.assertFalse(zipfile.is_zipfile(fp))
1669        fp.seek(0, 0)
1670        self.assertFalse(zipfile.is_zipfile(fp))
1671
1672    def test_damaged_zipfile(self):
1673        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1674        # - Create a valid zip file
1675        fp = io.BytesIO()
1676        with zipfile.ZipFile(fp, mode="w") as zipf:
1677            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1678        zipfiledata = fp.getvalue()
1679
1680        # - Now create copies of it missing the last N bytes and make sure
1681        #   a BadZipFile exception is raised when we try to open it
1682        for N in range(len(zipfiledata)):
1683            fp = io.BytesIO(zipfiledata[:N])
1684            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1685
1686    def test_is_zip_valid_file(self):
1687        """Check that is_zipfile() correctly identifies zip files."""
1688        # - passing a filename
1689        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1690            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1691
1692        self.assertTrue(zipfile.is_zipfile(TESTFN))
1693        # - passing a file object
1694        with open(TESTFN, "rb") as fp:
1695            self.assertTrue(zipfile.is_zipfile(fp))
1696            fp.seek(0, 0)
1697            zip_contents = fp.read()
1698        # - passing a file-like object
1699        fp = io.BytesIO()
1700        fp.write(zip_contents)
1701        self.assertTrue(zipfile.is_zipfile(fp))
1702        fp.seek(0, 0)
1703        self.assertTrue(zipfile.is_zipfile(fp))
1704
1705    def test_non_existent_file_raises_OSError(self):
1706        # make sure we don't raise an AttributeError when a partially-constructed
1707        # ZipFile instance is finalized; this tests for regression on SF tracker
1708        # bug #403871.
1709
1710        # The bug we're testing for caused an AttributeError to be raised
1711        # when a ZipFile instance was created for a file that did not
1712        # exist; the .fp member was not initialized but was needed by the
1713        # __del__() method.  Since the AttributeError is in the __del__(),
1714        # it is ignored, but the user should be sufficiently annoyed by
1715        # the message on the output that regression will be noticed
1716        # quickly.
1717        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1718
1719    def test_empty_file_raises_BadZipFile(self):
1720        f = open(TESTFN, 'w')
1721        f.close()
1722        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1723
1724        with open(TESTFN, 'w') as fp:
1725            fp.write("short file")
1726        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1727
1728    def test_closed_zip_raises_ValueError(self):
1729        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1730        data = io.BytesIO()
1731        with zipfile.ZipFile(data, mode="w") as zipf:
1732            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1733
1734        # This is correct; calling .read on a closed ZipFile should raise
1735        # a ValueError, and so should calling .testzip.  An earlier
1736        # version of .testzip would swallow this exception (and any other)
1737        # and report that the first file in the archive was corrupt.
1738        self.assertRaises(ValueError, zipf.read, "foo.txt")
1739        self.assertRaises(ValueError, zipf.open, "foo.txt")
1740        self.assertRaises(ValueError, zipf.testzip)
1741        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1742        with open(TESTFN, 'w') as f:
1743            f.write('zipfile test data')
1744        self.assertRaises(ValueError, zipf.write, TESTFN)
1745
1746    def test_bad_constructor_mode(self):
1747        """Check that bad modes passed to ZipFile constructor are caught."""
1748        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1749
1750    def test_bad_open_mode(self):
1751        """Check that bad modes passed to ZipFile.open are caught."""
1752        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1753            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1754
1755        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1756            # read the data to make sure the file is there
1757            zipf.read("foo.txt")
1758            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1759            # universal newlines support is removed
1760            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1761            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1762
1763    def test_read0(self):
1764        """Check that calling read(0) on a ZipExtFile object returns an empty
1765        string and doesn't advance file pointer."""
1766        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1767            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1768            # read the data to make sure the file is there
1769            with zipf.open("foo.txt") as f:
1770                for i in range(FIXEDTEST_SIZE):
1771                    self.assertEqual(f.read(0), b'')
1772
1773                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1774
1775    def test_open_non_existent_item(self):
1776        """Check that attempting to call open() for an item that doesn't
1777        exist in the archive raises a RuntimeError."""
1778        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1779            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1780
1781    def test_bad_compression_mode(self):
1782        """Check that bad compression methods passed to ZipFile.open are
1783        caught."""
1784        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1785
1786    def test_unsupported_compression(self):
1787        # data is declared as shrunk, but actually deflated
1788        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1789                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1790                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1791                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1792                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1793                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1794        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1795            self.assertRaises(NotImplementedError, zipf.open, 'x')
1796
1797    def test_null_byte_in_filename(self):
1798        """Check that a filename containing a null byte is properly
1799        terminated."""
1800        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1801            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1802            self.assertEqual(zipf.namelist(), ['foo.txt'])
1803
1804    def test_struct_sizes(self):
1805        """Check that ZIP internal structure sizes are calculated correctly."""
1806        self.assertEqual(zipfile.sizeEndCentDir, 22)
1807        self.assertEqual(zipfile.sizeCentralDir, 46)
1808        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1809        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1810
1811    def test_comments(self):
1812        """Check that comments on the archive are handled properly."""
1813
1814        # check default comment is empty
1815        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1816            self.assertEqual(zipf.comment, b'')
1817            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1818
1819        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1820            self.assertEqual(zipfr.comment, b'')
1821
1822        # check a simple short comment
1823        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1824        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1825            zipf.comment = comment
1826            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1827        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1828            self.assertEqual(zipf.comment, comment)
1829
1830        # check a comment of max length
1831        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1832        comment2 = comment2.encode("ascii")
1833        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1834            zipf.comment = comment2
1835            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1836
1837        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1838            self.assertEqual(zipfr.comment, comment2)
1839
1840        # check a comment that is too long is truncated
1841        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1842            with self.assertWarns(UserWarning):
1843                zipf.comment = comment2 + b'oops'
1844            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1845        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1846            self.assertEqual(zipfr.comment, comment2)
1847
1848        # check that comments are correctly modified in append mode
1849        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1850            zipf.comment = b"original comment"
1851            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1852        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1853            zipf.comment = b"an updated comment"
1854        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1855            self.assertEqual(zipf.comment, b"an updated comment")
1856
1857        # check that comments are correctly shortened in append mode
1858        # and the file is indeed truncated
1859        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1860            zipf.comment = b"original comment that's longer"
1861            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1862        original_zip_size = os.path.getsize(TESTFN)
1863        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1864            zipf.comment = b"shorter comment"
1865        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1866        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1867            self.assertEqual(zipf.comment, b"shorter comment")
1868
1869    def test_unicode_comment(self):
1870        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1871            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1872            with self.assertRaises(TypeError):
1873                zipf.comment = "this is an error"
1874
1875    def test_change_comment_in_empty_archive(self):
1876        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1877            self.assertFalse(zipf.filelist)
1878            zipf.comment = b"this is a comment"
1879        with zipfile.ZipFile(TESTFN, "r") as zipf:
1880            self.assertEqual(zipf.comment, b"this is a comment")
1881
1882    def test_change_comment_in_nonempty_archive(self):
1883        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1884            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1885        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1886            self.assertTrue(zipf.filelist)
1887            zipf.comment = b"this is a comment"
1888        with zipfile.ZipFile(TESTFN, "r") as zipf:
1889            self.assertEqual(zipf.comment, b"this is a comment")
1890
1891    def test_empty_zipfile(self):
1892        # Check that creating a file in 'w' or 'a' mode and closing without
1893        # adding any files to the archives creates a valid empty ZIP file
1894        zipf = zipfile.ZipFile(TESTFN, mode="w")
1895        zipf.close()
1896        try:
1897            zipf = zipfile.ZipFile(TESTFN, mode="r")
1898        except zipfile.BadZipFile:
1899            self.fail("Unable to create empty ZIP file in 'w' mode")
1900
1901        zipf = zipfile.ZipFile(TESTFN, mode="a")
1902        zipf.close()
1903        try:
1904            zipf = zipfile.ZipFile(TESTFN, mode="r")
1905        except:
1906            self.fail("Unable to create empty ZIP file in 'a' mode")
1907
1908    def test_open_empty_file(self):
1909        # Issue 1710703: Check that opening a file with less than 22 bytes
1910        # raises a BadZipFile exception (rather than the previously unhelpful
1911        # OSError)
1912        f = open(TESTFN, 'w')
1913        f.close()
1914        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1915
1916    def test_create_zipinfo_before_1980(self):
1917        self.assertRaises(ValueError,
1918                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1919
1920    def test_create_empty_zipinfo_repr(self):
1921        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
1922        zi = zipfile.ZipInfo(filename="empty")
1923        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
1924
1925    def test_create_empty_zipinfo_default_attributes(self):
1926        """Ensure all required attributes are set."""
1927        zi = zipfile.ZipInfo()
1928        self.assertEqual(zi.orig_filename, "NoName")
1929        self.assertEqual(zi.filename, "NoName")
1930        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
1931        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
1932        self.assertEqual(zi.comment, b"")
1933        self.assertEqual(zi.extra, b"")
1934        self.assertIn(zi.create_system, (0, 3))
1935        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
1936        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
1937        self.assertEqual(zi.reserved, 0)
1938        self.assertEqual(zi.flag_bits, 0)
1939        self.assertEqual(zi.volume, 0)
1940        self.assertEqual(zi.internal_attr, 0)
1941        self.assertEqual(zi.external_attr, 0)
1942
1943        # Before bpo-26185, both were missing
1944        self.assertEqual(zi.file_size, 0)
1945        self.assertEqual(zi.compress_size, 0)
1946
1947    def test_zipfile_with_short_extra_field(self):
1948        """If an extra field in the header is less than 4 bytes, skip it."""
1949        zipdata = (
1950            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1951            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1952            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1953            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1954            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1955            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1956            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1957        )
1958        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1959            # testzip returns the name of the first corrupt file, or None
1960            self.assertIsNone(zipf.testzip())
1961
1962    def test_open_conflicting_handles(self):
1963        # It's only possible to open one writable file handle at a time
1964        msg1 = b"It's fun to charter an accountant!"
1965        msg2 = b"And sail the wide accountant sea"
1966        msg3 = b"To find, explore the funds offshore"
1967        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1968            with zipf.open('foo', mode='w') as w2:
1969                w2.write(msg1)
1970            with zipf.open('bar', mode='w') as w1:
1971                with self.assertRaises(ValueError):
1972                    zipf.open('handle', mode='w')
1973                with self.assertRaises(ValueError):
1974                    zipf.open('foo', mode='r')
1975                with self.assertRaises(ValueError):
1976                    zipf.writestr('str', 'abcde')
1977                with self.assertRaises(ValueError):
1978                    zipf.write(__file__, 'file')
1979                with self.assertRaises(ValueError):
1980                    zipf.close()
1981                w1.write(msg2)
1982            with zipf.open('baz', mode='w') as w2:
1983                w2.write(msg3)
1984
1985        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1986            self.assertEqual(zipf.read('foo'), msg1)
1987            self.assertEqual(zipf.read('bar'), msg2)
1988            self.assertEqual(zipf.read('baz'), msg3)
1989            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1990
1991    def test_seek_tell(self):
1992        # Test seek functionality
1993        txt = b"Where's Bruce?"
1994        bloc = txt.find(b"Bruce")
1995        # Check seek on a file
1996        with zipfile.ZipFile(TESTFN, "w") as zipf:
1997            zipf.writestr("foo.txt", txt)
1998        with zipfile.ZipFile(TESTFN, "r") as zipf:
1999            with zipf.open("foo.txt", "r") as fp:
2000                fp.seek(bloc, os.SEEK_SET)
2001                self.assertEqual(fp.tell(), bloc)
2002                fp.seek(-bloc, os.SEEK_CUR)
2003                self.assertEqual(fp.tell(), 0)
2004                fp.seek(bloc, os.SEEK_CUR)
2005                self.assertEqual(fp.tell(), bloc)
2006                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2007                fp.seek(0, os.SEEK_END)
2008                self.assertEqual(fp.tell(), len(txt))
2009                fp.seek(0, os.SEEK_SET)
2010                self.assertEqual(fp.tell(), 0)
2011        # Check seek on memory file
2012        data = io.BytesIO()
2013        with zipfile.ZipFile(data, mode="w") as zipf:
2014            zipf.writestr("foo.txt", txt)
2015        with zipfile.ZipFile(data, mode="r") as zipf:
2016            with zipf.open("foo.txt", "r") as fp:
2017                fp.seek(bloc, os.SEEK_SET)
2018                self.assertEqual(fp.tell(), bloc)
2019                fp.seek(-bloc, os.SEEK_CUR)
2020                self.assertEqual(fp.tell(), 0)
2021                fp.seek(bloc, os.SEEK_CUR)
2022                self.assertEqual(fp.tell(), bloc)
2023                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2024                fp.seek(0, os.SEEK_END)
2025                self.assertEqual(fp.tell(), len(txt))
2026                fp.seek(0, os.SEEK_SET)
2027                self.assertEqual(fp.tell(), 0)
2028
2029    @requires_bz2()
2030    def test_decompress_without_3rd_party_library(self):
2031        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2032        zip_file = io.BytesIO(data)
2033        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2034            zf.writestr('a.txt', b'a')
2035        with mock.patch('zipfile.bz2', None):
2036            with zipfile.ZipFile(zip_file) as zf:
2037                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2038
2039    def tearDown(self):
2040        unlink(TESTFN)
2041        unlink(TESTFN2)
2042
2043
2044class AbstractBadCrcTests:
2045    def test_testzip_with_bad_crc(self):
2046        """Tests that files with bad CRCs return their name from testzip."""
2047        zipdata = self.zip_with_bad_crc
2048
2049        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2050            # testzip returns the name of the first corrupt file, or None
2051            self.assertEqual('afile', zipf.testzip())
2052
2053    def test_read_with_bad_crc(self):
2054        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2055        zipdata = self.zip_with_bad_crc
2056
2057        # Using ZipFile.read()
2058        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2059            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2060
2061        # Using ZipExtFile.read()
2062        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2063            with zipf.open('afile', 'r') as corrupt_file:
2064                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2065
2066        # Same with small reads (in order to exercise the buffering logic)
2067        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2068            with zipf.open('afile', 'r') as corrupt_file:
2069                corrupt_file.MIN_READ_SIZE = 2
2070                with self.assertRaises(zipfile.BadZipFile):
2071                    while corrupt_file.read(2):
2072                        pass
2073
2074
2075class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2076    compression = zipfile.ZIP_STORED
2077    zip_with_bad_crc = (
2078        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2079        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2080        b'ilehello,AworldP'
2081        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2082        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2083        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2084        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2085        b'\0\0/\0\0\0\0\0')
2086
2087@requires_zlib()
2088class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2089    compression = zipfile.ZIP_DEFLATED
2090    zip_with_bad_crc = (
2091        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2092        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2093        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2094        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2095        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2096        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2097        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2098        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2099
2100@requires_bz2()
2101class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2102    compression = zipfile.ZIP_BZIP2
2103    zip_with_bad_crc = (
2104        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2105        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2106        b'ileBZh91AY&SY\xd4\xa8\xca'
2107        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2108        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2109        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2110        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2111        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2112        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2113        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2114        b'\x00\x00\x00\x00')
2115
2116@requires_lzma()
2117class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2118    compression = zipfile.ZIP_LZMA
2119    zip_with_bad_crc = (
2120        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2121        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2122        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2123        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2124        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2125        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2126        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2127        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2128        b'\x00>\x00\x00\x00\x00\x00')
2129
2130
2131class DecryptionTests(unittest.TestCase):
2132    """Check that ZIP decryption works. Since the library does not
2133    support encryption at the moment, we use a pre-generated encrypted
2134    ZIP file."""
2135
2136    data = (
2137        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2138        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2139        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2140        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2141        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2142        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2143        b'\x00\x00L\x00\x00\x00\x00\x00' )
2144    data2 = (
2145        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2146        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2147        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2148        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2149        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2150        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2151        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2152        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2153
2154    plain = b'zipfile.py encryption test'
2155    plain2 = b'\x00'*512
2156
2157    def setUp(self):
2158        with open(TESTFN, "wb") as fp:
2159            fp.write(self.data)
2160        self.zip = zipfile.ZipFile(TESTFN, "r")
2161        with open(TESTFN2, "wb") as fp:
2162            fp.write(self.data2)
2163        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2164
2165    def tearDown(self):
2166        self.zip.close()
2167        os.unlink(TESTFN)
2168        self.zip2.close()
2169        os.unlink(TESTFN2)
2170
2171    def test_no_password(self):
2172        # Reading the encrypted file without password
2173        # must generate a RunTime exception
2174        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2175        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2176
2177    def test_bad_password(self):
2178        self.zip.setpassword(b"perl")
2179        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2180        self.zip2.setpassword(b"perl")
2181        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2182
2183    @requires_zlib()
2184    def test_good_password(self):
2185        self.zip.setpassword(b"python")
2186        self.assertEqual(self.zip.read("test.txt"), self.plain)
2187        self.zip2.setpassword(b"12345")
2188        self.assertEqual(self.zip2.read("zero"), self.plain2)
2189
2190    def test_unicode_password(self):
2191        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2192        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2193        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2194        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2195
2196    def test_seek_tell(self):
2197        self.zip.setpassword(b"python")
2198        txt = self.plain
2199        test_word = b'encryption'
2200        bloc = txt.find(test_word)
2201        bloc_len = len(test_word)
2202        with self.zip.open("test.txt", "r") as fp:
2203            fp.seek(bloc, os.SEEK_SET)
2204            self.assertEqual(fp.tell(), bloc)
2205            fp.seek(-bloc, os.SEEK_CUR)
2206            self.assertEqual(fp.tell(), 0)
2207            fp.seek(bloc, os.SEEK_CUR)
2208            self.assertEqual(fp.tell(), bloc)
2209            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2210
2211            # Make sure that the second read after seeking back beyond
2212            # _readbuffer returns the same content (ie. rewind to the start of
2213            # the file to read forward to the required position).
2214            old_read_size = fp.MIN_READ_SIZE
2215            fp.MIN_READ_SIZE = 1
2216            fp._readbuffer = b''
2217            fp._offset = 0
2218            fp.seek(0, os.SEEK_SET)
2219            self.assertEqual(fp.tell(), 0)
2220            fp.seek(bloc, os.SEEK_CUR)
2221            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2222            fp.MIN_READ_SIZE = old_read_size
2223
2224            fp.seek(0, os.SEEK_END)
2225            self.assertEqual(fp.tell(), len(txt))
2226            fp.seek(0, os.SEEK_SET)
2227            self.assertEqual(fp.tell(), 0)
2228
2229            # Read the file completely to definitely call any eof integrity
2230            # checks (crc) and make sure they still pass.
2231            fp.read()
2232
2233
2234class AbstractTestsWithRandomBinaryFiles:
2235    @classmethod
2236    def setUpClass(cls):
2237        datacount = randint(16, 64)*1024 + randint(1, 1024)
2238        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2239                            for i in range(datacount))
2240
2241    def setUp(self):
2242        # Make a source file with some lines
2243        with open(TESTFN, "wb") as fp:
2244            fp.write(self.data)
2245
2246    def tearDown(self):
2247        unlink(TESTFN)
2248        unlink(TESTFN2)
2249
2250    def make_test_archive(self, f, compression):
2251        # Create the ZIP archive
2252        with zipfile.ZipFile(f, "w", compression) as zipfp:
2253            zipfp.write(TESTFN, "another.name")
2254            zipfp.write(TESTFN, TESTFN)
2255
2256    def zip_test(self, f, compression):
2257        self.make_test_archive(f, compression)
2258
2259        # Read the ZIP archive
2260        with zipfile.ZipFile(f, "r", compression) as zipfp:
2261            testdata = zipfp.read(TESTFN)
2262            self.assertEqual(len(testdata), len(self.data))
2263            self.assertEqual(testdata, self.data)
2264            self.assertEqual(zipfp.read("another.name"), self.data)
2265
2266    def test_read(self):
2267        for f in get_files(self):
2268            self.zip_test(f, self.compression)
2269
2270    def zip_open_test(self, f, compression):
2271        self.make_test_archive(f, compression)
2272
2273        # Read the ZIP archive
2274        with zipfile.ZipFile(f, "r", compression) as zipfp:
2275            zipdata1 = []
2276            with zipfp.open(TESTFN) as zipopen1:
2277                while True:
2278                    read_data = zipopen1.read(256)
2279                    if not read_data:
2280                        break
2281                    zipdata1.append(read_data)
2282
2283            zipdata2 = []
2284            with zipfp.open("another.name") as zipopen2:
2285                while True:
2286                    read_data = zipopen2.read(256)
2287                    if not read_data:
2288                        break
2289                    zipdata2.append(read_data)
2290
2291            testdata1 = b''.join(zipdata1)
2292            self.assertEqual(len(testdata1), len(self.data))
2293            self.assertEqual(testdata1, self.data)
2294
2295            testdata2 = b''.join(zipdata2)
2296            self.assertEqual(len(testdata2), len(self.data))
2297            self.assertEqual(testdata2, self.data)
2298
2299    def test_open(self):
2300        for f in get_files(self):
2301            self.zip_open_test(f, self.compression)
2302
2303    def zip_random_open_test(self, f, compression):
2304        self.make_test_archive(f, compression)
2305
2306        # Read the ZIP archive
2307        with zipfile.ZipFile(f, "r", compression) as zipfp:
2308            zipdata1 = []
2309            with zipfp.open(TESTFN) as zipopen1:
2310                while True:
2311                    read_data = zipopen1.read(randint(1, 1024))
2312                    if not read_data:
2313                        break
2314                    zipdata1.append(read_data)
2315
2316            testdata = b''.join(zipdata1)
2317            self.assertEqual(len(testdata), len(self.data))
2318            self.assertEqual(testdata, self.data)
2319
2320    def test_random_open(self):
2321        for f in get_files(self):
2322            self.zip_random_open_test(f, self.compression)
2323
2324
2325class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2326                                       unittest.TestCase):
2327    compression = zipfile.ZIP_STORED
2328
2329@requires_zlib()
2330class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2331                                        unittest.TestCase):
2332    compression = zipfile.ZIP_DEFLATED
2333
2334@requires_bz2()
2335class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2336                                      unittest.TestCase):
2337    compression = zipfile.ZIP_BZIP2
2338
2339@requires_lzma()
2340class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2341                                     unittest.TestCase):
2342    compression = zipfile.ZIP_LZMA
2343
2344
2345# Provide the tell() method but not seek()
2346class Tellable:
2347    def __init__(self, fp):
2348        self.fp = fp
2349        self.offset = 0
2350
2351    def write(self, data):
2352        n = self.fp.write(data)
2353        self.offset += n
2354        return n
2355
2356    def tell(self):
2357        return self.offset
2358
2359    def flush(self):
2360        self.fp.flush()
2361
2362class Unseekable:
2363    def __init__(self, fp):
2364        self.fp = fp
2365
2366    def write(self, data):
2367        return self.fp.write(data)
2368
2369    def flush(self):
2370        self.fp.flush()
2371
2372class UnseekableTests(unittest.TestCase):
2373    def test_writestr(self):
2374        for wrapper in (lambda f: f), Tellable, Unseekable:
2375            with self.subTest(wrapper=wrapper):
2376                f = io.BytesIO()
2377                f.write(b'abc')
2378                bf = io.BufferedWriter(f)
2379                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2380                    zipfp.writestr('ones', b'111')
2381                    zipfp.writestr('twos', b'222')
2382                self.assertEqual(f.getvalue()[:5], b'abcPK')
2383                with zipfile.ZipFile(f, mode='r') as zipf:
2384                    with zipf.open('ones') as zopen:
2385                        self.assertEqual(zopen.read(), b'111')
2386                    with zipf.open('twos') as zopen:
2387                        self.assertEqual(zopen.read(), b'222')
2388
2389    def test_write(self):
2390        for wrapper in (lambda f: f), Tellable, Unseekable:
2391            with self.subTest(wrapper=wrapper):
2392                f = io.BytesIO()
2393                f.write(b'abc')
2394                bf = io.BufferedWriter(f)
2395                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2396                    self.addCleanup(unlink, TESTFN)
2397                    with open(TESTFN, 'wb') as f2:
2398                        f2.write(b'111')
2399                    zipfp.write(TESTFN, 'ones')
2400                    with open(TESTFN, 'wb') as f2:
2401                        f2.write(b'222')
2402                    zipfp.write(TESTFN, 'twos')
2403                self.assertEqual(f.getvalue()[:5], b'abcPK')
2404                with zipfile.ZipFile(f, mode='r') as zipf:
2405                    with zipf.open('ones') as zopen:
2406                        self.assertEqual(zopen.read(), b'111')
2407                    with zipf.open('twos') as zopen:
2408                        self.assertEqual(zopen.read(), b'222')
2409
2410    def test_open_write(self):
2411        for wrapper in (lambda f: f), Tellable, Unseekable:
2412            with self.subTest(wrapper=wrapper):
2413                f = io.BytesIO()
2414                f.write(b'abc')
2415                bf = io.BufferedWriter(f)
2416                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2417                    with zipf.open('ones', 'w') as zopen:
2418                        zopen.write(b'111')
2419                    with zipf.open('twos', 'w') as zopen:
2420                        zopen.write(b'222')
2421                self.assertEqual(f.getvalue()[:5], b'abcPK')
2422                with zipfile.ZipFile(f) as zipf:
2423                    self.assertEqual(zipf.read('ones'), b'111')
2424                    self.assertEqual(zipf.read('twos'), b'222')
2425
2426
2427@requires_zlib()
2428class TestsWithMultipleOpens(unittest.TestCase):
2429    @classmethod
2430    def setUpClass(cls):
2431        cls.data1 = b'111' + randbytes(10000)
2432        cls.data2 = b'222' + randbytes(10000)
2433
2434    def make_test_archive(self, f):
2435        # Create the ZIP archive
2436        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2437            zipfp.writestr('ones', self.data1)
2438            zipfp.writestr('twos', self.data2)
2439
2440    def test_same_file(self):
2441        # Verify that (when the ZipFile is in control of creating file objects)
2442        # multiple open() calls can be made without interfering with each other.
2443        for f in get_files(self):
2444            self.make_test_archive(f)
2445            with zipfile.ZipFile(f, mode="r") as zipf:
2446                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2447                    data1 = zopen1.read(500)
2448                    data2 = zopen2.read(500)
2449                    data1 += zopen1.read()
2450                    data2 += zopen2.read()
2451                self.assertEqual(data1, data2)
2452                self.assertEqual(data1, self.data1)
2453
2454    def test_different_file(self):
2455        # Verify that (when the ZipFile is in control of creating file objects)
2456        # multiple open() calls can be made without interfering with each other.
2457        for f in get_files(self):
2458            self.make_test_archive(f)
2459            with zipfile.ZipFile(f, mode="r") as zipf:
2460                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2461                    data1 = zopen1.read(500)
2462                    data2 = zopen2.read(500)
2463                    data1 += zopen1.read()
2464                    data2 += zopen2.read()
2465                self.assertEqual(data1, self.data1)
2466                self.assertEqual(data2, self.data2)
2467
2468    def test_interleaved(self):
2469        # Verify that (when the ZipFile is in control of creating file objects)
2470        # multiple open() calls can be made without interfering with each other.
2471        for f in get_files(self):
2472            self.make_test_archive(f)
2473            with zipfile.ZipFile(f, mode="r") as zipf:
2474                with zipf.open('ones') as zopen1:
2475                    data1 = zopen1.read(500)
2476                    with zipf.open('twos') as zopen2:
2477                        data2 = zopen2.read(500)
2478                        data1 += zopen1.read()
2479                        data2 += zopen2.read()
2480                self.assertEqual(data1, self.data1)
2481                self.assertEqual(data2, self.data2)
2482
2483    def test_read_after_close(self):
2484        for f in get_files(self):
2485            self.make_test_archive(f)
2486            with contextlib.ExitStack() as stack:
2487                with zipfile.ZipFile(f, 'r') as zipf:
2488                    zopen1 = stack.enter_context(zipf.open('ones'))
2489                    zopen2 = stack.enter_context(zipf.open('twos'))
2490                data1 = zopen1.read(500)
2491                data2 = zopen2.read(500)
2492                data1 += zopen1.read()
2493                data2 += zopen2.read()
2494            self.assertEqual(data1, self.data1)
2495            self.assertEqual(data2, self.data2)
2496
2497    def test_read_after_write(self):
2498        for f in get_files(self):
2499            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2500                zipf.writestr('ones', self.data1)
2501                zipf.writestr('twos', self.data2)
2502                with zipf.open('ones') as zopen1:
2503                    data1 = zopen1.read(500)
2504            self.assertEqual(data1, self.data1[:500])
2505            with zipfile.ZipFile(f, 'r') as zipf:
2506                data1 = zipf.read('ones')
2507                data2 = zipf.read('twos')
2508            self.assertEqual(data1, self.data1)
2509            self.assertEqual(data2, self.data2)
2510
2511    def test_write_after_read(self):
2512        for f in get_files(self):
2513            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2514                zipf.writestr('ones', self.data1)
2515                with zipf.open('ones') as zopen1:
2516                    zopen1.read(500)
2517                    zipf.writestr('twos', self.data2)
2518            with zipfile.ZipFile(f, 'r') as zipf:
2519                data1 = zipf.read('ones')
2520                data2 = zipf.read('twos')
2521            self.assertEqual(data1, self.data1)
2522            self.assertEqual(data2, self.data2)
2523
2524    def test_many_opens(self):
2525        # Verify that read() and open() promptly close the file descriptor,
2526        # and don't rely on the garbage collector to free resources.
2527        self.make_test_archive(TESTFN2)
2528        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2529            for x in range(100):
2530                zipf.read('ones')
2531                with zipf.open('ones') as zopen1:
2532                    pass
2533        with open(os.devnull) as f:
2534            self.assertLess(f.fileno(), 100)
2535
2536    def test_write_while_reading(self):
2537        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2538            zipf.writestr('ones', self.data1)
2539        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2540            with zipf.open('ones', 'r') as r1:
2541                data1 = r1.read(500)
2542                with zipf.open('twos', 'w') as w1:
2543                    w1.write(self.data2)
2544                data1 += r1.read()
2545        self.assertEqual(data1, self.data1)
2546        with zipfile.ZipFile(TESTFN2) as zipf:
2547            self.assertEqual(zipf.read('twos'), self.data2)
2548
2549    def tearDown(self):
2550        unlink(TESTFN2)
2551
2552
2553class TestWithDirectory(unittest.TestCase):
2554    def setUp(self):
2555        os.mkdir(TESTFN2)
2556
2557    def test_extract_dir(self):
2558        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2559            zipf.extractall(TESTFN2)
2560        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2561        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2562        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2563
2564    def test_bug_6050(self):
2565        # Extraction should succeed if directories already exist
2566        os.mkdir(os.path.join(TESTFN2, "a"))
2567        self.test_extract_dir()
2568
2569    def test_write_dir(self):
2570        dirpath = os.path.join(TESTFN2, "x")
2571        os.mkdir(dirpath)
2572        mode = os.stat(dirpath).st_mode & 0xFFFF
2573        with zipfile.ZipFile(TESTFN, "w") as zipf:
2574            zipf.write(dirpath)
2575            zinfo = zipf.filelist[0]
2576            self.assertTrue(zinfo.filename.endswith("/x/"))
2577            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2578            zipf.write(dirpath, "y")
2579            zinfo = zipf.filelist[1]
2580            self.assertTrue(zinfo.filename, "y/")
2581            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2582        with zipfile.ZipFile(TESTFN, "r") as zipf:
2583            zinfo = zipf.filelist[0]
2584            self.assertTrue(zinfo.filename.endswith("/x/"))
2585            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2586            zinfo = zipf.filelist[1]
2587            self.assertTrue(zinfo.filename, "y/")
2588            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2589            target = os.path.join(TESTFN2, "target")
2590            os.mkdir(target)
2591            zipf.extractall(target)
2592            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2593            self.assertEqual(len(os.listdir(target)), 2)
2594
2595    def test_writestr_dir(self):
2596        os.mkdir(os.path.join(TESTFN2, "x"))
2597        with zipfile.ZipFile(TESTFN, "w") as zipf:
2598            zipf.writestr("x/", b'')
2599            zinfo = zipf.filelist[0]
2600            self.assertEqual(zinfo.filename, "x/")
2601            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2602        with zipfile.ZipFile(TESTFN, "r") as zipf:
2603            zinfo = zipf.filelist[0]
2604            self.assertTrue(zinfo.filename.endswith("x/"))
2605            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2606            target = os.path.join(TESTFN2, "target")
2607            os.mkdir(target)
2608            zipf.extractall(target)
2609            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2610            self.assertEqual(os.listdir(target), ["x"])
2611
2612    def tearDown(self):
2613        rmtree(TESTFN2)
2614        if os.path.exists(TESTFN):
2615            unlink(TESTFN)
2616
2617
2618class ZipInfoTests(unittest.TestCase):
2619    def test_from_file(self):
2620        zi = zipfile.ZipInfo.from_file(__file__)
2621        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2622        self.assertFalse(zi.is_dir())
2623        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2624
2625    def test_from_file_pathlike(self):
2626        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2627        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2628        self.assertFalse(zi.is_dir())
2629        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2630
2631    def test_from_file_bytes(self):
2632        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2633        self.assertEqual(posixpath.basename(zi.filename), 'test')
2634        self.assertFalse(zi.is_dir())
2635        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2636
2637    def test_from_file_fileno(self):
2638        with open(__file__, 'rb') as f:
2639            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2640            self.assertEqual(posixpath.basename(zi.filename), 'test')
2641            self.assertFalse(zi.is_dir())
2642            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2643
2644    def test_from_dir(self):
2645        dirpath = os.path.dirname(os.path.abspath(__file__))
2646        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2647        self.assertEqual(zi.filename, 'stdlib_tests/')
2648        self.assertTrue(zi.is_dir())
2649        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2650        self.assertEqual(zi.file_size, 0)
2651
2652
2653class CommandLineTest(unittest.TestCase):
2654
2655    def zipfilecmd(self, *args, **kwargs):
2656        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2657                                                      **kwargs)
2658        return out.replace(os.linesep.encode(), b'\n')
2659
2660    def zipfilecmd_failure(self, *args):
2661        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2662
2663    def test_bad_use(self):
2664        rc, out, err = self.zipfilecmd_failure()
2665        self.assertEqual(out, b'')
2666        self.assertIn(b'usage', err.lower())
2667        self.assertIn(b'error', err.lower())
2668        self.assertIn(b'required', err.lower())
2669        rc, out, err = self.zipfilecmd_failure('-l', '')
2670        self.assertEqual(out, b'')
2671        self.assertNotEqual(err.strip(), b'')
2672
2673    def test_test_command(self):
2674        zip_name = findfile('zipdir.zip')
2675        for opt in '-t', '--test':
2676            out = self.zipfilecmd(opt, zip_name)
2677            self.assertEqual(out.rstrip(), b'Done testing')
2678        zip_name = findfile('testtar.tar')
2679        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2680        self.assertEqual(out, b'')
2681
2682    def test_list_command(self):
2683        zip_name = findfile('zipdir.zip')
2684        t = io.StringIO()
2685        with zipfile.ZipFile(zip_name, 'r') as tf:
2686            tf.printdir(t)
2687        expected = t.getvalue().encode('ascii', 'backslashreplace')
2688        for opt in '-l', '--list':
2689            out = self.zipfilecmd(opt, zip_name,
2690                                  PYTHONIOENCODING='ascii:backslashreplace')
2691            self.assertEqual(out, expected)
2692
2693    @requires_zlib()
2694    def test_create_command(self):
2695        self.addCleanup(unlink, TESTFN)
2696        with open(TESTFN, 'w') as f:
2697            f.write('test 1')
2698        os.mkdir(TESTFNDIR)
2699        self.addCleanup(rmtree, TESTFNDIR)
2700        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2701            f.write('test 2')
2702        files = [TESTFN, TESTFNDIR]
2703        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2704        for opt in '-c', '--create':
2705            try:
2706                out = self.zipfilecmd(opt, TESTFN2, *files)
2707                self.assertEqual(out, b'')
2708                with zipfile.ZipFile(TESTFN2) as zf:
2709                    self.assertEqual(zf.namelist(), namelist)
2710                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2711                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2712            finally:
2713                unlink(TESTFN2)
2714
2715    def test_extract_command(self):
2716        zip_name = findfile('zipdir.zip')
2717        for opt in '-e', '--extract':
2718            with temp_dir() as extdir:
2719                out = self.zipfilecmd(opt, zip_name, extdir)
2720                self.assertEqual(out, b'')
2721                with zipfile.ZipFile(zip_name) as zf:
2722                    for zi in zf.infolist():
2723                        path = os.path.join(extdir,
2724                                    zi.filename.replace('/', os.sep))
2725                        if zi.is_dir():
2726                            self.assertTrue(os.path.isdir(path))
2727                        else:
2728                            self.assertTrue(os.path.isfile(path))
2729                            with open(path, 'rb') as f:
2730                                self.assertEqual(f.read(), zf.read(zi))
2731
2732
2733class TestExecutablePrependedZip(unittest.TestCase):
2734    """Test our ability to open zip files with an executable prepended."""
2735
2736    def setUp(self):
2737        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2738        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2739
2740    def _test_zip_works(self, name):
2741        # bpo28494 sanity check: ensure is_zipfile works on these.
2742        self.assertTrue(zipfile.is_zipfile(name),
2743                        f'is_zipfile failed on {name}')
2744        # Ensure we can operate on these via ZipFile.
2745        with zipfile.ZipFile(name) as zipfp:
2746            for n in zipfp.namelist():
2747                data = zipfp.read(n)
2748                self.assertIn(b'FAVORITE_NUMBER', data)
2749
2750    def test_read_zip_with_exe_prepended(self):
2751        self._test_zip_works(self.exe_zip)
2752
2753    def test_read_zip64_with_exe_prepended(self):
2754        self._test_zip_works(self.exe_zip64)
2755
2756    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2757    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2758                         'Test relies on #!/bin/bash working.')
2759    def test_execute_zip2(self):
2760        output = subprocess.check_output([self.exe_zip, sys.executable])
2761        self.assertIn(b'number in executable: 5', output)
2762
2763    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2764    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2765                         'Test relies on #!/bin/bash working.')
2766    def test_execute_zip64(self):
2767        output = subprocess.check_output([self.exe_zip64, sys.executable])
2768        self.assertIn(b'number in executable: 5', output)
2769
2770
2771# Poor man's technique to consume a (smallish) iterable.
2772consume = tuple
2773
2774
2775# from jaraco.itertools 5.0
2776class jaraco:
2777    class itertools:
2778        class Counter:
2779            def __init__(self, i):
2780                self.count = 0
2781                self._orig_iter = iter(i)
2782
2783            def __iter__(self):
2784                return self
2785
2786            def __next__(self):
2787                result = next(self._orig_iter)
2788                self.count += 1
2789                return result
2790
2791
2792def add_dirs(zf):
2793    """
2794    Given a writable zip file zf, inject directory entries for
2795    any directories implied by the presence of children.
2796    """
2797    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2798        zf.writestr(name, b"")
2799    return zf
2800
2801
2802def build_alpharep_fixture():
2803    """
2804    Create a zip file with this structure:
2805
2806    .
2807    ├── a.txt
2808    ├── b
2809    │   ├── c.txt
2810    │   ├── d
2811    │   │   └── e.txt
2812    │   └── f.txt
2813    └── g
2814        └── h
2815            └── i.txt
2816
2817    This fixture has the following key characteristics:
2818
2819    - a file at the root (a)
2820    - a file two levels deep (b/d/e)
2821    - multiple files in a directory (b/c, b/f)
2822    - a directory containing only a directory (g/h)
2823
2824    "alpha" because it uses alphabet
2825    "rep" because it's a representative example
2826    """
2827    data = io.BytesIO()
2828    zf = zipfile.ZipFile(data, "w")
2829    zf.writestr("a.txt", b"content of a")
2830    zf.writestr("b/c.txt", b"content of c")
2831    zf.writestr("b/d/e.txt", b"content of e")
2832    zf.writestr("b/f.txt", b"content of f")
2833    zf.writestr("g/h/i.txt", b"content of i")
2834    zf.filename = "alpharep.zip"
2835    return zf
2836
2837
2838class TestPath(unittest.TestCase):
2839    def setUp(self):
2840        self.fixtures = contextlib.ExitStack()
2841        self.addCleanup(self.fixtures.close)
2842
2843    def zipfile_alpharep(self):
2844        with self.subTest():
2845            yield build_alpharep_fixture()
2846        with self.subTest():
2847            yield add_dirs(build_alpharep_fixture())
2848
2849    def zipfile_ondisk(self):
2850        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2851        for alpharep in self.zipfile_alpharep():
2852            buffer = alpharep.fp
2853            alpharep.close()
2854            path = tmpdir / alpharep.filename
2855            with path.open("wb") as strm:
2856                strm.write(buffer.getvalue())
2857            yield path
2858
2859    def test_iterdir_and_types(self):
2860        for alpharep in self.zipfile_alpharep():
2861            root = zipfile.Path(alpharep)
2862            assert root.is_dir()
2863            a, b, g = root.iterdir()
2864            assert a.is_file()
2865            assert b.is_dir()
2866            assert g.is_dir()
2867            c, f, d = b.iterdir()
2868            assert c.is_file() and f.is_file()
2869            e, = d.iterdir()
2870            assert e.is_file()
2871            h, = g.iterdir()
2872            i, = h.iterdir()
2873            assert i.is_file()
2874
2875    def test_subdir_is_dir(self):
2876        for alpharep in self.zipfile_alpharep():
2877            root = zipfile.Path(alpharep)
2878            assert (root / 'b').is_dir()
2879            assert (root / 'b/').is_dir()
2880            assert (root / 'g').is_dir()
2881            assert (root / 'g/').is_dir()
2882
2883    def test_open(self):
2884        for alpharep in self.zipfile_alpharep():
2885            root = zipfile.Path(alpharep)
2886            a, b, g = root.iterdir()
2887            with a.open() as strm:
2888                data = strm.read()
2889            assert data == "content of a"
2890
2891    def test_read(self):
2892        for alpharep in self.zipfile_alpharep():
2893            root = zipfile.Path(alpharep)
2894            a, b, g = root.iterdir()
2895            assert a.read_text() == "content of a"
2896            assert a.read_bytes() == b"content of a"
2897
2898    def test_joinpath(self):
2899        for alpharep in self.zipfile_alpharep():
2900            root = zipfile.Path(alpharep)
2901            a = root.joinpath("a")
2902            assert a.is_file()
2903            e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2904            assert e.read_text() == "content of e"
2905
2906    def test_traverse_truediv(self):
2907        for alpharep in self.zipfile_alpharep():
2908            root = zipfile.Path(alpharep)
2909            a = root / "a"
2910            assert a.is_file()
2911            e = root / "b" / "d" / "e.txt"
2912            assert e.read_text() == "content of e"
2913
2914    def test_pathlike_construction(self):
2915        """
2916        zipfile.Path should be constructable from a path-like object
2917        """
2918        for zipfile_ondisk in self.zipfile_ondisk():
2919            pathlike = pathlib.Path(str(zipfile_ondisk))
2920            zipfile.Path(pathlike)
2921
2922    def test_traverse_pathlike(self):
2923        for alpharep in self.zipfile_alpharep():
2924            root = zipfile.Path(alpharep)
2925            root / pathlib.Path("a")
2926
2927    def test_parent(self):
2928        for alpharep in self.zipfile_alpharep():
2929            root = zipfile.Path(alpharep)
2930            assert (root / 'a').parent.at == ''
2931            assert (root / 'a' / 'b').parent.at == 'a/'
2932
2933    def test_dir_parent(self):
2934        for alpharep in self.zipfile_alpharep():
2935            root = zipfile.Path(alpharep)
2936            assert (root / 'b').parent.at == ''
2937            assert (root / 'b/').parent.at == ''
2938
2939    def test_missing_dir_parent(self):
2940        for alpharep in self.zipfile_alpharep():
2941            root = zipfile.Path(alpharep)
2942            assert (root / 'missing dir/').parent.at == ''
2943
2944    def test_mutability(self):
2945        """
2946        If the underlying zipfile is changed, the Path object should
2947        reflect that change.
2948        """
2949        for alpharep in self.zipfile_alpharep():
2950            root = zipfile.Path(alpharep)
2951            a, b, g = root.iterdir()
2952            alpharep.writestr('foo.txt', 'foo')
2953            alpharep.writestr('bar/baz.txt', 'baz')
2954            assert any(
2955                child.name == 'foo.txt'
2956                for child in root.iterdir())
2957            assert (root / 'foo.txt').read_text() == 'foo'
2958            baz, = (root / 'bar').iterdir()
2959            assert baz.read_text() == 'baz'
2960
2961    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
2962
2963    def huge_zipfile(self):
2964        """Create a read-only zipfile with a huge number of entries entries."""
2965        strm = io.BytesIO()
2966        zf = zipfile.ZipFile(strm, "w")
2967        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
2968            zf.writestr(entry, entry)
2969        zf.mode = 'r'
2970        return zf
2971
2972    def test_joinpath_constant_time(self):
2973        """
2974        Ensure joinpath on items in zipfile is linear time.
2975        """
2976        root = zipfile.Path(self.huge_zipfile())
2977        entries = jaraco.itertools.Counter(root.iterdir())
2978        for entry in entries:
2979            entry.joinpath('suffix')
2980        # Check the file iterated all items
2981        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
2982
2983    # @func_timeout.func_set_timeout(3)
2984    def test_implied_dirs_performance(self):
2985        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
2986        zipfile.CompleteDirs._implied_dirs(data)
2987
2988
2989if __name__ == "__main__":
2990    unittest.main()
2991