1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16import functools
17
18
19from tempfile import TemporaryFile
20from random import randint, random, randbytes
21
22from test.support import script_helper
23from test.support import (findfile, requires_zlib, requires_bz2,
24                          requires_lzma, captured_stdout)
25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd
26
27
28TESTFN2 = TESTFN + "2"
29TESTFNDIR = TESTFN + "d"
30FIXEDTEST_SIZE = 1000
31DATAFILES_DIR = 'zipfile_datafiles'
32
33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
34                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
35                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
36                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
37
38def get_files(test):
39    yield TESTFN2
40    with TemporaryFile() as f:
41        yield f
42        test.assertFalse(f.closed)
43    with io.BytesIO() as f:
44        yield f
45        test.assertFalse(f.closed)
46
47class AbstractTestsWithSourceFile:
48    @classmethod
49    def setUpClass(cls):
50        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
51                              (i, random()), "ascii")
52                        for i in range(FIXEDTEST_SIZE)]
53        cls.data = b''.join(cls.line_gen)
54
55    def setUp(self):
56        # Make a source file with some lines
57        with open(TESTFN, "wb") as fp:
58            fp.write(self.data)
59
60    def make_test_archive(self, f, compression, compresslevel=None):
61        kwargs = {'compression': compression, 'compresslevel': compresslevel}
62        # Create the ZIP archive
63        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
64            zipfp.write(TESTFN, "another.name")
65            zipfp.write(TESTFN, TESTFN)
66            zipfp.writestr("strfile", self.data)
67            with zipfp.open('written-open-w', mode='w') as f:
68                for line in self.line_gen:
69                    f.write(line)
70
71    def zip_test(self, f, compression, compresslevel=None):
72        self.make_test_archive(f, compression, compresslevel)
73
74        # Read the ZIP archive
75        with zipfile.ZipFile(f, "r", compression) as zipfp:
76            self.assertEqual(zipfp.read(TESTFN), self.data)
77            self.assertEqual(zipfp.read("another.name"), self.data)
78            self.assertEqual(zipfp.read("strfile"), self.data)
79
80            # Print the ZIP directory
81            fp = io.StringIO()
82            zipfp.printdir(file=fp)
83            directory = fp.getvalue()
84            lines = directory.splitlines()
85            self.assertEqual(len(lines), 5) # Number of files + header
86
87            self.assertIn('File Name', lines[0])
88            self.assertIn('Modified', lines[0])
89            self.assertIn('Size', lines[0])
90
91            fn, date, time_, size = lines[1].split()
92            self.assertEqual(fn, 'another.name')
93            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
94            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
95            self.assertEqual(size, str(len(self.data)))
96
97            # Check the namelist
98            names = zipfp.namelist()
99            self.assertEqual(len(names), 4)
100            self.assertIn(TESTFN, names)
101            self.assertIn("another.name", names)
102            self.assertIn("strfile", names)
103            self.assertIn("written-open-w", names)
104
105            # Check infolist
106            infos = zipfp.infolist()
107            names = [i.filename for i in infos]
108            self.assertEqual(len(names), 4)
109            self.assertIn(TESTFN, names)
110            self.assertIn("another.name", names)
111            self.assertIn("strfile", names)
112            self.assertIn("written-open-w", names)
113            for i in infos:
114                self.assertEqual(i.file_size, len(self.data))
115
116            # check getinfo
117            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
118                info = zipfp.getinfo(nm)
119                self.assertEqual(info.filename, nm)
120                self.assertEqual(info.file_size, len(self.data))
121
122            # Check that testzip doesn't raise an exception
123            zipfp.testzip()
124
125    def test_basic(self):
126        for f in get_files(self):
127            self.zip_test(f, self.compression)
128
129    def zip_open_test(self, f, compression):
130        self.make_test_archive(f, compression)
131
132        # Read the ZIP archive
133        with zipfile.ZipFile(f, "r", compression) as zipfp:
134            zipdata1 = []
135            with zipfp.open(TESTFN) as zipopen1:
136                while True:
137                    read_data = zipopen1.read(256)
138                    if not read_data:
139                        break
140                    zipdata1.append(read_data)
141
142            zipdata2 = []
143            with zipfp.open("another.name") as zipopen2:
144                while True:
145                    read_data = zipopen2.read(256)
146                    if not read_data:
147                        break
148                    zipdata2.append(read_data)
149
150            self.assertEqual(b''.join(zipdata1), self.data)
151            self.assertEqual(b''.join(zipdata2), self.data)
152
153    def test_open(self):
154        for f in get_files(self):
155            self.zip_open_test(f, self.compression)
156
157    def test_open_with_pathlike(self):
158        path = pathlib.Path(TESTFN2)
159        self.zip_open_test(path, self.compression)
160        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
161            self.assertIsInstance(zipfp.filename, str)
162
163    def zip_random_open_test(self, f, compression):
164        self.make_test_archive(f, compression)
165
166        # Read the ZIP archive
167        with zipfile.ZipFile(f, "r", compression) as zipfp:
168            zipdata1 = []
169            with zipfp.open(TESTFN) as zipopen1:
170                while True:
171                    read_data = zipopen1.read(randint(1, 1024))
172                    if not read_data:
173                        break
174                    zipdata1.append(read_data)
175
176            self.assertEqual(b''.join(zipdata1), self.data)
177
178    def test_random_open(self):
179        for f in get_files(self):
180            self.zip_random_open_test(f, self.compression)
181
182    def zip_read1_test(self, f, compression):
183        self.make_test_archive(f, compression)
184
185        # Read the ZIP archive
186        with zipfile.ZipFile(f, "r") as zipfp, \
187             zipfp.open(TESTFN) as zipopen:
188            zipdata = []
189            while True:
190                read_data = zipopen.read1(-1)
191                if not read_data:
192                    break
193                zipdata.append(read_data)
194
195        self.assertEqual(b''.join(zipdata), self.data)
196
197    def test_read1(self):
198        for f in get_files(self):
199            self.zip_read1_test(f, self.compression)
200
201    def zip_read1_10_test(self, f, compression):
202        self.make_test_archive(f, compression)
203
204        # Read the ZIP archive
205        with zipfile.ZipFile(f, "r") as zipfp, \
206             zipfp.open(TESTFN) as zipopen:
207            zipdata = []
208            while True:
209                read_data = zipopen.read1(10)
210                self.assertLessEqual(len(read_data), 10)
211                if not read_data:
212                    break
213                zipdata.append(read_data)
214
215        self.assertEqual(b''.join(zipdata), self.data)
216
217    def test_read1_10(self):
218        for f in get_files(self):
219            self.zip_read1_10_test(f, self.compression)
220
221    def zip_readline_read_test(self, f, compression):
222        self.make_test_archive(f, compression)
223
224        # Read the ZIP archive
225        with zipfile.ZipFile(f, "r") as zipfp, \
226             zipfp.open(TESTFN) as zipopen:
227            data = b''
228            while True:
229                read = zipopen.readline()
230                if not read:
231                    break
232                data += read
233
234                read = zipopen.read(100)
235                if not read:
236                    break
237                data += read
238
239        self.assertEqual(data, self.data)
240
241    def test_readline_read(self):
242        # Issue #7610: calls to readline() interleaved with calls to read().
243        for f in get_files(self):
244            self.zip_readline_read_test(f, self.compression)
245
246    def zip_readline_test(self, f, compression):
247        self.make_test_archive(f, compression)
248
249        # Read the ZIP archive
250        with zipfile.ZipFile(f, "r") as zipfp:
251            with zipfp.open(TESTFN) as zipopen:
252                for line in self.line_gen:
253                    linedata = zipopen.readline()
254                    self.assertEqual(linedata, line)
255
256    def test_readline(self):
257        for f in get_files(self):
258            self.zip_readline_test(f, self.compression)
259
260    def zip_readlines_test(self, f, compression):
261        self.make_test_archive(f, compression)
262
263        # Read the ZIP archive
264        with zipfile.ZipFile(f, "r") as zipfp:
265            with zipfp.open(TESTFN) as zipopen:
266                ziplines = zipopen.readlines()
267            for line, zipline in zip(self.line_gen, ziplines):
268                self.assertEqual(zipline, line)
269
270    def test_readlines(self):
271        for f in get_files(self):
272            self.zip_readlines_test(f, self.compression)
273
274    def zip_iterlines_test(self, f, compression):
275        self.make_test_archive(f, compression)
276
277        # Read the ZIP archive
278        with zipfile.ZipFile(f, "r") as zipfp:
279            with zipfp.open(TESTFN) as zipopen:
280                for line, zipline in zip(self.line_gen, zipopen):
281                    self.assertEqual(zipline, line)
282
283    def test_iterlines(self):
284        for f in get_files(self):
285            self.zip_iterlines_test(f, self.compression)
286
287    def test_low_compression(self):
288        """Check for cases where compressed data is larger than original."""
289        # Create the ZIP archive
290        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
291            zipfp.writestr("strfile", '12')
292
293        # Get an open object for strfile
294        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
295            with zipfp.open("strfile") as openobj:
296                self.assertEqual(openobj.read(1), b'1')
297                self.assertEqual(openobj.read(1), b'2')
298
299    def test_writestr_compression(self):
300        zipfp = zipfile.ZipFile(TESTFN2, "w")
301        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
302        info = zipfp.getinfo('b.txt')
303        self.assertEqual(info.compress_type, self.compression)
304
305    def test_writestr_compresslevel(self):
306        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
307        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
308        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
309                       compresslevel=2)
310
311        # Compression level follows the constructor.
312        a_info = zipfp.getinfo('a.txt')
313        self.assertEqual(a_info.compress_type, self.compression)
314        self.assertEqual(a_info._compresslevel, 1)
315
316        # Compression level is overridden.
317        b_info = zipfp.getinfo('b.txt')
318        self.assertEqual(b_info.compress_type, self.compression)
319        self.assertEqual(b_info._compresslevel, 2)
320
321    def test_read_return_size(self):
322        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
323        # than requested.
324        for test_size in (1, 4095, 4096, 4097, 16384):
325            file_size = test_size + 1
326            junk = randbytes(file_size)
327            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
328                zipf.writestr('foo', junk)
329                with zipf.open('foo', 'r') as fp:
330                    buf = fp.read(test_size)
331                    self.assertEqual(len(buf), test_size)
332
333    def test_truncated_zipfile(self):
334        fp = io.BytesIO()
335        with zipfile.ZipFile(fp, mode='w') as zipf:
336            zipf.writestr('strfile', self.data, compress_type=self.compression)
337            end_offset = fp.tell()
338        zipfiledata = fp.getvalue()
339
340        fp = io.BytesIO(zipfiledata)
341        with zipfile.ZipFile(fp) as zipf:
342            with zipf.open('strfile') as zipopen:
343                fp.truncate(end_offset - 20)
344                with self.assertRaises(EOFError):
345                    zipopen.read()
346
347        fp = io.BytesIO(zipfiledata)
348        with zipfile.ZipFile(fp) as zipf:
349            with zipf.open('strfile') as zipopen:
350                fp.truncate(end_offset - 20)
351                with self.assertRaises(EOFError):
352                    while zipopen.read(100):
353                        pass
354
355        fp = io.BytesIO(zipfiledata)
356        with zipfile.ZipFile(fp) as zipf:
357            with zipf.open('strfile') as zipopen:
358                fp.truncate(end_offset - 20)
359                with self.assertRaises(EOFError):
360                    while zipopen.read1(100):
361                        pass
362
363    def test_repr(self):
364        fname = 'file.name'
365        for f in get_files(self):
366            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
367                zipfp.write(TESTFN, fname)
368                r = repr(zipfp)
369                self.assertIn("mode='w'", r)
370
371            with zipfile.ZipFile(f, 'r') as zipfp:
372                r = repr(zipfp)
373                if isinstance(f, str):
374                    self.assertIn('filename=%r' % f, r)
375                else:
376                    self.assertIn('file=%r' % f, r)
377                self.assertIn("mode='r'", r)
378                r = repr(zipfp.getinfo(fname))
379                self.assertIn('filename=%r' % fname, r)
380                self.assertIn('filemode=', r)
381                self.assertIn('file_size=', r)
382                if self.compression != zipfile.ZIP_STORED:
383                    self.assertIn('compress_type=', r)
384                    self.assertIn('compress_size=', r)
385                with zipfp.open(fname) as zipopen:
386                    r = repr(zipopen)
387                    self.assertIn('name=%r' % fname, r)
388                    self.assertIn("mode='r'", r)
389                    if self.compression != zipfile.ZIP_STORED:
390                        self.assertIn('compress_type=', r)
391                self.assertIn('[closed]', repr(zipopen))
392            self.assertIn('[closed]', repr(zipfp))
393
394    def test_compresslevel_basic(self):
395        for f in get_files(self):
396            self.zip_test(f, self.compression, compresslevel=9)
397
398    def test_per_file_compresslevel(self):
399        """Check that files within a Zip archive can have different
400        compression levels."""
401        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
402            zipfp.write(TESTFN, 'compress_1')
403            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
404            one_info = zipfp.getinfo('compress_1')
405            nine_info = zipfp.getinfo('compress_9')
406            self.assertEqual(one_info._compresslevel, 1)
407            self.assertEqual(nine_info._compresslevel, 9)
408
409    def test_writing_errors(self):
410        class BrokenFile(io.BytesIO):
411            def write(self, data):
412                nonlocal count
413                if count is not None:
414                    if count == stop:
415                        raise OSError
416                    count += 1
417                super().write(data)
418
419        stop = 0
420        while True:
421            testfile = BrokenFile()
422            count = None
423            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
424                with zipfp.open('file1', 'w') as f:
425                    f.write(b'data1')
426                count = 0
427                try:
428                    with zipfp.open('file2', 'w') as f:
429                        f.write(b'data2')
430                except OSError:
431                    stop += 1
432                else:
433                    break
434                finally:
435                    count = None
436            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
437                self.assertEqual(zipfp.namelist(), ['file1'])
438                self.assertEqual(zipfp.read('file1'), b'data1')
439
440        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
441            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
442            self.assertEqual(zipfp.read('file1'), b'data1')
443            self.assertEqual(zipfp.read('file2'), b'data2')
444
445
446    def tearDown(self):
447        unlink(TESTFN)
448        unlink(TESTFN2)
449
450
451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
452                                unittest.TestCase):
453    compression = zipfile.ZIP_STORED
454    test_low_compression = None
455
456    def zip_test_writestr_permissions(self, f, compression):
457        # Make sure that writestr and open(... mode='w') create files with
458        # mode 0600, when they are passed a name rather than a ZipInfo
459        # instance.
460
461        self.make_test_archive(f, compression)
462        with zipfile.ZipFile(f, "r") as zipfp:
463            zinfo = zipfp.getinfo('strfile')
464            self.assertEqual(zinfo.external_attr, 0o600 << 16)
465
466            zinfo2 = zipfp.getinfo('written-open-w')
467            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
468
469    def test_writestr_permissions(self):
470        for f in get_files(self):
471            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
472
473    def test_absolute_arcnames(self):
474        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
475            zipfp.write(TESTFN, "/absolute")
476
477        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
478            self.assertEqual(zipfp.namelist(), ["absolute"])
479
480    def test_append_to_zip_file(self):
481        """Test appending to an existing zipfile."""
482        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
483            zipfp.write(TESTFN, TESTFN)
484
485        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
486            zipfp.writestr("strfile", self.data)
487            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
488
489    def test_append_to_non_zip_file(self):
490        """Test appending to an existing file that is not a zipfile."""
491        # NOTE: this test fails if len(d) < 22 because of the first
492        # line "fpin.seek(-22, 2)" in _EndRecData
493        data = b'I am not a ZipFile!'*10
494        with open(TESTFN2, 'wb') as f:
495            f.write(data)
496
497        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
498            zipfp.write(TESTFN, TESTFN)
499
500        with open(TESTFN2, 'rb') as f:
501            f.seek(len(data))
502            with zipfile.ZipFile(f, "r") as zipfp:
503                self.assertEqual(zipfp.namelist(), [TESTFN])
504                self.assertEqual(zipfp.read(TESTFN), self.data)
505        with open(TESTFN2, 'rb') as f:
506            self.assertEqual(f.read(len(data)), data)
507            zipfiledata = f.read()
508        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
509            self.assertEqual(zipfp.namelist(), [TESTFN])
510            self.assertEqual(zipfp.read(TESTFN), self.data)
511
512    def test_read_concatenated_zip_file(self):
513        with io.BytesIO() as bio:
514            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
515                zipfp.write(TESTFN, TESTFN)
516            zipfiledata = bio.getvalue()
517        data = b'I am not a ZipFile!'*10
518        with open(TESTFN2, 'wb') as f:
519            f.write(data)
520            f.write(zipfiledata)
521
522        with zipfile.ZipFile(TESTFN2) as zipfp:
523            self.assertEqual(zipfp.namelist(), [TESTFN])
524            self.assertEqual(zipfp.read(TESTFN), self.data)
525
526    def test_append_to_concatenated_zip_file(self):
527        with io.BytesIO() as bio:
528            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
529                zipfp.write(TESTFN, TESTFN)
530            zipfiledata = bio.getvalue()
531        data = b'I am not a ZipFile!'*1000000
532        with open(TESTFN2, 'wb') as f:
533            f.write(data)
534            f.write(zipfiledata)
535
536        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
537            self.assertEqual(zipfp.namelist(), [TESTFN])
538            zipfp.writestr('strfile', self.data)
539
540        with open(TESTFN2, 'rb') as f:
541            self.assertEqual(f.read(len(data)), data)
542            zipfiledata = f.read()
543        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
544            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
545            self.assertEqual(zipfp.read(TESTFN), self.data)
546            self.assertEqual(zipfp.read('strfile'), self.data)
547
548    def test_ignores_newline_at_end(self):
549        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
550            zipfp.write(TESTFN, TESTFN)
551        with open(TESTFN2, 'a', encoding='utf-8') as f:
552            f.write("\r\n\00\00\00")
553        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
554            self.assertIsInstance(zipfp, zipfile.ZipFile)
555
556    def test_ignores_stuff_appended_past_comments(self):
557        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
558            zipfp.comment = b"this is a comment"
559            zipfp.write(TESTFN, TESTFN)
560        with open(TESTFN2, 'a', encoding='utf-8') as f:
561            f.write("abcdef\r\n")
562        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
563            self.assertIsInstance(zipfp, zipfile.ZipFile)
564            self.assertEqual(zipfp.comment, b"this is a comment")
565
566    def test_write_default_name(self):
567        """Check that calling ZipFile.write without arcname specified
568        produces the expected result."""
569        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
570            zipfp.write(TESTFN)
571            with open(TESTFN, "rb") as f:
572                self.assertEqual(zipfp.read(TESTFN), f.read())
573
574    def test_io_on_closed_zipextfile(self):
575        fname = "somefile.txt"
576        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
577            zipfp.writestr(fname, "bogus")
578
579        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
580            with zipfp.open(fname) as fid:
581                fid.close()
582                self.assertRaises(ValueError, fid.read)
583                self.assertRaises(ValueError, fid.seek, 0)
584                self.assertRaises(ValueError, fid.tell)
585                self.assertRaises(ValueError, fid.readable)
586                self.assertRaises(ValueError, fid.seekable)
587
588    def test_write_to_readonly(self):
589        """Check that trying to call write() on a readonly ZipFile object
590        raises a ValueError."""
591        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
592            zipfp.writestr("somefile.txt", "bogus")
593
594        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
595            self.assertRaises(ValueError, zipfp.write, TESTFN)
596
597        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
598            with self.assertRaises(ValueError):
599                zipfp.open(TESTFN, mode='w')
600
601    def test_add_file_before_1980(self):
602        # Set atime and mtime to 1970-01-01
603        os.utime(TESTFN, (0, 0))
604        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
605            self.assertRaises(ValueError, zipfp.write, TESTFN)
606
607        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
608            zipfp.write(TESTFN)
609            zinfo = zipfp.getinfo(TESTFN)
610            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
611
612    def test_add_file_after_2107(self):
613        # Set atime and mtime to 2108-12-30
614        ts = 4386268800
615        try:
616            time.localtime(ts)
617        except OverflowError:
618            self.skipTest(f'time.localtime({ts}) raises OverflowError')
619        try:
620            os.utime(TESTFN, (ts, ts))
621        except OverflowError:
622            self.skipTest('Host fs cannot set timestamp to required value.')
623
624        mtime_ns = os.stat(TESTFN).st_mtime_ns
625        if mtime_ns != (4386268800 * 10**9):
626            # XFS filesystem is limited to 32-bit timestamp, but the syscall
627            # didn't fail. Moreover, there is a VFS bug which returns
628            # a cached timestamp which is different than the value on disk.
629            #
630            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
631            #
632            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
633            # https://bugs.python.org/issue39460#msg360952
634            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
635
636        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
637            self.assertRaises(struct.error, zipfp.write, TESTFN)
638
639        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
640            zipfp.write(TESTFN)
641            zinfo = zipfp.getinfo(TESTFN)
642            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
643
644
645@requires_zlib()
646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
647                                 unittest.TestCase):
648    compression = zipfile.ZIP_DEFLATED
649
650    def test_per_file_compression(self):
651        """Check that files within a Zip archive can have different
652        compression options."""
653        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
654            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
655            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
656            sinfo = zipfp.getinfo('storeme')
657            dinfo = zipfp.getinfo('deflateme')
658            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
659            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
660
661@requires_bz2()
662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
663                               unittest.TestCase):
664    compression = zipfile.ZIP_BZIP2
665
666@requires_lzma()
667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
668                              unittest.TestCase):
669    compression = zipfile.ZIP_LZMA
670
671
672class AbstractTestZip64InSmallFiles:
673    # These tests test the ZIP64 functionality without using large files,
674    # see test_zipfile64 for proper tests.
675
676    @classmethod
677    def setUpClass(cls):
678        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
679                    for i in range(0, FIXEDTEST_SIZE))
680        cls.data = b'\n'.join(line_gen)
681
682    def setUp(self):
683        self._limit = zipfile.ZIP64_LIMIT
684        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
685        zipfile.ZIP64_LIMIT = 1000
686        zipfile.ZIP_FILECOUNT_LIMIT = 9
687
688        # Make a source file with some lines
689        with open(TESTFN, "wb") as fp:
690            fp.write(self.data)
691
692    def zip_test(self, f, compression):
693        # Create the ZIP archive
694        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
695            zipfp.write(TESTFN, "another.name")
696            zipfp.write(TESTFN, TESTFN)
697            zipfp.writestr("strfile", self.data)
698
699        # Read the ZIP archive
700        with zipfile.ZipFile(f, "r", compression) as zipfp:
701            self.assertEqual(zipfp.read(TESTFN), self.data)
702            self.assertEqual(zipfp.read("another.name"), self.data)
703            self.assertEqual(zipfp.read("strfile"), self.data)
704
705            # Print the ZIP directory
706            fp = io.StringIO()
707            zipfp.printdir(fp)
708
709            directory = fp.getvalue()
710            lines = directory.splitlines()
711            self.assertEqual(len(lines), 4) # Number of files + header
712
713            self.assertIn('File Name', lines[0])
714            self.assertIn('Modified', lines[0])
715            self.assertIn('Size', lines[0])
716
717            fn, date, time_, size = lines[1].split()
718            self.assertEqual(fn, 'another.name')
719            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
720            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
721            self.assertEqual(size, str(len(self.data)))
722
723            # Check the namelist
724            names = zipfp.namelist()
725            self.assertEqual(len(names), 3)
726            self.assertIn(TESTFN, names)
727            self.assertIn("another.name", names)
728            self.assertIn("strfile", names)
729
730            # Check infolist
731            infos = zipfp.infolist()
732            names = [i.filename for i in infos]
733            self.assertEqual(len(names), 3)
734            self.assertIn(TESTFN, names)
735            self.assertIn("another.name", names)
736            self.assertIn("strfile", names)
737            for i in infos:
738                self.assertEqual(i.file_size, len(self.data))
739
740            # check getinfo
741            for nm in (TESTFN, "another.name", "strfile"):
742                info = zipfp.getinfo(nm)
743                self.assertEqual(info.filename, nm)
744                self.assertEqual(info.file_size, len(self.data))
745
746            # Check that testzip doesn't raise an exception
747            zipfp.testzip()
748
749    def test_basic(self):
750        for f in get_files(self):
751            self.zip_test(f, self.compression)
752
753    def test_too_many_files(self):
754        # This test checks that more than 64k files can be added to an archive,
755        # and that the resulting archive can be read properly by ZipFile
756        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
757                               allowZip64=True)
758        zipf.debug = 100
759        numfiles = 15
760        for i in range(numfiles):
761            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
762        self.assertEqual(len(zipf.namelist()), numfiles)
763        zipf.close()
764
765        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
766        self.assertEqual(len(zipf2.namelist()), numfiles)
767        for i in range(numfiles):
768            content = zipf2.read("foo%08d" % i).decode('ascii')
769            self.assertEqual(content, "%d" % (i**3 % 57))
770        zipf2.close()
771
772    def test_too_many_files_append(self):
773        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
774                               allowZip64=False)
775        zipf.debug = 100
776        numfiles = 9
777        for i in range(numfiles):
778            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
779        self.assertEqual(len(zipf.namelist()), numfiles)
780        with self.assertRaises(zipfile.LargeZipFile):
781            zipf.writestr("foo%08d" % numfiles, b'')
782        self.assertEqual(len(zipf.namelist()), numfiles)
783        zipf.close()
784
785        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
786                               allowZip64=False)
787        zipf.debug = 100
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        with self.assertRaises(zipfile.LargeZipFile):
790            zipf.writestr("foo%08d" % numfiles, b'')
791        self.assertEqual(len(zipf.namelist()), numfiles)
792        zipf.close()
793
794        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
795                               allowZip64=True)
796        zipf.debug = 100
797        self.assertEqual(len(zipf.namelist()), numfiles)
798        numfiles2 = 15
799        for i in range(numfiles, numfiles2):
800            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
801        self.assertEqual(len(zipf.namelist()), numfiles2)
802        zipf.close()
803
804        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
805        self.assertEqual(len(zipf2.namelist()), numfiles2)
806        for i in range(numfiles2):
807            content = zipf2.read("foo%08d" % i).decode('ascii')
808            self.assertEqual(content, "%d" % (i**3 % 57))
809        zipf2.close()
810
811    def tearDown(self):
812        zipfile.ZIP64_LIMIT = self._limit
813        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
814        unlink(TESTFN)
815        unlink(TESTFN2)
816
817
818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
819                                  unittest.TestCase):
820    compression = zipfile.ZIP_STORED
821
822    def large_file_exception_test(self, f, compression):
823        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
824            self.assertRaises(zipfile.LargeZipFile,
825                              zipfp.write, TESTFN, "another.name")
826
827    def large_file_exception_test2(self, f, compression):
828        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
829            self.assertRaises(zipfile.LargeZipFile,
830                              zipfp.writestr, "another.name", self.data)
831
832    def test_large_file_exception(self):
833        for f in get_files(self):
834            self.large_file_exception_test(f, zipfile.ZIP_STORED)
835            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
836
837    def test_absolute_arcnames(self):
838        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
839                             allowZip64=True) as zipfp:
840            zipfp.write(TESTFN, "/absolute")
841
842        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
843            self.assertEqual(zipfp.namelist(), ["absolute"])
844
845    def test_append(self):
846        # Test that appending to the Zip64 archive doesn't change
847        # extra fields of existing entries.
848        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
849            zipfp.writestr("strfile", self.data)
850        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
851            zinfo = zipfp.getinfo("strfile")
852            extra = zinfo.extra
853        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
854            zipfp.writestr("strfile2", self.data)
855        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
856            zinfo = zipfp.getinfo("strfile")
857            self.assertEqual(zinfo.extra, extra)
858
859    def make_zip64_file(
860        self, file_size_64_set=False, file_size_extra=False,
861        compress_size_64_set=False, compress_size_extra=False,
862        header_offset_64_set=False, header_offset_extra=False,
863    ):
864        """Generate bytes sequence for a zip with (incomplete) zip64 data.
865
866        The actual values (not the zip 64 0xffffffff values) stored in the file
867        are:
868        file_size: 8
869        compress_size: 8
870        header_offset: 0
871        """
872        actual_size = 8
873        actual_header_offset = 0
874        local_zip64_fields = []
875        central_zip64_fields = []
876
877        file_size = actual_size
878        if file_size_64_set:
879            file_size = 0xffffffff
880            if file_size_extra:
881                local_zip64_fields.append(actual_size)
882                central_zip64_fields.append(actual_size)
883        file_size = struct.pack("<L", file_size)
884
885        compress_size = actual_size
886        if compress_size_64_set:
887            compress_size = 0xffffffff
888            if compress_size_extra:
889                local_zip64_fields.append(actual_size)
890                central_zip64_fields.append(actual_size)
891        compress_size = struct.pack("<L", compress_size)
892
893        header_offset = actual_header_offset
894        if header_offset_64_set:
895            header_offset = 0xffffffff
896            if header_offset_extra:
897                central_zip64_fields.append(actual_header_offset)
898        header_offset = struct.pack("<L", header_offset)
899
900        local_extra = struct.pack(
901            '<HH' + 'Q'*len(local_zip64_fields),
902            0x0001,
903            8*len(local_zip64_fields),
904            *local_zip64_fields
905        )
906
907        central_extra = struct.pack(
908            '<HH' + 'Q'*len(central_zip64_fields),
909            0x0001,
910            8*len(central_zip64_fields),
911            *central_zip64_fields
912        )
913
914        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
915        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
916
917        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
918        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
919
920        filename = b"test.txt"
921        content = b"test1234"
922        filename_length = struct.pack("<H", len(filename))
923        zip64_contents = (
924            # Local file header
925            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
926            + compress_size
927            + file_size
928            + filename_length
929            + local_extra_length
930            + filename
931            + local_extra
932            + content
933            # Central directory:
934            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
935            + compress_size
936            + file_size
937            + filename_length
938            + central_extra_length
939            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
940            + header_offset
941            + filename
942            + central_extra
943            # Zip64 end of central directory
944            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
945            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
946            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
947            + central_dir_size
948            + offset_to_central_dir
949            # Zip64 end of central directory locator
950            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
951            + b"\x00\x00\x00"
952            # end of central directory
953            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
954            + b"\x00\x00\x00\x00"
955        )
956        return zip64_contents
957
958    def test_bad_zip64_extra(self):
959        """Missing zip64 extra records raises an exception.
960
961        There are 4 fields that the zip64 format handles (the disk number is
962        not used in this module and so is ignored here). According to the zip
963        spec:
964              The order of the fields in the zip64 extended
965              information record is fixed, but the fields MUST
966              only appear if the corresponding Local or Central
967              directory record field is set to 0xFFFF or 0xFFFFFFFF.
968
969        If the zip64 extra content doesn't contain enough entries for the
970        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
971        This test mismatches the length of the zip64 extra field and the number
972        of fields set to indicate the presence of zip64 data.
973        """
974        # zip64 file size present, no fields in extra, expecting one, equals
975        # missing file size.
976        missing_file_size_extra = self.make_zip64_file(
977            file_size_64_set=True,
978        )
979        with self.assertRaises(zipfile.BadZipFile) as e:
980            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
981        self.assertIn('file size', str(e.exception).lower())
982
983        # zip64 file size present, zip64 compress size present, one field in
984        # extra, expecting two, equals missing compress size.
985        missing_compress_size_extra = self.make_zip64_file(
986            file_size_64_set=True,
987            file_size_extra=True,
988            compress_size_64_set=True,
989        )
990        with self.assertRaises(zipfile.BadZipFile) as e:
991            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
992        self.assertIn('compress size', str(e.exception).lower())
993
994        # zip64 compress size present, no fields in extra, expecting one,
995        # equals missing compress size.
996        missing_compress_size_extra = self.make_zip64_file(
997            compress_size_64_set=True,
998        )
999        with self.assertRaises(zipfile.BadZipFile) as e:
1000            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1001        self.assertIn('compress size', str(e.exception).lower())
1002
1003        # zip64 file size present, zip64 compress size present, zip64 header
1004        # offset present, two fields in extra, expecting three, equals missing
1005        # header offset
1006        missing_header_offset_extra = self.make_zip64_file(
1007            file_size_64_set=True,
1008            file_size_extra=True,
1009            compress_size_64_set=True,
1010            compress_size_extra=True,
1011            header_offset_64_set=True,
1012        )
1013        with self.assertRaises(zipfile.BadZipFile) as e:
1014            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1015        self.assertIn('header offset', str(e.exception).lower())
1016
1017        # zip64 compress size present, zip64 header offset present, one field
1018        # in extra, expecting two, equals missing header offset
1019        missing_header_offset_extra = self.make_zip64_file(
1020            file_size_64_set=False,
1021            compress_size_64_set=True,
1022            compress_size_extra=True,
1023            header_offset_64_set=True,
1024        )
1025        with self.assertRaises(zipfile.BadZipFile) as e:
1026            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1027        self.assertIn('header offset', str(e.exception).lower())
1028
1029        # zip64 file size present, zip64 header offset present, one field in
1030        # extra, expecting two, equals missing header offset
1031        missing_header_offset_extra = self.make_zip64_file(
1032            file_size_64_set=True,
1033            file_size_extra=True,
1034            compress_size_64_set=False,
1035            header_offset_64_set=True,
1036        )
1037        with self.assertRaises(zipfile.BadZipFile) as e:
1038            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1039        self.assertIn('header offset', str(e.exception).lower())
1040
1041        # zip64 header offset present, no fields in extra, expecting one,
1042        # equals missing header offset
1043        missing_header_offset_extra = self.make_zip64_file(
1044            file_size_64_set=False,
1045            compress_size_64_set=False,
1046            header_offset_64_set=True,
1047        )
1048        with self.assertRaises(zipfile.BadZipFile) as e:
1049            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1050        self.assertIn('header offset', str(e.exception).lower())
1051
1052    def test_generated_valid_zip64_extra(self):
1053        # These values are what is set in the make_zip64_file method.
1054        expected_file_size = 8
1055        expected_compress_size = 8
1056        expected_header_offset = 0
1057        expected_content = b"test1234"
1058
1059        # Loop through the various valid combinations of zip64 masks
1060        # present and extra fields present.
1061        params = (
1062            {"file_size_64_set": True, "file_size_extra": True},
1063            {"compress_size_64_set": True, "compress_size_extra": True},
1064            {"header_offset_64_set": True, "header_offset_extra": True},
1065        )
1066
1067        for r in range(1, len(params) + 1):
1068            for combo in itertools.combinations(params, r):
1069                kwargs = {}
1070                for c in combo:
1071                    kwargs.update(c)
1072                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1073                    zinfo = zf.infolist()[0]
1074                    self.assertEqual(zinfo.file_size, expected_file_size)
1075                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1076                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1077                    self.assertEqual(zf.read(zinfo), expected_content)
1078
1079
1080@requires_zlib()
1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1082                                   unittest.TestCase):
1083    compression = zipfile.ZIP_DEFLATED
1084
1085@requires_bz2()
1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1087                                 unittest.TestCase):
1088    compression = zipfile.ZIP_BZIP2
1089
1090@requires_lzma()
1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1092                                unittest.TestCase):
1093    compression = zipfile.ZIP_LZMA
1094
1095
1096class AbstractWriterTests:
1097
1098    def tearDown(self):
1099        unlink(TESTFN2)
1100
1101    def test_close_after_close(self):
1102        data = b'content'
1103        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1104            w = zipf.open('test', 'w')
1105            w.write(data)
1106            w.close()
1107            self.assertTrue(w.closed)
1108            w.close()
1109            self.assertTrue(w.closed)
1110            self.assertEqual(zipf.read('test'), data)
1111
1112    def test_write_after_close(self):
1113        data = b'content'
1114        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1115            w = zipf.open('test', 'w')
1116            w.write(data)
1117            w.close()
1118            self.assertTrue(w.closed)
1119            self.assertRaises(ValueError, w.write, b'')
1120            self.assertEqual(zipf.read('test'), data)
1121
1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1123    compression = zipfile.ZIP_STORED
1124
1125@requires_zlib()
1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1127    compression = zipfile.ZIP_DEFLATED
1128
1129@requires_bz2()
1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1131    compression = zipfile.ZIP_BZIP2
1132
1133@requires_lzma()
1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1135    compression = zipfile.ZIP_LZMA
1136
1137
1138class PyZipFileTests(unittest.TestCase):
1139    def assertCompiledIn(self, name, namelist):
1140        if name + 'o' not in namelist:
1141            self.assertIn(name + 'c', namelist)
1142
1143    def requiresWriteAccess(self, path):
1144        # effective_ids unavailable on windows
1145        if not os.access(path, os.W_OK,
1146                         effective_ids=os.access in os.supports_effective_ids):
1147            self.skipTest('requires write access to the installed location')
1148        filename = os.path.join(path, 'test_zipfile.try')
1149        try:
1150            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1151            os.close(fd)
1152        except Exception:
1153            self.skipTest('requires write access to the installed location')
1154        unlink(filename)
1155
1156    def test_write_pyfile(self):
1157        self.requiresWriteAccess(os.path.dirname(__file__))
1158        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1159            fn = __file__
1160            if fn.endswith('.pyc'):
1161                path_split = fn.split(os.sep)
1162                if os.altsep is not None:
1163                    path_split.extend(fn.split(os.altsep))
1164                if '__pycache__' in path_split:
1165                    fn = importlib.util.source_from_cache(fn)
1166                else:
1167                    fn = fn[:-1]
1168
1169            zipfp.writepy(fn)
1170
1171            bn = os.path.basename(fn)
1172            self.assertNotIn(bn, zipfp.namelist())
1173            self.assertCompiledIn(bn, zipfp.namelist())
1174
1175        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1176            fn = __file__
1177            if fn.endswith('.pyc'):
1178                fn = fn[:-1]
1179
1180            zipfp.writepy(fn, "testpackage")
1181
1182            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1183            self.assertNotIn(bn, zipfp.namelist())
1184            self.assertCompiledIn(bn, zipfp.namelist())
1185
1186    def test_write_python_package(self):
1187        import email
1188        packagedir = os.path.dirname(email.__file__)
1189        self.requiresWriteAccess(packagedir)
1190
1191        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1192            zipfp.writepy(packagedir)
1193
1194            # Check for a couple of modules at different levels of the
1195            # hierarchy
1196            names = zipfp.namelist()
1197            self.assertCompiledIn('email/__init__.py', names)
1198            self.assertCompiledIn('email/mime/text.py', names)
1199
1200    def test_write_filtered_python_package(self):
1201        import test
1202        packagedir = os.path.dirname(test.__file__)
1203        self.requiresWriteAccess(packagedir)
1204
1205        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1206
1207            # first make sure that the test folder gives error messages
1208            # (on the badsyntax_... files)
1209            with captured_stdout() as reportSIO:
1210                zipfp.writepy(packagedir)
1211            reportStr = reportSIO.getvalue()
1212            self.assertTrue('SyntaxError' in reportStr)
1213
1214            # then check that the filter works on the whole package
1215            with captured_stdout() as reportSIO:
1216                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1217            reportStr = reportSIO.getvalue()
1218            self.assertTrue('SyntaxError' not in reportStr)
1219
1220            # then check that the filter works on individual files
1221            def filter(path):
1222                return not os.path.basename(path).startswith("bad")
1223            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1224                zipfp.writepy(packagedir, filterfunc=filter)
1225            reportStr = reportSIO.getvalue()
1226            if reportStr:
1227                print(reportStr)
1228            self.assertTrue('SyntaxError' not in reportStr)
1229
1230    def test_write_with_optimization(self):
1231        import email
1232        packagedir = os.path.dirname(email.__file__)
1233        self.requiresWriteAccess(packagedir)
1234        optlevel = 1 if __debug__ else 0
1235        ext = '.pyc'
1236
1237        with TemporaryFile() as t, \
1238             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1239            zipfp.writepy(packagedir)
1240
1241            names = zipfp.namelist()
1242            self.assertIn('email/__init__' + ext, names)
1243            self.assertIn('email/mime/text' + ext, names)
1244
1245    def test_write_python_directory(self):
1246        os.mkdir(TESTFN2)
1247        try:
1248            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1249                fp.write("print(42)\n")
1250
1251            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1252                fp.write("print(42 * 42)\n")
1253
1254            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1255                fp.write("bla bla bla\n")
1256
1257            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1258                zipfp.writepy(TESTFN2)
1259
1260                names = zipfp.namelist()
1261                self.assertCompiledIn('mod1.py', names)
1262                self.assertCompiledIn('mod2.py', names)
1263                self.assertNotIn('mod2.txt', names)
1264
1265        finally:
1266            rmtree(TESTFN2)
1267
1268    def test_write_python_directory_filtered(self):
1269        os.mkdir(TESTFN2)
1270        try:
1271            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1272                fp.write("print(42)\n")
1273
1274            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1275                fp.write("print(42 * 42)\n")
1276
1277            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1278                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1279                                                  not fn.endswith('mod2.py'))
1280
1281                names = zipfp.namelist()
1282                self.assertCompiledIn('mod1.py', names)
1283                self.assertNotIn('mod2.py', names)
1284
1285        finally:
1286            rmtree(TESTFN2)
1287
1288    def test_write_non_pyfile(self):
1289        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1290            with open(TESTFN, 'w', encoding='utf-8') as f:
1291                f.write('most definitely not a python file')
1292            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1293            unlink(TESTFN)
1294
1295    def test_write_pyfile_bad_syntax(self):
1296        os.mkdir(TESTFN2)
1297        try:
1298            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1299                fp.write("Bad syntax in python file\n")
1300
1301            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1302                # syntax errors are printed to stdout
1303                with captured_stdout() as s:
1304                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1305
1306                self.assertIn("SyntaxError", s.getvalue())
1307
1308                # as it will not have compiled the python file, it will
1309                # include the .py file not .pyc
1310                names = zipfp.namelist()
1311                self.assertIn('mod1.py', names)
1312                self.assertNotIn('mod1.pyc', names)
1313
1314        finally:
1315            rmtree(TESTFN2)
1316
1317    def test_write_pathlike(self):
1318        os.mkdir(TESTFN2)
1319        try:
1320            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1321                fp.write("print(42)\n")
1322
1323            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1324                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1325                names = zipfp.namelist()
1326                self.assertCompiledIn('mod1.py', names)
1327        finally:
1328            rmtree(TESTFN2)
1329
1330
1331class ExtractTests(unittest.TestCase):
1332
1333    def make_test_file(self):
1334        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1335            for fpath, fdata in SMALL_TEST_DATA:
1336                zipfp.writestr(fpath, fdata)
1337
1338    def test_extract(self):
1339        with temp_cwd():
1340            self.make_test_file()
1341            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1342                for fpath, fdata in SMALL_TEST_DATA:
1343                    writtenfile = zipfp.extract(fpath)
1344
1345                    # make sure it was written to the right place
1346                    correctfile = os.path.join(os.getcwd(), fpath)
1347                    correctfile = os.path.normpath(correctfile)
1348
1349                    self.assertEqual(writtenfile, correctfile)
1350
1351                    # make sure correct data is in correct file
1352                    with open(writtenfile, "rb") as f:
1353                        self.assertEqual(fdata.encode(), f.read())
1354
1355                    unlink(writtenfile)
1356
1357    def _test_extract_with_target(self, target):
1358        self.make_test_file()
1359        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1360            for fpath, fdata in SMALL_TEST_DATA:
1361                writtenfile = zipfp.extract(fpath, target)
1362
1363                # make sure it was written to the right place
1364                correctfile = os.path.join(target, fpath)
1365                correctfile = os.path.normpath(correctfile)
1366                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1367
1368                # make sure correct data is in correct file
1369                with open(writtenfile, "rb") as f:
1370                    self.assertEqual(fdata.encode(), f.read())
1371
1372                unlink(writtenfile)
1373
1374        unlink(TESTFN2)
1375
1376    def test_extract_with_target(self):
1377        with temp_dir() as extdir:
1378            self._test_extract_with_target(extdir)
1379
1380    def test_extract_with_target_pathlike(self):
1381        with temp_dir() as extdir:
1382            self._test_extract_with_target(pathlib.Path(extdir))
1383
1384    def test_extract_all(self):
1385        with temp_cwd():
1386            self.make_test_file()
1387            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1388                zipfp.extractall()
1389                for fpath, fdata in SMALL_TEST_DATA:
1390                    outfile = os.path.join(os.getcwd(), fpath)
1391
1392                    with open(outfile, "rb") as f:
1393                        self.assertEqual(fdata.encode(), f.read())
1394
1395                    unlink(outfile)
1396
1397    def _test_extract_all_with_target(self, target):
1398        self.make_test_file()
1399        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1400            zipfp.extractall(target)
1401            for fpath, fdata in SMALL_TEST_DATA:
1402                outfile = os.path.join(target, fpath)
1403
1404                with open(outfile, "rb") as f:
1405                    self.assertEqual(fdata.encode(), f.read())
1406
1407                unlink(outfile)
1408
1409        unlink(TESTFN2)
1410
1411    def test_extract_all_with_target(self):
1412        with temp_dir() as extdir:
1413            self._test_extract_all_with_target(extdir)
1414
1415    def test_extract_all_with_target_pathlike(self):
1416        with temp_dir() as extdir:
1417            self._test_extract_all_with_target(pathlib.Path(extdir))
1418
1419    def check_file(self, filename, content):
1420        self.assertTrue(os.path.isfile(filename))
1421        with open(filename, 'rb') as f:
1422            self.assertEqual(f.read(), content)
1423
1424    def test_sanitize_windows_name(self):
1425        san = zipfile.ZipFile._sanitize_windows_name
1426        # Passing pathsep in allows this test to work regardless of platform.
1427        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1428        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1429        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1430
1431    def test_extract_hackers_arcnames_common_cases(self):
1432        common_hacknames = [
1433            ('../foo/bar', 'foo/bar'),
1434            ('foo/../bar', 'foo/bar'),
1435            ('foo/../../bar', 'foo/bar'),
1436            ('foo/bar/..', 'foo/bar'),
1437            ('./../foo/bar', 'foo/bar'),
1438            ('/foo/bar', 'foo/bar'),
1439            ('/foo/../bar', 'foo/bar'),
1440            ('/foo/../../bar', 'foo/bar'),
1441        ]
1442        self._test_extract_hackers_arcnames(common_hacknames)
1443
1444    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1445    def test_extract_hackers_arcnames_windows_only(self):
1446        """Test combination of path fixing and windows name sanitization."""
1447        windows_hacknames = [
1448            (r'..\foo\bar', 'foo/bar'),
1449            (r'..\/foo\/bar', 'foo/bar'),
1450            (r'foo/\..\/bar', 'foo/bar'),
1451            (r'foo\/../\bar', 'foo/bar'),
1452            (r'C:foo/bar', 'foo/bar'),
1453            (r'C:/foo/bar', 'foo/bar'),
1454            (r'C://foo/bar', 'foo/bar'),
1455            (r'C:\foo\bar', 'foo/bar'),
1456            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1457            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1458            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1459            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1460            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1461            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1462            (r'//?/C:/foo/bar', 'foo/bar'),
1463            (r'\\?\C:\foo\bar', 'foo/bar'),
1464            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1465            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1466            ('../../foo../../ba..r', 'foo/ba..r'),
1467        ]
1468        self._test_extract_hackers_arcnames(windows_hacknames)
1469
1470    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1471    def test_extract_hackers_arcnames_posix_only(self):
1472        posix_hacknames = [
1473            ('//foo/bar', 'foo/bar'),
1474            ('../../foo../../ba..r', 'foo../ba..r'),
1475            (r'foo/..\bar', r'foo/..\bar'),
1476        ]
1477        self._test_extract_hackers_arcnames(posix_hacknames)
1478
1479    def _test_extract_hackers_arcnames(self, hacknames):
1480        for arcname, fixedname in hacknames:
1481            content = b'foobar' + arcname.encode()
1482            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1483                zinfo = zipfile.ZipInfo()
1484                # preserve backslashes
1485                zinfo.filename = arcname
1486                zinfo.external_attr = 0o600 << 16
1487                zipfp.writestr(zinfo, content)
1488
1489            arcname = arcname.replace(os.sep, "/")
1490            targetpath = os.path.join('target', 'subdir', 'subsub')
1491            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1492
1493            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1494                writtenfile = zipfp.extract(arcname, targetpath)
1495                self.assertEqual(writtenfile, correctfile,
1496                                 msg='extract %r: %r != %r' %
1497                                 (arcname, writtenfile, correctfile))
1498            self.check_file(correctfile, content)
1499            rmtree('target')
1500
1501            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1502                zipfp.extractall(targetpath)
1503            self.check_file(correctfile, content)
1504            rmtree('target')
1505
1506            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1507
1508            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1509                writtenfile = zipfp.extract(arcname)
1510                self.assertEqual(writtenfile, correctfile,
1511                                 msg="extract %r" % arcname)
1512            self.check_file(correctfile, content)
1513            rmtree(fixedname.split('/')[0])
1514
1515            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1516                zipfp.extractall()
1517            self.check_file(correctfile, content)
1518            rmtree(fixedname.split('/')[0])
1519
1520            unlink(TESTFN2)
1521
1522
1523class OtherTests(unittest.TestCase):
1524    def test_open_via_zip_info(self):
1525        # Create the ZIP archive
1526        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1527            zipfp.writestr("name", "foo")
1528            with self.assertWarns(UserWarning):
1529                zipfp.writestr("name", "bar")
1530            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1531
1532        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1533            infos = zipfp.infolist()
1534            data = b""
1535            for info in infos:
1536                with zipfp.open(info) as zipopen:
1537                    data += zipopen.read()
1538            self.assertIn(data, {b"foobar", b"barfoo"})
1539            data = b""
1540            for info in infos:
1541                data += zipfp.read(info)
1542            self.assertIn(data, {b"foobar", b"barfoo"})
1543
1544    def test_writestr_extended_local_header_issue1202(self):
1545        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1546            for data in 'abcdefghijklmnop':
1547                zinfo = zipfile.ZipInfo(data)
1548                zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR  # Include an extended local header.
1549                orig_zip.writestr(zinfo, data)
1550
1551    def test_close(self):
1552        """Check that the zipfile is closed after the 'with' block."""
1553        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1554            for fpath, fdata in SMALL_TEST_DATA:
1555                zipfp.writestr(fpath, fdata)
1556                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1557        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1558
1559        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1560            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1561        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1562
1563    def test_close_on_exception(self):
1564        """Check that the zipfile is closed if an exception is raised in the
1565        'with' block."""
1566        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1567            for fpath, fdata in SMALL_TEST_DATA:
1568                zipfp.writestr(fpath, fdata)
1569
1570        try:
1571            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1572                raise zipfile.BadZipFile()
1573        except zipfile.BadZipFile:
1574            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1575
1576    def test_unsupported_version(self):
1577        # File has an extract_version of 120
1578        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1579                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1580                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1581                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1582                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1583
1584        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1585                          io.BytesIO(data), 'r')
1586
1587    @requires_zlib()
1588    def test_read_unicode_filenames(self):
1589        # bug #10801
1590        fname = findfile('zip_cp437_header.zip')
1591        with zipfile.ZipFile(fname) as zipfp:
1592            for name in zipfp.namelist():
1593                zipfp.open(name).close()
1594
1595    def test_write_unicode_filenames(self):
1596        with zipfile.ZipFile(TESTFN, "w") as zf:
1597            zf.writestr("foo.txt", "Test for unicode filename")
1598            zf.writestr("\xf6.txt", "Test for unicode filename")
1599            self.assertIsInstance(zf.infolist()[0].filename, str)
1600
1601        with zipfile.ZipFile(TESTFN, "r") as zf:
1602            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1603            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1604
1605    def test_read_after_write_unicode_filenames(self):
1606        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1607            zipfp.writestr('приклад', b'sample')
1608            self.assertEqual(zipfp.read('приклад'), b'sample')
1609
1610    def test_exclusive_create_zip_file(self):
1611        """Test exclusive creating a new zipfile."""
1612        unlink(TESTFN2)
1613        filename = 'testfile.txt'
1614        content = b'hello, world. this is some content.'
1615        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1616            zipfp.writestr(filename, content)
1617        with self.assertRaises(FileExistsError):
1618            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1619        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1620            self.assertEqual(zipfp.namelist(), [filename])
1621            self.assertEqual(zipfp.read(filename), content)
1622
1623    def test_create_non_existent_file_for_append(self):
1624        if os.path.exists(TESTFN):
1625            os.unlink(TESTFN)
1626
1627        filename = 'testfile.txt'
1628        content = b'hello, world. this is some content.'
1629
1630        try:
1631            with zipfile.ZipFile(TESTFN, 'a') as zf:
1632                zf.writestr(filename, content)
1633        except OSError:
1634            self.fail('Could not append data to a non-existent zip file.')
1635
1636        self.assertTrue(os.path.exists(TESTFN))
1637
1638        with zipfile.ZipFile(TESTFN, 'r') as zf:
1639            self.assertEqual(zf.read(filename), content)
1640
1641    def test_close_erroneous_file(self):
1642        # This test checks that the ZipFile constructor closes the file object
1643        # it opens if there's an error in the file.  If it doesn't, the
1644        # traceback holds a reference to the ZipFile object and, indirectly,
1645        # the file object.
1646        # On Windows, this causes the os.unlink() call to fail because the
1647        # underlying file is still open.  This is SF bug #412214.
1648        #
1649        with open(TESTFN, "w", encoding="utf-8") as fp:
1650            fp.write("this is not a legal zip file\n")
1651        try:
1652            zf = zipfile.ZipFile(TESTFN)
1653        except zipfile.BadZipFile:
1654            pass
1655
1656    def test_is_zip_erroneous_file(self):
1657        """Check that is_zipfile() correctly identifies non-zip files."""
1658        # - passing a filename
1659        with open(TESTFN, "w", encoding='utf-8') as fp:
1660            fp.write("this is not a legal zip file\n")
1661        self.assertFalse(zipfile.is_zipfile(TESTFN))
1662        # - passing a path-like object
1663        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1664        # - passing a file object
1665        with open(TESTFN, "rb") as fp:
1666            self.assertFalse(zipfile.is_zipfile(fp))
1667        # - passing a file-like object
1668        fp = io.BytesIO()
1669        fp.write(b"this is not a legal zip file\n")
1670        self.assertFalse(zipfile.is_zipfile(fp))
1671        fp.seek(0, 0)
1672        self.assertFalse(zipfile.is_zipfile(fp))
1673
1674    def test_damaged_zipfile(self):
1675        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1676        # - Create a valid zip file
1677        fp = io.BytesIO()
1678        with zipfile.ZipFile(fp, mode="w") as zipf:
1679            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1680        zipfiledata = fp.getvalue()
1681
1682        # - Now create copies of it missing the last N bytes and make sure
1683        #   a BadZipFile exception is raised when we try to open it
1684        for N in range(len(zipfiledata)):
1685            fp = io.BytesIO(zipfiledata[:N])
1686            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1687
1688    def test_is_zip_valid_file(self):
1689        """Check that is_zipfile() correctly identifies zip files."""
1690        # - passing a filename
1691        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1692            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1693
1694        self.assertTrue(zipfile.is_zipfile(TESTFN))
1695        # - passing a file object
1696        with open(TESTFN, "rb") as fp:
1697            self.assertTrue(zipfile.is_zipfile(fp))
1698            fp.seek(0, 0)
1699            zip_contents = fp.read()
1700        # - passing a file-like object
1701        fp = io.BytesIO()
1702        fp.write(zip_contents)
1703        self.assertTrue(zipfile.is_zipfile(fp))
1704        fp.seek(0, 0)
1705        self.assertTrue(zipfile.is_zipfile(fp))
1706
1707    def test_non_existent_file_raises_OSError(self):
1708        # make sure we don't raise an AttributeError when a partially-constructed
1709        # ZipFile instance is finalized; this tests for regression on SF tracker
1710        # bug #403871.
1711
1712        # The bug we're testing for caused an AttributeError to be raised
1713        # when a ZipFile instance was created for a file that did not
1714        # exist; the .fp member was not initialized but was needed by the
1715        # __del__() method.  Since the AttributeError is in the __del__(),
1716        # it is ignored, but the user should be sufficiently annoyed by
1717        # the message on the output that regression will be noticed
1718        # quickly.
1719        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1720
1721    def test_empty_file_raises_BadZipFile(self):
1722        f = open(TESTFN, 'w', encoding='utf-8')
1723        f.close()
1724        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1725
1726        with open(TESTFN, 'w', encoding='utf-8') as fp:
1727            fp.write("short file")
1728        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1729
1730    def test_closed_zip_raises_ValueError(self):
1731        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1732        data = io.BytesIO()
1733        with zipfile.ZipFile(data, mode="w") as zipf:
1734            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1735
1736        # This is correct; calling .read on a closed ZipFile should raise
1737        # a ValueError, and so should calling .testzip.  An earlier
1738        # version of .testzip would swallow this exception (and any other)
1739        # and report that the first file in the archive was corrupt.
1740        self.assertRaises(ValueError, zipf.read, "foo.txt")
1741        self.assertRaises(ValueError, zipf.open, "foo.txt")
1742        self.assertRaises(ValueError, zipf.testzip)
1743        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1744        with open(TESTFN, 'w', encoding='utf-8') as f:
1745            f.write('zipfile test data')
1746        self.assertRaises(ValueError, zipf.write, TESTFN)
1747
1748    def test_bad_constructor_mode(self):
1749        """Check that bad modes passed to ZipFile constructor are caught."""
1750        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1751
1752    def test_bad_open_mode(self):
1753        """Check that bad modes passed to ZipFile.open are caught."""
1754        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1755            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1756
1757        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1758            # read the data to make sure the file is there
1759            zipf.read("foo.txt")
1760            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1761            # universal newlines support is removed
1762            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1763            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1764
1765    def test_read0(self):
1766        """Check that calling read(0) on a ZipExtFile object returns an empty
1767        string and doesn't advance file pointer."""
1768        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1769            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1770            # read the data to make sure the file is there
1771            with zipf.open("foo.txt") as f:
1772                for i in range(FIXEDTEST_SIZE):
1773                    self.assertEqual(f.read(0), b'')
1774
1775                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1776
1777    def test_open_non_existent_item(self):
1778        """Check that attempting to call open() for an item that doesn't
1779        exist in the archive raises a RuntimeError."""
1780        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1781            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1782
1783    def test_bad_compression_mode(self):
1784        """Check that bad compression methods passed to ZipFile.open are
1785        caught."""
1786        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1787
1788    def test_unsupported_compression(self):
1789        # data is declared as shrunk, but actually deflated
1790        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1791                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1792                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1793                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1794                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1795                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1796        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1797            self.assertRaises(NotImplementedError, zipf.open, 'x')
1798
1799    def test_null_byte_in_filename(self):
1800        """Check that a filename containing a null byte is properly
1801        terminated."""
1802        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1803            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1804            self.assertEqual(zipf.namelist(), ['foo.txt'])
1805
1806    def test_struct_sizes(self):
1807        """Check that ZIP internal structure sizes are calculated correctly."""
1808        self.assertEqual(zipfile.sizeEndCentDir, 22)
1809        self.assertEqual(zipfile.sizeCentralDir, 46)
1810        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1811        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1812
1813    def test_comments(self):
1814        """Check that comments on the archive are handled properly."""
1815
1816        # check default comment is empty
1817        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1818            self.assertEqual(zipf.comment, b'')
1819            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1820
1821        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1822            self.assertEqual(zipfr.comment, b'')
1823
1824        # check a simple short comment
1825        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1826        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1827            zipf.comment = comment
1828            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1829        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1830            self.assertEqual(zipf.comment, comment)
1831
1832        # check a comment of max length
1833        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1834        comment2 = comment2.encode("ascii")
1835        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1836            zipf.comment = comment2
1837            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1838
1839        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1840            self.assertEqual(zipfr.comment, comment2)
1841
1842        # check a comment that is too long is truncated
1843        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1844            with self.assertWarns(UserWarning):
1845                zipf.comment = comment2 + b'oops'
1846            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1847        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1848            self.assertEqual(zipfr.comment, comment2)
1849
1850        # check that comments are correctly modified in append mode
1851        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1852            zipf.comment = b"original comment"
1853            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1854        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1855            zipf.comment = b"an updated comment"
1856        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1857            self.assertEqual(zipf.comment, b"an updated comment")
1858
1859        # check that comments are correctly shortened in append mode
1860        # and the file is indeed truncated
1861        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1862            zipf.comment = b"original comment that's longer"
1863            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1864        original_zip_size = os.path.getsize(TESTFN)
1865        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1866            zipf.comment = b"shorter comment"
1867        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1868        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1869            self.assertEqual(zipf.comment, b"shorter comment")
1870
1871    def test_unicode_comment(self):
1872        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1873            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1874            with self.assertRaises(TypeError):
1875                zipf.comment = "this is an error"
1876
1877    def test_change_comment_in_empty_archive(self):
1878        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1879            self.assertFalse(zipf.filelist)
1880            zipf.comment = b"this is a comment"
1881        with zipfile.ZipFile(TESTFN, "r") as zipf:
1882            self.assertEqual(zipf.comment, b"this is a comment")
1883
1884    def test_change_comment_in_nonempty_archive(self):
1885        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1886            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1887        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1888            self.assertTrue(zipf.filelist)
1889            zipf.comment = b"this is a comment"
1890        with zipfile.ZipFile(TESTFN, "r") as zipf:
1891            self.assertEqual(zipf.comment, b"this is a comment")
1892
1893    def test_empty_zipfile(self):
1894        # Check that creating a file in 'w' or 'a' mode and closing without
1895        # adding any files to the archives creates a valid empty ZIP file
1896        zipf = zipfile.ZipFile(TESTFN, mode="w")
1897        zipf.close()
1898        try:
1899            zipf = zipfile.ZipFile(TESTFN, mode="r")
1900        except zipfile.BadZipFile:
1901            self.fail("Unable to create empty ZIP file in 'w' mode")
1902
1903        zipf = zipfile.ZipFile(TESTFN, mode="a")
1904        zipf.close()
1905        try:
1906            zipf = zipfile.ZipFile(TESTFN, mode="r")
1907        except:
1908            self.fail("Unable to create empty ZIP file in 'a' mode")
1909
1910    def test_open_empty_file(self):
1911        # Issue 1710703: Check that opening a file with less than 22 bytes
1912        # raises a BadZipFile exception (rather than the previously unhelpful
1913        # OSError)
1914        f = open(TESTFN, 'w', encoding='utf-8')
1915        f.close()
1916        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1917
1918    def test_create_zipinfo_before_1980(self):
1919        self.assertRaises(ValueError,
1920                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1921
1922    def test_create_empty_zipinfo_repr(self):
1923        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
1924        zi = zipfile.ZipInfo(filename="empty")
1925        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
1926
1927    def test_create_empty_zipinfo_default_attributes(self):
1928        """Ensure all required attributes are set."""
1929        zi = zipfile.ZipInfo()
1930        self.assertEqual(zi.orig_filename, "NoName")
1931        self.assertEqual(zi.filename, "NoName")
1932        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
1933        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
1934        self.assertEqual(zi.comment, b"")
1935        self.assertEqual(zi.extra, b"")
1936        self.assertIn(zi.create_system, (0, 3))
1937        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
1938        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
1939        self.assertEqual(zi.reserved, 0)
1940        self.assertEqual(zi.flag_bits, 0)
1941        self.assertEqual(zi.volume, 0)
1942        self.assertEqual(zi.internal_attr, 0)
1943        self.assertEqual(zi.external_attr, 0)
1944
1945        # Before bpo-26185, both were missing
1946        self.assertEqual(zi.file_size, 0)
1947        self.assertEqual(zi.compress_size, 0)
1948
1949    def test_zipfile_with_short_extra_field(self):
1950        """If an extra field in the header is less than 4 bytes, skip it."""
1951        zipdata = (
1952            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1953            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1954            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1955            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1956            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1957            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1958            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1959        )
1960        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1961            # testzip returns the name of the first corrupt file, or None
1962            self.assertIsNone(zipf.testzip())
1963
1964    def test_open_conflicting_handles(self):
1965        # It's only possible to open one writable file handle at a time
1966        msg1 = b"It's fun to charter an accountant!"
1967        msg2 = b"And sail the wide accountant sea"
1968        msg3 = b"To find, explore the funds offshore"
1969        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1970            with zipf.open('foo', mode='w') as w2:
1971                w2.write(msg1)
1972            with zipf.open('bar', mode='w') as w1:
1973                with self.assertRaises(ValueError):
1974                    zipf.open('handle', mode='w')
1975                with self.assertRaises(ValueError):
1976                    zipf.open('foo', mode='r')
1977                with self.assertRaises(ValueError):
1978                    zipf.writestr('str', 'abcde')
1979                with self.assertRaises(ValueError):
1980                    zipf.write(__file__, 'file')
1981                with self.assertRaises(ValueError):
1982                    zipf.close()
1983                w1.write(msg2)
1984            with zipf.open('baz', mode='w') as w2:
1985                w2.write(msg3)
1986
1987        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1988            self.assertEqual(zipf.read('foo'), msg1)
1989            self.assertEqual(zipf.read('bar'), msg2)
1990            self.assertEqual(zipf.read('baz'), msg3)
1991            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1992
1993    def test_seek_tell(self):
1994        # Test seek functionality
1995        txt = b"Where's Bruce?"
1996        bloc = txt.find(b"Bruce")
1997        # Check seek on a file
1998        with zipfile.ZipFile(TESTFN, "w") as zipf:
1999            zipf.writestr("foo.txt", txt)
2000        with zipfile.ZipFile(TESTFN, "r") as zipf:
2001            with zipf.open("foo.txt", "r") as fp:
2002                fp.seek(bloc, os.SEEK_SET)
2003                self.assertEqual(fp.tell(), bloc)
2004                fp.seek(-bloc, os.SEEK_CUR)
2005                self.assertEqual(fp.tell(), 0)
2006                fp.seek(bloc, os.SEEK_CUR)
2007                self.assertEqual(fp.tell(), bloc)
2008                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2009                fp.seek(0, os.SEEK_END)
2010                self.assertEqual(fp.tell(), len(txt))
2011                fp.seek(0, os.SEEK_SET)
2012                self.assertEqual(fp.tell(), 0)
2013        # Check seek on memory file
2014        data = io.BytesIO()
2015        with zipfile.ZipFile(data, mode="w") as zipf:
2016            zipf.writestr("foo.txt", txt)
2017        with zipfile.ZipFile(data, mode="r") as zipf:
2018            with zipf.open("foo.txt", "r") as fp:
2019                fp.seek(bloc, os.SEEK_SET)
2020                self.assertEqual(fp.tell(), bloc)
2021                fp.seek(-bloc, os.SEEK_CUR)
2022                self.assertEqual(fp.tell(), 0)
2023                fp.seek(bloc, os.SEEK_CUR)
2024                self.assertEqual(fp.tell(), bloc)
2025                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2026                fp.seek(0, os.SEEK_END)
2027                self.assertEqual(fp.tell(), len(txt))
2028                fp.seek(0, os.SEEK_SET)
2029                self.assertEqual(fp.tell(), 0)
2030
2031    @requires_bz2()
2032    def test_decompress_without_3rd_party_library(self):
2033        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2034        zip_file = io.BytesIO(data)
2035        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2036            zf.writestr('a.txt', b'a')
2037        with mock.patch('zipfile.bz2', None):
2038            with zipfile.ZipFile(zip_file) as zf:
2039                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2040
2041    def tearDown(self):
2042        unlink(TESTFN)
2043        unlink(TESTFN2)
2044
2045
2046class AbstractBadCrcTests:
2047    def test_testzip_with_bad_crc(self):
2048        """Tests that files with bad CRCs return their name from testzip."""
2049        zipdata = self.zip_with_bad_crc
2050
2051        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2052            # testzip returns the name of the first corrupt file, or None
2053            self.assertEqual('afile', zipf.testzip())
2054
2055    def test_read_with_bad_crc(self):
2056        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2057        zipdata = self.zip_with_bad_crc
2058
2059        # Using ZipFile.read()
2060        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2061            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2062
2063        # Using ZipExtFile.read()
2064        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2065            with zipf.open('afile', 'r') as corrupt_file:
2066                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2067
2068        # Same with small reads (in order to exercise the buffering logic)
2069        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2070            with zipf.open('afile', 'r') as corrupt_file:
2071                corrupt_file.MIN_READ_SIZE = 2
2072                with self.assertRaises(zipfile.BadZipFile):
2073                    while corrupt_file.read(2):
2074                        pass
2075
2076
2077class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2078    compression = zipfile.ZIP_STORED
2079    zip_with_bad_crc = (
2080        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2081        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2082        b'ilehello,AworldP'
2083        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2084        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2085        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2086        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2087        b'\0\0/\0\0\0\0\0')
2088
2089@requires_zlib()
2090class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2091    compression = zipfile.ZIP_DEFLATED
2092    zip_with_bad_crc = (
2093        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2094        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2095        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2096        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2097        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2098        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2099        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2100        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2101
2102@requires_bz2()
2103class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2104    compression = zipfile.ZIP_BZIP2
2105    zip_with_bad_crc = (
2106        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2107        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2108        b'ileBZh91AY&SY\xd4\xa8\xca'
2109        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2110        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2111        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2112        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2113        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2114        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2115        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2116        b'\x00\x00\x00\x00')
2117
2118@requires_lzma()
2119class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2120    compression = zipfile.ZIP_LZMA
2121    zip_with_bad_crc = (
2122        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2123        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2124        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2125        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2126        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2127        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2128        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2129        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2130        b'\x00>\x00\x00\x00\x00\x00')
2131
2132
2133class DecryptionTests(unittest.TestCase):
2134    """Check that ZIP decryption works. Since the library does not
2135    support encryption at the moment, we use a pre-generated encrypted
2136    ZIP file."""
2137
2138    data = (
2139        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2140        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2141        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2142        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2143        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2144        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2145        b'\x00\x00L\x00\x00\x00\x00\x00' )
2146    data2 = (
2147        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2148        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2149        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2150        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2151        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2152        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2153        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2154        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2155
2156    plain = b'zipfile.py encryption test'
2157    plain2 = b'\x00'*512
2158
2159    def setUp(self):
2160        with open(TESTFN, "wb") as fp:
2161            fp.write(self.data)
2162        self.zip = zipfile.ZipFile(TESTFN, "r")
2163        with open(TESTFN2, "wb") as fp:
2164            fp.write(self.data2)
2165        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2166
2167    def tearDown(self):
2168        self.zip.close()
2169        os.unlink(TESTFN)
2170        self.zip2.close()
2171        os.unlink(TESTFN2)
2172
2173    def test_no_password(self):
2174        # Reading the encrypted file without password
2175        # must generate a RunTime exception
2176        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2177        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2178
2179    def test_bad_password(self):
2180        self.zip.setpassword(b"perl")
2181        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2182        self.zip2.setpassword(b"perl")
2183        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2184
2185    @requires_zlib()
2186    def test_good_password(self):
2187        self.zip.setpassword(b"python")
2188        self.assertEqual(self.zip.read("test.txt"), self.plain)
2189        self.zip2.setpassword(b"12345")
2190        self.assertEqual(self.zip2.read("zero"), self.plain2)
2191
2192    def test_unicode_password(self):
2193        expected_msg = "pwd: expected bytes, got str"
2194
2195        with self.assertRaisesRegex(TypeError, expected_msg):
2196            self.zip.setpassword("unicode")
2197
2198        with self.assertRaisesRegex(TypeError, expected_msg):
2199            self.zip.read("test.txt", "python")
2200
2201        with self.assertRaisesRegex(TypeError, expected_msg):
2202            self.zip.open("test.txt", pwd="python")
2203
2204        with self.assertRaisesRegex(TypeError, expected_msg):
2205            self.zip.extract("test.txt", pwd="python")
2206
2207        with self.assertRaisesRegex(TypeError, expected_msg):
2208            self.zip.pwd = "python"
2209            self.zip.open("test.txt")
2210
2211    def test_seek_tell(self):
2212        self.zip.setpassword(b"python")
2213        txt = self.plain
2214        test_word = b'encryption'
2215        bloc = txt.find(test_word)
2216        bloc_len = len(test_word)
2217        with self.zip.open("test.txt", "r") as fp:
2218            fp.seek(bloc, os.SEEK_SET)
2219            self.assertEqual(fp.tell(), bloc)
2220            fp.seek(-bloc, os.SEEK_CUR)
2221            self.assertEqual(fp.tell(), 0)
2222            fp.seek(bloc, os.SEEK_CUR)
2223            self.assertEqual(fp.tell(), bloc)
2224            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2225
2226            # Make sure that the second read after seeking back beyond
2227            # _readbuffer returns the same content (ie. rewind to the start of
2228            # the file to read forward to the required position).
2229            old_read_size = fp.MIN_READ_SIZE
2230            fp.MIN_READ_SIZE = 1
2231            fp._readbuffer = b''
2232            fp._offset = 0
2233            fp.seek(0, os.SEEK_SET)
2234            self.assertEqual(fp.tell(), 0)
2235            fp.seek(bloc, os.SEEK_CUR)
2236            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2237            fp.MIN_READ_SIZE = old_read_size
2238
2239            fp.seek(0, os.SEEK_END)
2240            self.assertEqual(fp.tell(), len(txt))
2241            fp.seek(0, os.SEEK_SET)
2242            self.assertEqual(fp.tell(), 0)
2243
2244            # Read the file completely to definitely call any eof integrity
2245            # checks (crc) and make sure they still pass.
2246            fp.read()
2247
2248
2249class AbstractTestsWithRandomBinaryFiles:
2250    @classmethod
2251    def setUpClass(cls):
2252        datacount = randint(16, 64)*1024 + randint(1, 1024)
2253        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2254                            for i in range(datacount))
2255
2256    def setUp(self):
2257        # Make a source file with some lines
2258        with open(TESTFN, "wb") as fp:
2259            fp.write(self.data)
2260
2261    def tearDown(self):
2262        unlink(TESTFN)
2263        unlink(TESTFN2)
2264
2265    def make_test_archive(self, f, compression):
2266        # Create the ZIP archive
2267        with zipfile.ZipFile(f, "w", compression) as zipfp:
2268            zipfp.write(TESTFN, "another.name")
2269            zipfp.write(TESTFN, TESTFN)
2270
2271    def zip_test(self, f, compression):
2272        self.make_test_archive(f, compression)
2273
2274        # Read the ZIP archive
2275        with zipfile.ZipFile(f, "r", compression) as zipfp:
2276            testdata = zipfp.read(TESTFN)
2277            self.assertEqual(len(testdata), len(self.data))
2278            self.assertEqual(testdata, self.data)
2279            self.assertEqual(zipfp.read("another.name"), self.data)
2280
2281    def test_read(self):
2282        for f in get_files(self):
2283            self.zip_test(f, self.compression)
2284
2285    def zip_open_test(self, f, compression):
2286        self.make_test_archive(f, compression)
2287
2288        # Read the ZIP archive
2289        with zipfile.ZipFile(f, "r", compression) as zipfp:
2290            zipdata1 = []
2291            with zipfp.open(TESTFN) as zipopen1:
2292                while True:
2293                    read_data = zipopen1.read(256)
2294                    if not read_data:
2295                        break
2296                    zipdata1.append(read_data)
2297
2298            zipdata2 = []
2299            with zipfp.open("another.name") as zipopen2:
2300                while True:
2301                    read_data = zipopen2.read(256)
2302                    if not read_data:
2303                        break
2304                    zipdata2.append(read_data)
2305
2306            testdata1 = b''.join(zipdata1)
2307            self.assertEqual(len(testdata1), len(self.data))
2308            self.assertEqual(testdata1, self.data)
2309
2310            testdata2 = b''.join(zipdata2)
2311            self.assertEqual(len(testdata2), len(self.data))
2312            self.assertEqual(testdata2, self.data)
2313
2314    def test_open(self):
2315        for f in get_files(self):
2316            self.zip_open_test(f, self.compression)
2317
2318    def zip_random_open_test(self, f, compression):
2319        self.make_test_archive(f, compression)
2320
2321        # Read the ZIP archive
2322        with zipfile.ZipFile(f, "r", compression) as zipfp:
2323            zipdata1 = []
2324            with zipfp.open(TESTFN) as zipopen1:
2325                while True:
2326                    read_data = zipopen1.read(randint(1, 1024))
2327                    if not read_data:
2328                        break
2329                    zipdata1.append(read_data)
2330
2331            testdata = b''.join(zipdata1)
2332            self.assertEqual(len(testdata), len(self.data))
2333            self.assertEqual(testdata, self.data)
2334
2335    def test_random_open(self):
2336        for f in get_files(self):
2337            self.zip_random_open_test(f, self.compression)
2338
2339
2340class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2341                                       unittest.TestCase):
2342    compression = zipfile.ZIP_STORED
2343
2344@requires_zlib()
2345class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2346                                        unittest.TestCase):
2347    compression = zipfile.ZIP_DEFLATED
2348
2349@requires_bz2()
2350class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2351                                      unittest.TestCase):
2352    compression = zipfile.ZIP_BZIP2
2353
2354@requires_lzma()
2355class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2356                                     unittest.TestCase):
2357    compression = zipfile.ZIP_LZMA
2358
2359
2360# Provide the tell() method but not seek()
2361class Tellable:
2362    def __init__(self, fp):
2363        self.fp = fp
2364        self.offset = 0
2365
2366    def write(self, data):
2367        n = self.fp.write(data)
2368        self.offset += n
2369        return n
2370
2371    def tell(self):
2372        return self.offset
2373
2374    def flush(self):
2375        self.fp.flush()
2376
2377class Unseekable:
2378    def __init__(self, fp):
2379        self.fp = fp
2380
2381    def write(self, data):
2382        return self.fp.write(data)
2383
2384    def flush(self):
2385        self.fp.flush()
2386
2387class UnseekableTests(unittest.TestCase):
2388    def test_writestr(self):
2389        for wrapper in (lambda f: f), Tellable, Unseekable:
2390            with self.subTest(wrapper=wrapper):
2391                f = io.BytesIO()
2392                f.write(b'abc')
2393                bf = io.BufferedWriter(f)
2394                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2395                    zipfp.writestr('ones', b'111')
2396                    zipfp.writestr('twos', b'222')
2397                self.assertEqual(f.getvalue()[:5], b'abcPK')
2398                with zipfile.ZipFile(f, mode='r') as zipf:
2399                    with zipf.open('ones') as zopen:
2400                        self.assertEqual(zopen.read(), b'111')
2401                    with zipf.open('twos') as zopen:
2402                        self.assertEqual(zopen.read(), b'222')
2403
2404    def test_write(self):
2405        for wrapper in (lambda f: f), Tellable, Unseekable:
2406            with self.subTest(wrapper=wrapper):
2407                f = io.BytesIO()
2408                f.write(b'abc')
2409                bf = io.BufferedWriter(f)
2410                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2411                    self.addCleanup(unlink, TESTFN)
2412                    with open(TESTFN, 'wb') as f2:
2413                        f2.write(b'111')
2414                    zipfp.write(TESTFN, 'ones')
2415                    with open(TESTFN, 'wb') as f2:
2416                        f2.write(b'222')
2417                    zipfp.write(TESTFN, 'twos')
2418                self.assertEqual(f.getvalue()[:5], b'abcPK')
2419                with zipfile.ZipFile(f, mode='r') as zipf:
2420                    with zipf.open('ones') as zopen:
2421                        self.assertEqual(zopen.read(), b'111')
2422                    with zipf.open('twos') as zopen:
2423                        self.assertEqual(zopen.read(), b'222')
2424
2425    def test_open_write(self):
2426        for wrapper in (lambda f: f), Tellable, Unseekable:
2427            with self.subTest(wrapper=wrapper):
2428                f = io.BytesIO()
2429                f.write(b'abc')
2430                bf = io.BufferedWriter(f)
2431                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2432                    with zipf.open('ones', 'w') as zopen:
2433                        zopen.write(b'111')
2434                    with zipf.open('twos', 'w') as zopen:
2435                        zopen.write(b'222')
2436                self.assertEqual(f.getvalue()[:5], b'abcPK')
2437                with zipfile.ZipFile(f) as zipf:
2438                    self.assertEqual(zipf.read('ones'), b'111')
2439                    self.assertEqual(zipf.read('twos'), b'222')
2440
2441
2442@requires_zlib()
2443class TestsWithMultipleOpens(unittest.TestCase):
2444    @classmethod
2445    def setUpClass(cls):
2446        cls.data1 = b'111' + randbytes(10000)
2447        cls.data2 = b'222' + randbytes(10000)
2448
2449    def make_test_archive(self, f):
2450        # Create the ZIP archive
2451        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2452            zipfp.writestr('ones', self.data1)
2453            zipfp.writestr('twos', self.data2)
2454
2455    def test_same_file(self):
2456        # Verify that (when the ZipFile is in control of creating file objects)
2457        # multiple open() calls can be made without interfering with each other.
2458        for f in get_files(self):
2459            self.make_test_archive(f)
2460            with zipfile.ZipFile(f, mode="r") as zipf:
2461                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2462                    data1 = zopen1.read(500)
2463                    data2 = zopen2.read(500)
2464                    data1 += zopen1.read()
2465                    data2 += zopen2.read()
2466                self.assertEqual(data1, data2)
2467                self.assertEqual(data1, self.data1)
2468
2469    def test_different_file(self):
2470        # Verify that (when the ZipFile is in control of creating file objects)
2471        # multiple open() calls can be made without interfering with each other.
2472        for f in get_files(self):
2473            self.make_test_archive(f)
2474            with zipfile.ZipFile(f, mode="r") as zipf:
2475                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2476                    data1 = zopen1.read(500)
2477                    data2 = zopen2.read(500)
2478                    data1 += zopen1.read()
2479                    data2 += zopen2.read()
2480                self.assertEqual(data1, self.data1)
2481                self.assertEqual(data2, self.data2)
2482
2483    def test_interleaved(self):
2484        # Verify that (when the ZipFile is in control of creating file objects)
2485        # multiple open() calls can be made without interfering with each other.
2486        for f in get_files(self):
2487            self.make_test_archive(f)
2488            with zipfile.ZipFile(f, mode="r") as zipf:
2489                with zipf.open('ones') as zopen1:
2490                    data1 = zopen1.read(500)
2491                    with zipf.open('twos') as zopen2:
2492                        data2 = zopen2.read(500)
2493                        data1 += zopen1.read()
2494                        data2 += zopen2.read()
2495                self.assertEqual(data1, self.data1)
2496                self.assertEqual(data2, self.data2)
2497
2498    def test_read_after_close(self):
2499        for f in get_files(self):
2500            self.make_test_archive(f)
2501            with contextlib.ExitStack() as stack:
2502                with zipfile.ZipFile(f, 'r') as zipf:
2503                    zopen1 = stack.enter_context(zipf.open('ones'))
2504                    zopen2 = stack.enter_context(zipf.open('twos'))
2505                data1 = zopen1.read(500)
2506                data2 = zopen2.read(500)
2507                data1 += zopen1.read()
2508                data2 += zopen2.read()
2509            self.assertEqual(data1, self.data1)
2510            self.assertEqual(data2, self.data2)
2511
2512    def test_read_after_write(self):
2513        for f in get_files(self):
2514            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2515                zipf.writestr('ones', self.data1)
2516                zipf.writestr('twos', self.data2)
2517                with zipf.open('ones') as zopen1:
2518                    data1 = zopen1.read(500)
2519            self.assertEqual(data1, self.data1[:500])
2520            with zipfile.ZipFile(f, 'r') as zipf:
2521                data1 = zipf.read('ones')
2522                data2 = zipf.read('twos')
2523            self.assertEqual(data1, self.data1)
2524            self.assertEqual(data2, self.data2)
2525
2526    def test_write_after_read(self):
2527        for f in get_files(self):
2528            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2529                zipf.writestr('ones', self.data1)
2530                with zipf.open('ones') as zopen1:
2531                    zopen1.read(500)
2532                    zipf.writestr('twos', self.data2)
2533            with zipfile.ZipFile(f, 'r') as zipf:
2534                data1 = zipf.read('ones')
2535                data2 = zipf.read('twos')
2536            self.assertEqual(data1, self.data1)
2537            self.assertEqual(data2, self.data2)
2538
2539    def test_many_opens(self):
2540        # Verify that read() and open() promptly close the file descriptor,
2541        # and don't rely on the garbage collector to free resources.
2542        self.make_test_archive(TESTFN2)
2543        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2544            for x in range(100):
2545                zipf.read('ones')
2546                with zipf.open('ones') as zopen1:
2547                    pass
2548        with open(os.devnull, "rb") as f:
2549            self.assertLess(f.fileno(), 100)
2550
2551    def test_write_while_reading(self):
2552        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2553            zipf.writestr('ones', self.data1)
2554        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2555            with zipf.open('ones', 'r') as r1:
2556                data1 = r1.read(500)
2557                with zipf.open('twos', 'w') as w1:
2558                    w1.write(self.data2)
2559                data1 += r1.read()
2560        self.assertEqual(data1, self.data1)
2561        with zipfile.ZipFile(TESTFN2) as zipf:
2562            self.assertEqual(zipf.read('twos'), self.data2)
2563
2564    def tearDown(self):
2565        unlink(TESTFN2)
2566
2567
2568class TestWithDirectory(unittest.TestCase):
2569    def setUp(self):
2570        os.mkdir(TESTFN2)
2571
2572    def test_extract_dir(self):
2573        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2574            zipf.extractall(TESTFN2)
2575        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2576        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2577        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2578
2579    def test_bug_6050(self):
2580        # Extraction should succeed if directories already exist
2581        os.mkdir(os.path.join(TESTFN2, "a"))
2582        self.test_extract_dir()
2583
2584    def test_write_dir(self):
2585        dirpath = os.path.join(TESTFN2, "x")
2586        os.mkdir(dirpath)
2587        mode = os.stat(dirpath).st_mode & 0xFFFF
2588        with zipfile.ZipFile(TESTFN, "w") as zipf:
2589            zipf.write(dirpath)
2590            zinfo = zipf.filelist[0]
2591            self.assertTrue(zinfo.filename.endswith("/x/"))
2592            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2593            zipf.write(dirpath, "y")
2594            zinfo = zipf.filelist[1]
2595            self.assertTrue(zinfo.filename, "y/")
2596            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2597        with zipfile.ZipFile(TESTFN, "r") as zipf:
2598            zinfo = zipf.filelist[0]
2599            self.assertTrue(zinfo.filename.endswith("/x/"))
2600            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2601            zinfo = zipf.filelist[1]
2602            self.assertTrue(zinfo.filename, "y/")
2603            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2604            target = os.path.join(TESTFN2, "target")
2605            os.mkdir(target)
2606            zipf.extractall(target)
2607            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2608            self.assertEqual(len(os.listdir(target)), 2)
2609
2610    def test_writestr_dir(self):
2611        os.mkdir(os.path.join(TESTFN2, "x"))
2612        with zipfile.ZipFile(TESTFN, "w") as zipf:
2613            zipf.writestr("x/", b'')
2614            zinfo = zipf.filelist[0]
2615            self.assertEqual(zinfo.filename, "x/")
2616            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2617        with zipfile.ZipFile(TESTFN, "r") as zipf:
2618            zinfo = zipf.filelist[0]
2619            self.assertTrue(zinfo.filename.endswith("x/"))
2620            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2621            target = os.path.join(TESTFN2, "target")
2622            os.mkdir(target)
2623            zipf.extractall(target)
2624            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2625            self.assertEqual(os.listdir(target), ["x"])
2626
2627    def tearDown(self):
2628        rmtree(TESTFN2)
2629        if os.path.exists(TESTFN):
2630            unlink(TESTFN)
2631
2632
2633class ZipInfoTests(unittest.TestCase):
2634    def test_from_file(self):
2635        zi = zipfile.ZipInfo.from_file(__file__)
2636        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2637        self.assertFalse(zi.is_dir())
2638        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2639
2640    def test_from_file_pathlike(self):
2641        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2642        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2643        self.assertFalse(zi.is_dir())
2644        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2645
2646    def test_from_file_bytes(self):
2647        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2648        self.assertEqual(posixpath.basename(zi.filename), 'test')
2649        self.assertFalse(zi.is_dir())
2650        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2651
2652    def test_from_file_fileno(self):
2653        with open(__file__, 'rb') as f:
2654            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2655            self.assertEqual(posixpath.basename(zi.filename), 'test')
2656            self.assertFalse(zi.is_dir())
2657            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2658
2659    def test_from_dir(self):
2660        dirpath = os.path.dirname(os.path.abspath(__file__))
2661        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2662        self.assertEqual(zi.filename, 'stdlib_tests/')
2663        self.assertTrue(zi.is_dir())
2664        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2665        self.assertEqual(zi.file_size, 0)
2666
2667
2668class CommandLineTest(unittest.TestCase):
2669
2670    def zipfilecmd(self, *args, **kwargs):
2671        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2672                                                      **kwargs)
2673        return out.replace(os.linesep.encode(), b'\n')
2674
2675    def zipfilecmd_failure(self, *args):
2676        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2677
2678    def test_bad_use(self):
2679        rc, out, err = self.zipfilecmd_failure()
2680        self.assertEqual(out, b'')
2681        self.assertIn(b'usage', err.lower())
2682        self.assertIn(b'error', err.lower())
2683        self.assertIn(b'required', err.lower())
2684        rc, out, err = self.zipfilecmd_failure('-l', '')
2685        self.assertEqual(out, b'')
2686        self.assertNotEqual(err.strip(), b'')
2687
2688    def test_test_command(self):
2689        zip_name = findfile('zipdir.zip')
2690        for opt in '-t', '--test':
2691            out = self.zipfilecmd(opt, zip_name)
2692            self.assertEqual(out.rstrip(), b'Done testing')
2693        zip_name = findfile('testtar.tar')
2694        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2695        self.assertEqual(out, b'')
2696
2697    def test_list_command(self):
2698        zip_name = findfile('zipdir.zip')
2699        t = io.StringIO()
2700        with zipfile.ZipFile(zip_name, 'r') as tf:
2701            tf.printdir(t)
2702        expected = t.getvalue().encode('ascii', 'backslashreplace')
2703        for opt in '-l', '--list':
2704            out = self.zipfilecmd(opt, zip_name,
2705                                  PYTHONIOENCODING='ascii:backslashreplace')
2706            self.assertEqual(out, expected)
2707
2708    @requires_zlib()
2709    def test_create_command(self):
2710        self.addCleanup(unlink, TESTFN)
2711        with open(TESTFN, 'w', encoding='utf-8') as f:
2712            f.write('test 1')
2713        os.mkdir(TESTFNDIR)
2714        self.addCleanup(rmtree, TESTFNDIR)
2715        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
2716            f.write('test 2')
2717        files = [TESTFN, TESTFNDIR]
2718        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2719        for opt in '-c', '--create':
2720            try:
2721                out = self.zipfilecmd(opt, TESTFN2, *files)
2722                self.assertEqual(out, b'')
2723                with zipfile.ZipFile(TESTFN2) as zf:
2724                    self.assertEqual(zf.namelist(), namelist)
2725                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2726                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2727            finally:
2728                unlink(TESTFN2)
2729
2730    def test_extract_command(self):
2731        zip_name = findfile('zipdir.zip')
2732        for opt in '-e', '--extract':
2733            with temp_dir() as extdir:
2734                out = self.zipfilecmd(opt, zip_name, extdir)
2735                self.assertEqual(out, b'')
2736                with zipfile.ZipFile(zip_name) as zf:
2737                    for zi in zf.infolist():
2738                        path = os.path.join(extdir,
2739                                    zi.filename.replace('/', os.sep))
2740                        if zi.is_dir():
2741                            self.assertTrue(os.path.isdir(path))
2742                        else:
2743                            self.assertTrue(os.path.isfile(path))
2744                            with open(path, 'rb') as f:
2745                                self.assertEqual(f.read(), zf.read(zi))
2746
2747
2748class TestExecutablePrependedZip(unittest.TestCase):
2749    """Test our ability to open zip files with an executable prepended."""
2750
2751    def setUp(self):
2752        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2753        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2754
2755    def _test_zip_works(self, name):
2756        # bpo28494 sanity check: ensure is_zipfile works on these.
2757        self.assertTrue(zipfile.is_zipfile(name),
2758                        f'is_zipfile failed on {name}')
2759        # Ensure we can operate on these via ZipFile.
2760        with zipfile.ZipFile(name) as zipfp:
2761            for n in zipfp.namelist():
2762                data = zipfp.read(n)
2763                self.assertIn(b'FAVORITE_NUMBER', data)
2764
2765    def test_read_zip_with_exe_prepended(self):
2766        self._test_zip_works(self.exe_zip)
2767
2768    def test_read_zip64_with_exe_prepended(self):
2769        self._test_zip_works(self.exe_zip64)
2770
2771    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2772    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2773                         'Test relies on #!/bin/bash working.')
2774    def test_execute_zip2(self):
2775        output = subprocess.check_output([self.exe_zip, sys.executable])
2776        self.assertIn(b'number in executable: 5', output)
2777
2778    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2779    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2780                         'Test relies on #!/bin/bash working.')
2781    def test_execute_zip64(self):
2782        output = subprocess.check_output([self.exe_zip64, sys.executable])
2783        self.assertIn(b'number in executable: 5', output)
2784
2785
2786# Poor man's technique to consume a (smallish) iterable.
2787consume = tuple
2788
2789
2790# from jaraco.itertools 5.0
2791class jaraco:
2792    class itertools:
2793        class Counter:
2794            def __init__(self, i):
2795                self.count = 0
2796                self._orig_iter = iter(i)
2797
2798            def __iter__(self):
2799                return self
2800
2801            def __next__(self):
2802                result = next(self._orig_iter)
2803                self.count += 1
2804                return result
2805
2806
2807def add_dirs(zf):
2808    """
2809    Given a writable zip file zf, inject directory entries for
2810    any directories implied by the presence of children.
2811    """
2812    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2813        zf.writestr(name, b"")
2814    return zf
2815
2816
2817def build_alpharep_fixture():
2818    """
2819    Create a zip file with this structure:
2820
2821    .
2822    ├── a.txt
2823    ├── b
2824    │   ├── c.txt
2825    │   ├── d
2826    │   │   └── e.txt
2827    │   └── f.txt
2828    └── g
2829        └── h
2830            └── i.txt
2831
2832    This fixture has the following key characteristics:
2833
2834    - a file at the root (a)
2835    - a file two levels deep (b/d/e)
2836    - multiple files in a directory (b/c, b/f)
2837    - a directory containing only a directory (g/h)
2838
2839    "alpha" because it uses alphabet
2840    "rep" because it's a representative example
2841    """
2842    data = io.BytesIO()
2843    zf = zipfile.ZipFile(data, "w")
2844    zf.writestr("a.txt", b"content of a")
2845    zf.writestr("b/c.txt", b"content of c")
2846    zf.writestr("b/d/e.txt", b"content of e")
2847    zf.writestr("b/f.txt", b"content of f")
2848    zf.writestr("g/h/i.txt", b"content of i")
2849    zf.filename = "alpharep.zip"
2850    return zf
2851
2852
2853def pass_alpharep(meth):
2854    """
2855    Given a method, wrap it in a for loop that invokes method
2856    with each subtest.
2857    """
2858
2859    @functools.wraps(meth)
2860    def wrapper(self):
2861        for alpharep in self.zipfile_alpharep():
2862            meth(self, alpharep=alpharep)
2863
2864    return wrapper
2865
2866
2867class TestPath(unittest.TestCase):
2868    def setUp(self):
2869        self.fixtures = contextlib.ExitStack()
2870        self.addCleanup(self.fixtures.close)
2871
2872    def zipfile_alpharep(self):
2873        with self.subTest():
2874            yield build_alpharep_fixture()
2875        with self.subTest():
2876            yield add_dirs(build_alpharep_fixture())
2877
2878    def zipfile_ondisk(self, alpharep):
2879        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2880        buffer = alpharep.fp
2881        alpharep.close()
2882        path = tmpdir / alpharep.filename
2883        with path.open("wb") as strm:
2884            strm.write(buffer.getvalue())
2885        return path
2886
2887    @pass_alpharep
2888    def test_iterdir_and_types(self, alpharep):
2889        root = zipfile.Path(alpharep)
2890        assert root.is_dir()
2891        a, b, g = root.iterdir()
2892        assert a.is_file()
2893        assert b.is_dir()
2894        assert g.is_dir()
2895        c, f, d = b.iterdir()
2896        assert c.is_file() and f.is_file()
2897        (e,) = d.iterdir()
2898        assert e.is_file()
2899        (h,) = g.iterdir()
2900        (i,) = h.iterdir()
2901        assert i.is_file()
2902
2903    @pass_alpharep
2904    def test_is_file_missing(self, alpharep):
2905        root = zipfile.Path(alpharep)
2906        assert not root.joinpath('missing.txt').is_file()
2907
2908    @pass_alpharep
2909    def test_iterdir_on_file(self, alpharep):
2910        root = zipfile.Path(alpharep)
2911        a, b, g = root.iterdir()
2912        with self.assertRaises(ValueError):
2913            a.iterdir()
2914
2915    @pass_alpharep
2916    def test_subdir_is_dir(self, alpharep):
2917        root = zipfile.Path(alpharep)
2918        assert (root / 'b').is_dir()
2919        assert (root / 'b/').is_dir()
2920        assert (root / 'g').is_dir()
2921        assert (root / 'g/').is_dir()
2922
2923    @pass_alpharep
2924    def test_open(self, alpharep):
2925        root = zipfile.Path(alpharep)
2926        a, b, g = root.iterdir()
2927        with a.open(encoding="utf-8") as strm:
2928            data = strm.read()
2929        assert data == "content of a"
2930
2931    def test_open_write(self):
2932        """
2933        If the zipfile is open for write, it should be possible to
2934        write bytes or text to it.
2935        """
2936        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
2937        with zf.joinpath('file.bin').open('wb') as strm:
2938            strm.write(b'binary contents')
2939        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
2940            strm.write('text file')
2941
2942    def test_open_extant_directory(self):
2943        """
2944        Attempting to open a directory raises IsADirectoryError.
2945        """
2946        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2947        with self.assertRaises(IsADirectoryError):
2948            zf.joinpath('b').open()
2949
2950    @pass_alpharep
2951    def test_open_binary_invalid_args(self, alpharep):
2952        root = zipfile.Path(alpharep)
2953        with self.assertRaises(ValueError):
2954            root.joinpath('a.txt').open('rb', encoding='utf-8')
2955        with self.assertRaises(ValueError):
2956            root.joinpath('a.txt').open('rb', 'utf-8')
2957
2958    def test_open_missing_directory(self):
2959        """
2960        Attempting to open a missing directory raises FileNotFoundError.
2961        """
2962        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2963        with self.assertRaises(FileNotFoundError):
2964            zf.joinpath('z').open()
2965
2966    @pass_alpharep
2967    def test_read(self, alpharep):
2968        root = zipfile.Path(alpharep)
2969        a, b, g = root.iterdir()
2970        assert a.read_text(encoding="utf-8") == "content of a"
2971        assert a.read_bytes() == b"content of a"
2972
2973    @pass_alpharep
2974    def test_joinpath(self, alpharep):
2975        root = zipfile.Path(alpharep)
2976        a = root.joinpath("a.txt")
2977        assert a.is_file()
2978        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2979        assert e.read_text(encoding="utf-8") == "content of e"
2980
2981    @pass_alpharep
2982    def test_joinpath_multiple(self, alpharep):
2983        root = zipfile.Path(alpharep)
2984        e = root.joinpath("b", "d", "e.txt")
2985        assert e.read_text(encoding="utf-8") == "content of e"
2986
2987    @pass_alpharep
2988    def test_traverse_truediv(self, alpharep):
2989        root = zipfile.Path(alpharep)
2990        a = root / "a.txt"
2991        assert a.is_file()
2992        e = root / "b" / "d" / "e.txt"
2993        assert e.read_text(encoding="utf-8") == "content of e"
2994
2995    @pass_alpharep
2996    def test_traverse_simplediv(self, alpharep):
2997        """
2998        Disable the __future__.division when testing traversal.
2999        """
3000        code = compile(
3001            source="zipfile.Path(alpharep) / 'a'",
3002            filename="(test)",
3003            mode="eval",
3004            dont_inherit=True,
3005        )
3006        eval(code)
3007
3008    @pass_alpharep
3009    def test_pathlike_construction(self, alpharep):
3010        """
3011        zipfile.Path should be constructable from a path-like object
3012        """
3013        zipfile_ondisk = self.zipfile_ondisk(alpharep)
3014        pathlike = pathlib.Path(str(zipfile_ondisk))
3015        zipfile.Path(pathlike)
3016
3017    @pass_alpharep
3018    def test_traverse_pathlike(self, alpharep):
3019        root = zipfile.Path(alpharep)
3020        root / pathlib.Path("a")
3021
3022    @pass_alpharep
3023    def test_parent(self, alpharep):
3024        root = zipfile.Path(alpharep)
3025        assert (root / 'a').parent.at == ''
3026        assert (root / 'a' / 'b').parent.at == 'a/'
3027
3028    @pass_alpharep
3029    def test_dir_parent(self, alpharep):
3030        root = zipfile.Path(alpharep)
3031        assert (root / 'b').parent.at == ''
3032        assert (root / 'b/').parent.at == ''
3033
3034    @pass_alpharep
3035    def test_missing_dir_parent(self, alpharep):
3036        root = zipfile.Path(alpharep)
3037        assert (root / 'missing dir/').parent.at == ''
3038
3039    @pass_alpharep
3040    def test_mutability(self, alpharep):
3041        """
3042        If the underlying zipfile is changed, the Path object should
3043        reflect that change.
3044        """
3045        root = zipfile.Path(alpharep)
3046        a, b, g = root.iterdir()
3047        alpharep.writestr('foo.txt', 'foo')
3048        alpharep.writestr('bar/baz.txt', 'baz')
3049        assert any(child.name == 'foo.txt' for child in root.iterdir())
3050        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
3051        (baz,) = (root / 'bar').iterdir()
3052        assert baz.read_text(encoding="utf-8") == 'baz'
3053
3054    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
3055
3056    def huge_zipfile(self):
3057        """Create a read-only zipfile with a huge number of entries entries."""
3058        strm = io.BytesIO()
3059        zf = zipfile.ZipFile(strm, "w")
3060        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
3061            zf.writestr(entry, entry)
3062        zf.mode = 'r'
3063        return zf
3064
3065    def test_joinpath_constant_time(self):
3066        """
3067        Ensure joinpath on items in zipfile is linear time.
3068        """
3069        root = zipfile.Path(self.huge_zipfile())
3070        entries = jaraco.itertools.Counter(root.iterdir())
3071        for entry in entries:
3072            entry.joinpath('suffix')
3073        # Check the file iterated all items
3074        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
3075
3076    # @func_timeout.func_set_timeout(3)
3077    def test_implied_dirs_performance(self):
3078        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
3079        zipfile.CompleteDirs._implied_dirs(data)
3080
3081    @pass_alpharep
3082    def test_read_does_not_close(self, alpharep):
3083        alpharep = self.zipfile_ondisk(alpharep)
3084        with zipfile.ZipFile(alpharep) as file:
3085            for rep in range(2):
3086                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
3087
3088    @pass_alpharep
3089    def test_subclass(self, alpharep):
3090        class Subclass(zipfile.Path):
3091            pass
3092
3093        root = Subclass(alpharep)
3094        assert isinstance(root / 'b', Subclass)
3095
3096    @pass_alpharep
3097    def test_filename(self, alpharep):
3098        root = zipfile.Path(alpharep)
3099        assert root.filename == pathlib.Path('alpharep.zip')
3100
3101    @pass_alpharep
3102    def test_root_name(self, alpharep):
3103        """
3104        The name of the root should be the name of the zipfile
3105        """
3106        root = zipfile.Path(alpharep)
3107        assert root.name == 'alpharep.zip' == root.filename.name
3108
3109    @pass_alpharep
3110    def test_suffix(self, alpharep):
3111        """
3112        The suffix of the root should be the suffix of the zipfile.
3113        The suffix of each nested file is the final component's last suffix, if any.
3114        Includes the leading period, just like pathlib.Path.
3115        """
3116        root = zipfile.Path(alpharep)
3117        assert root.suffix == '.zip' == root.filename.suffix
3118
3119        b = root / "b.txt"
3120        assert b.suffix == ".txt"
3121
3122        c = root / "c" / "filename.tar.gz"
3123        assert c.suffix == ".gz"
3124
3125        d = root / "d"
3126        assert d.suffix == ""
3127
3128    @pass_alpharep
3129    def test_suffixes(self, alpharep):
3130        """
3131        The suffix of the root should be the suffix of the zipfile.
3132        The suffix of each nested file is the final component's last suffix, if any.
3133        Includes the leading period, just like pathlib.Path.
3134        """
3135        root = zipfile.Path(alpharep)
3136        assert root.suffixes == ['.zip'] == root.filename.suffixes
3137
3138        b = root / 'b.txt'
3139        assert b.suffixes == ['.txt']
3140
3141        c = root / 'c' / 'filename.tar.gz'
3142        assert c.suffixes == ['.tar', '.gz']
3143
3144        d = root / 'd'
3145        assert d.suffixes == []
3146
3147        e = root / '.hgrc'
3148        assert e.suffixes == []
3149
3150    @pass_alpharep
3151    def test_stem(self, alpharep):
3152        """
3153        The final path component, without its suffix
3154        """
3155        root = zipfile.Path(alpharep)
3156        assert root.stem == 'alpharep' == root.filename.stem
3157
3158        b = root / "b.txt"
3159        assert b.stem == "b"
3160
3161        c = root / "c" / "filename.tar.gz"
3162        assert c.stem == "filename.tar"
3163
3164        d = root / "d"
3165        assert d.stem == "d"
3166
3167    @pass_alpharep
3168    def test_root_parent(self, alpharep):
3169        root = zipfile.Path(alpharep)
3170        assert root.parent == pathlib.Path('.')
3171        root.root.filename = 'foo/bar.zip'
3172        assert root.parent == pathlib.Path('foo')
3173
3174    @pass_alpharep
3175    def test_root_unnamed(self, alpharep):
3176        """
3177        It is an error to attempt to get the name
3178        or parent of an unnamed zipfile.
3179        """
3180        alpharep.filename = None
3181        root = zipfile.Path(alpharep)
3182        with self.assertRaises(TypeError):
3183            root.name
3184        with self.assertRaises(TypeError):
3185            root.parent
3186
3187        # .name and .parent should still work on subs
3188        sub = root / "b"
3189        assert sub.name == "b"
3190        assert sub.parent
3191
3192    @pass_alpharep
3193    def test_inheritance(self, alpharep):
3194        cls = type('PathChild', (zipfile.Path,), {})
3195        for alpharep in self.zipfile_alpharep():
3196            file = cls(alpharep).joinpath('some dir').parent
3197            assert isinstance(file, cls)
3198
3199
3200if __name__ == "__main__":
3201    unittest.main()
3202