1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16import functools
17
18
19from tempfile import TemporaryFile
20from random import randint, random, randbytes
21
22from test.support import script_helper
23from test.support import (findfile, requires_zlib, requires_bz2,
24                          requires_lzma, captured_stdout)
25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd
26
27
28TESTFN2 = TESTFN + "2"
29TESTFNDIR = TESTFN + "d"
30FIXEDTEST_SIZE = 1000
31DATAFILES_DIR = 'zipfile_datafiles'
32
33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
34                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
35                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
36                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
37
38def get_files(test):
39    yield TESTFN2
40    with TemporaryFile() as f:
41        yield f
42        test.assertFalse(f.closed)
43    with io.BytesIO() as f:
44        yield f
45        test.assertFalse(f.closed)
46
47class AbstractTestsWithSourceFile:
48    @classmethod
49    def setUpClass(cls):
50        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
51                              (i, random()), "ascii")
52                        for i in range(FIXEDTEST_SIZE)]
53        cls.data = b''.join(cls.line_gen)
54
55    def setUp(self):
56        # Make a source file with some lines
57        with open(TESTFN, "wb") as fp:
58            fp.write(self.data)
59
60    def make_test_archive(self, f, compression, compresslevel=None):
61        kwargs = {'compression': compression, 'compresslevel': compresslevel}
62        # Create the ZIP archive
63        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
64            zipfp.write(TESTFN, "another.name")
65            zipfp.write(TESTFN, TESTFN)
66            zipfp.writestr("strfile", self.data)
67            with zipfp.open('written-open-w', mode='w') as f:
68                for line in self.line_gen:
69                    f.write(line)
70
71    def zip_test(self, f, compression, compresslevel=None):
72        self.make_test_archive(f, compression, compresslevel)
73
74        # Read the ZIP archive
75        with zipfile.ZipFile(f, "r", compression) as zipfp:
76            self.assertEqual(zipfp.read(TESTFN), self.data)
77            self.assertEqual(zipfp.read("another.name"), self.data)
78            self.assertEqual(zipfp.read("strfile"), self.data)
79
80            # Print the ZIP directory
81            fp = io.StringIO()
82            zipfp.printdir(file=fp)
83            directory = fp.getvalue()
84            lines = directory.splitlines()
85            self.assertEqual(len(lines), 5) # Number of files + header
86
87            self.assertIn('File Name', lines[0])
88            self.assertIn('Modified', lines[0])
89            self.assertIn('Size', lines[0])
90
91            fn, date, time_, size = lines[1].split()
92            self.assertEqual(fn, 'another.name')
93            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
94            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
95            self.assertEqual(size, str(len(self.data)))
96
97            # Check the namelist
98            names = zipfp.namelist()
99            self.assertEqual(len(names), 4)
100            self.assertIn(TESTFN, names)
101            self.assertIn("another.name", names)
102            self.assertIn("strfile", names)
103            self.assertIn("written-open-w", names)
104
105            # Check infolist
106            infos = zipfp.infolist()
107            names = [i.filename for i in infos]
108            self.assertEqual(len(names), 4)
109            self.assertIn(TESTFN, names)
110            self.assertIn("another.name", names)
111            self.assertIn("strfile", names)
112            self.assertIn("written-open-w", names)
113            for i in infos:
114                self.assertEqual(i.file_size, len(self.data))
115
116            # check getinfo
117            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
118                info = zipfp.getinfo(nm)
119                self.assertEqual(info.filename, nm)
120                self.assertEqual(info.file_size, len(self.data))
121
122            # Check that testzip doesn't raise an exception
123            zipfp.testzip()
124
125    def test_basic(self):
126        for f in get_files(self):
127            self.zip_test(f, self.compression)
128
129    def zip_open_test(self, f, compression):
130        self.make_test_archive(f, compression)
131
132        # Read the ZIP archive
133        with zipfile.ZipFile(f, "r", compression) as zipfp:
134            zipdata1 = []
135            with zipfp.open(TESTFN) as zipopen1:
136                while True:
137                    read_data = zipopen1.read(256)
138                    if not read_data:
139                        break
140                    zipdata1.append(read_data)
141
142            zipdata2 = []
143            with zipfp.open("another.name") as zipopen2:
144                while True:
145                    read_data = zipopen2.read(256)
146                    if not read_data:
147                        break
148                    zipdata2.append(read_data)
149
150            self.assertEqual(b''.join(zipdata1), self.data)
151            self.assertEqual(b''.join(zipdata2), self.data)
152
153    def test_open(self):
154        for f in get_files(self):
155            self.zip_open_test(f, self.compression)
156
157    def test_open_with_pathlike(self):
158        path = pathlib.Path(TESTFN2)
159        self.zip_open_test(path, self.compression)
160        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
161            self.assertIsInstance(zipfp.filename, str)
162
163    def zip_random_open_test(self, f, compression):
164        self.make_test_archive(f, compression)
165
166        # Read the ZIP archive
167        with zipfile.ZipFile(f, "r", compression) as zipfp:
168            zipdata1 = []
169            with zipfp.open(TESTFN) as zipopen1:
170                while True:
171                    read_data = zipopen1.read(randint(1, 1024))
172                    if not read_data:
173                        break
174                    zipdata1.append(read_data)
175
176            self.assertEqual(b''.join(zipdata1), self.data)
177
178    def test_random_open(self):
179        for f in get_files(self):
180            self.zip_random_open_test(f, self.compression)
181
182    def zip_read1_test(self, f, compression):
183        self.make_test_archive(f, compression)
184
185        # Read the ZIP archive
186        with zipfile.ZipFile(f, "r") as zipfp, \
187             zipfp.open(TESTFN) as zipopen:
188            zipdata = []
189            while True:
190                read_data = zipopen.read1(-1)
191                if not read_data:
192                    break
193                zipdata.append(read_data)
194
195        self.assertEqual(b''.join(zipdata), self.data)
196
197    def test_read1(self):
198        for f in get_files(self):
199            self.zip_read1_test(f, self.compression)
200
201    def zip_read1_10_test(self, f, compression):
202        self.make_test_archive(f, compression)
203
204        # Read the ZIP archive
205        with zipfile.ZipFile(f, "r") as zipfp, \
206             zipfp.open(TESTFN) as zipopen:
207            zipdata = []
208            while True:
209                read_data = zipopen.read1(10)
210                self.assertLessEqual(len(read_data), 10)
211                if not read_data:
212                    break
213                zipdata.append(read_data)
214
215        self.assertEqual(b''.join(zipdata), self.data)
216
217    def test_read1_10(self):
218        for f in get_files(self):
219            self.zip_read1_10_test(f, self.compression)
220
221    def zip_readline_read_test(self, f, compression):
222        self.make_test_archive(f, compression)
223
224        # Read the ZIP archive
225        with zipfile.ZipFile(f, "r") as zipfp, \
226             zipfp.open(TESTFN) as zipopen:
227            data = b''
228            while True:
229                read = zipopen.readline()
230                if not read:
231                    break
232                data += read
233
234                read = zipopen.read(100)
235                if not read:
236                    break
237                data += read
238
239        self.assertEqual(data, self.data)
240
241    def test_readline_read(self):
242        # Issue #7610: calls to readline() interleaved with calls to read().
243        for f in get_files(self):
244            self.zip_readline_read_test(f, self.compression)
245
246    def zip_readline_test(self, f, compression):
247        self.make_test_archive(f, compression)
248
249        # Read the ZIP archive
250        with zipfile.ZipFile(f, "r") as zipfp:
251            with zipfp.open(TESTFN) as zipopen:
252                for line in self.line_gen:
253                    linedata = zipopen.readline()
254                    self.assertEqual(linedata, line)
255
256    def test_readline(self):
257        for f in get_files(self):
258            self.zip_readline_test(f, self.compression)
259
260    def zip_readlines_test(self, f, compression):
261        self.make_test_archive(f, compression)
262
263        # Read the ZIP archive
264        with zipfile.ZipFile(f, "r") as zipfp:
265            with zipfp.open(TESTFN) as zipopen:
266                ziplines = zipopen.readlines()
267            for line, zipline in zip(self.line_gen, ziplines):
268                self.assertEqual(zipline, line)
269
270    def test_readlines(self):
271        for f in get_files(self):
272            self.zip_readlines_test(f, self.compression)
273
274    def zip_iterlines_test(self, f, compression):
275        self.make_test_archive(f, compression)
276
277        # Read the ZIP archive
278        with zipfile.ZipFile(f, "r") as zipfp:
279            with zipfp.open(TESTFN) as zipopen:
280                for line, zipline in zip(self.line_gen, zipopen):
281                    self.assertEqual(zipline, line)
282
283    def test_iterlines(self):
284        for f in get_files(self):
285            self.zip_iterlines_test(f, self.compression)
286
287    def test_low_compression(self):
288        """Check for cases where compressed data is larger than original."""
289        # Create the ZIP archive
290        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
291            zipfp.writestr("strfile", '12')
292
293        # Get an open object for strfile
294        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
295            with zipfp.open("strfile") as openobj:
296                self.assertEqual(openobj.read(1), b'1')
297                self.assertEqual(openobj.read(1), b'2')
298
299    def test_writestr_compression(self):
300        zipfp = zipfile.ZipFile(TESTFN2, "w")
301        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
302        info = zipfp.getinfo('b.txt')
303        self.assertEqual(info.compress_type, self.compression)
304
305    def test_writestr_compresslevel(self):
306        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
307        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
308        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
309                       compresslevel=2)
310
311        # Compression level follows the constructor.
312        a_info = zipfp.getinfo('a.txt')
313        self.assertEqual(a_info.compress_type, self.compression)
314        self.assertEqual(a_info._compresslevel, 1)
315
316        # Compression level is overridden.
317        b_info = zipfp.getinfo('b.txt')
318        self.assertEqual(b_info.compress_type, self.compression)
319        self.assertEqual(b_info._compresslevel, 2)
320
321    def test_read_return_size(self):
322        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
323        # than requested.
324        for test_size in (1, 4095, 4096, 4097, 16384):
325            file_size = test_size + 1
326            junk = randbytes(file_size)
327            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
328                zipf.writestr('foo', junk)
329                with zipf.open('foo', 'r') as fp:
330                    buf = fp.read(test_size)
331                    self.assertEqual(len(buf), test_size)
332
333    def test_truncated_zipfile(self):
334        fp = io.BytesIO()
335        with zipfile.ZipFile(fp, mode='w') as zipf:
336            zipf.writestr('strfile', self.data, compress_type=self.compression)
337            end_offset = fp.tell()
338        zipfiledata = fp.getvalue()
339
340        fp = io.BytesIO(zipfiledata)
341        with zipfile.ZipFile(fp) as zipf:
342            with zipf.open('strfile') as zipopen:
343                fp.truncate(end_offset - 20)
344                with self.assertRaises(EOFError):
345                    zipopen.read()
346
347        fp = io.BytesIO(zipfiledata)
348        with zipfile.ZipFile(fp) as zipf:
349            with zipf.open('strfile') as zipopen:
350                fp.truncate(end_offset - 20)
351                with self.assertRaises(EOFError):
352                    while zipopen.read(100):
353                        pass
354
355        fp = io.BytesIO(zipfiledata)
356        with zipfile.ZipFile(fp) as zipf:
357            with zipf.open('strfile') as zipopen:
358                fp.truncate(end_offset - 20)
359                with self.assertRaises(EOFError):
360                    while zipopen.read1(100):
361                        pass
362
363    def test_repr(self):
364        fname = 'file.name'
365        for f in get_files(self):
366            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
367                zipfp.write(TESTFN, fname)
368                r = repr(zipfp)
369                self.assertIn("mode='w'", r)
370
371            with zipfile.ZipFile(f, 'r') as zipfp:
372                r = repr(zipfp)
373                if isinstance(f, str):
374                    self.assertIn('filename=%r' % f, r)
375                else:
376                    self.assertIn('file=%r' % f, r)
377                self.assertIn("mode='r'", r)
378                r = repr(zipfp.getinfo(fname))
379                self.assertIn('filename=%r' % fname, r)
380                self.assertIn('filemode=', r)
381                self.assertIn('file_size=', r)
382                if self.compression != zipfile.ZIP_STORED:
383                    self.assertIn('compress_type=', r)
384                    self.assertIn('compress_size=', r)
385                with zipfp.open(fname) as zipopen:
386                    r = repr(zipopen)
387                    self.assertIn('name=%r' % fname, r)
388                    self.assertIn("mode='r'", r)
389                    if self.compression != zipfile.ZIP_STORED:
390                        self.assertIn('compress_type=', r)
391                self.assertIn('[closed]', repr(zipopen))
392            self.assertIn('[closed]', repr(zipfp))
393
394    def test_compresslevel_basic(self):
395        for f in get_files(self):
396            self.zip_test(f, self.compression, compresslevel=9)
397
398    def test_per_file_compresslevel(self):
399        """Check that files within a Zip archive can have different
400        compression levels."""
401        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
402            zipfp.write(TESTFN, 'compress_1')
403            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
404            one_info = zipfp.getinfo('compress_1')
405            nine_info = zipfp.getinfo('compress_9')
406            self.assertEqual(one_info._compresslevel, 1)
407            self.assertEqual(nine_info._compresslevel, 9)
408
409    def test_writing_errors(self):
410        class BrokenFile(io.BytesIO):
411            def write(self, data):
412                nonlocal count
413                if count is not None:
414                    if count == stop:
415                        raise OSError
416                    count += 1
417                super().write(data)
418
419        stop = 0
420        while True:
421            testfile = BrokenFile()
422            count = None
423            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
424                with zipfp.open('file1', 'w') as f:
425                    f.write(b'data1')
426                count = 0
427                try:
428                    with zipfp.open('file2', 'w') as f:
429                        f.write(b'data2')
430                except OSError:
431                    stop += 1
432                else:
433                    break
434                finally:
435                    count = None
436            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
437                self.assertEqual(zipfp.namelist(), ['file1'])
438                self.assertEqual(zipfp.read('file1'), b'data1')
439
440        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
441            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
442            self.assertEqual(zipfp.read('file1'), b'data1')
443            self.assertEqual(zipfp.read('file2'), b'data2')
444
445
446    def tearDown(self):
447        unlink(TESTFN)
448        unlink(TESTFN2)
449
450
451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
452                                unittest.TestCase):
453    compression = zipfile.ZIP_STORED
454    test_low_compression = None
455
456    def zip_test_writestr_permissions(self, f, compression):
457        # Make sure that writestr and open(... mode='w') create files with
458        # mode 0600, when they are passed a name rather than a ZipInfo
459        # instance.
460
461        self.make_test_archive(f, compression)
462        with zipfile.ZipFile(f, "r") as zipfp:
463            zinfo = zipfp.getinfo('strfile')
464            self.assertEqual(zinfo.external_attr, 0o600 << 16)
465
466            zinfo2 = zipfp.getinfo('written-open-w')
467            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
468
469    def test_writestr_permissions(self):
470        for f in get_files(self):
471            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
472
473    def test_absolute_arcnames(self):
474        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
475            zipfp.write(TESTFN, "/absolute")
476
477        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
478            self.assertEqual(zipfp.namelist(), ["absolute"])
479
480    def test_append_to_zip_file(self):
481        """Test appending to an existing zipfile."""
482        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
483            zipfp.write(TESTFN, TESTFN)
484
485        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
486            zipfp.writestr("strfile", self.data)
487            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
488
489    def test_append_to_non_zip_file(self):
490        """Test appending to an existing file that is not a zipfile."""
491        # NOTE: this test fails if len(d) < 22 because of the first
492        # line "fpin.seek(-22, 2)" in _EndRecData
493        data = b'I am not a ZipFile!'*10
494        with open(TESTFN2, 'wb') as f:
495            f.write(data)
496
497        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
498            zipfp.write(TESTFN, TESTFN)
499
500        with open(TESTFN2, 'rb') as f:
501            f.seek(len(data))
502            with zipfile.ZipFile(f, "r") as zipfp:
503                self.assertEqual(zipfp.namelist(), [TESTFN])
504                self.assertEqual(zipfp.read(TESTFN), self.data)
505        with open(TESTFN2, 'rb') as f:
506            self.assertEqual(f.read(len(data)), data)
507            zipfiledata = f.read()
508        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
509            self.assertEqual(zipfp.namelist(), [TESTFN])
510            self.assertEqual(zipfp.read(TESTFN), self.data)
511
512    def test_read_concatenated_zip_file(self):
513        with io.BytesIO() as bio:
514            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
515                zipfp.write(TESTFN, TESTFN)
516            zipfiledata = bio.getvalue()
517        data = b'I am not a ZipFile!'*10
518        with open(TESTFN2, 'wb') as f:
519            f.write(data)
520            f.write(zipfiledata)
521
522        with zipfile.ZipFile(TESTFN2) as zipfp:
523            self.assertEqual(zipfp.namelist(), [TESTFN])
524            self.assertEqual(zipfp.read(TESTFN), self.data)
525
526    def test_append_to_concatenated_zip_file(self):
527        with io.BytesIO() as bio:
528            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
529                zipfp.write(TESTFN, TESTFN)
530            zipfiledata = bio.getvalue()
531        data = b'I am not a ZipFile!'*1000000
532        with open(TESTFN2, 'wb') as f:
533            f.write(data)
534            f.write(zipfiledata)
535
536        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
537            self.assertEqual(zipfp.namelist(), [TESTFN])
538            zipfp.writestr('strfile', self.data)
539
540        with open(TESTFN2, 'rb') as f:
541            self.assertEqual(f.read(len(data)), data)
542            zipfiledata = f.read()
543        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
544            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
545            self.assertEqual(zipfp.read(TESTFN), self.data)
546            self.assertEqual(zipfp.read('strfile'), self.data)
547
548    def test_ignores_newline_at_end(self):
549        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
550            zipfp.write(TESTFN, TESTFN)
551        with open(TESTFN2, 'a', encoding='utf-8') as f:
552            f.write("\r\n\00\00\00")
553        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
554            self.assertIsInstance(zipfp, zipfile.ZipFile)
555
556    def test_ignores_stuff_appended_past_comments(self):
557        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
558            zipfp.comment = b"this is a comment"
559            zipfp.write(TESTFN, TESTFN)
560        with open(TESTFN2, 'a', encoding='utf-8') as f:
561            f.write("abcdef\r\n")
562        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
563            self.assertIsInstance(zipfp, zipfile.ZipFile)
564            self.assertEqual(zipfp.comment, b"this is a comment")
565
566    def test_write_default_name(self):
567        """Check that calling ZipFile.write without arcname specified
568        produces the expected result."""
569        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
570            zipfp.write(TESTFN)
571            with open(TESTFN, "rb") as f:
572                self.assertEqual(zipfp.read(TESTFN), f.read())
573
574    def test_io_on_closed_zipextfile(self):
575        fname = "somefile.txt"
576        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
577            zipfp.writestr(fname, "bogus")
578
579        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
580            with zipfp.open(fname) as fid:
581                fid.close()
582                self.assertRaises(ValueError, fid.read)
583                self.assertRaises(ValueError, fid.seek, 0)
584                self.assertRaises(ValueError, fid.tell)
585                self.assertRaises(ValueError, fid.readable)
586                self.assertRaises(ValueError, fid.seekable)
587
588    def test_write_to_readonly(self):
589        """Check that trying to call write() on a readonly ZipFile object
590        raises a ValueError."""
591        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
592            zipfp.writestr("somefile.txt", "bogus")
593
594        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
595            self.assertRaises(ValueError, zipfp.write, TESTFN)
596
597        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
598            with self.assertRaises(ValueError):
599                zipfp.open(TESTFN, mode='w')
600
601    def test_add_file_before_1980(self):
602        # Set atime and mtime to 1970-01-01
603        os.utime(TESTFN, (0, 0))
604        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
605            self.assertRaises(ValueError, zipfp.write, TESTFN)
606
607        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
608            zipfp.write(TESTFN)
609            zinfo = zipfp.getinfo(TESTFN)
610            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
611
612    def test_add_file_after_2107(self):
613        # Set atime and mtime to 2108-12-30
614        ts = 4386268800
615        try:
616            time.localtime(ts)
617        except OverflowError:
618            self.skipTest(f'time.localtime({ts}) raises OverflowError')
619        try:
620            os.utime(TESTFN, (ts, ts))
621        except OverflowError:
622            self.skipTest('Host fs cannot set timestamp to required value.')
623
624        mtime_ns = os.stat(TESTFN).st_mtime_ns
625        if mtime_ns != (4386268800 * 10**9):
626            # XFS filesystem is limited to 32-bit timestamp, but the syscall
627            # didn't fail. Moreover, there is a VFS bug which returns
628            # a cached timestamp which is different than the value on disk.
629            #
630            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
631            #
632            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
633            # https://bugs.python.org/issue39460#msg360952
634            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
635
636        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
637            self.assertRaises(struct.error, zipfp.write, TESTFN)
638
639        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
640            zipfp.write(TESTFN)
641            zinfo = zipfp.getinfo(TESTFN)
642            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
643
644
645@requires_zlib()
646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
647                                 unittest.TestCase):
648    compression = zipfile.ZIP_DEFLATED
649
650    def test_per_file_compression(self):
651        """Check that files within a Zip archive can have different
652        compression options."""
653        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
654            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
655            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
656            sinfo = zipfp.getinfo('storeme')
657            dinfo = zipfp.getinfo('deflateme')
658            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
659            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
660
661@requires_bz2()
662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
663                               unittest.TestCase):
664    compression = zipfile.ZIP_BZIP2
665
666@requires_lzma()
667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
668                              unittest.TestCase):
669    compression = zipfile.ZIP_LZMA
670
671
672class AbstractTestZip64InSmallFiles:
673    # These tests test the ZIP64 functionality without using large files,
674    # see test_zipfile64 for proper tests.
675
676    @classmethod
677    def setUpClass(cls):
678        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
679                    for i in range(0, FIXEDTEST_SIZE))
680        cls.data = b'\n'.join(line_gen)
681
682    def setUp(self):
683        self._limit = zipfile.ZIP64_LIMIT
684        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
685        zipfile.ZIP64_LIMIT = 1000
686        zipfile.ZIP_FILECOUNT_LIMIT = 9
687
688        # Make a source file with some lines
689        with open(TESTFN, "wb") as fp:
690            fp.write(self.data)
691
692    def zip_test(self, f, compression):
693        # Create the ZIP archive
694        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
695            zipfp.write(TESTFN, "another.name")
696            zipfp.write(TESTFN, TESTFN)
697            zipfp.writestr("strfile", self.data)
698
699        # Read the ZIP archive
700        with zipfile.ZipFile(f, "r", compression) as zipfp:
701            self.assertEqual(zipfp.read(TESTFN), self.data)
702            self.assertEqual(zipfp.read("another.name"), self.data)
703            self.assertEqual(zipfp.read("strfile"), self.data)
704
705            # Print the ZIP directory
706            fp = io.StringIO()
707            zipfp.printdir(fp)
708
709            directory = fp.getvalue()
710            lines = directory.splitlines()
711            self.assertEqual(len(lines), 4) # Number of files + header
712
713            self.assertIn('File Name', lines[0])
714            self.assertIn('Modified', lines[0])
715            self.assertIn('Size', lines[0])
716
717            fn, date, time_, size = lines[1].split()
718            self.assertEqual(fn, 'another.name')
719            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
720            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
721            self.assertEqual(size, str(len(self.data)))
722
723            # Check the namelist
724            names = zipfp.namelist()
725            self.assertEqual(len(names), 3)
726            self.assertIn(TESTFN, names)
727            self.assertIn("another.name", names)
728            self.assertIn("strfile", names)
729
730            # Check infolist
731            infos = zipfp.infolist()
732            names = [i.filename for i in infos]
733            self.assertEqual(len(names), 3)
734            self.assertIn(TESTFN, names)
735            self.assertIn("another.name", names)
736            self.assertIn("strfile", names)
737            for i in infos:
738                self.assertEqual(i.file_size, len(self.data))
739
740            # check getinfo
741            for nm in (TESTFN, "another.name", "strfile"):
742                info = zipfp.getinfo(nm)
743                self.assertEqual(info.filename, nm)
744                self.assertEqual(info.file_size, len(self.data))
745
746            # Check that testzip doesn't raise an exception
747            zipfp.testzip()
748
749    def test_basic(self):
750        for f in get_files(self):
751            self.zip_test(f, self.compression)
752
753    def test_too_many_files(self):
754        # This test checks that more than 64k files can be added to an archive,
755        # and that the resulting archive can be read properly by ZipFile
756        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
757                               allowZip64=True)
758        zipf.debug = 100
759        numfiles = 15
760        for i in range(numfiles):
761            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
762        self.assertEqual(len(zipf.namelist()), numfiles)
763        zipf.close()
764
765        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
766        self.assertEqual(len(zipf2.namelist()), numfiles)
767        for i in range(numfiles):
768            content = zipf2.read("foo%08d" % i).decode('ascii')
769            self.assertEqual(content, "%d" % (i**3 % 57))
770        zipf2.close()
771
772    def test_too_many_files_append(self):
773        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
774                               allowZip64=False)
775        zipf.debug = 100
776        numfiles = 9
777        for i in range(numfiles):
778            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
779        self.assertEqual(len(zipf.namelist()), numfiles)
780        with self.assertRaises(zipfile.LargeZipFile):
781            zipf.writestr("foo%08d" % numfiles, b'')
782        self.assertEqual(len(zipf.namelist()), numfiles)
783        zipf.close()
784
785        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
786                               allowZip64=False)
787        zipf.debug = 100
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        with self.assertRaises(zipfile.LargeZipFile):
790            zipf.writestr("foo%08d" % numfiles, b'')
791        self.assertEqual(len(zipf.namelist()), numfiles)
792        zipf.close()
793
794        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
795                               allowZip64=True)
796        zipf.debug = 100
797        self.assertEqual(len(zipf.namelist()), numfiles)
798        numfiles2 = 15
799        for i in range(numfiles, numfiles2):
800            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
801        self.assertEqual(len(zipf.namelist()), numfiles2)
802        zipf.close()
803
804        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
805        self.assertEqual(len(zipf2.namelist()), numfiles2)
806        for i in range(numfiles2):
807            content = zipf2.read("foo%08d" % i).decode('ascii')
808            self.assertEqual(content, "%d" % (i**3 % 57))
809        zipf2.close()
810
811    def tearDown(self):
812        zipfile.ZIP64_LIMIT = self._limit
813        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
814        unlink(TESTFN)
815        unlink(TESTFN2)
816
817
818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
819                                  unittest.TestCase):
820    compression = zipfile.ZIP_STORED
821
822    def large_file_exception_test(self, f, compression):
823        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
824            self.assertRaises(zipfile.LargeZipFile,
825                              zipfp.write, TESTFN, "another.name")
826
827    def large_file_exception_test2(self, f, compression):
828        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
829            self.assertRaises(zipfile.LargeZipFile,
830                              zipfp.writestr, "another.name", self.data)
831
832    def test_large_file_exception(self):
833        for f in get_files(self):
834            self.large_file_exception_test(f, zipfile.ZIP_STORED)
835            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
836
837    def test_absolute_arcnames(self):
838        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
839                             allowZip64=True) as zipfp:
840            zipfp.write(TESTFN, "/absolute")
841
842        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
843            self.assertEqual(zipfp.namelist(), ["absolute"])
844
845    def test_append(self):
846        # Test that appending to the Zip64 archive doesn't change
847        # extra fields of existing entries.
848        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
849            zipfp.writestr("strfile", self.data)
850        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
851            zinfo = zipfp.getinfo("strfile")
852            extra = zinfo.extra
853        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
854            zipfp.writestr("strfile2", self.data)
855        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
856            zinfo = zipfp.getinfo("strfile")
857            self.assertEqual(zinfo.extra, extra)
858
859    def make_zip64_file(
860        self, file_size_64_set=False, file_size_extra=False,
861        compress_size_64_set=False, compress_size_extra=False,
862        header_offset_64_set=False, header_offset_extra=False,
863    ):
864        """Generate bytes sequence for a zip with (incomplete) zip64 data.
865
866        The actual values (not the zip 64 0xffffffff values) stored in the file
867        are:
868        file_size: 8
869        compress_size: 8
870        header_offset: 0
871        """
872        actual_size = 8
873        actual_header_offset = 0
874        local_zip64_fields = []
875        central_zip64_fields = []
876
877        file_size = actual_size
878        if file_size_64_set:
879            file_size = 0xffffffff
880            if file_size_extra:
881                local_zip64_fields.append(actual_size)
882                central_zip64_fields.append(actual_size)
883        file_size = struct.pack("<L", file_size)
884
885        compress_size = actual_size
886        if compress_size_64_set:
887            compress_size = 0xffffffff
888            if compress_size_extra:
889                local_zip64_fields.append(actual_size)
890                central_zip64_fields.append(actual_size)
891        compress_size = struct.pack("<L", compress_size)
892
893        header_offset = actual_header_offset
894        if header_offset_64_set:
895            header_offset = 0xffffffff
896            if header_offset_extra:
897                central_zip64_fields.append(actual_header_offset)
898        header_offset = struct.pack("<L", header_offset)
899
900        local_extra = struct.pack(
901            '<HH' + 'Q'*len(local_zip64_fields),
902            0x0001,
903            8*len(local_zip64_fields),
904            *local_zip64_fields
905        )
906
907        central_extra = struct.pack(
908            '<HH' + 'Q'*len(central_zip64_fields),
909            0x0001,
910            8*len(central_zip64_fields),
911            *central_zip64_fields
912        )
913
914        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
915        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
916
917        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
918        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
919
920        filename = b"test.txt"
921        content = b"test1234"
922        filename_length = struct.pack("<H", len(filename))
923        zip64_contents = (
924            # Local file header
925            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
926            + compress_size
927            + file_size
928            + filename_length
929            + local_extra_length
930            + filename
931            + local_extra
932            + content
933            # Central directory:
934            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
935            + compress_size
936            + file_size
937            + filename_length
938            + central_extra_length
939            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
940            + header_offset
941            + filename
942            + central_extra
943            # Zip64 end of central directory
944            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
945            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
946            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
947            + central_dir_size
948            + offset_to_central_dir
949            # Zip64 end of central directory locator
950            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
951            + b"\x00\x00\x00"
952            # end of central directory
953            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
954            + b"\x00\x00\x00\x00"
955        )
956        return zip64_contents
957
958    def test_bad_zip64_extra(self):
959        """Missing zip64 extra records raises an exception.
960
961        There are 4 fields that the zip64 format handles (the disk number is
962        not used in this module and so is ignored here). According to the zip
963        spec:
964              The order of the fields in the zip64 extended
965              information record is fixed, but the fields MUST
966              only appear if the corresponding Local or Central
967              directory record field is set to 0xFFFF or 0xFFFFFFFF.
968
969        If the zip64 extra content doesn't contain enough entries for the
970        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
971        This test mismatches the length of the zip64 extra field and the number
972        of fields set to indicate the presence of zip64 data.
973        """
974        # zip64 file size present, no fields in extra, expecting one, equals
975        # missing file size.
976        missing_file_size_extra = self.make_zip64_file(
977            file_size_64_set=True,
978        )
979        with self.assertRaises(zipfile.BadZipFile) as e:
980            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
981        self.assertIn('file size', str(e.exception).lower())
982
983        # zip64 file size present, zip64 compress size present, one field in
984        # extra, expecting two, equals missing compress size.
985        missing_compress_size_extra = self.make_zip64_file(
986            file_size_64_set=True,
987            file_size_extra=True,
988            compress_size_64_set=True,
989        )
990        with self.assertRaises(zipfile.BadZipFile) as e:
991            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
992        self.assertIn('compress size', str(e.exception).lower())
993
994        # zip64 compress size present, no fields in extra, expecting one,
995        # equals missing compress size.
996        missing_compress_size_extra = self.make_zip64_file(
997            compress_size_64_set=True,
998        )
999        with self.assertRaises(zipfile.BadZipFile) as e:
1000            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1001        self.assertIn('compress size', str(e.exception).lower())
1002
1003        # zip64 file size present, zip64 compress size present, zip64 header
1004        # offset present, two fields in extra, expecting three, equals missing
1005        # header offset
1006        missing_header_offset_extra = self.make_zip64_file(
1007            file_size_64_set=True,
1008            file_size_extra=True,
1009            compress_size_64_set=True,
1010            compress_size_extra=True,
1011            header_offset_64_set=True,
1012        )
1013        with self.assertRaises(zipfile.BadZipFile) as e:
1014            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1015        self.assertIn('header offset', str(e.exception).lower())
1016
1017        # zip64 compress size present, zip64 header offset present, one field
1018        # in extra, expecting two, equals missing header offset
1019        missing_header_offset_extra = self.make_zip64_file(
1020            file_size_64_set=False,
1021            compress_size_64_set=True,
1022            compress_size_extra=True,
1023            header_offset_64_set=True,
1024        )
1025        with self.assertRaises(zipfile.BadZipFile) as e:
1026            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1027        self.assertIn('header offset', str(e.exception).lower())
1028
1029        # zip64 file size present, zip64 header offset present, one field in
1030        # extra, expecting two, equals missing header offset
1031        missing_header_offset_extra = self.make_zip64_file(
1032            file_size_64_set=True,
1033            file_size_extra=True,
1034            compress_size_64_set=False,
1035            header_offset_64_set=True,
1036        )
1037        with self.assertRaises(zipfile.BadZipFile) as e:
1038            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1039        self.assertIn('header offset', str(e.exception).lower())
1040
1041        # zip64 header offset present, no fields in extra, expecting one,
1042        # equals missing header offset
1043        missing_header_offset_extra = self.make_zip64_file(
1044            file_size_64_set=False,
1045            compress_size_64_set=False,
1046            header_offset_64_set=True,
1047        )
1048        with self.assertRaises(zipfile.BadZipFile) as e:
1049            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1050        self.assertIn('header offset', str(e.exception).lower())
1051
1052    def test_generated_valid_zip64_extra(self):
1053        # These values are what is set in the make_zip64_file method.
1054        expected_file_size = 8
1055        expected_compress_size = 8
1056        expected_header_offset = 0
1057        expected_content = b"test1234"
1058
1059        # Loop through the various valid combinations of zip64 masks
1060        # present and extra fields present.
1061        params = (
1062            {"file_size_64_set": True, "file_size_extra": True},
1063            {"compress_size_64_set": True, "compress_size_extra": True},
1064            {"header_offset_64_set": True, "header_offset_extra": True},
1065        )
1066
1067        for r in range(1, len(params) + 1):
1068            for combo in itertools.combinations(params, r):
1069                kwargs = {}
1070                for c in combo:
1071                    kwargs.update(c)
1072                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1073                    zinfo = zf.infolist()[0]
1074                    self.assertEqual(zinfo.file_size, expected_file_size)
1075                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1076                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1077                    self.assertEqual(zf.read(zinfo), expected_content)
1078
1079
1080@requires_zlib()
1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1082                                   unittest.TestCase):
1083    compression = zipfile.ZIP_DEFLATED
1084
1085@requires_bz2()
1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1087                                 unittest.TestCase):
1088    compression = zipfile.ZIP_BZIP2
1089
1090@requires_lzma()
1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1092                                unittest.TestCase):
1093    compression = zipfile.ZIP_LZMA
1094
1095
1096class AbstractWriterTests:
1097
1098    def tearDown(self):
1099        unlink(TESTFN2)
1100
1101    def test_close_after_close(self):
1102        data = b'content'
1103        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1104            w = zipf.open('test', 'w')
1105            w.write(data)
1106            w.close()
1107            self.assertTrue(w.closed)
1108            w.close()
1109            self.assertTrue(w.closed)
1110            self.assertEqual(zipf.read('test'), data)
1111
1112    def test_write_after_close(self):
1113        data = b'content'
1114        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1115            w = zipf.open('test', 'w')
1116            w.write(data)
1117            w.close()
1118            self.assertTrue(w.closed)
1119            self.assertRaises(ValueError, w.write, b'')
1120            self.assertEqual(zipf.read('test'), data)
1121
1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1123    compression = zipfile.ZIP_STORED
1124
1125@requires_zlib()
1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1127    compression = zipfile.ZIP_DEFLATED
1128
1129@requires_bz2()
1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1131    compression = zipfile.ZIP_BZIP2
1132
1133@requires_lzma()
1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1135    compression = zipfile.ZIP_LZMA
1136
1137
1138class PyZipFileTests(unittest.TestCase):
1139    def assertCompiledIn(self, name, namelist):
1140        if name + 'o' not in namelist:
1141            self.assertIn(name + 'c', namelist)
1142
1143    def requiresWriteAccess(self, path):
1144        # effective_ids unavailable on windows
1145        if not os.access(path, os.W_OK,
1146                         effective_ids=os.access in os.supports_effective_ids):
1147            self.skipTest('requires write access to the installed location')
1148        filename = os.path.join(path, 'test_zipfile.try')
1149        try:
1150            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1151            os.close(fd)
1152        except Exception:
1153            self.skipTest('requires write access to the installed location')
1154        unlink(filename)
1155
1156    def test_write_pyfile(self):
1157        self.requiresWriteAccess(os.path.dirname(__file__))
1158        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1159            fn = __file__
1160            if fn.endswith('.pyc'):
1161                path_split = fn.split(os.sep)
1162                if os.altsep is not None:
1163                    path_split.extend(fn.split(os.altsep))
1164                if '__pycache__' in path_split:
1165                    fn = importlib.util.source_from_cache(fn)
1166                else:
1167                    fn = fn[:-1]
1168
1169            zipfp.writepy(fn)
1170
1171            bn = os.path.basename(fn)
1172            self.assertNotIn(bn, zipfp.namelist())
1173            self.assertCompiledIn(bn, zipfp.namelist())
1174
1175        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1176            fn = __file__
1177            if fn.endswith('.pyc'):
1178                fn = fn[:-1]
1179
1180            zipfp.writepy(fn, "testpackage")
1181
1182            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1183            self.assertNotIn(bn, zipfp.namelist())
1184            self.assertCompiledIn(bn, zipfp.namelist())
1185
1186    def test_write_python_package(self):
1187        import email
1188        packagedir = os.path.dirname(email.__file__)
1189        self.requiresWriteAccess(packagedir)
1190
1191        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1192            zipfp.writepy(packagedir)
1193
1194            # Check for a couple of modules at different levels of the
1195            # hierarchy
1196            names = zipfp.namelist()
1197            self.assertCompiledIn('email/__init__.py', names)
1198            self.assertCompiledIn('email/mime/text.py', names)
1199
1200    def test_write_filtered_python_package(self):
1201        import test
1202        packagedir = os.path.dirname(test.__file__)
1203        self.requiresWriteAccess(packagedir)
1204
1205        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1206
1207            # first make sure that the test folder gives error messages
1208            # (on the badsyntax_... files)
1209            with captured_stdout() as reportSIO:
1210                zipfp.writepy(packagedir)
1211            reportStr = reportSIO.getvalue()
1212            self.assertTrue('SyntaxError' in reportStr)
1213
1214            # then check that the filter works on the whole package
1215            with captured_stdout() as reportSIO:
1216                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1217            reportStr = reportSIO.getvalue()
1218            self.assertTrue('SyntaxError' not in reportStr)
1219
1220            # then check that the filter works on individual files
1221            def filter(path):
1222                return not os.path.basename(path).startswith("bad")
1223            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1224                zipfp.writepy(packagedir, filterfunc=filter)
1225            reportStr = reportSIO.getvalue()
1226            if reportStr:
1227                print(reportStr)
1228            self.assertTrue('SyntaxError' not in reportStr)
1229
1230    def test_write_with_optimization(self):
1231        import email
1232        packagedir = os.path.dirname(email.__file__)
1233        self.requiresWriteAccess(packagedir)
1234        optlevel = 1 if __debug__ else 0
1235        ext = '.pyc'
1236
1237        with TemporaryFile() as t, \
1238             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1239            zipfp.writepy(packagedir)
1240
1241            names = zipfp.namelist()
1242            self.assertIn('email/__init__' + ext, names)
1243            self.assertIn('email/mime/text' + ext, names)
1244
1245    def test_write_python_directory(self):
1246        os.mkdir(TESTFN2)
1247        try:
1248            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1249                fp.write("print(42)\n")
1250
1251            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1252                fp.write("print(42 * 42)\n")
1253
1254            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1255                fp.write("bla bla bla\n")
1256
1257            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1258                zipfp.writepy(TESTFN2)
1259
1260                names = zipfp.namelist()
1261                self.assertCompiledIn('mod1.py', names)
1262                self.assertCompiledIn('mod2.py', names)
1263                self.assertNotIn('mod2.txt', names)
1264
1265        finally:
1266            rmtree(TESTFN2)
1267
1268    def test_write_python_directory_filtered(self):
1269        os.mkdir(TESTFN2)
1270        try:
1271            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1272                fp.write("print(42)\n")
1273
1274            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1275                fp.write("print(42 * 42)\n")
1276
1277            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1278                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1279                                                  not fn.endswith('mod2.py'))
1280
1281                names = zipfp.namelist()
1282                self.assertCompiledIn('mod1.py', names)
1283                self.assertNotIn('mod2.py', names)
1284
1285        finally:
1286            rmtree(TESTFN2)
1287
1288    def test_write_non_pyfile(self):
1289        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1290            with open(TESTFN, 'w', encoding='utf-8') as f:
1291                f.write('most definitely not a python file')
1292            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1293            unlink(TESTFN)
1294
1295    def test_write_pyfile_bad_syntax(self):
1296        os.mkdir(TESTFN2)
1297        try:
1298            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1299                fp.write("Bad syntax in python file\n")
1300
1301            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1302                # syntax errors are printed to stdout
1303                with captured_stdout() as s:
1304                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1305
1306                self.assertIn("SyntaxError", s.getvalue())
1307
1308                # as it will not have compiled the python file, it will
1309                # include the .py file not .pyc
1310                names = zipfp.namelist()
1311                self.assertIn('mod1.py', names)
1312                self.assertNotIn('mod1.pyc', names)
1313
1314        finally:
1315            rmtree(TESTFN2)
1316
1317    def test_write_pathlike(self):
1318        os.mkdir(TESTFN2)
1319        try:
1320            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1321                fp.write("print(42)\n")
1322
1323            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1324                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1325                names = zipfp.namelist()
1326                self.assertCompiledIn('mod1.py', names)
1327        finally:
1328            rmtree(TESTFN2)
1329
1330
1331class ExtractTests(unittest.TestCase):
1332
1333    def make_test_file(self):
1334        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1335            for fpath, fdata in SMALL_TEST_DATA:
1336                zipfp.writestr(fpath, fdata)
1337
1338    def test_extract(self):
1339        with temp_cwd():
1340            self.make_test_file()
1341            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1342                for fpath, fdata in SMALL_TEST_DATA:
1343                    writtenfile = zipfp.extract(fpath)
1344
1345                    # make sure it was written to the right place
1346                    correctfile = os.path.join(os.getcwd(), fpath)
1347                    correctfile = os.path.normpath(correctfile)
1348
1349                    self.assertEqual(writtenfile, correctfile)
1350
1351                    # make sure correct data is in correct file
1352                    with open(writtenfile, "rb") as f:
1353                        self.assertEqual(fdata.encode(), f.read())
1354
1355                    unlink(writtenfile)
1356
1357    def _test_extract_with_target(self, target):
1358        self.make_test_file()
1359        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1360            for fpath, fdata in SMALL_TEST_DATA:
1361                writtenfile = zipfp.extract(fpath, target)
1362
1363                # make sure it was written to the right place
1364                correctfile = os.path.join(target, fpath)
1365                correctfile = os.path.normpath(correctfile)
1366                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1367
1368                # make sure correct data is in correct file
1369                with open(writtenfile, "rb") as f:
1370                    self.assertEqual(fdata.encode(), f.read())
1371
1372                unlink(writtenfile)
1373
1374        unlink(TESTFN2)
1375
1376    def test_extract_with_target(self):
1377        with temp_dir() as extdir:
1378            self._test_extract_with_target(extdir)
1379
1380    def test_extract_with_target_pathlike(self):
1381        with temp_dir() as extdir:
1382            self._test_extract_with_target(pathlib.Path(extdir))
1383
1384    def test_extract_all(self):
1385        with temp_cwd():
1386            self.make_test_file()
1387            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1388                zipfp.extractall()
1389                for fpath, fdata in SMALL_TEST_DATA:
1390                    outfile = os.path.join(os.getcwd(), fpath)
1391
1392                    with open(outfile, "rb") as f:
1393                        self.assertEqual(fdata.encode(), f.read())
1394
1395                    unlink(outfile)
1396
1397    def _test_extract_all_with_target(self, target):
1398        self.make_test_file()
1399        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1400            zipfp.extractall(target)
1401            for fpath, fdata in SMALL_TEST_DATA:
1402                outfile = os.path.join(target, fpath)
1403
1404                with open(outfile, "rb") as f:
1405                    self.assertEqual(fdata.encode(), f.read())
1406
1407                unlink(outfile)
1408
1409        unlink(TESTFN2)
1410
1411    def test_extract_all_with_target(self):
1412        with temp_dir() as extdir:
1413            self._test_extract_all_with_target(extdir)
1414
1415    def test_extract_all_with_target_pathlike(self):
1416        with temp_dir() as extdir:
1417            self._test_extract_all_with_target(pathlib.Path(extdir))
1418
1419    def check_file(self, filename, content):
1420        self.assertTrue(os.path.isfile(filename))
1421        with open(filename, 'rb') as f:
1422            self.assertEqual(f.read(), content)
1423
1424    def test_sanitize_windows_name(self):
1425        san = zipfile.ZipFile._sanitize_windows_name
1426        # Passing pathsep in allows this test to work regardless of platform.
1427        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1428        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1429        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1430
1431    def test_extract_hackers_arcnames_common_cases(self):
1432        common_hacknames = [
1433            ('../foo/bar', 'foo/bar'),
1434            ('foo/../bar', 'foo/bar'),
1435            ('foo/../../bar', 'foo/bar'),
1436            ('foo/bar/..', 'foo/bar'),
1437            ('./../foo/bar', 'foo/bar'),
1438            ('/foo/bar', 'foo/bar'),
1439            ('/foo/../bar', 'foo/bar'),
1440            ('/foo/../../bar', 'foo/bar'),
1441        ]
1442        self._test_extract_hackers_arcnames(common_hacknames)
1443
1444    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1445    def test_extract_hackers_arcnames_windows_only(self):
1446        """Test combination of path fixing and windows name sanitization."""
1447        windows_hacknames = [
1448            (r'..\foo\bar', 'foo/bar'),
1449            (r'..\/foo\/bar', 'foo/bar'),
1450            (r'foo/\..\/bar', 'foo/bar'),
1451            (r'foo\/../\bar', 'foo/bar'),
1452            (r'C:foo/bar', 'foo/bar'),
1453            (r'C:/foo/bar', 'foo/bar'),
1454            (r'C://foo/bar', 'foo/bar'),
1455            (r'C:\foo\bar', 'foo/bar'),
1456            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1457            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1458            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1459            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1460            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1461            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1462            (r'//?/C:/foo/bar', 'foo/bar'),
1463            (r'\\?\C:\foo\bar', 'foo/bar'),
1464            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1465            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1466            ('../../foo../../ba..r', 'foo/ba..r'),
1467        ]
1468        self._test_extract_hackers_arcnames(windows_hacknames)
1469
1470    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1471    def test_extract_hackers_arcnames_posix_only(self):
1472        posix_hacknames = [
1473            ('//foo/bar', 'foo/bar'),
1474            ('../../foo../../ba..r', 'foo../ba..r'),
1475            (r'foo/..\bar', r'foo/..\bar'),
1476        ]
1477        self._test_extract_hackers_arcnames(posix_hacknames)
1478
1479    def _test_extract_hackers_arcnames(self, hacknames):
1480        for arcname, fixedname in hacknames:
1481            content = b'foobar' + arcname.encode()
1482            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1483                zinfo = zipfile.ZipInfo()
1484                # preserve backslashes
1485                zinfo.filename = arcname
1486                zinfo.external_attr = 0o600 << 16
1487                zipfp.writestr(zinfo, content)
1488
1489            arcname = arcname.replace(os.sep, "/")
1490            targetpath = os.path.join('target', 'subdir', 'subsub')
1491            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1492
1493            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1494                writtenfile = zipfp.extract(arcname, targetpath)
1495                self.assertEqual(writtenfile, correctfile,
1496                                 msg='extract %r: %r != %r' %
1497                                 (arcname, writtenfile, correctfile))
1498            self.check_file(correctfile, content)
1499            rmtree('target')
1500
1501            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1502                zipfp.extractall(targetpath)
1503            self.check_file(correctfile, content)
1504            rmtree('target')
1505
1506            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1507
1508            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1509                writtenfile = zipfp.extract(arcname)
1510                self.assertEqual(writtenfile, correctfile,
1511                                 msg="extract %r" % arcname)
1512            self.check_file(correctfile, content)
1513            rmtree(fixedname.split('/')[0])
1514
1515            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1516                zipfp.extractall()
1517            self.check_file(correctfile, content)
1518            rmtree(fixedname.split('/')[0])
1519
1520            unlink(TESTFN2)
1521
1522
1523class OtherTests(unittest.TestCase):
1524    def test_open_via_zip_info(self):
1525        # Create the ZIP archive
1526        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1527            zipfp.writestr("name", "foo")
1528            with self.assertWarns(UserWarning):
1529                zipfp.writestr("name", "bar")
1530            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1531
1532        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1533            infos = zipfp.infolist()
1534            data = b""
1535            for info in infos:
1536                with zipfp.open(info) as zipopen:
1537                    data += zipopen.read()
1538            self.assertIn(data, {b"foobar", b"barfoo"})
1539            data = b""
1540            for info in infos:
1541                data += zipfp.read(info)
1542            self.assertIn(data, {b"foobar", b"barfoo"})
1543
1544    def test_writestr_extended_local_header_issue1202(self):
1545        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1546            for data in 'abcdefghijklmnop':
1547                zinfo = zipfile.ZipInfo(data)
1548                zinfo.flag_bits |= 0x08  # Include an extended local header.
1549                orig_zip.writestr(zinfo, data)
1550
1551    def test_close(self):
1552        """Check that the zipfile is closed after the 'with' block."""
1553        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1554            for fpath, fdata in SMALL_TEST_DATA:
1555                zipfp.writestr(fpath, fdata)
1556                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1557        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1558
1559        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1560            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1561        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1562
1563    def test_close_on_exception(self):
1564        """Check that the zipfile is closed if an exception is raised in the
1565        'with' block."""
1566        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1567            for fpath, fdata in SMALL_TEST_DATA:
1568                zipfp.writestr(fpath, fdata)
1569
1570        try:
1571            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1572                raise zipfile.BadZipFile()
1573        except zipfile.BadZipFile:
1574            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1575
1576    def test_unsupported_version(self):
1577        # File has an extract_version of 120
1578        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1579                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1580                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1581                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1582                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1583
1584        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1585                          io.BytesIO(data), 'r')
1586
1587    @requires_zlib()
1588    def test_read_unicode_filenames(self):
1589        # bug #10801
1590        fname = findfile('zip_cp437_header.zip')
1591        with zipfile.ZipFile(fname) as zipfp:
1592            for name in zipfp.namelist():
1593                zipfp.open(name).close()
1594
1595    def test_write_unicode_filenames(self):
1596        with zipfile.ZipFile(TESTFN, "w") as zf:
1597            zf.writestr("foo.txt", "Test for unicode filename")
1598            zf.writestr("\xf6.txt", "Test for unicode filename")
1599            self.assertIsInstance(zf.infolist()[0].filename, str)
1600
1601        with zipfile.ZipFile(TESTFN, "r") as zf:
1602            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1603            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1604
1605    def test_read_after_write_unicode_filenames(self):
1606        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1607            zipfp.writestr('приклад', b'sample')
1608            self.assertEqual(zipfp.read('приклад'), b'sample')
1609
1610    def test_exclusive_create_zip_file(self):
1611        """Test exclusive creating a new zipfile."""
1612        unlink(TESTFN2)
1613        filename = 'testfile.txt'
1614        content = b'hello, world. this is some content.'
1615        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1616            zipfp.writestr(filename, content)
1617        with self.assertRaises(FileExistsError):
1618            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1619        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1620            self.assertEqual(zipfp.namelist(), [filename])
1621            self.assertEqual(zipfp.read(filename), content)
1622
1623    def test_create_non_existent_file_for_append(self):
1624        if os.path.exists(TESTFN):
1625            os.unlink(TESTFN)
1626
1627        filename = 'testfile.txt'
1628        content = b'hello, world. this is some content.'
1629
1630        try:
1631            with zipfile.ZipFile(TESTFN, 'a') as zf:
1632                zf.writestr(filename, content)
1633        except OSError:
1634            self.fail('Could not append data to a non-existent zip file.')
1635
1636        self.assertTrue(os.path.exists(TESTFN))
1637
1638        with zipfile.ZipFile(TESTFN, 'r') as zf:
1639            self.assertEqual(zf.read(filename), content)
1640
1641    def test_close_erroneous_file(self):
1642        # This test checks that the ZipFile constructor closes the file object
1643        # it opens if there's an error in the file.  If it doesn't, the
1644        # traceback holds a reference to the ZipFile object and, indirectly,
1645        # the file object.
1646        # On Windows, this causes the os.unlink() call to fail because the
1647        # underlying file is still open.  This is SF bug #412214.
1648        #
1649        with open(TESTFN, "w", encoding="utf-8") as fp:
1650            fp.write("this is not a legal zip file\n")
1651        try:
1652            zf = zipfile.ZipFile(TESTFN)
1653        except zipfile.BadZipFile:
1654            pass
1655
1656    def test_is_zip_erroneous_file(self):
1657        """Check that is_zipfile() correctly identifies non-zip files."""
1658        # - passing a filename
1659        with open(TESTFN, "w", encoding='utf-8') as fp:
1660            fp.write("this is not a legal zip file\n")
1661        self.assertFalse(zipfile.is_zipfile(TESTFN))
1662        # - passing a path-like object
1663        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1664        # - passing a file object
1665        with open(TESTFN, "rb") as fp:
1666            self.assertFalse(zipfile.is_zipfile(fp))
1667        # - passing a file-like object
1668        fp = io.BytesIO()
1669        fp.write(b"this is not a legal zip file\n")
1670        self.assertFalse(zipfile.is_zipfile(fp))
1671        fp.seek(0, 0)
1672        self.assertFalse(zipfile.is_zipfile(fp))
1673
1674    def test_damaged_zipfile(self):
1675        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1676        # - Create a valid zip file
1677        fp = io.BytesIO()
1678        with zipfile.ZipFile(fp, mode="w") as zipf:
1679            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1680        zipfiledata = fp.getvalue()
1681
1682        # - Now create copies of it missing the last N bytes and make sure
1683        #   a BadZipFile exception is raised when we try to open it
1684        for N in range(len(zipfiledata)):
1685            fp = io.BytesIO(zipfiledata[:N])
1686            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1687
1688    def test_is_zip_valid_file(self):
1689        """Check that is_zipfile() correctly identifies zip files."""
1690        # - passing a filename
1691        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1692            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1693
1694        self.assertTrue(zipfile.is_zipfile(TESTFN))
1695        # - passing a file object
1696        with open(TESTFN, "rb") as fp:
1697            self.assertTrue(zipfile.is_zipfile(fp))
1698            fp.seek(0, 0)
1699            zip_contents = fp.read()
1700        # - passing a file-like object
1701        fp = io.BytesIO()
1702        fp.write(zip_contents)
1703        self.assertTrue(zipfile.is_zipfile(fp))
1704        fp.seek(0, 0)
1705        self.assertTrue(zipfile.is_zipfile(fp))
1706
1707    def test_non_existent_file_raises_OSError(self):
1708        # make sure we don't raise an AttributeError when a partially-constructed
1709        # ZipFile instance is finalized; this tests for regression on SF tracker
1710        # bug #403871.
1711
1712        # The bug we're testing for caused an AttributeError to be raised
1713        # when a ZipFile instance was created for a file that did not
1714        # exist; the .fp member was not initialized but was needed by the
1715        # __del__() method.  Since the AttributeError is in the __del__(),
1716        # it is ignored, but the user should be sufficiently annoyed by
1717        # the message on the output that regression will be noticed
1718        # quickly.
1719        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1720
1721    def test_empty_file_raises_BadZipFile(self):
1722        f = open(TESTFN, 'w', encoding='utf-8')
1723        f.close()
1724        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1725
1726        with open(TESTFN, 'w', encoding='utf-8') as fp:
1727            fp.write("short file")
1728        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1729
1730    def test_closed_zip_raises_ValueError(self):
1731        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1732        data = io.BytesIO()
1733        with zipfile.ZipFile(data, mode="w") as zipf:
1734            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1735
1736        # This is correct; calling .read on a closed ZipFile should raise
1737        # a ValueError, and so should calling .testzip.  An earlier
1738        # version of .testzip would swallow this exception (and any other)
1739        # and report that the first file in the archive was corrupt.
1740        self.assertRaises(ValueError, zipf.read, "foo.txt")
1741        self.assertRaises(ValueError, zipf.open, "foo.txt")
1742        self.assertRaises(ValueError, zipf.testzip)
1743        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1744        with open(TESTFN, 'w', encoding='utf-8') as f:
1745            f.write('zipfile test data')
1746        self.assertRaises(ValueError, zipf.write, TESTFN)
1747
1748    def test_bad_constructor_mode(self):
1749        """Check that bad modes passed to ZipFile constructor are caught."""
1750        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1751
1752    def test_bad_open_mode(self):
1753        """Check that bad modes passed to ZipFile.open are caught."""
1754        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1755            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1756
1757        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1758            # read the data to make sure the file is there
1759            zipf.read("foo.txt")
1760            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1761            # universal newlines support is removed
1762            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1763            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1764
1765    def test_read0(self):
1766        """Check that calling read(0) on a ZipExtFile object returns an empty
1767        string and doesn't advance file pointer."""
1768        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1769            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1770            # read the data to make sure the file is there
1771            with zipf.open("foo.txt") as f:
1772                for i in range(FIXEDTEST_SIZE):
1773                    self.assertEqual(f.read(0), b'')
1774
1775                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1776
1777    def test_open_non_existent_item(self):
1778        """Check that attempting to call open() for an item that doesn't
1779        exist in the archive raises a RuntimeError."""
1780        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1781            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1782
1783    def test_bad_compression_mode(self):
1784        """Check that bad compression methods passed to ZipFile.open are
1785        caught."""
1786        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1787
1788    def test_unsupported_compression(self):
1789        # data is declared as shrunk, but actually deflated
1790        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1791                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1792                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1793                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1794                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1795                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1796        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1797            self.assertRaises(NotImplementedError, zipf.open, 'x')
1798
1799    def test_null_byte_in_filename(self):
1800        """Check that a filename containing a null byte is properly
1801        terminated."""
1802        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1803            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1804            self.assertEqual(zipf.namelist(), ['foo.txt'])
1805
1806    def test_struct_sizes(self):
1807        """Check that ZIP internal structure sizes are calculated correctly."""
1808        self.assertEqual(zipfile.sizeEndCentDir, 22)
1809        self.assertEqual(zipfile.sizeCentralDir, 46)
1810        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1811        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1812
1813    def test_comments(self):
1814        """Check that comments on the archive are handled properly."""
1815
1816        # check default comment is empty
1817        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1818            self.assertEqual(zipf.comment, b'')
1819            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1820
1821        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1822            self.assertEqual(zipfr.comment, b'')
1823
1824        # check a simple short comment
1825        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1826        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1827            zipf.comment = comment
1828            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1829        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1830            self.assertEqual(zipf.comment, comment)
1831
1832        # check a comment of max length
1833        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1834        comment2 = comment2.encode("ascii")
1835        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1836            zipf.comment = comment2
1837            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1838
1839        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1840            self.assertEqual(zipfr.comment, comment2)
1841
1842        # check a comment that is too long is truncated
1843        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1844            with self.assertWarns(UserWarning):
1845                zipf.comment = comment2 + b'oops'
1846            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1847        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1848            self.assertEqual(zipfr.comment, comment2)
1849
1850        # check that comments are correctly modified in append mode
1851        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1852            zipf.comment = b"original comment"
1853            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1854        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1855            zipf.comment = b"an updated comment"
1856        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1857            self.assertEqual(zipf.comment, b"an updated comment")
1858
1859        # check that comments are correctly shortened in append mode
1860        # and the file is indeed truncated
1861        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1862            zipf.comment = b"original comment that's longer"
1863            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1864        original_zip_size = os.path.getsize(TESTFN)
1865        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1866            zipf.comment = b"shorter comment"
1867        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1868        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1869            self.assertEqual(zipf.comment, b"shorter comment")
1870
1871    def test_unicode_comment(self):
1872        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1873            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1874            with self.assertRaises(TypeError):
1875                zipf.comment = "this is an error"
1876
1877    def test_change_comment_in_empty_archive(self):
1878        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1879            self.assertFalse(zipf.filelist)
1880            zipf.comment = b"this is a comment"
1881        with zipfile.ZipFile(TESTFN, "r") as zipf:
1882            self.assertEqual(zipf.comment, b"this is a comment")
1883
1884    def test_change_comment_in_nonempty_archive(self):
1885        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1886            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1887        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1888            self.assertTrue(zipf.filelist)
1889            zipf.comment = b"this is a comment"
1890        with zipfile.ZipFile(TESTFN, "r") as zipf:
1891            self.assertEqual(zipf.comment, b"this is a comment")
1892
1893    def test_empty_zipfile(self):
1894        # Check that creating a file in 'w' or 'a' mode and closing without
1895        # adding any files to the archives creates a valid empty ZIP file
1896        zipf = zipfile.ZipFile(TESTFN, mode="w")
1897        zipf.close()
1898        try:
1899            zipf = zipfile.ZipFile(TESTFN, mode="r")
1900        except zipfile.BadZipFile:
1901            self.fail("Unable to create empty ZIP file in 'w' mode")
1902
1903        zipf = zipfile.ZipFile(TESTFN, mode="a")
1904        zipf.close()
1905        try:
1906            zipf = zipfile.ZipFile(TESTFN, mode="r")
1907        except:
1908            self.fail("Unable to create empty ZIP file in 'a' mode")
1909
1910    def test_open_empty_file(self):
1911        # Issue 1710703: Check that opening a file with less than 22 bytes
1912        # raises a BadZipFile exception (rather than the previously unhelpful
1913        # OSError)
1914        f = open(TESTFN, 'w', encoding='utf-8')
1915        f.close()
1916        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1917
1918    def test_create_zipinfo_before_1980(self):
1919        self.assertRaises(ValueError,
1920                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1921
1922    def test_create_empty_zipinfo_repr(self):
1923        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
1924        zi = zipfile.ZipInfo(filename="empty")
1925        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
1926
1927    def test_create_empty_zipinfo_default_attributes(self):
1928        """Ensure all required attributes are set."""
1929        zi = zipfile.ZipInfo()
1930        self.assertEqual(zi.orig_filename, "NoName")
1931        self.assertEqual(zi.filename, "NoName")
1932        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
1933        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
1934        self.assertEqual(zi.comment, b"")
1935        self.assertEqual(zi.extra, b"")
1936        self.assertIn(zi.create_system, (0, 3))
1937        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
1938        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
1939        self.assertEqual(zi.reserved, 0)
1940        self.assertEqual(zi.flag_bits, 0)
1941        self.assertEqual(zi.volume, 0)
1942        self.assertEqual(zi.internal_attr, 0)
1943        self.assertEqual(zi.external_attr, 0)
1944
1945        # Before bpo-26185, both were missing
1946        self.assertEqual(zi.file_size, 0)
1947        self.assertEqual(zi.compress_size, 0)
1948
1949    def test_zipfile_with_short_extra_field(self):
1950        """If an extra field in the header is less than 4 bytes, skip it."""
1951        zipdata = (
1952            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1953            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1954            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1955            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1956            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1957            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1958            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1959        )
1960        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1961            # testzip returns the name of the first corrupt file, or None
1962            self.assertIsNone(zipf.testzip())
1963
1964    def test_open_conflicting_handles(self):
1965        # It's only possible to open one writable file handle at a time
1966        msg1 = b"It's fun to charter an accountant!"
1967        msg2 = b"And sail the wide accountant sea"
1968        msg3 = b"To find, explore the funds offshore"
1969        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1970            with zipf.open('foo', mode='w') as w2:
1971                w2.write(msg1)
1972            with zipf.open('bar', mode='w') as w1:
1973                with self.assertRaises(ValueError):
1974                    zipf.open('handle', mode='w')
1975                with self.assertRaises(ValueError):
1976                    zipf.open('foo', mode='r')
1977                with self.assertRaises(ValueError):
1978                    zipf.writestr('str', 'abcde')
1979                with self.assertRaises(ValueError):
1980                    zipf.write(__file__, 'file')
1981                with self.assertRaises(ValueError):
1982                    zipf.close()
1983                w1.write(msg2)
1984            with zipf.open('baz', mode='w') as w2:
1985                w2.write(msg3)
1986
1987        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1988            self.assertEqual(zipf.read('foo'), msg1)
1989            self.assertEqual(zipf.read('bar'), msg2)
1990            self.assertEqual(zipf.read('baz'), msg3)
1991            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1992
1993    def test_seek_tell(self):
1994        # Test seek functionality
1995        txt = b"Where's Bruce?"
1996        bloc = txt.find(b"Bruce")
1997        # Check seek on a file
1998        with zipfile.ZipFile(TESTFN, "w") as zipf:
1999            zipf.writestr("foo.txt", txt)
2000        with zipfile.ZipFile(TESTFN, "r") as zipf:
2001            with zipf.open("foo.txt", "r") as fp:
2002                fp.seek(bloc, os.SEEK_SET)
2003                self.assertEqual(fp.tell(), bloc)
2004                fp.seek(-bloc, os.SEEK_CUR)
2005                self.assertEqual(fp.tell(), 0)
2006                fp.seek(bloc, os.SEEK_CUR)
2007                self.assertEqual(fp.tell(), bloc)
2008                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2009                fp.seek(0, os.SEEK_END)
2010                self.assertEqual(fp.tell(), len(txt))
2011                fp.seek(0, os.SEEK_SET)
2012                self.assertEqual(fp.tell(), 0)
2013        # Check seek on memory file
2014        data = io.BytesIO()
2015        with zipfile.ZipFile(data, mode="w") as zipf:
2016            zipf.writestr("foo.txt", txt)
2017        with zipfile.ZipFile(data, mode="r") as zipf:
2018            with zipf.open("foo.txt", "r") as fp:
2019                fp.seek(bloc, os.SEEK_SET)
2020                self.assertEqual(fp.tell(), bloc)
2021                fp.seek(-bloc, os.SEEK_CUR)
2022                self.assertEqual(fp.tell(), 0)
2023                fp.seek(bloc, os.SEEK_CUR)
2024                self.assertEqual(fp.tell(), bloc)
2025                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2026                fp.seek(0, os.SEEK_END)
2027                self.assertEqual(fp.tell(), len(txt))
2028                fp.seek(0, os.SEEK_SET)
2029                self.assertEqual(fp.tell(), 0)
2030
2031    @requires_bz2()
2032    def test_decompress_without_3rd_party_library(self):
2033        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2034        zip_file = io.BytesIO(data)
2035        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2036            zf.writestr('a.txt', b'a')
2037        with mock.patch('zipfile.bz2', None):
2038            with zipfile.ZipFile(zip_file) as zf:
2039                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2040
2041    def tearDown(self):
2042        unlink(TESTFN)
2043        unlink(TESTFN2)
2044
2045
2046class AbstractBadCrcTests:
2047    def test_testzip_with_bad_crc(self):
2048        """Tests that files with bad CRCs return their name from testzip."""
2049        zipdata = self.zip_with_bad_crc
2050
2051        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2052            # testzip returns the name of the first corrupt file, or None
2053            self.assertEqual('afile', zipf.testzip())
2054
2055    def test_read_with_bad_crc(self):
2056        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2057        zipdata = self.zip_with_bad_crc
2058
2059        # Using ZipFile.read()
2060        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2061            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2062
2063        # Using ZipExtFile.read()
2064        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2065            with zipf.open('afile', 'r') as corrupt_file:
2066                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2067
2068        # Same with small reads (in order to exercise the buffering logic)
2069        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2070            with zipf.open('afile', 'r') as corrupt_file:
2071                corrupt_file.MIN_READ_SIZE = 2
2072                with self.assertRaises(zipfile.BadZipFile):
2073                    while corrupt_file.read(2):
2074                        pass
2075
2076
2077class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2078    compression = zipfile.ZIP_STORED
2079    zip_with_bad_crc = (
2080        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2081        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2082        b'ilehello,AworldP'
2083        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2084        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2085        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2086        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2087        b'\0\0/\0\0\0\0\0')
2088
2089@requires_zlib()
2090class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2091    compression = zipfile.ZIP_DEFLATED
2092    zip_with_bad_crc = (
2093        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2094        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2095        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2096        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2097        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2098        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2099        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2100        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2101
2102@requires_bz2()
2103class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2104    compression = zipfile.ZIP_BZIP2
2105    zip_with_bad_crc = (
2106        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2107        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2108        b'ileBZh91AY&SY\xd4\xa8\xca'
2109        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2110        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2111        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2112        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2113        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2114        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2115        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2116        b'\x00\x00\x00\x00')
2117
2118@requires_lzma()
2119class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2120    compression = zipfile.ZIP_LZMA
2121    zip_with_bad_crc = (
2122        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2123        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2124        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2125        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2126        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2127        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2128        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2129        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2130        b'\x00>\x00\x00\x00\x00\x00')
2131
2132
2133class DecryptionTests(unittest.TestCase):
2134    """Check that ZIP decryption works. Since the library does not
2135    support encryption at the moment, we use a pre-generated encrypted
2136    ZIP file."""
2137
2138    data = (
2139        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2140        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2141        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2142        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2143        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2144        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2145        b'\x00\x00L\x00\x00\x00\x00\x00' )
2146    data2 = (
2147        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2148        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2149        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2150        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2151        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2152        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2153        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2154        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2155
2156    plain = b'zipfile.py encryption test'
2157    plain2 = b'\x00'*512
2158
2159    def setUp(self):
2160        with open(TESTFN, "wb") as fp:
2161            fp.write(self.data)
2162        self.zip = zipfile.ZipFile(TESTFN, "r")
2163        with open(TESTFN2, "wb") as fp:
2164            fp.write(self.data2)
2165        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2166
2167    def tearDown(self):
2168        self.zip.close()
2169        os.unlink(TESTFN)
2170        self.zip2.close()
2171        os.unlink(TESTFN2)
2172
2173    def test_no_password(self):
2174        # Reading the encrypted file without password
2175        # must generate a RunTime exception
2176        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2177        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2178
2179    def test_bad_password(self):
2180        self.zip.setpassword(b"perl")
2181        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2182        self.zip2.setpassword(b"perl")
2183        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2184
2185    @requires_zlib()
2186    def test_good_password(self):
2187        self.zip.setpassword(b"python")
2188        self.assertEqual(self.zip.read("test.txt"), self.plain)
2189        self.zip2.setpassword(b"12345")
2190        self.assertEqual(self.zip2.read("zero"), self.plain2)
2191
2192    def test_unicode_password(self):
2193        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2194        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2195        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2196        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2197
2198    def test_seek_tell(self):
2199        self.zip.setpassword(b"python")
2200        txt = self.plain
2201        test_word = b'encryption'
2202        bloc = txt.find(test_word)
2203        bloc_len = len(test_word)
2204        with self.zip.open("test.txt", "r") as fp:
2205            fp.seek(bloc, os.SEEK_SET)
2206            self.assertEqual(fp.tell(), bloc)
2207            fp.seek(-bloc, os.SEEK_CUR)
2208            self.assertEqual(fp.tell(), 0)
2209            fp.seek(bloc, os.SEEK_CUR)
2210            self.assertEqual(fp.tell(), bloc)
2211            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2212
2213            # Make sure that the second read after seeking back beyond
2214            # _readbuffer returns the same content (ie. rewind to the start of
2215            # the file to read forward to the required position).
2216            old_read_size = fp.MIN_READ_SIZE
2217            fp.MIN_READ_SIZE = 1
2218            fp._readbuffer = b''
2219            fp._offset = 0
2220            fp.seek(0, os.SEEK_SET)
2221            self.assertEqual(fp.tell(), 0)
2222            fp.seek(bloc, os.SEEK_CUR)
2223            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2224            fp.MIN_READ_SIZE = old_read_size
2225
2226            fp.seek(0, os.SEEK_END)
2227            self.assertEqual(fp.tell(), len(txt))
2228            fp.seek(0, os.SEEK_SET)
2229            self.assertEqual(fp.tell(), 0)
2230
2231            # Read the file completely to definitely call any eof integrity
2232            # checks (crc) and make sure they still pass.
2233            fp.read()
2234
2235
2236class AbstractTestsWithRandomBinaryFiles:
2237    @classmethod
2238    def setUpClass(cls):
2239        datacount = randint(16, 64)*1024 + randint(1, 1024)
2240        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2241                            for i in range(datacount))
2242
2243    def setUp(self):
2244        # Make a source file with some lines
2245        with open(TESTFN, "wb") as fp:
2246            fp.write(self.data)
2247
2248    def tearDown(self):
2249        unlink(TESTFN)
2250        unlink(TESTFN2)
2251
2252    def make_test_archive(self, f, compression):
2253        # Create the ZIP archive
2254        with zipfile.ZipFile(f, "w", compression) as zipfp:
2255            zipfp.write(TESTFN, "another.name")
2256            zipfp.write(TESTFN, TESTFN)
2257
2258    def zip_test(self, f, compression):
2259        self.make_test_archive(f, compression)
2260
2261        # Read the ZIP archive
2262        with zipfile.ZipFile(f, "r", compression) as zipfp:
2263            testdata = zipfp.read(TESTFN)
2264            self.assertEqual(len(testdata), len(self.data))
2265            self.assertEqual(testdata, self.data)
2266            self.assertEqual(zipfp.read("another.name"), self.data)
2267
2268    def test_read(self):
2269        for f in get_files(self):
2270            self.zip_test(f, self.compression)
2271
2272    def zip_open_test(self, f, compression):
2273        self.make_test_archive(f, compression)
2274
2275        # Read the ZIP archive
2276        with zipfile.ZipFile(f, "r", compression) as zipfp:
2277            zipdata1 = []
2278            with zipfp.open(TESTFN) as zipopen1:
2279                while True:
2280                    read_data = zipopen1.read(256)
2281                    if not read_data:
2282                        break
2283                    zipdata1.append(read_data)
2284
2285            zipdata2 = []
2286            with zipfp.open("another.name") as zipopen2:
2287                while True:
2288                    read_data = zipopen2.read(256)
2289                    if not read_data:
2290                        break
2291                    zipdata2.append(read_data)
2292
2293            testdata1 = b''.join(zipdata1)
2294            self.assertEqual(len(testdata1), len(self.data))
2295            self.assertEqual(testdata1, self.data)
2296
2297            testdata2 = b''.join(zipdata2)
2298            self.assertEqual(len(testdata2), len(self.data))
2299            self.assertEqual(testdata2, self.data)
2300
2301    def test_open(self):
2302        for f in get_files(self):
2303            self.zip_open_test(f, self.compression)
2304
2305    def zip_random_open_test(self, f, compression):
2306        self.make_test_archive(f, compression)
2307
2308        # Read the ZIP archive
2309        with zipfile.ZipFile(f, "r", compression) as zipfp:
2310            zipdata1 = []
2311            with zipfp.open(TESTFN) as zipopen1:
2312                while True:
2313                    read_data = zipopen1.read(randint(1, 1024))
2314                    if not read_data:
2315                        break
2316                    zipdata1.append(read_data)
2317
2318            testdata = b''.join(zipdata1)
2319            self.assertEqual(len(testdata), len(self.data))
2320            self.assertEqual(testdata, self.data)
2321
2322    def test_random_open(self):
2323        for f in get_files(self):
2324            self.zip_random_open_test(f, self.compression)
2325
2326
2327class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2328                                       unittest.TestCase):
2329    compression = zipfile.ZIP_STORED
2330
2331@requires_zlib()
2332class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2333                                        unittest.TestCase):
2334    compression = zipfile.ZIP_DEFLATED
2335
2336@requires_bz2()
2337class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2338                                      unittest.TestCase):
2339    compression = zipfile.ZIP_BZIP2
2340
2341@requires_lzma()
2342class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2343                                     unittest.TestCase):
2344    compression = zipfile.ZIP_LZMA
2345
2346
2347# Provide the tell() method but not seek()
2348class Tellable:
2349    def __init__(self, fp):
2350        self.fp = fp
2351        self.offset = 0
2352
2353    def write(self, data):
2354        n = self.fp.write(data)
2355        self.offset += n
2356        return n
2357
2358    def tell(self):
2359        return self.offset
2360
2361    def flush(self):
2362        self.fp.flush()
2363
2364class Unseekable:
2365    def __init__(self, fp):
2366        self.fp = fp
2367
2368    def write(self, data):
2369        return self.fp.write(data)
2370
2371    def flush(self):
2372        self.fp.flush()
2373
2374class UnseekableTests(unittest.TestCase):
2375    def test_writestr(self):
2376        for wrapper in (lambda f: f), Tellable, Unseekable:
2377            with self.subTest(wrapper=wrapper):
2378                f = io.BytesIO()
2379                f.write(b'abc')
2380                bf = io.BufferedWriter(f)
2381                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2382                    zipfp.writestr('ones', b'111')
2383                    zipfp.writestr('twos', b'222')
2384                self.assertEqual(f.getvalue()[:5], b'abcPK')
2385                with zipfile.ZipFile(f, mode='r') as zipf:
2386                    with zipf.open('ones') as zopen:
2387                        self.assertEqual(zopen.read(), b'111')
2388                    with zipf.open('twos') as zopen:
2389                        self.assertEqual(zopen.read(), b'222')
2390
2391    def test_write(self):
2392        for wrapper in (lambda f: f), Tellable, Unseekable:
2393            with self.subTest(wrapper=wrapper):
2394                f = io.BytesIO()
2395                f.write(b'abc')
2396                bf = io.BufferedWriter(f)
2397                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2398                    self.addCleanup(unlink, TESTFN)
2399                    with open(TESTFN, 'wb') as f2:
2400                        f2.write(b'111')
2401                    zipfp.write(TESTFN, 'ones')
2402                    with open(TESTFN, 'wb') as f2:
2403                        f2.write(b'222')
2404                    zipfp.write(TESTFN, 'twos')
2405                self.assertEqual(f.getvalue()[:5], b'abcPK')
2406                with zipfile.ZipFile(f, mode='r') as zipf:
2407                    with zipf.open('ones') as zopen:
2408                        self.assertEqual(zopen.read(), b'111')
2409                    with zipf.open('twos') as zopen:
2410                        self.assertEqual(zopen.read(), b'222')
2411
2412    def test_open_write(self):
2413        for wrapper in (lambda f: f), Tellable, Unseekable:
2414            with self.subTest(wrapper=wrapper):
2415                f = io.BytesIO()
2416                f.write(b'abc')
2417                bf = io.BufferedWriter(f)
2418                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2419                    with zipf.open('ones', 'w') as zopen:
2420                        zopen.write(b'111')
2421                    with zipf.open('twos', 'w') as zopen:
2422                        zopen.write(b'222')
2423                self.assertEqual(f.getvalue()[:5], b'abcPK')
2424                with zipfile.ZipFile(f) as zipf:
2425                    self.assertEqual(zipf.read('ones'), b'111')
2426                    self.assertEqual(zipf.read('twos'), b'222')
2427
2428
2429@requires_zlib()
2430class TestsWithMultipleOpens(unittest.TestCase):
2431    @classmethod
2432    def setUpClass(cls):
2433        cls.data1 = b'111' + randbytes(10000)
2434        cls.data2 = b'222' + randbytes(10000)
2435
2436    def make_test_archive(self, f):
2437        # Create the ZIP archive
2438        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2439            zipfp.writestr('ones', self.data1)
2440            zipfp.writestr('twos', self.data2)
2441
2442    def test_same_file(self):
2443        # Verify that (when the ZipFile is in control of creating file objects)
2444        # multiple open() calls can be made without interfering with each other.
2445        for f in get_files(self):
2446            self.make_test_archive(f)
2447            with zipfile.ZipFile(f, mode="r") as zipf:
2448                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2449                    data1 = zopen1.read(500)
2450                    data2 = zopen2.read(500)
2451                    data1 += zopen1.read()
2452                    data2 += zopen2.read()
2453                self.assertEqual(data1, data2)
2454                self.assertEqual(data1, self.data1)
2455
2456    def test_different_file(self):
2457        # Verify that (when the ZipFile is in control of creating file objects)
2458        # multiple open() calls can be made without interfering with each other.
2459        for f in get_files(self):
2460            self.make_test_archive(f)
2461            with zipfile.ZipFile(f, mode="r") as zipf:
2462                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2463                    data1 = zopen1.read(500)
2464                    data2 = zopen2.read(500)
2465                    data1 += zopen1.read()
2466                    data2 += zopen2.read()
2467                self.assertEqual(data1, self.data1)
2468                self.assertEqual(data2, self.data2)
2469
2470    def test_interleaved(self):
2471        # Verify that (when the ZipFile is in control of creating file objects)
2472        # multiple open() calls can be made without interfering with each other.
2473        for f in get_files(self):
2474            self.make_test_archive(f)
2475            with zipfile.ZipFile(f, mode="r") as zipf:
2476                with zipf.open('ones') as zopen1:
2477                    data1 = zopen1.read(500)
2478                    with zipf.open('twos') as zopen2:
2479                        data2 = zopen2.read(500)
2480                        data1 += zopen1.read()
2481                        data2 += zopen2.read()
2482                self.assertEqual(data1, self.data1)
2483                self.assertEqual(data2, self.data2)
2484
2485    def test_read_after_close(self):
2486        for f in get_files(self):
2487            self.make_test_archive(f)
2488            with contextlib.ExitStack() as stack:
2489                with zipfile.ZipFile(f, 'r') as zipf:
2490                    zopen1 = stack.enter_context(zipf.open('ones'))
2491                    zopen2 = stack.enter_context(zipf.open('twos'))
2492                data1 = zopen1.read(500)
2493                data2 = zopen2.read(500)
2494                data1 += zopen1.read()
2495                data2 += zopen2.read()
2496            self.assertEqual(data1, self.data1)
2497            self.assertEqual(data2, self.data2)
2498
2499    def test_read_after_write(self):
2500        for f in get_files(self):
2501            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2502                zipf.writestr('ones', self.data1)
2503                zipf.writestr('twos', self.data2)
2504                with zipf.open('ones') as zopen1:
2505                    data1 = zopen1.read(500)
2506            self.assertEqual(data1, self.data1[:500])
2507            with zipfile.ZipFile(f, 'r') as zipf:
2508                data1 = zipf.read('ones')
2509                data2 = zipf.read('twos')
2510            self.assertEqual(data1, self.data1)
2511            self.assertEqual(data2, self.data2)
2512
2513    def test_write_after_read(self):
2514        for f in get_files(self):
2515            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2516                zipf.writestr('ones', self.data1)
2517                with zipf.open('ones') as zopen1:
2518                    zopen1.read(500)
2519                    zipf.writestr('twos', self.data2)
2520            with zipfile.ZipFile(f, 'r') as zipf:
2521                data1 = zipf.read('ones')
2522                data2 = zipf.read('twos')
2523            self.assertEqual(data1, self.data1)
2524            self.assertEqual(data2, self.data2)
2525
2526    def test_many_opens(self):
2527        # Verify that read() and open() promptly close the file descriptor,
2528        # and don't rely on the garbage collector to free resources.
2529        self.make_test_archive(TESTFN2)
2530        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2531            for x in range(100):
2532                zipf.read('ones')
2533                with zipf.open('ones') as zopen1:
2534                    pass
2535        with open(os.devnull, "rb") as f:
2536            self.assertLess(f.fileno(), 100)
2537
2538    def test_write_while_reading(self):
2539        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2540            zipf.writestr('ones', self.data1)
2541        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2542            with zipf.open('ones', 'r') as r1:
2543                data1 = r1.read(500)
2544                with zipf.open('twos', 'w') as w1:
2545                    w1.write(self.data2)
2546                data1 += r1.read()
2547        self.assertEqual(data1, self.data1)
2548        with zipfile.ZipFile(TESTFN2) as zipf:
2549            self.assertEqual(zipf.read('twos'), self.data2)
2550
2551    def tearDown(self):
2552        unlink(TESTFN2)
2553
2554
2555class TestWithDirectory(unittest.TestCase):
2556    def setUp(self):
2557        os.mkdir(TESTFN2)
2558
2559    def test_extract_dir(self):
2560        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2561            zipf.extractall(TESTFN2)
2562        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2563        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2564        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2565
2566    def test_bug_6050(self):
2567        # Extraction should succeed if directories already exist
2568        os.mkdir(os.path.join(TESTFN2, "a"))
2569        self.test_extract_dir()
2570
2571    def test_write_dir(self):
2572        dirpath = os.path.join(TESTFN2, "x")
2573        os.mkdir(dirpath)
2574        mode = os.stat(dirpath).st_mode & 0xFFFF
2575        with zipfile.ZipFile(TESTFN, "w") as zipf:
2576            zipf.write(dirpath)
2577            zinfo = zipf.filelist[0]
2578            self.assertTrue(zinfo.filename.endswith("/x/"))
2579            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2580            zipf.write(dirpath, "y")
2581            zinfo = zipf.filelist[1]
2582            self.assertTrue(zinfo.filename, "y/")
2583            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2584        with zipfile.ZipFile(TESTFN, "r") as zipf:
2585            zinfo = zipf.filelist[0]
2586            self.assertTrue(zinfo.filename.endswith("/x/"))
2587            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2588            zinfo = zipf.filelist[1]
2589            self.assertTrue(zinfo.filename, "y/")
2590            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2591            target = os.path.join(TESTFN2, "target")
2592            os.mkdir(target)
2593            zipf.extractall(target)
2594            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2595            self.assertEqual(len(os.listdir(target)), 2)
2596
2597    def test_writestr_dir(self):
2598        os.mkdir(os.path.join(TESTFN2, "x"))
2599        with zipfile.ZipFile(TESTFN, "w") as zipf:
2600            zipf.writestr("x/", b'')
2601            zinfo = zipf.filelist[0]
2602            self.assertEqual(zinfo.filename, "x/")
2603            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2604        with zipfile.ZipFile(TESTFN, "r") as zipf:
2605            zinfo = zipf.filelist[0]
2606            self.assertTrue(zinfo.filename.endswith("x/"))
2607            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2608            target = os.path.join(TESTFN2, "target")
2609            os.mkdir(target)
2610            zipf.extractall(target)
2611            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2612            self.assertEqual(os.listdir(target), ["x"])
2613
2614    def tearDown(self):
2615        rmtree(TESTFN2)
2616        if os.path.exists(TESTFN):
2617            unlink(TESTFN)
2618
2619
2620class ZipInfoTests(unittest.TestCase):
2621    def test_from_file(self):
2622        zi = zipfile.ZipInfo.from_file(__file__)
2623        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2624        self.assertFalse(zi.is_dir())
2625        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2626
2627    def test_from_file_pathlike(self):
2628        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2629        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2630        self.assertFalse(zi.is_dir())
2631        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2632
2633    def test_from_file_bytes(self):
2634        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2635        self.assertEqual(posixpath.basename(zi.filename), 'test')
2636        self.assertFalse(zi.is_dir())
2637        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2638
2639    def test_from_file_fileno(self):
2640        with open(__file__, 'rb') as f:
2641            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2642            self.assertEqual(posixpath.basename(zi.filename), 'test')
2643            self.assertFalse(zi.is_dir())
2644            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2645
2646    def test_from_dir(self):
2647        dirpath = os.path.dirname(os.path.abspath(__file__))
2648        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2649        self.assertEqual(zi.filename, 'stdlib_tests/')
2650        self.assertTrue(zi.is_dir())
2651        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2652        self.assertEqual(zi.file_size, 0)
2653
2654
2655class CommandLineTest(unittest.TestCase):
2656
2657    def zipfilecmd(self, *args, **kwargs):
2658        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2659                                                      **kwargs)
2660        return out.replace(os.linesep.encode(), b'\n')
2661
2662    def zipfilecmd_failure(self, *args):
2663        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2664
2665    def test_bad_use(self):
2666        rc, out, err = self.zipfilecmd_failure()
2667        self.assertEqual(out, b'')
2668        self.assertIn(b'usage', err.lower())
2669        self.assertIn(b'error', err.lower())
2670        self.assertIn(b'required', err.lower())
2671        rc, out, err = self.zipfilecmd_failure('-l', '')
2672        self.assertEqual(out, b'')
2673        self.assertNotEqual(err.strip(), b'')
2674
2675    def test_test_command(self):
2676        zip_name = findfile('zipdir.zip')
2677        for opt in '-t', '--test':
2678            out = self.zipfilecmd(opt, zip_name)
2679            self.assertEqual(out.rstrip(), b'Done testing')
2680        zip_name = findfile('testtar.tar')
2681        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2682        self.assertEqual(out, b'')
2683
2684    def test_list_command(self):
2685        zip_name = findfile('zipdir.zip')
2686        t = io.StringIO()
2687        with zipfile.ZipFile(zip_name, 'r') as tf:
2688            tf.printdir(t)
2689        expected = t.getvalue().encode('ascii', 'backslashreplace')
2690        for opt in '-l', '--list':
2691            out = self.zipfilecmd(opt, zip_name,
2692                                  PYTHONIOENCODING='ascii:backslashreplace')
2693            self.assertEqual(out, expected)
2694
2695    @requires_zlib()
2696    def test_create_command(self):
2697        self.addCleanup(unlink, TESTFN)
2698        with open(TESTFN, 'w', encoding='utf-8') as f:
2699            f.write('test 1')
2700        os.mkdir(TESTFNDIR)
2701        self.addCleanup(rmtree, TESTFNDIR)
2702        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
2703            f.write('test 2')
2704        files = [TESTFN, TESTFNDIR]
2705        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2706        for opt in '-c', '--create':
2707            try:
2708                out = self.zipfilecmd(opt, TESTFN2, *files)
2709                self.assertEqual(out, b'')
2710                with zipfile.ZipFile(TESTFN2) as zf:
2711                    self.assertEqual(zf.namelist(), namelist)
2712                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2713                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2714            finally:
2715                unlink(TESTFN2)
2716
2717    def test_extract_command(self):
2718        zip_name = findfile('zipdir.zip')
2719        for opt in '-e', '--extract':
2720            with temp_dir() as extdir:
2721                out = self.zipfilecmd(opt, zip_name, extdir)
2722                self.assertEqual(out, b'')
2723                with zipfile.ZipFile(zip_name) as zf:
2724                    for zi in zf.infolist():
2725                        path = os.path.join(extdir,
2726                                    zi.filename.replace('/', os.sep))
2727                        if zi.is_dir():
2728                            self.assertTrue(os.path.isdir(path))
2729                        else:
2730                            self.assertTrue(os.path.isfile(path))
2731                            with open(path, 'rb') as f:
2732                                self.assertEqual(f.read(), zf.read(zi))
2733
2734
2735class TestExecutablePrependedZip(unittest.TestCase):
2736    """Test our ability to open zip files with an executable prepended."""
2737
2738    def setUp(self):
2739        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2740        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2741
2742    def _test_zip_works(self, name):
2743        # bpo28494 sanity check: ensure is_zipfile works on these.
2744        self.assertTrue(zipfile.is_zipfile(name),
2745                        f'is_zipfile failed on {name}')
2746        # Ensure we can operate on these via ZipFile.
2747        with zipfile.ZipFile(name) as zipfp:
2748            for n in zipfp.namelist():
2749                data = zipfp.read(n)
2750                self.assertIn(b'FAVORITE_NUMBER', data)
2751
2752    def test_read_zip_with_exe_prepended(self):
2753        self._test_zip_works(self.exe_zip)
2754
2755    def test_read_zip64_with_exe_prepended(self):
2756        self._test_zip_works(self.exe_zip64)
2757
2758    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2759    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2760                         'Test relies on #!/bin/bash working.')
2761    def test_execute_zip2(self):
2762        output = subprocess.check_output([self.exe_zip, sys.executable])
2763        self.assertIn(b'number in executable: 5', output)
2764
2765    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2766    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2767                         'Test relies on #!/bin/bash working.')
2768    def test_execute_zip64(self):
2769        output = subprocess.check_output([self.exe_zip64, sys.executable])
2770        self.assertIn(b'number in executable: 5', output)
2771
2772
2773# Poor man's technique to consume a (smallish) iterable.
2774consume = tuple
2775
2776
2777# from jaraco.itertools 5.0
2778class jaraco:
2779    class itertools:
2780        class Counter:
2781            def __init__(self, i):
2782                self.count = 0
2783                self._orig_iter = iter(i)
2784
2785            def __iter__(self):
2786                return self
2787
2788            def __next__(self):
2789                result = next(self._orig_iter)
2790                self.count += 1
2791                return result
2792
2793
2794def add_dirs(zf):
2795    """
2796    Given a writable zip file zf, inject directory entries for
2797    any directories implied by the presence of children.
2798    """
2799    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2800        zf.writestr(name, b"")
2801    return zf
2802
2803
2804def build_alpharep_fixture():
2805    """
2806    Create a zip file with this structure:
2807
2808    .
2809    ├── a.txt
2810    ├── b
2811    │   ├── c.txt
2812    │   ├── d
2813    │   │   └── e.txt
2814    │   └── f.txt
2815    └── g
2816        └── h
2817            └── i.txt
2818
2819    This fixture has the following key characteristics:
2820
2821    - a file at the root (a)
2822    - a file two levels deep (b/d/e)
2823    - multiple files in a directory (b/c, b/f)
2824    - a directory containing only a directory (g/h)
2825
2826    "alpha" because it uses alphabet
2827    "rep" because it's a representative example
2828    """
2829    data = io.BytesIO()
2830    zf = zipfile.ZipFile(data, "w")
2831    zf.writestr("a.txt", b"content of a")
2832    zf.writestr("b/c.txt", b"content of c")
2833    zf.writestr("b/d/e.txt", b"content of e")
2834    zf.writestr("b/f.txt", b"content of f")
2835    zf.writestr("g/h/i.txt", b"content of i")
2836    zf.filename = "alpharep.zip"
2837    return zf
2838
2839
2840def pass_alpharep(meth):
2841    """
2842    Given a method, wrap it in a for loop that invokes method
2843    with each subtest.
2844    """
2845
2846    @functools.wraps(meth)
2847    def wrapper(self):
2848        for alpharep in self.zipfile_alpharep():
2849            meth(self, alpharep=alpharep)
2850
2851    return wrapper
2852
2853
2854class TestPath(unittest.TestCase):
2855    def setUp(self):
2856        self.fixtures = contextlib.ExitStack()
2857        self.addCleanup(self.fixtures.close)
2858
2859    def zipfile_alpharep(self):
2860        with self.subTest():
2861            yield build_alpharep_fixture()
2862        with self.subTest():
2863            yield add_dirs(build_alpharep_fixture())
2864
2865    def zipfile_ondisk(self, alpharep):
2866        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2867        buffer = alpharep.fp
2868        alpharep.close()
2869        path = tmpdir / alpharep.filename
2870        with path.open("wb") as strm:
2871            strm.write(buffer.getvalue())
2872        return path
2873
2874    @pass_alpharep
2875    def test_iterdir_and_types(self, alpharep):
2876        root = zipfile.Path(alpharep)
2877        assert root.is_dir()
2878        a, b, g = root.iterdir()
2879        assert a.is_file()
2880        assert b.is_dir()
2881        assert g.is_dir()
2882        c, f, d = b.iterdir()
2883        assert c.is_file() and f.is_file()
2884        (e,) = d.iterdir()
2885        assert e.is_file()
2886        (h,) = g.iterdir()
2887        (i,) = h.iterdir()
2888        assert i.is_file()
2889
2890    @pass_alpharep
2891    def test_is_file_missing(self, alpharep):
2892        root = zipfile.Path(alpharep)
2893        assert not root.joinpath('missing.txt').is_file()
2894
2895    @pass_alpharep
2896    def test_iterdir_on_file(self, alpharep):
2897        root = zipfile.Path(alpharep)
2898        a, b, g = root.iterdir()
2899        with self.assertRaises(ValueError):
2900            a.iterdir()
2901
2902    @pass_alpharep
2903    def test_subdir_is_dir(self, alpharep):
2904        root = zipfile.Path(alpharep)
2905        assert (root / 'b').is_dir()
2906        assert (root / 'b/').is_dir()
2907        assert (root / 'g').is_dir()
2908        assert (root / 'g/').is_dir()
2909
2910    @pass_alpharep
2911    def test_open(self, alpharep):
2912        root = zipfile.Path(alpharep)
2913        a, b, g = root.iterdir()
2914        with a.open(encoding="utf-8") as strm:
2915            data = strm.read()
2916        assert data == "content of a"
2917
2918    def test_open_write(self):
2919        """
2920        If the zipfile is open for write, it should be possible to
2921        write bytes or text to it.
2922        """
2923        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
2924        with zf.joinpath('file.bin').open('wb') as strm:
2925            strm.write(b'binary contents')
2926        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
2927            strm.write('text file')
2928
2929    def test_open_extant_directory(self):
2930        """
2931        Attempting to open a directory raises IsADirectoryError.
2932        """
2933        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2934        with self.assertRaises(IsADirectoryError):
2935            zf.joinpath('b').open()
2936
2937    @pass_alpharep
2938    def test_open_binary_invalid_args(self, alpharep):
2939        root = zipfile.Path(alpharep)
2940        with self.assertRaises(ValueError):
2941            root.joinpath('a.txt').open('rb', encoding='utf-8')
2942        with self.assertRaises(ValueError):
2943            root.joinpath('a.txt').open('rb', 'utf-8')
2944
2945    def test_open_missing_directory(self):
2946        """
2947        Attempting to open a missing directory raises FileNotFoundError.
2948        """
2949        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2950        with self.assertRaises(FileNotFoundError):
2951            zf.joinpath('z').open()
2952
2953    @pass_alpharep
2954    def test_read(self, alpharep):
2955        root = zipfile.Path(alpharep)
2956        a, b, g = root.iterdir()
2957        assert a.read_text(encoding="utf-8") == "content of a"
2958        assert a.read_bytes() == b"content of a"
2959
2960    @pass_alpharep
2961    def test_joinpath(self, alpharep):
2962        root = zipfile.Path(alpharep)
2963        a = root.joinpath("a.txt")
2964        assert a.is_file()
2965        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2966        assert e.read_text(encoding="utf-8") == "content of e"
2967
2968    @pass_alpharep
2969    def test_joinpath_multiple(self, alpharep):
2970        root = zipfile.Path(alpharep)
2971        e = root.joinpath("b", "d", "e.txt")
2972        assert e.read_text(encoding="utf-8") == "content of e"
2973
2974    @pass_alpharep
2975    def test_traverse_truediv(self, alpharep):
2976        root = zipfile.Path(alpharep)
2977        a = root / "a.txt"
2978        assert a.is_file()
2979        e = root / "b" / "d" / "e.txt"
2980        assert e.read_text(encoding="utf-8") == "content of e"
2981
2982    @pass_alpharep
2983    def test_traverse_simplediv(self, alpharep):
2984        """
2985        Disable the __future__.division when testing traversal.
2986        """
2987        code = compile(
2988            source="zipfile.Path(alpharep) / 'a'",
2989            filename="(test)",
2990            mode="eval",
2991            dont_inherit=True,
2992        )
2993        eval(code)
2994
2995    @pass_alpharep
2996    def test_pathlike_construction(self, alpharep):
2997        """
2998        zipfile.Path should be constructable from a path-like object
2999        """
3000        zipfile_ondisk = self.zipfile_ondisk(alpharep)
3001        pathlike = pathlib.Path(str(zipfile_ondisk))
3002        zipfile.Path(pathlike)
3003
3004    @pass_alpharep
3005    def test_traverse_pathlike(self, alpharep):
3006        root = zipfile.Path(alpharep)
3007        root / pathlib.Path("a")
3008
3009    @pass_alpharep
3010    def test_parent(self, alpharep):
3011        root = zipfile.Path(alpharep)
3012        assert (root / 'a').parent.at == ''
3013        assert (root / 'a' / 'b').parent.at == 'a/'
3014
3015    @pass_alpharep
3016    def test_dir_parent(self, alpharep):
3017        root = zipfile.Path(alpharep)
3018        assert (root / 'b').parent.at == ''
3019        assert (root / 'b/').parent.at == ''
3020
3021    @pass_alpharep
3022    def test_missing_dir_parent(self, alpharep):
3023        root = zipfile.Path(alpharep)
3024        assert (root / 'missing dir/').parent.at == ''
3025
3026    @pass_alpharep
3027    def test_mutability(self, alpharep):
3028        """
3029        If the underlying zipfile is changed, the Path object should
3030        reflect that change.
3031        """
3032        root = zipfile.Path(alpharep)
3033        a, b, g = root.iterdir()
3034        alpharep.writestr('foo.txt', 'foo')
3035        alpharep.writestr('bar/baz.txt', 'baz')
3036        assert any(child.name == 'foo.txt' for child in root.iterdir())
3037        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
3038        (baz,) = (root / 'bar').iterdir()
3039        assert baz.read_text(encoding="utf-8") == 'baz'
3040
3041    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
3042
3043    def huge_zipfile(self):
3044        """Create a read-only zipfile with a huge number of entries entries."""
3045        strm = io.BytesIO()
3046        zf = zipfile.ZipFile(strm, "w")
3047        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
3048            zf.writestr(entry, entry)
3049        zf.mode = 'r'
3050        return zf
3051
3052    def test_joinpath_constant_time(self):
3053        """
3054        Ensure joinpath on items in zipfile is linear time.
3055        """
3056        root = zipfile.Path(self.huge_zipfile())
3057        entries = jaraco.itertools.Counter(root.iterdir())
3058        for entry in entries:
3059            entry.joinpath('suffix')
3060        # Check the file iterated all items
3061        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
3062
3063    # @func_timeout.func_set_timeout(3)
3064    def test_implied_dirs_performance(self):
3065        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
3066        zipfile.CompleteDirs._implied_dirs(data)
3067
3068    @pass_alpharep
3069    def test_read_does_not_close(self, alpharep):
3070        alpharep = self.zipfile_ondisk(alpharep)
3071        with zipfile.ZipFile(alpharep) as file:
3072            for rep in range(2):
3073                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
3074
3075    @pass_alpharep
3076    def test_subclass(self, alpharep):
3077        class Subclass(zipfile.Path):
3078            pass
3079
3080        root = Subclass(alpharep)
3081        assert isinstance(root / 'b', Subclass)
3082
3083    @pass_alpharep
3084    def test_filename(self, alpharep):
3085        root = zipfile.Path(alpharep)
3086        assert root.filename == pathlib.Path('alpharep.zip')
3087
3088    @pass_alpharep
3089    def test_root_name(self, alpharep):
3090        """
3091        The name of the root should be the name of the zipfile
3092        """
3093        root = zipfile.Path(alpharep)
3094        assert root.name == 'alpharep.zip' == root.filename.name
3095
3096    @pass_alpharep
3097    def test_root_parent(self, alpharep):
3098        root = zipfile.Path(alpharep)
3099        assert root.parent == pathlib.Path('.')
3100        root.root.filename = 'foo/bar.zip'
3101        assert root.parent == pathlib.Path('foo')
3102
3103    @pass_alpharep
3104    def test_root_unnamed(self, alpharep):
3105        """
3106        It is an error to attempt to get the name
3107        or parent of an unnamed zipfile.
3108        """
3109        alpharep.filename = None
3110        root = zipfile.Path(alpharep)
3111        with self.assertRaises(TypeError):
3112            root.name
3113        with self.assertRaises(TypeError):
3114            root.parent
3115
3116        # .name and .parent should still work on subs
3117        sub = root / "b"
3118        assert sub.name == "b"
3119        assert sub.parent
3120
3121    @pass_alpharep
3122    def test_inheritance(self, alpharep):
3123        cls = type('PathChild', (zipfile.Path,), {})
3124        for alpharep in self.zipfile_alpharep():
3125            file = cls(alpharep).joinpath('some dir').parent
3126            assert isinstance(file, cls)
3127
3128
3129if __name__ == "__main__":
3130    unittest.main()
3131