1import contextlib
2import io
3import itertools
4import os
5import importlib.util
6import pathlib
7import posixpath
8import time
9import struct
10import zipfile
11import unittest
12
13
14from tempfile import TemporaryFile
15from random import randint, random, getrandbits
16
17from test.support import script_helper
18from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd,
19                          requires_zlib, requires_bz2, requires_lzma,
20                          captured_stdout)
21
22TESTFN2 = TESTFN + "2"
23TESTFNDIR = TESTFN + "d"
24FIXEDTEST_SIZE = 1000
25DATAFILES_DIR = 'zipfile_datafiles'
26
27SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
28                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
29                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
30                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
31
32def getrandbytes(size):
33    return getrandbits(8 * size).to_bytes(size, 'little')
34
35def get_files(test):
36    yield TESTFN2
37    with TemporaryFile() as f:
38        yield f
39        test.assertFalse(f.closed)
40    with io.BytesIO() as f:
41        yield f
42        test.assertFalse(f.closed)
43
44class AbstractTestsWithSourceFile:
45    @classmethod
46    def setUpClass(cls):
47        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
48                              (i, random()), "ascii")
49                        for i in range(FIXEDTEST_SIZE)]
50        cls.data = b''.join(cls.line_gen)
51
52    def setUp(self):
53        # Make a source file with some lines
54        with open(TESTFN, "wb") as fp:
55            fp.write(self.data)
56
57    def make_test_archive(self, f, compression, compresslevel=None):
58        kwargs = {'compression': compression, 'compresslevel': compresslevel}
59        # Create the ZIP archive
60        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
61            zipfp.write(TESTFN, "another.name")
62            zipfp.write(TESTFN, TESTFN)
63            zipfp.writestr("strfile", self.data)
64            with zipfp.open('written-open-w', mode='w') as f:
65                for line in self.line_gen:
66                    f.write(line)
67
68    def zip_test(self, f, compression, compresslevel=None):
69        self.make_test_archive(f, compression, compresslevel)
70
71        # Read the ZIP archive
72        with zipfile.ZipFile(f, "r", compression) as zipfp:
73            self.assertEqual(zipfp.read(TESTFN), self.data)
74            self.assertEqual(zipfp.read("another.name"), self.data)
75            self.assertEqual(zipfp.read("strfile"), self.data)
76
77            # Print the ZIP directory
78            fp = io.StringIO()
79            zipfp.printdir(file=fp)
80            directory = fp.getvalue()
81            lines = directory.splitlines()
82            self.assertEqual(len(lines), 5) # Number of files + header
83
84            self.assertIn('File Name', lines[0])
85            self.assertIn('Modified', lines[0])
86            self.assertIn('Size', lines[0])
87
88            fn, date, time_, size = lines[1].split()
89            self.assertEqual(fn, 'another.name')
90            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
91            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
92            self.assertEqual(size, str(len(self.data)))
93
94            # Check the namelist
95            names = zipfp.namelist()
96            self.assertEqual(len(names), 4)
97            self.assertIn(TESTFN, names)
98            self.assertIn("another.name", names)
99            self.assertIn("strfile", names)
100            self.assertIn("written-open-w", names)
101
102            # Check infolist
103            infos = zipfp.infolist()
104            names = [i.filename for i in infos]
105            self.assertEqual(len(names), 4)
106            self.assertIn(TESTFN, names)
107            self.assertIn("another.name", names)
108            self.assertIn("strfile", names)
109            self.assertIn("written-open-w", names)
110            for i in infos:
111                self.assertEqual(i.file_size, len(self.data))
112
113            # check getinfo
114            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
115                info = zipfp.getinfo(nm)
116                self.assertEqual(info.filename, nm)
117                self.assertEqual(info.file_size, len(self.data))
118
119            # Check that testzip doesn't raise an exception
120            zipfp.testzip()
121
122    def test_basic(self):
123        for f in get_files(self):
124            self.zip_test(f, self.compression)
125
126    def zip_open_test(self, f, compression):
127        self.make_test_archive(f, compression)
128
129        # Read the ZIP archive
130        with zipfile.ZipFile(f, "r", compression) as zipfp:
131            zipdata1 = []
132            with zipfp.open(TESTFN) as zipopen1:
133                while True:
134                    read_data = zipopen1.read(256)
135                    if not read_data:
136                        break
137                    zipdata1.append(read_data)
138
139            zipdata2 = []
140            with zipfp.open("another.name") as zipopen2:
141                while True:
142                    read_data = zipopen2.read(256)
143                    if not read_data:
144                        break
145                    zipdata2.append(read_data)
146
147            self.assertEqual(b''.join(zipdata1), self.data)
148            self.assertEqual(b''.join(zipdata2), self.data)
149
150    def test_open(self):
151        for f in get_files(self):
152            self.zip_open_test(f, self.compression)
153
154    def test_open_with_pathlike(self):
155        path = pathlib.Path(TESTFN2)
156        self.zip_open_test(path, self.compression)
157        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
158            self.assertIsInstance(zipfp.filename, str)
159
160    def zip_random_open_test(self, f, compression):
161        self.make_test_archive(f, compression)
162
163        # Read the ZIP archive
164        with zipfile.ZipFile(f, "r", compression) as zipfp:
165            zipdata1 = []
166            with zipfp.open(TESTFN) as zipopen1:
167                while True:
168                    read_data = zipopen1.read(randint(1, 1024))
169                    if not read_data:
170                        break
171                    zipdata1.append(read_data)
172
173            self.assertEqual(b''.join(zipdata1), self.data)
174
175    def test_random_open(self):
176        for f in get_files(self):
177            self.zip_random_open_test(f, self.compression)
178
179    def zip_read1_test(self, f, compression):
180        self.make_test_archive(f, compression)
181
182        # Read the ZIP archive
183        with zipfile.ZipFile(f, "r") as zipfp, \
184             zipfp.open(TESTFN) as zipopen:
185            zipdata = []
186            while True:
187                read_data = zipopen.read1(-1)
188                if not read_data:
189                    break
190                zipdata.append(read_data)
191
192        self.assertEqual(b''.join(zipdata), self.data)
193
194    def test_read1(self):
195        for f in get_files(self):
196            self.zip_read1_test(f, self.compression)
197
198    def zip_read1_10_test(self, f, compression):
199        self.make_test_archive(f, compression)
200
201        # Read the ZIP archive
202        with zipfile.ZipFile(f, "r") as zipfp, \
203             zipfp.open(TESTFN) as zipopen:
204            zipdata = []
205            while True:
206                read_data = zipopen.read1(10)
207                self.assertLessEqual(len(read_data), 10)
208                if not read_data:
209                    break
210                zipdata.append(read_data)
211
212        self.assertEqual(b''.join(zipdata), self.data)
213
214    def test_read1_10(self):
215        for f in get_files(self):
216            self.zip_read1_10_test(f, self.compression)
217
218    def zip_readline_read_test(self, f, compression):
219        self.make_test_archive(f, compression)
220
221        # Read the ZIP archive
222        with zipfile.ZipFile(f, "r") as zipfp, \
223             zipfp.open(TESTFN) as zipopen:
224            data = b''
225            while True:
226                read = zipopen.readline()
227                if not read:
228                    break
229                data += read
230
231                read = zipopen.read(100)
232                if not read:
233                    break
234                data += read
235
236        self.assertEqual(data, self.data)
237
238    def test_readline_read(self):
239        # Issue #7610: calls to readline() interleaved with calls to read().
240        for f in get_files(self):
241            self.zip_readline_read_test(f, self.compression)
242
243    def zip_readline_test(self, f, compression):
244        self.make_test_archive(f, compression)
245
246        # Read the ZIP archive
247        with zipfile.ZipFile(f, "r") as zipfp:
248            with zipfp.open(TESTFN) as zipopen:
249                for line in self.line_gen:
250                    linedata = zipopen.readline()
251                    self.assertEqual(linedata, line)
252
253    def test_readline(self):
254        for f in get_files(self):
255            self.zip_readline_test(f, self.compression)
256
257    def zip_readlines_test(self, f, compression):
258        self.make_test_archive(f, compression)
259
260        # Read the ZIP archive
261        with zipfile.ZipFile(f, "r") as zipfp:
262            with zipfp.open(TESTFN) as zipopen:
263                ziplines = zipopen.readlines()
264            for line, zipline in zip(self.line_gen, ziplines):
265                self.assertEqual(zipline, line)
266
267    def test_readlines(self):
268        for f in get_files(self):
269            self.zip_readlines_test(f, self.compression)
270
271    def zip_iterlines_test(self, f, compression):
272        self.make_test_archive(f, compression)
273
274        # Read the ZIP archive
275        with zipfile.ZipFile(f, "r") as zipfp:
276            with zipfp.open(TESTFN) as zipopen:
277                for line, zipline in zip(self.line_gen, zipopen):
278                    self.assertEqual(zipline, line)
279
280    def test_iterlines(self):
281        for f in get_files(self):
282            self.zip_iterlines_test(f, self.compression)
283
284    def test_low_compression(self):
285        """Check for cases where compressed data is larger than original."""
286        # Create the ZIP archive
287        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
288            zipfp.writestr("strfile", '12')
289
290        # Get an open object for strfile
291        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
292            with zipfp.open("strfile") as openobj:
293                self.assertEqual(openobj.read(1), b'1')
294                self.assertEqual(openobj.read(1), b'2')
295
296    def test_writestr_compression(self):
297        zipfp = zipfile.ZipFile(TESTFN2, "w")
298        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
299        info = zipfp.getinfo('b.txt')
300        self.assertEqual(info.compress_type, self.compression)
301
302    def test_writestr_compresslevel(self):
303        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
304        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
305        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
306                       compresslevel=2)
307
308        # Compression level follows the constructor.
309        a_info = zipfp.getinfo('a.txt')
310        self.assertEqual(a_info.compress_type, self.compression)
311        self.assertEqual(a_info._compresslevel, 1)
312
313        # Compression level is overridden.
314        b_info = zipfp.getinfo('b.txt')
315        self.assertEqual(b_info.compress_type, self.compression)
316        self.assertEqual(b_info._compresslevel, 2)
317
318    def test_read_return_size(self):
319        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
320        # than requested.
321        for test_size in (1, 4095, 4096, 4097, 16384):
322            file_size = test_size + 1
323            junk = getrandbytes(file_size)
324            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
325                zipf.writestr('foo', junk)
326                with zipf.open('foo', 'r') as fp:
327                    buf = fp.read(test_size)
328                    self.assertEqual(len(buf), test_size)
329
330    def test_truncated_zipfile(self):
331        fp = io.BytesIO()
332        with zipfile.ZipFile(fp, mode='w') as zipf:
333            zipf.writestr('strfile', self.data, compress_type=self.compression)
334            end_offset = fp.tell()
335        zipfiledata = fp.getvalue()
336
337        fp = io.BytesIO(zipfiledata)
338        with zipfile.ZipFile(fp) as zipf:
339            with zipf.open('strfile') as zipopen:
340                fp.truncate(end_offset - 20)
341                with self.assertRaises(EOFError):
342                    zipopen.read()
343
344        fp = io.BytesIO(zipfiledata)
345        with zipfile.ZipFile(fp) as zipf:
346            with zipf.open('strfile') as zipopen:
347                fp.truncate(end_offset - 20)
348                with self.assertRaises(EOFError):
349                    while zipopen.read(100):
350                        pass
351
352        fp = io.BytesIO(zipfiledata)
353        with zipfile.ZipFile(fp) as zipf:
354            with zipf.open('strfile') as zipopen:
355                fp.truncate(end_offset - 20)
356                with self.assertRaises(EOFError):
357                    while zipopen.read1(100):
358                        pass
359
360    def test_repr(self):
361        fname = 'file.name'
362        for f in get_files(self):
363            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
364                zipfp.write(TESTFN, fname)
365                r = repr(zipfp)
366                self.assertIn("mode='w'", r)
367
368            with zipfile.ZipFile(f, 'r') as zipfp:
369                r = repr(zipfp)
370                if isinstance(f, str):
371                    self.assertIn('filename=%r' % f, r)
372                else:
373                    self.assertIn('file=%r' % f, r)
374                self.assertIn("mode='r'", r)
375                r = repr(zipfp.getinfo(fname))
376                self.assertIn('filename=%r' % fname, r)
377                self.assertIn('filemode=', r)
378                self.assertIn('file_size=', r)
379                if self.compression != zipfile.ZIP_STORED:
380                    self.assertIn('compress_type=', r)
381                    self.assertIn('compress_size=', r)
382                with zipfp.open(fname) as zipopen:
383                    r = repr(zipopen)
384                    self.assertIn('name=%r' % fname, r)
385                    self.assertIn("mode='r'", r)
386                    if self.compression != zipfile.ZIP_STORED:
387                        self.assertIn('compress_type=', r)
388                self.assertIn('[closed]', repr(zipopen))
389            self.assertIn('[closed]', repr(zipfp))
390
391    def test_compresslevel_basic(self):
392        for f in get_files(self):
393            self.zip_test(f, self.compression, compresslevel=9)
394
395    def test_per_file_compresslevel(self):
396        """Check that files within a Zip archive can have different
397        compression levels."""
398        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
399            zipfp.write(TESTFN, 'compress_1')
400            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
401            one_info = zipfp.getinfo('compress_1')
402            nine_info = zipfp.getinfo('compress_9')
403            self.assertEqual(one_info._compresslevel, 1)
404            self.assertEqual(nine_info._compresslevel, 9)
405
406    def test_writing_errors(self):
407        class BrokenFile(io.BytesIO):
408            def write(self, data):
409                nonlocal count
410                if count is not None:
411                    if count == stop:
412                        raise OSError
413                    count += 1
414                super().write(data)
415
416        stop = 0
417        while True:
418            testfile = BrokenFile()
419            count = None
420            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
421                with zipfp.open('file1', 'w') as f:
422                    f.write(b'data1')
423                count = 0
424                try:
425                    with zipfp.open('file2', 'w') as f:
426                        f.write(b'data2')
427                except OSError:
428                    stop += 1
429                else:
430                    break
431                finally:
432                    count = None
433            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
434                self.assertEqual(zipfp.namelist(), ['file1'])
435                self.assertEqual(zipfp.read('file1'), b'data1')
436
437        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
438            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
439            self.assertEqual(zipfp.read('file1'), b'data1')
440            self.assertEqual(zipfp.read('file2'), b'data2')
441
442
443    def tearDown(self):
444        unlink(TESTFN)
445        unlink(TESTFN2)
446
447
448class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
449                                unittest.TestCase):
450    compression = zipfile.ZIP_STORED
451    test_low_compression = None
452
453    def zip_test_writestr_permissions(self, f, compression):
454        # Make sure that writestr and open(... mode='w') create files with
455        # mode 0600, when they are passed a name rather than a ZipInfo
456        # instance.
457
458        self.make_test_archive(f, compression)
459        with zipfile.ZipFile(f, "r") as zipfp:
460            zinfo = zipfp.getinfo('strfile')
461            self.assertEqual(zinfo.external_attr, 0o600 << 16)
462
463            zinfo2 = zipfp.getinfo('written-open-w')
464            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
465
466    def test_writestr_permissions(self):
467        for f in get_files(self):
468            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
469
470    def test_absolute_arcnames(self):
471        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
472            zipfp.write(TESTFN, "/absolute")
473
474        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
475            self.assertEqual(zipfp.namelist(), ["absolute"])
476
477    def test_append_to_zip_file(self):
478        """Test appending to an existing zipfile."""
479        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
480            zipfp.write(TESTFN, TESTFN)
481
482        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
483            zipfp.writestr("strfile", self.data)
484            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
485
486    def test_append_to_non_zip_file(self):
487        """Test appending to an existing file that is not a zipfile."""
488        # NOTE: this test fails if len(d) < 22 because of the first
489        # line "fpin.seek(-22, 2)" in _EndRecData
490        data = b'I am not a ZipFile!'*10
491        with open(TESTFN2, 'wb') as f:
492            f.write(data)
493
494        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
495            zipfp.write(TESTFN, TESTFN)
496
497        with open(TESTFN2, 'rb') as f:
498            f.seek(len(data))
499            with zipfile.ZipFile(f, "r") as zipfp:
500                self.assertEqual(zipfp.namelist(), [TESTFN])
501                self.assertEqual(zipfp.read(TESTFN), self.data)
502        with open(TESTFN2, 'rb') as f:
503            self.assertEqual(f.read(len(data)), data)
504            zipfiledata = f.read()
505        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
506            self.assertEqual(zipfp.namelist(), [TESTFN])
507            self.assertEqual(zipfp.read(TESTFN), self.data)
508
509    def test_read_concatenated_zip_file(self):
510        with io.BytesIO() as bio:
511            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
512                zipfp.write(TESTFN, TESTFN)
513            zipfiledata = bio.getvalue()
514        data = b'I am not a ZipFile!'*10
515        with open(TESTFN2, 'wb') as f:
516            f.write(data)
517            f.write(zipfiledata)
518
519        with zipfile.ZipFile(TESTFN2) as zipfp:
520            self.assertEqual(zipfp.namelist(), [TESTFN])
521            self.assertEqual(zipfp.read(TESTFN), self.data)
522
523    def test_append_to_concatenated_zip_file(self):
524        with io.BytesIO() as bio:
525            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
526                zipfp.write(TESTFN, TESTFN)
527            zipfiledata = bio.getvalue()
528        data = b'I am not a ZipFile!'*1000000
529        with open(TESTFN2, 'wb') as f:
530            f.write(data)
531            f.write(zipfiledata)
532
533        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
534            self.assertEqual(zipfp.namelist(), [TESTFN])
535            zipfp.writestr('strfile', self.data)
536
537        with open(TESTFN2, 'rb') as f:
538            self.assertEqual(f.read(len(data)), data)
539            zipfiledata = f.read()
540        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
541            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
542            self.assertEqual(zipfp.read(TESTFN), self.data)
543            self.assertEqual(zipfp.read('strfile'), self.data)
544
545    def test_ignores_newline_at_end(self):
546        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
547            zipfp.write(TESTFN, TESTFN)
548        with open(TESTFN2, 'a') as f:
549            f.write("\r\n\00\00\00")
550        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
551            self.assertIsInstance(zipfp, zipfile.ZipFile)
552
553    def test_ignores_stuff_appended_past_comments(self):
554        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
555            zipfp.comment = b"this is a comment"
556            zipfp.write(TESTFN, TESTFN)
557        with open(TESTFN2, 'a') as f:
558            f.write("abcdef\r\n")
559        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
560            self.assertIsInstance(zipfp, zipfile.ZipFile)
561            self.assertEqual(zipfp.comment, b"this is a comment")
562
563    def test_write_default_name(self):
564        """Check that calling ZipFile.write without arcname specified
565        produces the expected result."""
566        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
567            zipfp.write(TESTFN)
568            with open(TESTFN, "rb") as f:
569                self.assertEqual(zipfp.read(TESTFN), f.read())
570
571    def test_write_to_readonly(self):
572        """Check that trying to call write() on a readonly ZipFile object
573        raises a ValueError."""
574        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
575            zipfp.writestr("somefile.txt", "bogus")
576
577        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
578            self.assertRaises(ValueError, zipfp.write, TESTFN)
579
580        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
581            with self.assertRaises(ValueError):
582                zipfp.open(TESTFN, mode='w')
583
584    def test_add_file_before_1980(self):
585        # Set atime and mtime to 1970-01-01
586        os.utime(TESTFN, (0, 0))
587        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
588            self.assertRaises(ValueError, zipfp.write, TESTFN)
589
590
591@requires_zlib
592class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
593                                 unittest.TestCase):
594    compression = zipfile.ZIP_DEFLATED
595
596    def test_per_file_compression(self):
597        """Check that files within a Zip archive can have different
598        compression options."""
599        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
600            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
601            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
602            sinfo = zipfp.getinfo('storeme')
603            dinfo = zipfp.getinfo('deflateme')
604            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
605            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
606
607@requires_bz2
608class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
609                               unittest.TestCase):
610    compression = zipfile.ZIP_BZIP2
611
612@requires_lzma
613class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
614                              unittest.TestCase):
615    compression = zipfile.ZIP_LZMA
616
617
618class AbstractTestZip64InSmallFiles:
619    # These tests test the ZIP64 functionality without using large files,
620    # see test_zipfile64 for proper tests.
621
622    @classmethod
623    def setUpClass(cls):
624        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
625                    for i in range(0, FIXEDTEST_SIZE))
626        cls.data = b'\n'.join(line_gen)
627
628    def setUp(self):
629        self._limit = zipfile.ZIP64_LIMIT
630        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
631        zipfile.ZIP64_LIMIT = 1000
632        zipfile.ZIP_FILECOUNT_LIMIT = 9
633
634        # Make a source file with some lines
635        with open(TESTFN, "wb") as fp:
636            fp.write(self.data)
637
638    def zip_test(self, f, compression):
639        # Create the ZIP archive
640        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
641            zipfp.write(TESTFN, "another.name")
642            zipfp.write(TESTFN, TESTFN)
643            zipfp.writestr("strfile", self.data)
644
645        # Read the ZIP archive
646        with zipfile.ZipFile(f, "r", compression) as zipfp:
647            self.assertEqual(zipfp.read(TESTFN), self.data)
648            self.assertEqual(zipfp.read("another.name"), self.data)
649            self.assertEqual(zipfp.read("strfile"), self.data)
650
651            # Print the ZIP directory
652            fp = io.StringIO()
653            zipfp.printdir(fp)
654
655            directory = fp.getvalue()
656            lines = directory.splitlines()
657            self.assertEqual(len(lines), 4) # Number of files + header
658
659            self.assertIn('File Name', lines[0])
660            self.assertIn('Modified', lines[0])
661            self.assertIn('Size', lines[0])
662
663            fn, date, time_, size = lines[1].split()
664            self.assertEqual(fn, 'another.name')
665            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
666            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
667            self.assertEqual(size, str(len(self.data)))
668
669            # Check the namelist
670            names = zipfp.namelist()
671            self.assertEqual(len(names), 3)
672            self.assertIn(TESTFN, names)
673            self.assertIn("another.name", names)
674            self.assertIn("strfile", names)
675
676            # Check infolist
677            infos = zipfp.infolist()
678            names = [i.filename for i in infos]
679            self.assertEqual(len(names), 3)
680            self.assertIn(TESTFN, names)
681            self.assertIn("another.name", names)
682            self.assertIn("strfile", names)
683            for i in infos:
684                self.assertEqual(i.file_size, len(self.data))
685
686            # check getinfo
687            for nm in (TESTFN, "another.name", "strfile"):
688                info = zipfp.getinfo(nm)
689                self.assertEqual(info.filename, nm)
690                self.assertEqual(info.file_size, len(self.data))
691
692            # Check that testzip doesn't raise an exception
693            zipfp.testzip()
694
695    def test_basic(self):
696        for f in get_files(self):
697            self.zip_test(f, self.compression)
698
699    def test_too_many_files(self):
700        # This test checks that more than 64k files can be added to an archive,
701        # and that the resulting archive can be read properly by ZipFile
702        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
703                               allowZip64=True)
704        zipf.debug = 100
705        numfiles = 15
706        for i in range(numfiles):
707            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
708        self.assertEqual(len(zipf.namelist()), numfiles)
709        zipf.close()
710
711        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
712        self.assertEqual(len(zipf2.namelist()), numfiles)
713        for i in range(numfiles):
714            content = zipf2.read("foo%08d" % i).decode('ascii')
715            self.assertEqual(content, "%d" % (i**3 % 57))
716        zipf2.close()
717
718    def test_too_many_files_append(self):
719        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
720                               allowZip64=False)
721        zipf.debug = 100
722        numfiles = 9
723        for i in range(numfiles):
724            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
725        self.assertEqual(len(zipf.namelist()), numfiles)
726        with self.assertRaises(zipfile.LargeZipFile):
727            zipf.writestr("foo%08d" % numfiles, b'')
728        self.assertEqual(len(zipf.namelist()), numfiles)
729        zipf.close()
730
731        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
732                               allowZip64=False)
733        zipf.debug = 100
734        self.assertEqual(len(zipf.namelist()), numfiles)
735        with self.assertRaises(zipfile.LargeZipFile):
736            zipf.writestr("foo%08d" % numfiles, b'')
737        self.assertEqual(len(zipf.namelist()), numfiles)
738        zipf.close()
739
740        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
741                               allowZip64=True)
742        zipf.debug = 100
743        self.assertEqual(len(zipf.namelist()), numfiles)
744        numfiles2 = 15
745        for i in range(numfiles, numfiles2):
746            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
747        self.assertEqual(len(zipf.namelist()), numfiles2)
748        zipf.close()
749
750        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
751        self.assertEqual(len(zipf2.namelist()), numfiles2)
752        for i in range(numfiles2):
753            content = zipf2.read("foo%08d" % i).decode('ascii')
754            self.assertEqual(content, "%d" % (i**3 % 57))
755        zipf2.close()
756
757    def tearDown(self):
758        zipfile.ZIP64_LIMIT = self._limit
759        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
760        unlink(TESTFN)
761        unlink(TESTFN2)
762
763
764class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
765                                  unittest.TestCase):
766    compression = zipfile.ZIP_STORED
767
768    def large_file_exception_test(self, f, compression):
769        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
770            self.assertRaises(zipfile.LargeZipFile,
771                              zipfp.write, TESTFN, "another.name")
772
773    def large_file_exception_test2(self, f, compression):
774        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
775            self.assertRaises(zipfile.LargeZipFile,
776                              zipfp.writestr, "another.name", self.data)
777
778    def test_large_file_exception(self):
779        for f in get_files(self):
780            self.large_file_exception_test(f, zipfile.ZIP_STORED)
781            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
782
783    def test_absolute_arcnames(self):
784        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
785                             allowZip64=True) as zipfp:
786            zipfp.write(TESTFN, "/absolute")
787
788        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
789            self.assertEqual(zipfp.namelist(), ["absolute"])
790
791    def test_append(self):
792        # Test that appending to the Zip64 archive doesn't change
793        # extra fields of existing entries.
794        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
795            zipfp.writestr("strfile", self.data)
796        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
797            zinfo = zipfp.getinfo("strfile")
798            extra = zinfo.extra
799        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
800            zipfp.writestr("strfile2", self.data)
801        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
802            zinfo = zipfp.getinfo("strfile")
803            self.assertEqual(zinfo.extra, extra)
804
805    def make_zip64_file(
806        self, file_size_64_set=False, file_size_extra=False,
807        compress_size_64_set=False, compress_size_extra=False,
808        header_offset_64_set=False, header_offset_extra=False,
809    ):
810        """Generate bytes sequence for a zip with (incomplete) zip64 data.
811
812        The actual values (not the zip 64 0xffffffff values) stored in the file
813        are:
814        file_size: 8
815        compress_size: 8
816        header_offset: 0
817        """
818        actual_size = 8
819        actual_header_offset = 0
820        local_zip64_fields = []
821        central_zip64_fields = []
822
823        file_size = actual_size
824        if file_size_64_set:
825            file_size = 0xffffffff
826            if file_size_extra:
827                local_zip64_fields.append(actual_size)
828                central_zip64_fields.append(actual_size)
829        file_size = struct.pack("<L", file_size)
830
831        compress_size = actual_size
832        if compress_size_64_set:
833            compress_size = 0xffffffff
834            if compress_size_extra:
835                local_zip64_fields.append(actual_size)
836                central_zip64_fields.append(actual_size)
837        compress_size = struct.pack("<L", compress_size)
838
839        header_offset = actual_header_offset
840        if header_offset_64_set:
841            header_offset = 0xffffffff
842            if header_offset_extra:
843                central_zip64_fields.append(actual_header_offset)
844        header_offset = struct.pack("<L", header_offset)
845
846        local_extra = struct.pack(
847            '<HH' + 'Q'*len(local_zip64_fields),
848            0x0001,
849            8*len(local_zip64_fields),
850            *local_zip64_fields
851        )
852
853        central_extra = struct.pack(
854            '<HH' + 'Q'*len(central_zip64_fields),
855            0x0001,
856            8*len(central_zip64_fields),
857            *central_zip64_fields
858        )
859
860        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
861        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
862
863        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
864        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
865
866        filename = b"test.txt"
867        content = b"test1234"
868        filename_length = struct.pack("<H", len(filename))
869        zip64_contents = (
870            # Local file header
871            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
872            + compress_size
873            + file_size
874            + filename_length
875            + local_extra_length
876            + filename
877            + local_extra
878            + content
879            # Central directory:
880            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
881            + compress_size
882            + file_size
883            + filename_length
884            + central_extra_length
885            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
886            + header_offset
887            + filename
888            + central_extra
889            # Zip64 end of central directory
890            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
891            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
892            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
893            + central_dir_size
894            + offset_to_central_dir
895            # Zip64 end of central directory locator
896            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
897            + b"\x00\x00\x00"
898            # end of central directory
899            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
900            + b"\x00\x00\x00\x00"
901        )
902        return zip64_contents
903
904    def test_bad_zip64_extra(self):
905        """Missing zip64 extra records raises an exception.
906
907        There are 4 fields that the zip64 format handles (the disk number is
908        not used in this module and so is ignored here). According to the zip
909        spec:
910              The order of the fields in the zip64 extended
911              information record is fixed, but the fields MUST
912              only appear if the corresponding Local or Central
913              directory record field is set to 0xFFFF or 0xFFFFFFFF.
914
915        If the zip64 extra content doesn't contain enough entries for the
916        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
917        This test mismatches the length of the zip64 extra field and the number
918        of fields set to indicate the presence of zip64 data.
919        """
920        # zip64 file size present, no fields in extra, expecting one, equals
921        # missing file size.
922        missing_file_size_extra = self.make_zip64_file(
923            file_size_64_set=True,
924        )
925        with self.assertRaises(zipfile.BadZipFile) as e:
926            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
927        self.assertIn('file size', str(e.exception).lower())
928
929        # zip64 file size present, zip64 compress size present, one field in
930        # extra, expecting two, equals missing compress size.
931        missing_compress_size_extra = self.make_zip64_file(
932            file_size_64_set=True,
933            file_size_extra=True,
934            compress_size_64_set=True,
935        )
936        with self.assertRaises(zipfile.BadZipFile) as e:
937            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
938        self.assertIn('compress size', str(e.exception).lower())
939
940        # zip64 compress size present, no fields in extra, expecting one,
941        # equals missing compress size.
942        missing_compress_size_extra = self.make_zip64_file(
943            compress_size_64_set=True,
944        )
945        with self.assertRaises(zipfile.BadZipFile) as e:
946            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
947        self.assertIn('compress size', str(e.exception).lower())
948
949        # zip64 file size present, zip64 compress size present, zip64 header
950        # offset present, two fields in extra, expecting three, equals missing
951        # header offset
952        missing_header_offset_extra = self.make_zip64_file(
953            file_size_64_set=True,
954            file_size_extra=True,
955            compress_size_64_set=True,
956            compress_size_extra=True,
957            header_offset_64_set=True,
958        )
959        with self.assertRaises(zipfile.BadZipFile) as e:
960            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
961        self.assertIn('header offset', str(e.exception).lower())
962
963        # zip64 compress size present, zip64 header offset present, one field
964        # in extra, expecting two, equals missing header offset
965        missing_header_offset_extra = self.make_zip64_file(
966            file_size_64_set=False,
967            compress_size_64_set=True,
968            compress_size_extra=True,
969            header_offset_64_set=True,
970        )
971        with self.assertRaises(zipfile.BadZipFile) as e:
972            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
973        self.assertIn('header offset', str(e.exception).lower())
974
975        # zip64 file size present, zip64 header offset present, one field in
976        # extra, expecting two, equals missing header offset
977        missing_header_offset_extra = self.make_zip64_file(
978            file_size_64_set=True,
979            file_size_extra=True,
980            compress_size_64_set=False,
981            header_offset_64_set=True,
982        )
983        with self.assertRaises(zipfile.BadZipFile) as e:
984            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
985        self.assertIn('header offset', str(e.exception).lower())
986
987        # zip64 header offset present, no fields in extra, expecting one,
988        # equals missing header offset
989        missing_header_offset_extra = self.make_zip64_file(
990            file_size_64_set=False,
991            compress_size_64_set=False,
992            header_offset_64_set=True,
993        )
994        with self.assertRaises(zipfile.BadZipFile) as e:
995            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
996        self.assertIn('header offset', str(e.exception).lower())
997
998    def test_generated_valid_zip64_extra(self):
999        # These values are what is set in the make_zip64_file method.
1000        expected_file_size = 8
1001        expected_compress_size = 8
1002        expected_header_offset = 0
1003        expected_content = b"test1234"
1004
1005        # Loop through the various valid combinations of zip64 masks
1006        # present and extra fields present.
1007        params = (
1008            {"file_size_64_set": True, "file_size_extra": True},
1009            {"compress_size_64_set": True, "compress_size_extra": True},
1010            {"header_offset_64_set": True, "header_offset_extra": True},
1011        )
1012
1013        for r in range(1, len(params) + 1):
1014            for combo in itertools.combinations(params, r):
1015                kwargs = {}
1016                for c in combo:
1017                    kwargs.update(c)
1018                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1019                    zinfo = zf.infolist()[0]
1020                    self.assertEqual(zinfo.file_size, expected_file_size)
1021                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1022                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1023                    self.assertEqual(zf.read(zinfo), expected_content)
1024
1025
1026@requires_zlib
1027class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1028                                   unittest.TestCase):
1029    compression = zipfile.ZIP_DEFLATED
1030
1031@requires_bz2
1032class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1033                                 unittest.TestCase):
1034    compression = zipfile.ZIP_BZIP2
1035
1036@requires_lzma
1037class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1038                                unittest.TestCase):
1039    compression = zipfile.ZIP_LZMA
1040
1041
1042class AbstractWriterTests:
1043
1044    def tearDown(self):
1045        unlink(TESTFN2)
1046
1047    def test_close_after_close(self):
1048        data = b'content'
1049        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1050            w = zipf.open('test', 'w')
1051            w.write(data)
1052            w.close()
1053            self.assertTrue(w.closed)
1054            w.close()
1055            self.assertTrue(w.closed)
1056            self.assertEqual(zipf.read('test'), data)
1057
1058    def test_write_after_close(self):
1059        data = b'content'
1060        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1061            w = zipf.open('test', 'w')
1062            w.write(data)
1063            w.close()
1064            self.assertTrue(w.closed)
1065            self.assertRaises(ValueError, w.write, b'')
1066            self.assertEqual(zipf.read('test'), data)
1067
1068class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1069    compression = zipfile.ZIP_STORED
1070
1071@requires_zlib
1072class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1073    compression = zipfile.ZIP_DEFLATED
1074
1075@requires_bz2
1076class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1077    compression = zipfile.ZIP_BZIP2
1078
1079@requires_lzma
1080class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1081    compression = zipfile.ZIP_LZMA
1082
1083
1084class PyZipFileTests(unittest.TestCase):
1085    def assertCompiledIn(self, name, namelist):
1086        if name + 'o' not in namelist:
1087            self.assertIn(name + 'c', namelist)
1088
1089    def requiresWriteAccess(self, path):
1090        # effective_ids unavailable on windows
1091        if not os.access(path, os.W_OK,
1092                         effective_ids=os.access in os.supports_effective_ids):
1093            self.skipTest('requires write access to the installed location')
1094        filename = os.path.join(path, 'test_zipfile.try')
1095        try:
1096            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1097            os.close(fd)
1098        except Exception:
1099            self.skipTest('requires write access to the installed location')
1100        unlink(filename)
1101
1102    def test_write_pyfile(self):
1103        self.requiresWriteAccess(os.path.dirname(__file__))
1104        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1105            fn = __file__
1106            if fn.endswith('.pyc'):
1107                path_split = fn.split(os.sep)
1108                if os.altsep is not None:
1109                    path_split.extend(fn.split(os.altsep))
1110                if '__pycache__' in path_split:
1111                    fn = importlib.util.source_from_cache(fn)
1112                else:
1113                    fn = fn[:-1]
1114
1115            zipfp.writepy(fn)
1116
1117            bn = os.path.basename(fn)
1118            self.assertNotIn(bn, zipfp.namelist())
1119            self.assertCompiledIn(bn, zipfp.namelist())
1120
1121        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1122            fn = __file__
1123            if fn.endswith('.pyc'):
1124                fn = fn[:-1]
1125
1126            zipfp.writepy(fn, "testpackage")
1127
1128            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1129            self.assertNotIn(bn, zipfp.namelist())
1130            self.assertCompiledIn(bn, zipfp.namelist())
1131
1132    def test_write_python_package(self):
1133        import email
1134        packagedir = os.path.dirname(email.__file__)
1135        self.requiresWriteAccess(packagedir)
1136
1137        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1138            zipfp.writepy(packagedir)
1139
1140            # Check for a couple of modules at different levels of the
1141            # hierarchy
1142            names = zipfp.namelist()
1143            self.assertCompiledIn('email/__init__.py', names)
1144            self.assertCompiledIn('email/mime/text.py', names)
1145
1146    def test_write_filtered_python_package(self):
1147        import test
1148        packagedir = os.path.dirname(test.__file__)
1149        self.requiresWriteAccess(packagedir)
1150
1151        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1152
1153            # first make sure that the test folder gives error messages
1154            # (on the badsyntax_... files)
1155            with captured_stdout() as reportSIO:
1156                zipfp.writepy(packagedir)
1157            reportStr = reportSIO.getvalue()
1158            self.assertTrue('SyntaxError' in reportStr)
1159
1160            # then check that the filter works on the whole package
1161            with captured_stdout() as reportSIO:
1162                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1163            reportStr = reportSIO.getvalue()
1164            self.assertTrue('SyntaxError' not in reportStr)
1165
1166            # then check that the filter works on individual files
1167            def filter(path):
1168                return not os.path.basename(path).startswith("bad")
1169            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1170                zipfp.writepy(packagedir, filterfunc=filter)
1171            reportStr = reportSIO.getvalue()
1172            if reportStr:
1173                print(reportStr)
1174            self.assertTrue('SyntaxError' not in reportStr)
1175
1176    def test_write_with_optimization(self):
1177        import email
1178        packagedir = os.path.dirname(email.__file__)
1179        self.requiresWriteAccess(packagedir)
1180        optlevel = 1 if __debug__ else 0
1181        ext = '.pyc'
1182
1183        with TemporaryFile() as t, \
1184             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1185            zipfp.writepy(packagedir)
1186
1187            names = zipfp.namelist()
1188            self.assertIn('email/__init__' + ext, names)
1189            self.assertIn('email/mime/text' + ext, names)
1190
1191    def test_write_python_directory(self):
1192        os.mkdir(TESTFN2)
1193        try:
1194            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1195                fp.write("print(42)\n")
1196
1197            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1198                fp.write("print(42 * 42)\n")
1199
1200            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
1201                fp.write("bla bla bla\n")
1202
1203            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1204                zipfp.writepy(TESTFN2)
1205
1206                names = zipfp.namelist()
1207                self.assertCompiledIn('mod1.py', names)
1208                self.assertCompiledIn('mod2.py', names)
1209                self.assertNotIn('mod2.txt', names)
1210
1211        finally:
1212            rmtree(TESTFN2)
1213
1214    def test_write_python_directory_filtered(self):
1215        os.mkdir(TESTFN2)
1216        try:
1217            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1218                fp.write("print(42)\n")
1219
1220            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1221                fp.write("print(42 * 42)\n")
1222
1223            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1224                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1225                                                  not fn.endswith('mod2.py'))
1226
1227                names = zipfp.namelist()
1228                self.assertCompiledIn('mod1.py', names)
1229                self.assertNotIn('mod2.py', names)
1230
1231        finally:
1232            rmtree(TESTFN2)
1233
1234    def test_write_non_pyfile(self):
1235        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1236            with open(TESTFN, 'w') as f:
1237                f.write('most definitely not a python file')
1238            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1239            unlink(TESTFN)
1240
1241    def test_write_pyfile_bad_syntax(self):
1242        os.mkdir(TESTFN2)
1243        try:
1244            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1245                fp.write("Bad syntax in python file\n")
1246
1247            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1248                # syntax errors are printed to stdout
1249                with captured_stdout() as s:
1250                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1251
1252                self.assertIn("SyntaxError", s.getvalue())
1253
1254                # as it will not have compiled the python file, it will
1255                # include the .py file not .pyc
1256                names = zipfp.namelist()
1257                self.assertIn('mod1.py', names)
1258                self.assertNotIn('mod1.pyc', names)
1259
1260        finally:
1261            rmtree(TESTFN2)
1262
1263    def test_write_pathlike(self):
1264        os.mkdir(TESTFN2)
1265        try:
1266            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1267                fp.write("print(42)\n")
1268
1269            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1270                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1271                names = zipfp.namelist()
1272                self.assertCompiledIn('mod1.py', names)
1273        finally:
1274            rmtree(TESTFN2)
1275
1276
1277class ExtractTests(unittest.TestCase):
1278
1279    def make_test_file(self):
1280        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1281            for fpath, fdata in SMALL_TEST_DATA:
1282                zipfp.writestr(fpath, fdata)
1283
1284    def test_extract(self):
1285        with temp_cwd():
1286            self.make_test_file()
1287            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1288                for fpath, fdata in SMALL_TEST_DATA:
1289                    writtenfile = zipfp.extract(fpath)
1290
1291                    # make sure it was written to the right place
1292                    correctfile = os.path.join(os.getcwd(), fpath)
1293                    correctfile = os.path.normpath(correctfile)
1294
1295                    self.assertEqual(writtenfile, correctfile)
1296
1297                    # make sure correct data is in correct file
1298                    with open(writtenfile, "rb") as f:
1299                        self.assertEqual(fdata.encode(), f.read())
1300
1301                    unlink(writtenfile)
1302
1303    def _test_extract_with_target(self, target):
1304        self.make_test_file()
1305        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1306            for fpath, fdata in SMALL_TEST_DATA:
1307                writtenfile = zipfp.extract(fpath, target)
1308
1309                # make sure it was written to the right place
1310                correctfile = os.path.join(target, fpath)
1311                correctfile = os.path.normpath(correctfile)
1312                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1313
1314                # make sure correct data is in correct file
1315                with open(writtenfile, "rb") as f:
1316                    self.assertEqual(fdata.encode(), f.read())
1317
1318                unlink(writtenfile)
1319
1320        unlink(TESTFN2)
1321
1322    def test_extract_with_target(self):
1323        with temp_dir() as extdir:
1324            self._test_extract_with_target(extdir)
1325
1326    def test_extract_with_target_pathlike(self):
1327        with temp_dir() as extdir:
1328            self._test_extract_with_target(pathlib.Path(extdir))
1329
1330    def test_extract_all(self):
1331        with temp_cwd():
1332            self.make_test_file()
1333            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1334                zipfp.extractall()
1335                for fpath, fdata in SMALL_TEST_DATA:
1336                    outfile = os.path.join(os.getcwd(), fpath)
1337
1338                    with open(outfile, "rb") as f:
1339                        self.assertEqual(fdata.encode(), f.read())
1340
1341                    unlink(outfile)
1342
1343    def _test_extract_all_with_target(self, target):
1344        self.make_test_file()
1345        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1346            zipfp.extractall(target)
1347            for fpath, fdata in SMALL_TEST_DATA:
1348                outfile = os.path.join(target, fpath)
1349
1350                with open(outfile, "rb") as f:
1351                    self.assertEqual(fdata.encode(), f.read())
1352
1353                unlink(outfile)
1354
1355        unlink(TESTFN2)
1356
1357    def test_extract_all_with_target(self):
1358        with temp_dir() as extdir:
1359            self._test_extract_all_with_target(extdir)
1360
1361    def test_extract_all_with_target_pathlike(self):
1362        with temp_dir() as extdir:
1363            self._test_extract_all_with_target(pathlib.Path(extdir))
1364
1365    def check_file(self, filename, content):
1366        self.assertTrue(os.path.isfile(filename))
1367        with open(filename, 'rb') as f:
1368            self.assertEqual(f.read(), content)
1369
1370    def test_sanitize_windows_name(self):
1371        san = zipfile.ZipFile._sanitize_windows_name
1372        # Passing pathsep in allows this test to work regardless of platform.
1373        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1374        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1375        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1376
1377    def test_extract_hackers_arcnames_common_cases(self):
1378        common_hacknames = [
1379            ('../foo/bar', 'foo/bar'),
1380            ('foo/../bar', 'foo/bar'),
1381            ('foo/../../bar', 'foo/bar'),
1382            ('foo/bar/..', 'foo/bar'),
1383            ('./../foo/bar', 'foo/bar'),
1384            ('/foo/bar', 'foo/bar'),
1385            ('/foo/../bar', 'foo/bar'),
1386            ('/foo/../../bar', 'foo/bar'),
1387        ]
1388        self._test_extract_hackers_arcnames(common_hacknames)
1389
1390    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1391    def test_extract_hackers_arcnames_windows_only(self):
1392        """Test combination of path fixing and windows name sanitization."""
1393        windows_hacknames = [
1394            (r'..\foo\bar', 'foo/bar'),
1395            (r'..\/foo\/bar', 'foo/bar'),
1396            (r'foo/\..\/bar', 'foo/bar'),
1397            (r'foo\/../\bar', 'foo/bar'),
1398            (r'C:foo/bar', 'foo/bar'),
1399            (r'C:/foo/bar', 'foo/bar'),
1400            (r'C://foo/bar', 'foo/bar'),
1401            (r'C:\foo\bar', 'foo/bar'),
1402            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1403            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1404            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1405            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1406            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1407            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1408            (r'//?/C:/foo/bar', 'foo/bar'),
1409            (r'\\?\C:\foo\bar', 'foo/bar'),
1410            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1411            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1412            ('../../foo../../ba..r', 'foo/ba..r'),
1413        ]
1414        self._test_extract_hackers_arcnames(windows_hacknames)
1415
1416    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1417    def test_extract_hackers_arcnames_posix_only(self):
1418        posix_hacknames = [
1419            ('//foo/bar', 'foo/bar'),
1420            ('../../foo../../ba..r', 'foo../ba..r'),
1421            (r'foo/..\bar', r'foo/..\bar'),
1422        ]
1423        self._test_extract_hackers_arcnames(posix_hacknames)
1424
1425    def _test_extract_hackers_arcnames(self, hacknames):
1426        for arcname, fixedname in hacknames:
1427            content = b'foobar' + arcname.encode()
1428            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1429                zinfo = zipfile.ZipInfo()
1430                # preserve backslashes
1431                zinfo.filename = arcname
1432                zinfo.external_attr = 0o600 << 16
1433                zipfp.writestr(zinfo, content)
1434
1435            arcname = arcname.replace(os.sep, "/")
1436            targetpath = os.path.join('target', 'subdir', 'subsub')
1437            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1438
1439            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1440                writtenfile = zipfp.extract(arcname, targetpath)
1441                self.assertEqual(writtenfile, correctfile,
1442                                 msg='extract %r: %r != %r' %
1443                                 (arcname, writtenfile, correctfile))
1444            self.check_file(correctfile, content)
1445            rmtree('target')
1446
1447            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1448                zipfp.extractall(targetpath)
1449            self.check_file(correctfile, content)
1450            rmtree('target')
1451
1452            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1453
1454            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1455                writtenfile = zipfp.extract(arcname)
1456                self.assertEqual(writtenfile, correctfile,
1457                                 msg="extract %r" % arcname)
1458            self.check_file(correctfile, content)
1459            rmtree(fixedname.split('/')[0])
1460
1461            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1462                zipfp.extractall()
1463            self.check_file(correctfile, content)
1464            rmtree(fixedname.split('/')[0])
1465
1466            unlink(TESTFN2)
1467
1468
1469class OtherTests(unittest.TestCase):
1470    def test_open_via_zip_info(self):
1471        # Create the ZIP archive
1472        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1473            zipfp.writestr("name", "foo")
1474            with self.assertWarns(UserWarning):
1475                zipfp.writestr("name", "bar")
1476            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1477
1478        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1479            infos = zipfp.infolist()
1480            data = b""
1481            for info in infos:
1482                with zipfp.open(info) as zipopen:
1483                    data += zipopen.read()
1484            self.assertIn(data, {b"foobar", b"barfoo"})
1485            data = b""
1486            for info in infos:
1487                data += zipfp.read(info)
1488            self.assertIn(data, {b"foobar", b"barfoo"})
1489
1490    def test_writestr_extended_local_header_issue1202(self):
1491        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1492            for data in 'abcdefghijklmnop':
1493                zinfo = zipfile.ZipInfo(data)
1494                zinfo.flag_bits |= 0x08  # Include an extended local header.
1495                orig_zip.writestr(zinfo, data)
1496
1497    def test_close(self):
1498        """Check that the zipfile is closed after the 'with' block."""
1499        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1500            for fpath, fdata in SMALL_TEST_DATA:
1501                zipfp.writestr(fpath, fdata)
1502                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1503        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1504
1505        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1506            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1507        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1508
1509    def test_close_on_exception(self):
1510        """Check that the zipfile is closed if an exception is raised in the
1511        'with' block."""
1512        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1513            for fpath, fdata in SMALL_TEST_DATA:
1514                zipfp.writestr(fpath, fdata)
1515
1516        try:
1517            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1518                raise zipfile.BadZipFile()
1519        except zipfile.BadZipFile:
1520            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1521
1522    def test_unsupported_version(self):
1523        # File has an extract_version of 120
1524        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1525                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1526                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1527                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1528                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1529
1530        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1531                          io.BytesIO(data), 'r')
1532
1533    @requires_zlib
1534    def test_read_unicode_filenames(self):
1535        # bug #10801
1536        fname = findfile('zip_cp437_header.zip')
1537        with zipfile.ZipFile(fname) as zipfp:
1538            for name in zipfp.namelist():
1539                zipfp.open(name).close()
1540
1541    def test_write_unicode_filenames(self):
1542        with zipfile.ZipFile(TESTFN, "w") as zf:
1543            zf.writestr("foo.txt", "Test for unicode filename")
1544            zf.writestr("\xf6.txt", "Test for unicode filename")
1545            self.assertIsInstance(zf.infolist()[0].filename, str)
1546
1547        with zipfile.ZipFile(TESTFN, "r") as zf:
1548            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1549            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1550
1551    def test_exclusive_create_zip_file(self):
1552        """Test exclusive creating a new zipfile."""
1553        unlink(TESTFN2)
1554        filename = 'testfile.txt'
1555        content = b'hello, world. this is some content.'
1556        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1557            zipfp.writestr(filename, content)
1558        with self.assertRaises(FileExistsError):
1559            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1560        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1561            self.assertEqual(zipfp.namelist(), [filename])
1562            self.assertEqual(zipfp.read(filename), content)
1563
1564    def test_create_non_existent_file_for_append(self):
1565        if os.path.exists(TESTFN):
1566            os.unlink(TESTFN)
1567
1568        filename = 'testfile.txt'
1569        content = b'hello, world. this is some content.'
1570
1571        try:
1572            with zipfile.ZipFile(TESTFN, 'a') as zf:
1573                zf.writestr(filename, content)
1574        except OSError:
1575            self.fail('Could not append data to a non-existent zip file.')
1576
1577        self.assertTrue(os.path.exists(TESTFN))
1578
1579        with zipfile.ZipFile(TESTFN, 'r') as zf:
1580            self.assertEqual(zf.read(filename), content)
1581
1582    def test_close_erroneous_file(self):
1583        # This test checks that the ZipFile constructor closes the file object
1584        # it opens if there's an error in the file.  If it doesn't, the
1585        # traceback holds a reference to the ZipFile object and, indirectly,
1586        # the file object.
1587        # On Windows, this causes the os.unlink() call to fail because the
1588        # underlying file is still open.  This is SF bug #412214.
1589        #
1590        with open(TESTFN, "w") as fp:
1591            fp.write("this is not a legal zip file\n")
1592        try:
1593            zf = zipfile.ZipFile(TESTFN)
1594        except zipfile.BadZipFile:
1595            pass
1596
1597    def test_is_zip_erroneous_file(self):
1598        """Check that is_zipfile() correctly identifies non-zip files."""
1599        # - passing a filename
1600        with open(TESTFN, "w") as fp:
1601            fp.write("this is not a legal zip file\n")
1602        self.assertFalse(zipfile.is_zipfile(TESTFN))
1603        # - passing a path-like object
1604        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1605        # - passing a file object
1606        with open(TESTFN, "rb") as fp:
1607            self.assertFalse(zipfile.is_zipfile(fp))
1608        # - passing a file-like object
1609        fp = io.BytesIO()
1610        fp.write(b"this is not a legal zip file\n")
1611        self.assertFalse(zipfile.is_zipfile(fp))
1612        fp.seek(0, 0)
1613        self.assertFalse(zipfile.is_zipfile(fp))
1614
1615    def test_damaged_zipfile(self):
1616        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1617        # - Create a valid zip file
1618        fp = io.BytesIO()
1619        with zipfile.ZipFile(fp, mode="w") as zipf:
1620            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1621        zipfiledata = fp.getvalue()
1622
1623        # - Now create copies of it missing the last N bytes and make sure
1624        #   a BadZipFile exception is raised when we try to open it
1625        for N in range(len(zipfiledata)):
1626            fp = io.BytesIO(zipfiledata[:N])
1627            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1628
1629    def test_is_zip_valid_file(self):
1630        """Check that is_zipfile() correctly identifies zip files."""
1631        # - passing a filename
1632        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1633            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1634
1635        self.assertTrue(zipfile.is_zipfile(TESTFN))
1636        # - passing a file object
1637        with open(TESTFN, "rb") as fp:
1638            self.assertTrue(zipfile.is_zipfile(fp))
1639            fp.seek(0, 0)
1640            zip_contents = fp.read()
1641        # - passing a file-like object
1642        fp = io.BytesIO()
1643        fp.write(zip_contents)
1644        self.assertTrue(zipfile.is_zipfile(fp))
1645        fp.seek(0, 0)
1646        self.assertTrue(zipfile.is_zipfile(fp))
1647
1648    def test_non_existent_file_raises_OSError(self):
1649        # make sure we don't raise an AttributeError when a partially-constructed
1650        # ZipFile instance is finalized; this tests for regression on SF tracker
1651        # bug #403871.
1652
1653        # The bug we're testing for caused an AttributeError to be raised
1654        # when a ZipFile instance was created for a file that did not
1655        # exist; the .fp member was not initialized but was needed by the
1656        # __del__() method.  Since the AttributeError is in the __del__(),
1657        # it is ignored, but the user should be sufficiently annoyed by
1658        # the message on the output that regression will be noticed
1659        # quickly.
1660        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1661
1662    def test_empty_file_raises_BadZipFile(self):
1663        f = open(TESTFN, 'w')
1664        f.close()
1665        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1666
1667        with open(TESTFN, 'w') as fp:
1668            fp.write("short file")
1669        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1670
1671    def test_closed_zip_raises_ValueError(self):
1672        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1673        data = io.BytesIO()
1674        with zipfile.ZipFile(data, mode="w") as zipf:
1675            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1676
1677        # This is correct; calling .read on a closed ZipFile should raise
1678        # a ValueError, and so should calling .testzip.  An earlier
1679        # version of .testzip would swallow this exception (and any other)
1680        # and report that the first file in the archive was corrupt.
1681        self.assertRaises(ValueError, zipf.read, "foo.txt")
1682        self.assertRaises(ValueError, zipf.open, "foo.txt")
1683        self.assertRaises(ValueError, zipf.testzip)
1684        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1685        with open(TESTFN, 'w') as f:
1686            f.write('zipfile test data')
1687        self.assertRaises(ValueError, zipf.write, TESTFN)
1688
1689    def test_bad_constructor_mode(self):
1690        """Check that bad modes passed to ZipFile constructor are caught."""
1691        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1692
1693    def test_bad_open_mode(self):
1694        """Check that bad modes passed to ZipFile.open are caught."""
1695        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1696            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1697
1698        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1699            # read the data to make sure the file is there
1700            zipf.read("foo.txt")
1701            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1702            # universal newlines support is removed
1703            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1704            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1705
1706    def test_read0(self):
1707        """Check that calling read(0) on a ZipExtFile object returns an empty
1708        string and doesn't advance file pointer."""
1709        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1710            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1711            # read the data to make sure the file is there
1712            with zipf.open("foo.txt") as f:
1713                for i in range(FIXEDTEST_SIZE):
1714                    self.assertEqual(f.read(0), b'')
1715
1716                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1717
1718    def test_open_non_existent_item(self):
1719        """Check that attempting to call open() for an item that doesn't
1720        exist in the archive raises a RuntimeError."""
1721        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1722            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1723
1724    def test_bad_compression_mode(self):
1725        """Check that bad compression methods passed to ZipFile.open are
1726        caught."""
1727        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1728
1729    def test_unsupported_compression(self):
1730        # data is declared as shrunk, but actually deflated
1731        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1732                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1733                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1734                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1735                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1736                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1737        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1738            self.assertRaises(NotImplementedError, zipf.open, 'x')
1739
1740    def test_null_byte_in_filename(self):
1741        """Check that a filename containing a null byte is properly
1742        terminated."""
1743        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1744            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1745            self.assertEqual(zipf.namelist(), ['foo.txt'])
1746
1747    def test_struct_sizes(self):
1748        """Check that ZIP internal structure sizes are calculated correctly."""
1749        self.assertEqual(zipfile.sizeEndCentDir, 22)
1750        self.assertEqual(zipfile.sizeCentralDir, 46)
1751        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1752        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1753
1754    def test_comments(self):
1755        """Check that comments on the archive are handled properly."""
1756
1757        # check default comment is empty
1758        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1759            self.assertEqual(zipf.comment, b'')
1760            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1761
1762        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1763            self.assertEqual(zipfr.comment, b'')
1764
1765        # check a simple short comment
1766        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1767        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1768            zipf.comment = comment
1769            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1770        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1771            self.assertEqual(zipf.comment, comment)
1772
1773        # check a comment of max length
1774        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1775        comment2 = comment2.encode("ascii")
1776        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1777            zipf.comment = comment2
1778            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1779
1780        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1781            self.assertEqual(zipfr.comment, comment2)
1782
1783        # check a comment that is too long is truncated
1784        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1785            with self.assertWarns(UserWarning):
1786                zipf.comment = comment2 + b'oops'
1787            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1788        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1789            self.assertEqual(zipfr.comment, comment2)
1790
1791        # check that comments are correctly modified in append mode
1792        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1793            zipf.comment = b"original comment"
1794            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1795        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1796            zipf.comment = b"an updated comment"
1797        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1798            self.assertEqual(zipf.comment, b"an updated comment")
1799
1800        # check that comments are correctly shortened in append mode
1801        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1802            zipf.comment = b"original comment that's longer"
1803            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1804        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1805            zipf.comment = b"shorter comment"
1806        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1807            self.assertEqual(zipf.comment, b"shorter comment")
1808
1809    def test_unicode_comment(self):
1810        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1811            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1812            with self.assertRaises(TypeError):
1813                zipf.comment = "this is an error"
1814
1815    def test_change_comment_in_empty_archive(self):
1816        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1817            self.assertFalse(zipf.filelist)
1818            zipf.comment = b"this is a comment"
1819        with zipfile.ZipFile(TESTFN, "r") as zipf:
1820            self.assertEqual(zipf.comment, b"this is a comment")
1821
1822    def test_change_comment_in_nonempty_archive(self):
1823        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1824            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1825        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1826            self.assertTrue(zipf.filelist)
1827            zipf.comment = b"this is a comment"
1828        with zipfile.ZipFile(TESTFN, "r") as zipf:
1829            self.assertEqual(zipf.comment, b"this is a comment")
1830
1831    def test_empty_zipfile(self):
1832        # Check that creating a file in 'w' or 'a' mode and closing without
1833        # adding any files to the archives creates a valid empty ZIP file
1834        zipf = zipfile.ZipFile(TESTFN, mode="w")
1835        zipf.close()
1836        try:
1837            zipf = zipfile.ZipFile(TESTFN, mode="r")
1838        except zipfile.BadZipFile:
1839            self.fail("Unable to create empty ZIP file in 'w' mode")
1840
1841        zipf = zipfile.ZipFile(TESTFN, mode="a")
1842        zipf.close()
1843        try:
1844            zipf = zipfile.ZipFile(TESTFN, mode="r")
1845        except:
1846            self.fail("Unable to create empty ZIP file in 'a' mode")
1847
1848    def test_open_empty_file(self):
1849        # Issue 1710703: Check that opening a file with less than 22 bytes
1850        # raises a BadZipFile exception (rather than the previously unhelpful
1851        # OSError)
1852        f = open(TESTFN, 'w')
1853        f.close()
1854        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1855
1856    def test_create_zipinfo_before_1980(self):
1857        self.assertRaises(ValueError,
1858                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1859
1860    def test_zipfile_with_short_extra_field(self):
1861        """If an extra field in the header is less than 4 bytes, skip it."""
1862        zipdata = (
1863            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1864            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1865            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1866            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1867            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1868            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1869            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1870        )
1871        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1872            # testzip returns the name of the first corrupt file, or None
1873            self.assertIsNone(zipf.testzip())
1874
1875    def test_open_conflicting_handles(self):
1876        # It's only possible to open one writable file handle at a time
1877        msg1 = b"It's fun to charter an accountant!"
1878        msg2 = b"And sail the wide accountant sea"
1879        msg3 = b"To find, explore the funds offshore"
1880        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1881            with zipf.open('foo', mode='w') as w2:
1882                w2.write(msg1)
1883            with zipf.open('bar', mode='w') as w1:
1884                with self.assertRaises(ValueError):
1885                    zipf.open('handle', mode='w')
1886                with self.assertRaises(ValueError):
1887                    zipf.open('foo', mode='r')
1888                with self.assertRaises(ValueError):
1889                    zipf.writestr('str', 'abcde')
1890                with self.assertRaises(ValueError):
1891                    zipf.write(__file__, 'file')
1892                with self.assertRaises(ValueError):
1893                    zipf.close()
1894                w1.write(msg2)
1895            with zipf.open('baz', mode='w') as w2:
1896                w2.write(msg3)
1897
1898        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1899            self.assertEqual(zipf.read('foo'), msg1)
1900            self.assertEqual(zipf.read('bar'), msg2)
1901            self.assertEqual(zipf.read('baz'), msg3)
1902            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1903
1904    def test_seek_tell(self):
1905        # Test seek functionality
1906        txt = b"Where's Bruce?"
1907        bloc = txt.find(b"Bruce")
1908        # Check seek on a file
1909        with zipfile.ZipFile(TESTFN, "w") as zipf:
1910            zipf.writestr("foo.txt", txt)
1911        with zipfile.ZipFile(TESTFN, "r") as zipf:
1912            with zipf.open("foo.txt", "r") as fp:
1913                fp.seek(bloc, os.SEEK_SET)
1914                self.assertEqual(fp.tell(), bloc)
1915                fp.seek(-bloc, os.SEEK_CUR)
1916                self.assertEqual(fp.tell(), 0)
1917                fp.seek(bloc, os.SEEK_CUR)
1918                self.assertEqual(fp.tell(), bloc)
1919                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1920                fp.seek(0, os.SEEK_END)
1921                self.assertEqual(fp.tell(), len(txt))
1922                fp.seek(0, os.SEEK_SET)
1923                self.assertEqual(fp.tell(), 0)
1924        # Check seek on memory file
1925        data = io.BytesIO()
1926        with zipfile.ZipFile(data, mode="w") as zipf:
1927            zipf.writestr("foo.txt", txt)
1928        with zipfile.ZipFile(data, mode="r") as zipf:
1929            with zipf.open("foo.txt", "r") as fp:
1930                fp.seek(bloc, os.SEEK_SET)
1931                self.assertEqual(fp.tell(), bloc)
1932                fp.seek(-bloc, os.SEEK_CUR)
1933                self.assertEqual(fp.tell(), 0)
1934                fp.seek(bloc, os.SEEK_CUR)
1935                self.assertEqual(fp.tell(), bloc)
1936                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1937                fp.seek(0, os.SEEK_END)
1938                self.assertEqual(fp.tell(), len(txt))
1939                fp.seek(0, os.SEEK_SET)
1940                self.assertEqual(fp.tell(), 0)
1941
1942    def tearDown(self):
1943        unlink(TESTFN)
1944        unlink(TESTFN2)
1945
1946
1947class AbstractBadCrcTests:
1948    def test_testzip_with_bad_crc(self):
1949        """Tests that files with bad CRCs return their name from testzip."""
1950        zipdata = self.zip_with_bad_crc
1951
1952        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1953            # testzip returns the name of the first corrupt file, or None
1954            self.assertEqual('afile', zipf.testzip())
1955
1956    def test_read_with_bad_crc(self):
1957        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
1958        zipdata = self.zip_with_bad_crc
1959
1960        # Using ZipFile.read()
1961        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1962            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
1963
1964        # Using ZipExtFile.read()
1965        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1966            with zipf.open('afile', 'r') as corrupt_file:
1967                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
1968
1969        # Same with small reads (in order to exercise the buffering logic)
1970        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1971            with zipf.open('afile', 'r') as corrupt_file:
1972                corrupt_file.MIN_READ_SIZE = 2
1973                with self.assertRaises(zipfile.BadZipFile):
1974                    while corrupt_file.read(2):
1975                        pass
1976
1977
1978class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1979    compression = zipfile.ZIP_STORED
1980    zip_with_bad_crc = (
1981        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
1982        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
1983        b'ilehello,AworldP'
1984        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
1985        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
1986        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
1987        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
1988        b'\0\0/\0\0\0\0\0')
1989
1990@requires_zlib
1991class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1992    compression = zipfile.ZIP_DEFLATED
1993    zip_with_bad_crc = (
1994        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
1995        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1996        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
1997        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
1998        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
1999        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2000        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2001        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2002
2003@requires_bz2
2004class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2005    compression = zipfile.ZIP_BZIP2
2006    zip_with_bad_crc = (
2007        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2008        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2009        b'ileBZh91AY&SY\xd4\xa8\xca'
2010        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2011        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2012        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2013        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2014        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2015        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2016        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2017        b'\x00\x00\x00\x00')
2018
2019@requires_lzma
2020class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2021    compression = zipfile.ZIP_LZMA
2022    zip_with_bad_crc = (
2023        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2024        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2025        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2026        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2027        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2028        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2029        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2030        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2031        b'\x00>\x00\x00\x00\x00\x00')
2032
2033
2034class DecryptionTests(unittest.TestCase):
2035    """Check that ZIP decryption works. Since the library does not
2036    support encryption at the moment, we use a pre-generated encrypted
2037    ZIP file."""
2038
2039    data = (
2040        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2041        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2042        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2043        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2044        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2045        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2046        b'\x00\x00L\x00\x00\x00\x00\x00' )
2047    data2 = (
2048        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2049        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2050        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2051        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2052        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2053        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2054        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2055        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2056
2057    plain = b'zipfile.py encryption test'
2058    plain2 = b'\x00'*512
2059
2060    def setUp(self):
2061        with open(TESTFN, "wb") as fp:
2062            fp.write(self.data)
2063        self.zip = zipfile.ZipFile(TESTFN, "r")
2064        with open(TESTFN2, "wb") as fp:
2065            fp.write(self.data2)
2066        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2067
2068    def tearDown(self):
2069        self.zip.close()
2070        os.unlink(TESTFN)
2071        self.zip2.close()
2072        os.unlink(TESTFN2)
2073
2074    def test_no_password(self):
2075        # Reading the encrypted file without password
2076        # must generate a RunTime exception
2077        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2078        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2079
2080    def test_bad_password(self):
2081        self.zip.setpassword(b"perl")
2082        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2083        self.zip2.setpassword(b"perl")
2084        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2085
2086    @requires_zlib
2087    def test_good_password(self):
2088        self.zip.setpassword(b"python")
2089        self.assertEqual(self.zip.read("test.txt"), self.plain)
2090        self.zip2.setpassword(b"12345")
2091        self.assertEqual(self.zip2.read("zero"), self.plain2)
2092
2093    def test_unicode_password(self):
2094        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2095        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2096        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2097        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2098
2099    def test_seek_tell(self):
2100        self.zip.setpassword(b"python")
2101        txt = self.plain
2102        test_word = b'encryption'
2103        bloc = txt.find(test_word)
2104        bloc_len = len(test_word)
2105        with self.zip.open("test.txt", "r") as fp:
2106            fp.seek(bloc, os.SEEK_SET)
2107            self.assertEqual(fp.tell(), bloc)
2108            fp.seek(-bloc, os.SEEK_CUR)
2109            self.assertEqual(fp.tell(), 0)
2110            fp.seek(bloc, os.SEEK_CUR)
2111            self.assertEqual(fp.tell(), bloc)
2112            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2113
2114            # Make sure that the second read after seeking back beyond
2115            # _readbuffer returns the same content (ie. rewind to the start of
2116            # the file to read forward to the required position).
2117            old_read_size = fp.MIN_READ_SIZE
2118            fp.MIN_READ_SIZE = 1
2119            fp._readbuffer = b''
2120            fp._offset = 0
2121            fp.seek(0, os.SEEK_SET)
2122            self.assertEqual(fp.tell(), 0)
2123            fp.seek(bloc, os.SEEK_CUR)
2124            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2125            fp.MIN_READ_SIZE = old_read_size
2126
2127            fp.seek(0, os.SEEK_END)
2128            self.assertEqual(fp.tell(), len(txt))
2129            fp.seek(0, os.SEEK_SET)
2130            self.assertEqual(fp.tell(), 0)
2131
2132            # Read the file completely to definitely call any eof integrity
2133            # checks (crc) and make sure they still pass.
2134            fp.read()
2135
2136
2137class AbstractTestsWithRandomBinaryFiles:
2138    @classmethod
2139    def setUpClass(cls):
2140        datacount = randint(16, 64)*1024 + randint(1, 1024)
2141        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2142                            for i in range(datacount))
2143
2144    def setUp(self):
2145        # Make a source file with some lines
2146        with open(TESTFN, "wb") as fp:
2147            fp.write(self.data)
2148
2149    def tearDown(self):
2150        unlink(TESTFN)
2151        unlink(TESTFN2)
2152
2153    def make_test_archive(self, f, compression):
2154        # Create the ZIP archive
2155        with zipfile.ZipFile(f, "w", compression) as zipfp:
2156            zipfp.write(TESTFN, "another.name")
2157            zipfp.write(TESTFN, TESTFN)
2158
2159    def zip_test(self, f, compression):
2160        self.make_test_archive(f, compression)
2161
2162        # Read the ZIP archive
2163        with zipfile.ZipFile(f, "r", compression) as zipfp:
2164            testdata = zipfp.read(TESTFN)
2165            self.assertEqual(len(testdata), len(self.data))
2166            self.assertEqual(testdata, self.data)
2167            self.assertEqual(zipfp.read("another.name"), self.data)
2168
2169    def test_read(self):
2170        for f in get_files(self):
2171            self.zip_test(f, self.compression)
2172
2173    def zip_open_test(self, f, compression):
2174        self.make_test_archive(f, compression)
2175
2176        # Read the ZIP archive
2177        with zipfile.ZipFile(f, "r", compression) as zipfp:
2178            zipdata1 = []
2179            with zipfp.open(TESTFN) as zipopen1:
2180                while True:
2181                    read_data = zipopen1.read(256)
2182                    if not read_data:
2183                        break
2184                    zipdata1.append(read_data)
2185
2186            zipdata2 = []
2187            with zipfp.open("another.name") as zipopen2:
2188                while True:
2189                    read_data = zipopen2.read(256)
2190                    if not read_data:
2191                        break
2192                    zipdata2.append(read_data)
2193
2194            testdata1 = b''.join(zipdata1)
2195            self.assertEqual(len(testdata1), len(self.data))
2196            self.assertEqual(testdata1, self.data)
2197
2198            testdata2 = b''.join(zipdata2)
2199            self.assertEqual(len(testdata2), len(self.data))
2200            self.assertEqual(testdata2, self.data)
2201
2202    def test_open(self):
2203        for f in get_files(self):
2204            self.zip_open_test(f, self.compression)
2205
2206    def zip_random_open_test(self, f, compression):
2207        self.make_test_archive(f, compression)
2208
2209        # Read the ZIP archive
2210        with zipfile.ZipFile(f, "r", compression) as zipfp:
2211            zipdata1 = []
2212            with zipfp.open(TESTFN) as zipopen1:
2213                while True:
2214                    read_data = zipopen1.read(randint(1, 1024))
2215                    if not read_data:
2216                        break
2217                    zipdata1.append(read_data)
2218
2219            testdata = b''.join(zipdata1)
2220            self.assertEqual(len(testdata), len(self.data))
2221            self.assertEqual(testdata, self.data)
2222
2223    def test_random_open(self):
2224        for f in get_files(self):
2225            self.zip_random_open_test(f, self.compression)
2226
2227
2228class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2229                                       unittest.TestCase):
2230    compression = zipfile.ZIP_STORED
2231
2232@requires_zlib
2233class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2234                                        unittest.TestCase):
2235    compression = zipfile.ZIP_DEFLATED
2236
2237@requires_bz2
2238class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2239                                      unittest.TestCase):
2240    compression = zipfile.ZIP_BZIP2
2241
2242@requires_lzma
2243class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2244                                     unittest.TestCase):
2245    compression = zipfile.ZIP_LZMA
2246
2247
2248# Provide the tell() method but not seek()
2249class Tellable:
2250    def __init__(self, fp):
2251        self.fp = fp
2252        self.offset = 0
2253
2254    def write(self, data):
2255        n = self.fp.write(data)
2256        self.offset += n
2257        return n
2258
2259    def tell(self):
2260        return self.offset
2261
2262    def flush(self):
2263        self.fp.flush()
2264
2265class Unseekable:
2266    def __init__(self, fp):
2267        self.fp = fp
2268
2269    def write(self, data):
2270        return self.fp.write(data)
2271
2272    def flush(self):
2273        self.fp.flush()
2274
2275class UnseekableTests(unittest.TestCase):
2276    def test_writestr(self):
2277        for wrapper in (lambda f: f), Tellable, Unseekable:
2278            with self.subTest(wrapper=wrapper):
2279                f = io.BytesIO()
2280                f.write(b'abc')
2281                bf = io.BufferedWriter(f)
2282                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2283                    zipfp.writestr('ones', b'111')
2284                    zipfp.writestr('twos', b'222')
2285                self.assertEqual(f.getvalue()[:5], b'abcPK')
2286                with zipfile.ZipFile(f, mode='r') as zipf:
2287                    with zipf.open('ones') as zopen:
2288                        self.assertEqual(zopen.read(), b'111')
2289                    with zipf.open('twos') as zopen:
2290                        self.assertEqual(zopen.read(), b'222')
2291
2292    def test_write(self):
2293        for wrapper in (lambda f: f), Tellable, Unseekable:
2294            with self.subTest(wrapper=wrapper):
2295                f = io.BytesIO()
2296                f.write(b'abc')
2297                bf = io.BufferedWriter(f)
2298                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2299                    self.addCleanup(unlink, TESTFN)
2300                    with open(TESTFN, 'wb') as f2:
2301                        f2.write(b'111')
2302                    zipfp.write(TESTFN, 'ones')
2303                    with open(TESTFN, 'wb') as f2:
2304                        f2.write(b'222')
2305                    zipfp.write(TESTFN, 'twos')
2306                self.assertEqual(f.getvalue()[:5], b'abcPK')
2307                with zipfile.ZipFile(f, mode='r') as zipf:
2308                    with zipf.open('ones') as zopen:
2309                        self.assertEqual(zopen.read(), b'111')
2310                    with zipf.open('twos') as zopen:
2311                        self.assertEqual(zopen.read(), b'222')
2312
2313    def test_open_write(self):
2314        for wrapper in (lambda f: f), Tellable, Unseekable:
2315            with self.subTest(wrapper=wrapper):
2316                f = io.BytesIO()
2317                f.write(b'abc')
2318                bf = io.BufferedWriter(f)
2319                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2320                    with zipf.open('ones', 'w') as zopen:
2321                        zopen.write(b'111')
2322                    with zipf.open('twos', 'w') as zopen:
2323                        zopen.write(b'222')
2324                self.assertEqual(f.getvalue()[:5], b'abcPK')
2325                with zipfile.ZipFile(f) as zipf:
2326                    self.assertEqual(zipf.read('ones'), b'111')
2327                    self.assertEqual(zipf.read('twos'), b'222')
2328
2329
2330@requires_zlib
2331class TestsWithMultipleOpens(unittest.TestCase):
2332    @classmethod
2333    def setUpClass(cls):
2334        cls.data1 = b'111' + getrandbytes(10000)
2335        cls.data2 = b'222' + getrandbytes(10000)
2336
2337    def make_test_archive(self, f):
2338        # Create the ZIP archive
2339        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2340            zipfp.writestr('ones', self.data1)
2341            zipfp.writestr('twos', self.data2)
2342
2343    def test_same_file(self):
2344        # Verify that (when the ZipFile is in control of creating file objects)
2345        # multiple open() calls can be made without interfering with each other.
2346        for f in get_files(self):
2347            self.make_test_archive(f)
2348            with zipfile.ZipFile(f, mode="r") as zipf:
2349                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2350                    data1 = zopen1.read(500)
2351                    data2 = zopen2.read(500)
2352                    data1 += zopen1.read()
2353                    data2 += zopen2.read()
2354                self.assertEqual(data1, data2)
2355                self.assertEqual(data1, self.data1)
2356
2357    def test_different_file(self):
2358        # Verify that (when the ZipFile is in control of creating file objects)
2359        # multiple open() calls can be made without interfering with each other.
2360        for f in get_files(self):
2361            self.make_test_archive(f)
2362            with zipfile.ZipFile(f, mode="r") as zipf:
2363                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2364                    data1 = zopen1.read(500)
2365                    data2 = zopen2.read(500)
2366                    data1 += zopen1.read()
2367                    data2 += zopen2.read()
2368                self.assertEqual(data1, self.data1)
2369                self.assertEqual(data2, self.data2)
2370
2371    def test_interleaved(self):
2372        # Verify that (when the ZipFile is in control of creating file objects)
2373        # multiple open() calls can be made without interfering with each other.
2374        for f in get_files(self):
2375            self.make_test_archive(f)
2376            with zipfile.ZipFile(f, mode="r") as zipf:
2377                with zipf.open('ones') as zopen1:
2378                    data1 = zopen1.read(500)
2379                    with zipf.open('twos') as zopen2:
2380                        data2 = zopen2.read(500)
2381                        data1 += zopen1.read()
2382                        data2 += zopen2.read()
2383                self.assertEqual(data1, self.data1)
2384                self.assertEqual(data2, self.data2)
2385
2386    def test_read_after_close(self):
2387        for f in get_files(self):
2388            self.make_test_archive(f)
2389            with contextlib.ExitStack() as stack:
2390                with zipfile.ZipFile(f, 'r') as zipf:
2391                    zopen1 = stack.enter_context(zipf.open('ones'))
2392                    zopen2 = stack.enter_context(zipf.open('twos'))
2393                data1 = zopen1.read(500)
2394                data2 = zopen2.read(500)
2395                data1 += zopen1.read()
2396                data2 += zopen2.read()
2397            self.assertEqual(data1, self.data1)
2398            self.assertEqual(data2, self.data2)
2399
2400    def test_read_after_write(self):
2401        for f in get_files(self):
2402            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2403                zipf.writestr('ones', self.data1)
2404                zipf.writestr('twos', self.data2)
2405                with zipf.open('ones') as zopen1:
2406                    data1 = zopen1.read(500)
2407            self.assertEqual(data1, self.data1[:500])
2408            with zipfile.ZipFile(f, 'r') as zipf:
2409                data1 = zipf.read('ones')
2410                data2 = zipf.read('twos')
2411            self.assertEqual(data1, self.data1)
2412            self.assertEqual(data2, self.data2)
2413
2414    def test_write_after_read(self):
2415        for f in get_files(self):
2416            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2417                zipf.writestr('ones', self.data1)
2418                with zipf.open('ones') as zopen1:
2419                    zopen1.read(500)
2420                    zipf.writestr('twos', self.data2)
2421            with zipfile.ZipFile(f, 'r') as zipf:
2422                data1 = zipf.read('ones')
2423                data2 = zipf.read('twos')
2424            self.assertEqual(data1, self.data1)
2425            self.assertEqual(data2, self.data2)
2426
2427    def test_many_opens(self):
2428        # Verify that read() and open() promptly close the file descriptor,
2429        # and don't rely on the garbage collector to free resources.
2430        self.make_test_archive(TESTFN2)
2431        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2432            for x in range(100):
2433                zipf.read('ones')
2434                with zipf.open('ones') as zopen1:
2435                    pass
2436        with open(os.devnull) as f:
2437            self.assertLess(f.fileno(), 100)
2438
2439    def test_write_while_reading(self):
2440        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2441            zipf.writestr('ones', self.data1)
2442        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2443            with zipf.open('ones', 'r') as r1:
2444                data1 = r1.read(500)
2445                with zipf.open('twos', 'w') as w1:
2446                    w1.write(self.data2)
2447                data1 += r1.read()
2448        self.assertEqual(data1, self.data1)
2449        with zipfile.ZipFile(TESTFN2) as zipf:
2450            self.assertEqual(zipf.read('twos'), self.data2)
2451
2452    def tearDown(self):
2453        unlink(TESTFN2)
2454
2455
2456class TestWithDirectory(unittest.TestCase):
2457    def setUp(self):
2458        os.mkdir(TESTFN2)
2459
2460    def test_extract_dir(self):
2461        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2462            zipf.extractall(TESTFN2)
2463        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2464        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2465        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2466
2467    def test_bug_6050(self):
2468        # Extraction should succeed if directories already exist
2469        os.mkdir(os.path.join(TESTFN2, "a"))
2470        self.test_extract_dir()
2471
2472    def test_write_dir(self):
2473        dirpath = os.path.join(TESTFN2, "x")
2474        os.mkdir(dirpath)
2475        mode = os.stat(dirpath).st_mode & 0xFFFF
2476        with zipfile.ZipFile(TESTFN, "w") as zipf:
2477            zipf.write(dirpath)
2478            zinfo = zipf.filelist[0]
2479            self.assertTrue(zinfo.filename.endswith("/x/"))
2480            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2481            zipf.write(dirpath, "y")
2482            zinfo = zipf.filelist[1]
2483            self.assertTrue(zinfo.filename, "y/")
2484            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2485        with zipfile.ZipFile(TESTFN, "r") as zipf:
2486            zinfo = zipf.filelist[0]
2487            self.assertTrue(zinfo.filename.endswith("/x/"))
2488            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2489            zinfo = zipf.filelist[1]
2490            self.assertTrue(zinfo.filename, "y/")
2491            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2492            target = os.path.join(TESTFN2, "target")
2493            os.mkdir(target)
2494            zipf.extractall(target)
2495            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2496            self.assertEqual(len(os.listdir(target)), 2)
2497
2498    def test_writestr_dir(self):
2499        os.mkdir(os.path.join(TESTFN2, "x"))
2500        with zipfile.ZipFile(TESTFN, "w") as zipf:
2501            zipf.writestr("x/", b'')
2502            zinfo = zipf.filelist[0]
2503            self.assertEqual(zinfo.filename, "x/")
2504            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2505        with zipfile.ZipFile(TESTFN, "r") as zipf:
2506            zinfo = zipf.filelist[0]
2507            self.assertTrue(zinfo.filename.endswith("x/"))
2508            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2509            target = os.path.join(TESTFN2, "target")
2510            os.mkdir(target)
2511            zipf.extractall(target)
2512            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2513            self.assertEqual(os.listdir(target), ["x"])
2514
2515    def tearDown(self):
2516        rmtree(TESTFN2)
2517        if os.path.exists(TESTFN):
2518            unlink(TESTFN)
2519
2520
2521class ZipInfoTests(unittest.TestCase):
2522    def test_from_file(self):
2523        zi = zipfile.ZipInfo.from_file(__file__)
2524        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2525        self.assertFalse(zi.is_dir())
2526        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2527
2528    def test_from_file_pathlike(self):
2529        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2530        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2531        self.assertFalse(zi.is_dir())
2532        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2533
2534    def test_from_file_bytes(self):
2535        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2536        self.assertEqual(posixpath.basename(zi.filename), 'test')
2537        self.assertFalse(zi.is_dir())
2538        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2539
2540    def test_from_file_fileno(self):
2541        with open(__file__, 'rb') as f:
2542            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2543            self.assertEqual(posixpath.basename(zi.filename), 'test')
2544            self.assertFalse(zi.is_dir())
2545            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2546
2547    def test_from_dir(self):
2548        dirpath = os.path.dirname(os.path.abspath(__file__))
2549        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2550        self.assertEqual(zi.filename, 'stdlib_tests/')
2551        self.assertTrue(zi.is_dir())
2552        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2553        self.assertEqual(zi.file_size, 0)
2554
2555
2556class CommandLineTest(unittest.TestCase):
2557
2558    def zipfilecmd(self, *args, **kwargs):
2559        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2560                                                      **kwargs)
2561        return out.replace(os.linesep.encode(), b'\n')
2562
2563    def zipfilecmd_failure(self, *args):
2564        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2565
2566    def test_bad_use(self):
2567        rc, out, err = self.zipfilecmd_failure()
2568        self.assertEqual(out, b'')
2569        self.assertIn(b'usage', err.lower())
2570        self.assertIn(b'error', err.lower())
2571        self.assertIn(b'required', err.lower())
2572        rc, out, err = self.zipfilecmd_failure('-l', '')
2573        self.assertEqual(out, b'')
2574        self.assertNotEqual(err.strip(), b'')
2575
2576    def test_test_command(self):
2577        zip_name = findfile('zipdir.zip')
2578        for opt in '-t', '--test':
2579            out = self.zipfilecmd(opt, zip_name)
2580            self.assertEqual(out.rstrip(), b'Done testing')
2581        zip_name = findfile('testtar.tar')
2582        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2583        self.assertEqual(out, b'')
2584
2585    def test_list_command(self):
2586        zip_name = findfile('zipdir.zip')
2587        t = io.StringIO()
2588        with zipfile.ZipFile(zip_name, 'r') as tf:
2589            tf.printdir(t)
2590        expected = t.getvalue().encode('ascii', 'backslashreplace')
2591        for opt in '-l', '--list':
2592            out = self.zipfilecmd(opt, zip_name,
2593                                  PYTHONIOENCODING='ascii:backslashreplace')
2594            self.assertEqual(out, expected)
2595
2596    @requires_zlib
2597    def test_create_command(self):
2598        self.addCleanup(unlink, TESTFN)
2599        with open(TESTFN, 'w') as f:
2600            f.write('test 1')
2601        os.mkdir(TESTFNDIR)
2602        self.addCleanup(rmtree, TESTFNDIR)
2603        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2604            f.write('test 2')
2605        files = [TESTFN, TESTFNDIR]
2606        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2607        for opt in '-c', '--create':
2608            try:
2609                out = self.zipfilecmd(opt, TESTFN2, *files)
2610                self.assertEqual(out, b'')
2611                with zipfile.ZipFile(TESTFN2) as zf:
2612                    self.assertEqual(zf.namelist(), namelist)
2613                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2614                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2615            finally:
2616                unlink(TESTFN2)
2617
2618    def test_extract_command(self):
2619        zip_name = findfile('zipdir.zip')
2620        for opt in '-e', '--extract':
2621            with temp_dir() as extdir:
2622                out = self.zipfilecmd(opt, zip_name, extdir)
2623                self.assertEqual(out, b'')
2624                with zipfile.ZipFile(zip_name) as zf:
2625                    for zi in zf.infolist():
2626                        path = os.path.join(extdir,
2627                                    zi.filename.replace('/', os.sep))
2628                        if zi.is_dir():
2629                            self.assertTrue(os.path.isdir(path))
2630                        else:
2631                            self.assertTrue(os.path.isfile(path))
2632                            with open(path, 'rb') as f:
2633                                self.assertEqual(f.read(), zf.read(zi))
2634
2635if __name__ == "__main__":
2636    unittest.main()
2637