1# We can test part of the module without zlib.
2try:
3    import zlib
4except ImportError:
5    zlib = None
6
7import os
8import io
9import sys
10import time
11import struct
12import zipfile
13import unittest
14
15from StringIO import StringIO
16from tempfile import TemporaryFile
17from random import randint, random, getrandbits
18from unittest import skipUnless
19
20from test import script_helper
21from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \
22                              run_unittest, findfile, unlink, rmtree, \
23                              check_warnings, captured_stdout
24try:
25    TESTFN_UNICODE.encode(TESTFN_ENCODING)
26except (UnicodeError, TypeError):
27    # Either the file system encoding is None, or the file name
28    # cannot be encoded in the file system encoding.
29    TESTFN_UNICODE = None
30
31TESTFN2 = TESTFN + "2"
32TESTFNDIR = TESTFN + "d"
33FIXEDTEST_SIZE = 1000
34
35SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
36                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
37                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
38                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
39
40def getrandbytes(size):
41    return bytes(bytearray.fromhex('%0*x' % (2 * size, getrandbits(8 * size))))
42
43class TestsWithSourceFile(unittest.TestCase):
44    def setUp(self):
45        self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
46                         for i in xrange(FIXEDTEST_SIZE)]
47        self.data = '\n'.join(self.line_gen) + '\n'
48
49        # Make a source file with some lines
50        with open(TESTFN, "wb") as fp:
51            fp.write(self.data)
52
53    def make_test_archive(self, f, compression):
54        # Create the ZIP archive
55        with zipfile.ZipFile(f, "w", compression) as zipfp:
56            zipfp.write(TESTFN, "another.name")
57            zipfp.write(TESTFN, TESTFN)
58            zipfp.writestr("strfile", self.data)
59
60    def zip_test(self, f, compression):
61        self.make_test_archive(f, compression)
62
63        # Read the ZIP archive
64        with zipfile.ZipFile(f, "r", compression) as zipfp:
65            self.assertEqual(zipfp.read(TESTFN), self.data)
66            self.assertEqual(zipfp.read("another.name"), self.data)
67            self.assertEqual(zipfp.read("strfile"), self.data)
68
69            # Print the ZIP directory
70            fp = StringIO()
71            stdout = sys.stdout
72            try:
73                sys.stdout = fp
74                zipfp.printdir()
75            finally:
76                sys.stdout = stdout
77
78            directory = fp.getvalue()
79            lines = directory.splitlines()
80            self.assertEqual(len(lines), 4) # Number of files + header
81
82            self.assertIn('File Name', lines[0])
83            self.assertIn('Modified', lines[0])
84            self.assertIn('Size', lines[0])
85
86            fn, date, time_, size = lines[1].split()
87            self.assertEqual(fn, 'another.name')
88            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
89            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
90            self.assertEqual(size, str(len(self.data)))
91
92            # Check the namelist
93            names = zipfp.namelist()
94            self.assertEqual(len(names), 3)
95            self.assertIn(TESTFN, names)
96            self.assertIn("another.name", names)
97            self.assertIn("strfile", names)
98
99            # Check infolist
100            infos = zipfp.infolist()
101            names = [i.filename for i in infos]
102            self.assertEqual(len(names), 3)
103            self.assertIn(TESTFN, names)
104            self.assertIn("another.name", names)
105            self.assertIn("strfile", names)
106            for i in infos:
107                self.assertEqual(i.file_size, len(self.data))
108
109            # check getinfo
110            for nm in (TESTFN, "another.name", "strfile"):
111                info = zipfp.getinfo(nm)
112                self.assertEqual(info.filename, nm)
113                self.assertEqual(info.file_size, len(self.data))
114
115            # Check that testzip doesn't raise an exception
116            zipfp.testzip()
117
118    def test_stored(self):
119        for f in (TESTFN2, TemporaryFile(), StringIO()):
120            self.zip_test(f, zipfile.ZIP_STORED)
121
122    def zip_open_test(self, f, compression):
123        self.make_test_archive(f, compression)
124
125        # Read the ZIP archive
126        with zipfile.ZipFile(f, "r", compression) as zipfp:
127            zipdata1 = []
128            with zipfp.open(TESTFN) as zipopen1:
129                while True:
130                    read_data = zipopen1.read(256)
131                    if not read_data:
132                        break
133                    zipdata1.append(read_data)
134
135            zipdata2 = []
136            with zipfp.open("another.name") as zipopen2:
137                while True:
138                    read_data = zipopen2.read(256)
139                    if not read_data:
140                        break
141                    zipdata2.append(read_data)
142
143            self.assertEqual(''.join(zipdata1), self.data)
144            self.assertEqual(''.join(zipdata2), self.data)
145
146    def test_open_stored(self):
147        for f in (TESTFN2, TemporaryFile(), StringIO()):
148            self.zip_open_test(f, zipfile.ZIP_STORED)
149
150    def test_open_via_zip_info(self):
151        # Create the ZIP archive
152        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
153            zipfp.writestr("name", "foo")
154            with check_warnings(('', UserWarning)):
155                zipfp.writestr("name", "bar")
156            self.assertEqual(zipfp.namelist(), ["name"] * 2)
157
158        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
159            infos = zipfp.infolist()
160            data = ""
161            for info in infos:
162                with zipfp.open(info) as f:
163                    data += f.read()
164            self.assertTrue(data == "foobar" or data == "barfoo")
165            data = ""
166            for info in infos:
167                data += zipfp.read(info)
168            self.assertTrue(data == "foobar" or data == "barfoo")
169
170    def zip_random_open_test(self, f, compression):
171        self.make_test_archive(f, compression)
172
173        # Read the ZIP archive
174        with zipfile.ZipFile(f, "r", compression) as zipfp:
175            zipdata1 = []
176            with zipfp.open(TESTFN) as zipopen1:
177                while True:
178                    read_data = zipopen1.read(randint(1, 1024))
179                    if not read_data:
180                        break
181                    zipdata1.append(read_data)
182
183            self.assertEqual(''.join(zipdata1), self.data)
184
185    def test_random_open_stored(self):
186        for f in (TESTFN2, TemporaryFile(), StringIO()):
187            self.zip_random_open_test(f, zipfile.ZIP_STORED)
188
189    def test_universal_readaheads(self):
190        f = StringIO()
191
192        data = 'a\r\n' * 16 * 1024
193        with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
194            zipfp.writestr(TESTFN, data)
195
196        data2 = ''
197        with zipfile.ZipFile(f, 'r') as zipfp:
198            with zipfp.open(TESTFN, 'rU') as zipopen:
199                for line in zipopen:
200                    data2 += line
201
202        self.assertEqual(data, data2.replace('\n', '\r\n'))
203
204    def zip_readline_read_test(self, f, compression):
205        self.make_test_archive(f, compression)
206
207        # Read the ZIP archive
208        with zipfile.ZipFile(f, "r") as zipfp:
209            with zipfp.open(TESTFN) as zipopen:
210                data = ''
211                while True:
212                    read = zipopen.readline()
213                    if not read:
214                        break
215                    data += read
216
217                    read = zipopen.read(100)
218                    if not read:
219                        break
220                    data += read
221
222        self.assertEqual(data, self.data)
223
224    def zip_readline_test(self, f, compression):
225        self.make_test_archive(f, compression)
226
227        # Read the ZIP archive
228        with zipfile.ZipFile(f, "r") as zipfp:
229            with zipfp.open(TESTFN) as zipopen:
230                for line in self.line_gen:
231                    linedata = zipopen.readline()
232                    self.assertEqual(linedata, line + '\n')
233
234    def zip_readlines_test(self, f, compression):
235        self.make_test_archive(f, compression)
236
237        # Read the ZIP archive
238        with zipfile.ZipFile(f, "r") as zipfp:
239            with zipfp.open(TESTFN) as zo:
240                ziplines = zo.readlines()
241                for line, zipline in zip(self.line_gen, ziplines):
242                    self.assertEqual(zipline, line + '\n')
243
244    def zip_iterlines_test(self, f, compression):
245        self.make_test_archive(f, compression)
246
247        # Read the ZIP archive
248        with zipfile.ZipFile(f, "r") as zipfp:
249            for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
250                self.assertEqual(zipline, line + '\n')
251
252    def test_readline_read_stored(self):
253        # Issue #7610: calls to readline() interleaved with calls to read().
254        for f in (TESTFN2, TemporaryFile(), StringIO()):
255            self.zip_readline_read_test(f, zipfile.ZIP_STORED)
256
257    def test_readline_stored(self):
258        for f in (TESTFN2, TemporaryFile(), StringIO()):
259            self.zip_readline_test(f, zipfile.ZIP_STORED)
260
261    def test_readlines_stored(self):
262        for f in (TESTFN2, TemporaryFile(), StringIO()):
263            self.zip_readlines_test(f, zipfile.ZIP_STORED)
264
265    def test_iterlines_stored(self):
266        for f in (TESTFN2, TemporaryFile(), StringIO()):
267            self.zip_iterlines_test(f, zipfile.ZIP_STORED)
268
269    @skipUnless(zlib, "requires zlib")
270    def test_deflated(self):
271        for f in (TESTFN2, TemporaryFile(), StringIO()):
272            self.zip_test(f, zipfile.ZIP_DEFLATED)
273
274    @skipUnless(zlib, "requires zlib")
275    def test_open_deflated(self):
276        for f in (TESTFN2, TemporaryFile(), StringIO()):
277            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
278
279    @skipUnless(zlib, "requires zlib")
280    def test_random_open_deflated(self):
281        for f in (TESTFN2, TemporaryFile(), StringIO()):
282            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
283
284    @skipUnless(zlib, "requires zlib")
285    def test_readline_read_deflated(self):
286        # Issue #7610: calls to readline() interleaved with calls to read().
287        for f in (TESTFN2, TemporaryFile(), StringIO()):
288            self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
289
290    @skipUnless(zlib, "requires zlib")
291    def test_readline_deflated(self):
292        for f in (TESTFN2, TemporaryFile(), StringIO()):
293            self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
294
295    @skipUnless(zlib, "requires zlib")
296    def test_readlines_deflated(self):
297        for f in (TESTFN2, TemporaryFile(), StringIO()):
298            self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
299
300    @skipUnless(zlib, "requires zlib")
301    def test_iterlines_deflated(self):
302        for f in (TESTFN2, TemporaryFile(), StringIO()):
303            self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
304
305    @skipUnless(zlib, "requires zlib")
306    def test_low_compression(self):
307        """Check for cases where compressed data is larger than original."""
308        # Create the ZIP archive
309        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
310            zipfp.writestr("strfile", '12')
311
312        # Get an open object for strfile
313        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
314            with zipfp.open("strfile") as openobj:
315                self.assertEqual(openobj.read(1), '1')
316                self.assertEqual(openobj.read(1), '2')
317
318    def test_absolute_arcnames(self):
319        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
320            zipfp.write(TESTFN, "/absolute")
321
322        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
323            self.assertEqual(zipfp.namelist(), ["absolute"])
324
325    def test_append_to_zip_file(self):
326        """Test appending to an existing zipfile."""
327        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
328            zipfp.write(TESTFN, TESTFN)
329
330        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
331            zipfp.writestr("strfile", self.data)
332            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
333
334    def test_append_to_non_zip_file(self):
335        """Test appending to an existing file that is not a zipfile."""
336        # NOTE: this test fails if len(d) < 22 because of the first
337        # line "fpin.seek(-22, 2)" in _EndRecData
338        data = 'I am not a ZipFile!'*10
339        with open(TESTFN2, 'wb') as f:
340            f.write(data)
341
342        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
343            zipfp.write(TESTFN, TESTFN)
344
345        with open(TESTFN2, 'rb') as f:
346            f.seek(len(data))
347            with zipfile.ZipFile(f, "r") as zipfp:
348                self.assertEqual(zipfp.namelist(), [TESTFN])
349                self.assertEqual(zipfp.read(TESTFN), self.data)
350        with open(TESTFN2, 'rb') as f:
351            self.assertEqual(f.read(len(data)), data)
352            zipfiledata = f.read()
353        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
354            self.assertEqual(zipfp.namelist(), [TESTFN])
355            self.assertEqual(zipfp.read(TESTFN), self.data)
356
357    def test_read_concatenated_zip_file(self):
358        with io.BytesIO() as bio:
359            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
360                zipfp.write(TESTFN, TESTFN)
361            zipfiledata = bio.getvalue()
362        data = b'I am not a ZipFile!'*10
363        with open(TESTFN2, 'wb') as f:
364            f.write(data)
365            f.write(zipfiledata)
366
367        with zipfile.ZipFile(TESTFN2) as zipfp:
368            self.assertEqual(zipfp.namelist(), [TESTFN])
369            self.assertEqual(zipfp.read(TESTFN), self.data)
370
371    def test_append_to_concatenated_zip_file(self):
372        with io.BytesIO() as bio:
373            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
374                zipfp.write(TESTFN, TESTFN)
375            zipfiledata = bio.getvalue()
376        data = b'I am not a ZipFile!'*1000000
377        with open(TESTFN2, 'wb') as f:
378            f.write(data)
379            f.write(zipfiledata)
380
381        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
382            self.assertEqual(zipfp.namelist(), [TESTFN])
383            zipfp.writestr('strfile', self.data)
384
385        with open(TESTFN2, 'rb') as f:
386            self.assertEqual(f.read(len(data)), data)
387            zipfiledata = f.read()
388        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
389            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
390            self.assertEqual(zipfp.read(TESTFN), self.data)
391            self.assertEqual(zipfp.read('strfile'), self.data)
392
393    def test_ignores_newline_at_end(self):
394        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
395            zipfp.write(TESTFN, TESTFN)
396        with open(TESTFN2, 'a') as f:
397            f.write("\r\n\00\00\00")
398        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
399            self.assertIsInstance(zipfp, zipfile.ZipFile)
400
401    def test_ignores_stuff_appended_past_comments(self):
402        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
403            zipfp.comment = b"this is a comment"
404            zipfp.write(TESTFN, TESTFN)
405        with open(TESTFN2, 'a') as f:
406            f.write("abcdef\r\n")
407        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
408            self.assertIsInstance(zipfp, zipfile.ZipFile)
409            self.assertEqual(zipfp.comment, b"this is a comment")
410
411    def test_write_default_name(self):
412        """Check that calling ZipFile.write without arcname specified
413        produces the expected result."""
414        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
415            zipfp.write(TESTFN)
416            with open(TESTFN,'r') as fid:
417                self.assertEqual(zipfp.read(TESTFN), fid.read())
418
419    @skipUnless(zlib, "requires zlib")
420    def test_per_file_compression(self):
421        """Check that files within a Zip archive can have different
422        compression options."""
423        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
424            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
425            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
426            sinfo = zipfp.getinfo('storeme')
427            dinfo = zipfp.getinfo('deflateme')
428            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
429            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
430
431    def test_write_to_readonly(self):
432        """Check that trying to call write() on a readonly ZipFile object
433        raises a RuntimeError."""
434        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
435            zipfp.writestr("somefile.txt", "bogus")
436
437        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
438            self.assertRaises(RuntimeError, zipfp.write, TESTFN)
439
440    def test_extract(self):
441        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
442            for fpath, fdata in SMALL_TEST_DATA:
443                zipfp.writestr(fpath, fdata)
444
445        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
446            for fpath, fdata in SMALL_TEST_DATA:
447                writtenfile = zipfp.extract(fpath)
448
449                # make sure it was written to the right place
450                correctfile = os.path.join(os.getcwd(), fpath)
451                correctfile = os.path.normpath(correctfile)
452
453                self.assertEqual(writtenfile, correctfile)
454
455                # make sure correct data is in correct file
456                with open(writtenfile, "rb") as fid:
457                    self.assertEqual(fdata, fid.read())
458                os.remove(writtenfile)
459
460        # remove the test file subdirectories
461        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
462
463    def test_extract_all(self):
464        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
465            for fpath, fdata in SMALL_TEST_DATA:
466                zipfp.writestr(fpath, fdata)
467
468        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
469            zipfp.extractall()
470            for fpath, fdata in SMALL_TEST_DATA:
471                outfile = os.path.join(os.getcwd(), fpath)
472
473                with open(outfile, "rb") as fid:
474                    self.assertEqual(fdata, fid.read())
475                os.remove(outfile)
476
477        # remove the test file subdirectories
478        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
479
480    def check_file(self, filename, content):
481        self.assertTrue(os.path.isfile(filename))
482        with open(filename, 'rb') as f:
483            self.assertEqual(f.read(), content)
484
485    @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.")
486    def test_extract_unicode_filenames(self):
487        fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
488        content = 'Test for unicode filename'
489        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
490            for fname in fnames:
491                zipfp.writestr(fname, content)
492
493        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
494            for fname in fnames:
495                writtenfile = zipfp.extract(fname)
496
497                # make sure it was written to the right place
498                correctfile = os.path.join(os.getcwd(), fname)
499                correctfile = os.path.normpath(correctfile)
500                self.assertEqual(writtenfile, correctfile)
501
502                self.check_file(writtenfile, content)
503                os.remove(writtenfile)
504
505    def test_extract_hackers_arcnames(self):
506        hacknames = [
507            ('../foo/bar', 'foo/bar'),
508            ('foo/../bar', 'foo/bar'),
509            ('foo/../../bar', 'foo/bar'),
510            ('foo/bar/..', 'foo/bar'),
511            ('./../foo/bar', 'foo/bar'),
512            ('/foo/bar', 'foo/bar'),
513            ('/foo/../bar', 'foo/bar'),
514            ('/foo/../../bar', 'foo/bar'),
515        ]
516        if os.path.sep == '\\':
517            hacknames.extend([
518                (r'..\foo\bar', 'foo/bar'),
519                (r'..\/foo\/bar', 'foo/bar'),
520                (r'foo/\..\/bar', 'foo/bar'),
521                (r'foo\/../\bar', 'foo/bar'),
522                (r'C:foo/bar', 'foo/bar'),
523                (r'C:/foo/bar', 'foo/bar'),
524                (r'C://foo/bar', 'foo/bar'),
525                (r'C:\foo\bar', 'foo/bar'),
526                (r'//conky/mountpoint/foo/bar', 'foo/bar'),
527                (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
528                (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
529                (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
530                (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
531                (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
532                (r'//?/C:/foo/bar', 'foo/bar'),
533                (r'\\?\C:\foo\bar', 'foo/bar'),
534                (r'C:/../C:/foo/bar', 'C_/foo/bar'),
535                (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
536                ('../../foo../../ba..r', 'foo/ba..r'),
537            ])
538        else:  # Unix
539            hacknames.extend([
540                ('//foo/bar', 'foo/bar'),
541                ('../../foo../../ba..r', 'foo../ba..r'),
542                (r'foo/..\bar', r'foo/..\bar'),
543            ])
544
545        for arcname, fixedname in hacknames:
546            content = b'foobar' + arcname.encode()
547            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
548                zinfo = zipfile.ZipInfo()
549                # preserve backslashes
550                zinfo.filename = arcname
551                zinfo.external_attr = 0o600 << 16
552                zipfp.writestr(zinfo, content)
553
554            arcname = arcname.replace(os.sep, "/")
555            targetpath = os.path.join('target', 'subdir', 'subsub')
556            correctfile = os.path.join(targetpath, *fixedname.split('/'))
557
558            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
559                writtenfile = zipfp.extract(arcname, targetpath)
560                self.assertEqual(writtenfile, correctfile,
561                                 msg="extract %r" % arcname)
562            self.check_file(correctfile, content)
563            rmtree('target')
564
565            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
566                zipfp.extractall(targetpath)
567            self.check_file(correctfile, content)
568            rmtree('target')
569
570            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
571
572            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
573                writtenfile = zipfp.extract(arcname)
574                self.assertEqual(writtenfile, correctfile,
575                                 msg="extract %r" % arcname)
576            self.check_file(correctfile, content)
577            rmtree(fixedname.split('/')[0])
578
579            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
580                zipfp.extractall()
581            self.check_file(correctfile, content)
582            rmtree(fixedname.split('/')[0])
583
584            os.remove(TESTFN2)
585
586    def test_writestr_compression(self):
587        zipfp = zipfile.ZipFile(TESTFN2, "w")
588        zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
589        if zlib:
590            zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
591
592        info = zipfp.getinfo('a.txt')
593        self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
594
595        if zlib:
596            info = zipfp.getinfo('b.txt')
597            self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
598
599
600    def zip_test_writestr_permissions(self, f, compression):
601        # Make sure that writestr creates files with mode 0600,
602        # when it is passed a name rather than a ZipInfo instance.
603
604        self.make_test_archive(f, compression)
605        with zipfile.ZipFile(f, "r") as zipfp:
606            zinfo = zipfp.getinfo('strfile')
607            self.assertEqual(zinfo.external_attr, 0600 << 16)
608
609    def test_writestr_permissions(self):
610        for f in (TESTFN2, TemporaryFile(), StringIO()):
611            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
612
613    def test_close(self):
614        """Check that the zipfile is closed after the 'with' block."""
615        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
616            for fpath, fdata in SMALL_TEST_DATA:
617                zipfp.writestr(fpath, fdata)
618                self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
619        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
620
621        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
622            self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
623        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
624
625    def test_close_on_exception(self):
626        """Check that the zipfile is closed if an exception is raised in the
627        'with' block."""
628        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
629            for fpath, fdata in SMALL_TEST_DATA:
630                zipfp.writestr(fpath, fdata)
631
632        try:
633            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
634                raise zipfile.BadZipfile()
635        except zipfile.BadZipfile:
636            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
637
638    def test_add_file_before_1980(self):
639        # Set atime and mtime to 1970-01-01
640        os.utime(TESTFN, (0, 0))
641        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
642            self.assertRaises(ValueError, zipfp.write, TESTFN)
643
644    def tearDown(self):
645        unlink(TESTFN)
646        unlink(TESTFN2)
647
648
649class TestZip64InSmallFiles(unittest.TestCase):
650    # These tests test the ZIP64 functionality without using large files,
651    # see test_zipfile64 for proper tests.
652
653    def setUp(self):
654        self._limit = zipfile.ZIP64_LIMIT
655        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
656        zipfile.ZIP64_LIMIT = 1000
657        zipfile.ZIP_FILECOUNT_LIMIT = 9
658
659        line_gen = ("Test of zipfile line %d." % i
660                    for i in range(0, FIXEDTEST_SIZE))
661        self.data = '\n'.join(line_gen)
662
663        # Make a source file with some lines
664        with open(TESTFN, "wb") as fp:
665            fp.write(self.data)
666
667    def large_file_exception_test(self, f, compression):
668        with zipfile.ZipFile(f, "w", compression) as zipfp:
669            self.assertRaises(zipfile.LargeZipFile,
670                              zipfp.write, TESTFN, "another.name")
671
672    def large_file_exception_test2(self, f, compression):
673        with zipfile.ZipFile(f, "w", compression) as zipfp:
674            self.assertRaises(zipfile.LargeZipFile,
675                              zipfp.writestr, "another.name", self.data)
676
677    def test_large_file_exception(self):
678        for f in (TESTFN2, TemporaryFile(), StringIO()):
679            self.large_file_exception_test(f, zipfile.ZIP_STORED)
680            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
681
682    def zip_test(self, f, compression):
683        # Create the ZIP archive
684        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
685            zipfp.write(TESTFN, "another.name")
686            zipfp.write(TESTFN, TESTFN)
687            zipfp.writestr("strfile", self.data)
688
689        # Read the ZIP archive
690        with zipfile.ZipFile(f, "r", compression) as zipfp:
691            self.assertEqual(zipfp.read(TESTFN), self.data)
692            self.assertEqual(zipfp.read("another.name"), self.data)
693            self.assertEqual(zipfp.read("strfile"), self.data)
694
695            # Print the ZIP directory
696            fp = StringIO()
697            stdout = sys.stdout
698            try:
699                sys.stdout = fp
700                zipfp.printdir()
701            finally:
702                sys.stdout = stdout
703
704            directory = fp.getvalue()
705            lines = directory.splitlines()
706            self.assertEqual(len(lines), 4) # Number of files + header
707
708            self.assertIn('File Name', lines[0])
709            self.assertIn('Modified', lines[0])
710            self.assertIn('Size', lines[0])
711
712            fn, date, time_, size = lines[1].split()
713            self.assertEqual(fn, 'another.name')
714            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
715            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
716            self.assertEqual(size, str(len(self.data)))
717
718            # Check the namelist
719            names = zipfp.namelist()
720            self.assertEqual(len(names), 3)
721            self.assertIn(TESTFN, names)
722            self.assertIn("another.name", names)
723            self.assertIn("strfile", names)
724
725            # Check infolist
726            infos = zipfp.infolist()
727            names = [i.filename for i in infos]
728            self.assertEqual(len(names), 3)
729            self.assertIn(TESTFN, names)
730            self.assertIn("another.name", names)
731            self.assertIn("strfile", names)
732            for i in infos:
733                self.assertEqual(i.file_size, len(self.data))
734
735            # check getinfo
736            for nm in (TESTFN, "another.name", "strfile"):
737                info = zipfp.getinfo(nm)
738                self.assertEqual(info.filename, nm)
739                self.assertEqual(info.file_size, len(self.data))
740
741            # Check that testzip doesn't raise an exception
742            zipfp.testzip()
743
744    def test_stored(self):
745        for f in (TESTFN2, TemporaryFile(), StringIO()):
746            self.zip_test(f, zipfile.ZIP_STORED)
747
748    @skipUnless(zlib, "requires zlib")
749    def test_deflated(self):
750        for f in (TESTFN2, TemporaryFile(), StringIO()):
751            self.zip_test(f, zipfile.ZIP_DEFLATED)
752
753    def test_absolute_arcnames(self):
754        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
755                             allowZip64=True) as zipfp:
756            zipfp.write(TESTFN, "/absolute")
757
758        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
759            self.assertEqual(zipfp.namelist(), ["absolute"])
760
761    def test_too_many_files(self):
762        # This test checks that more than 64k files can be added to an archive,
763        # and that the resulting archive can be read properly by ZipFile
764        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True)
765        zipf.debug = 100
766        numfiles = 15
767        for i in range(numfiles):
768            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
769        self.assertEqual(len(zipf.namelist()), numfiles)
770        zipf.close()
771
772        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
773        self.assertEqual(len(zipf2.namelist()), numfiles)
774        for i in range(numfiles):
775            content = zipf2.read("foo%08d" % i)
776            self.assertEqual(content, "%d" % (i**3 % 57))
777        zipf2.close()
778
779    def test_too_many_files_append(self):
780        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False)
781        zipf.debug = 100
782        numfiles = 9
783        for i in range(numfiles):
784            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
785        self.assertEqual(len(zipf.namelist()), numfiles)
786        with self.assertRaises(zipfile.LargeZipFile):
787            zipf.writestr("foo%08d" % numfiles, b'')
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        zipf.close()
790
791        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False)
792        zipf.debug = 100
793        self.assertEqual(len(zipf.namelist()), numfiles)
794        with self.assertRaises(zipfile.LargeZipFile):
795            zipf.writestr("foo%08d" % numfiles, b'')
796        self.assertEqual(len(zipf.namelist()), numfiles)
797        zipf.close()
798
799        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True)
800        zipf.debug = 100
801        self.assertEqual(len(zipf.namelist()), numfiles)
802        numfiles2 = 15
803        for i in range(numfiles, numfiles2):
804            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
805        self.assertEqual(len(zipf.namelist()), numfiles2)
806        zipf.close()
807
808        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
809        self.assertEqual(len(zipf2.namelist()), numfiles2)
810        for i in range(numfiles2):
811            content = zipf2.read("foo%08d" % i)
812            self.assertEqual(content, "%d" % (i**3 % 57))
813        zipf2.close()
814
815    def test_append(self):
816        # Test that appending to the Zip64 archive doesn't change
817        # extra fields of existing entries.
818        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
819            zipfp.writestr("strfile", self.data)
820        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
821            zinfo = zipfp.getinfo("strfile")
822            extra = zinfo.extra
823        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
824            zipfp.writestr("strfile2", self.data)
825        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
826            zinfo = zipfp.getinfo("strfile")
827            self.assertEqual(zinfo.extra, extra)
828
829    def tearDown(self):
830        zipfile.ZIP64_LIMIT = self._limit
831        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
832        unlink(TESTFN)
833        unlink(TESTFN2)
834
835
836class PyZipFileTests(unittest.TestCase):
837    def requiresWriteAccess(self, path):
838        if not os.access(path, os.W_OK):
839            self.skipTest('requires write access to the installed location')
840        filename = os.path.join(path, 'test_zipfile.try')
841        try:
842            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
843            os.close(fd)
844        except Exception:
845            self.skipTest('requires write access to the installed location')
846        unlink(filename)
847
848    def test_write_pyfile(self):
849        self.requiresWriteAccess(os.path.dirname(__file__))
850        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
851            fn = __file__
852            if fn.endswith('.pyc') or fn.endswith('.pyo'):
853                fn = fn[:-1]
854
855            zipfp.writepy(fn)
856
857            bn = os.path.basename(fn)
858            self.assertNotIn(bn, zipfp.namelist())
859            self.assertTrue(bn + 'o' in zipfp.namelist() or
860                            bn + 'c' in zipfp.namelist())
861
862        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
863            fn = __file__
864            if fn.endswith(('.pyc', '.pyo')):
865                fn = fn[:-1]
866
867            zipfp.writepy(fn, "testpackage")
868
869            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
870            self.assertNotIn(bn, zipfp.namelist())
871            self.assertTrue(bn + 'o' in zipfp.namelist() or
872                            bn + 'c' in zipfp.namelist())
873
874    def test_write_python_package(self):
875        import email
876        packagedir = os.path.dirname(email.__file__)
877        self.requiresWriteAccess(packagedir)
878
879        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
880            zipfp.writepy(packagedir)
881
882            # Check for a couple of modules at different levels of the
883            # hierarchy
884            names = zipfp.namelist()
885            self.assertTrue('email/__init__.pyo' in names or
886                            'email/__init__.pyc' in names)
887            self.assertTrue('email/mime/text.pyo' in names or
888                            'email/mime/text.pyc' in names)
889
890    def test_write_python_directory(self):
891        os.mkdir(TESTFN2)
892        try:
893            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
894                fp.write("print(42)\n")
895
896            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
897                fp.write("print(42 * 42)\n")
898
899            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
900                fp.write("bla bla bla\n")
901
902            zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
903            zipfp.writepy(TESTFN2)
904
905            names = zipfp.namelist()
906            self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
907            self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
908            self.assertNotIn('mod2.txt', names)
909
910        finally:
911            rmtree(TESTFN2)
912
913    def test_write_non_pyfile(self):
914        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
915            with open(TESTFN, 'w') as fid:
916                fid.write('most definitely not a python file')
917            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
918            os.remove(TESTFN)
919
920
921class OtherTests(unittest.TestCase):
922    zips_with_bad_crc = {
923        zipfile.ZIP_STORED: (
924            b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
925            b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
926            b'ilehello,AworldP'
927            b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
928            b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
929            b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
930            b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
931            b'\0\0/\0\0\0\0\0'),
932        zipfile.ZIP_DEFLATED: (
933            b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
934            b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
935            b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
936            b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
937            b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
938            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
939            b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
940            b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
941    }
942
943    def test_unicode_filenames(self):
944        with zipfile.ZipFile(TESTFN, "w") as zf:
945            zf.writestr(u"foo.txt", "Test for unicode filename")
946            zf.writestr(u"\xf6.txt", "Test for unicode filename")
947            self.assertIsInstance(zf.infolist()[0].filename, unicode)
948
949        with zipfile.ZipFile(TESTFN, "r") as zf:
950            self.assertEqual(zf.filelist[0].filename, "foo.txt")
951            self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
952
953    def test_create_non_existent_file_for_append(self):
954        if os.path.exists(TESTFN):
955            os.unlink(TESTFN)
956
957        filename = 'testfile.txt'
958        content = 'hello, world. this is some content.'
959
960        try:
961            with zipfile.ZipFile(TESTFN, 'a') as zf:
962                zf.writestr(filename, content)
963        except IOError:
964            self.fail('Could not append data to a non-existent zip file.')
965
966        self.assertTrue(os.path.exists(TESTFN))
967
968        with zipfile.ZipFile(TESTFN, 'r') as zf:
969            self.assertEqual(zf.read(filename), content)
970
971    def test_close_erroneous_file(self):
972        # This test checks that the ZipFile constructor closes the file object
973        # it opens if there's an error in the file.  If it doesn't, the
974        # traceback holds a reference to the ZipFile object and, indirectly,
975        # the file object.
976        # On Windows, this causes the os.unlink() call to fail because the
977        # underlying file is still open.  This is SF bug #412214.
978        #
979        with open(TESTFN, "w") as fp:
980            fp.write("this is not a legal zip file\n")
981        try:
982            zf = zipfile.ZipFile(TESTFN)
983        except zipfile.BadZipfile:
984            pass
985
986    def test_is_zip_erroneous_file(self):
987        """Check that is_zipfile() correctly identifies non-zip files."""
988        # - passing a filename
989        with open(TESTFN, "w") as fp:
990            fp.write("this is not a legal zip file\n")
991        chk = zipfile.is_zipfile(TESTFN)
992        self.assertFalse(chk)
993        # - passing a file object
994        with open(TESTFN, "rb") as fp:
995            chk = zipfile.is_zipfile(fp)
996            self.assertTrue(not chk)
997        # - passing a file-like object
998        fp = StringIO()
999        fp.write("this is not a legal zip file\n")
1000        chk = zipfile.is_zipfile(fp)
1001        self.assertTrue(not chk)
1002        fp.seek(0, 0)
1003        chk = zipfile.is_zipfile(fp)
1004        self.assertTrue(not chk)
1005
1006    def test_damaged_zipfile(self):
1007        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1008        # - Create a valid zip file
1009        fp = io.BytesIO()
1010        with zipfile.ZipFile(fp, mode="w") as zipf:
1011            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1012        zipfiledata = fp.getvalue()
1013
1014        # - Now create copies of it missing the last N bytes and make sure
1015        #   a BadZipFile exception is raised when we try to open it
1016        for N in range(len(zipfiledata)):
1017            fp = io.BytesIO(zipfiledata[:N])
1018            self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
1019
1020    def test_is_zip_valid_file(self):
1021        """Check that is_zipfile() correctly identifies zip files."""
1022        # - passing a filename
1023        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1024            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1025        chk = zipfile.is_zipfile(TESTFN)
1026        self.assertTrue(chk)
1027        # - passing a file object
1028        with open(TESTFN, "rb") as fp:
1029            chk = zipfile.is_zipfile(fp)
1030            self.assertTrue(chk)
1031            fp.seek(0, 0)
1032            zip_contents = fp.read()
1033        # - passing a file-like object
1034        fp = StringIO()
1035        fp.write(zip_contents)
1036        chk = zipfile.is_zipfile(fp)
1037        self.assertTrue(chk)
1038        fp.seek(0, 0)
1039        chk = zipfile.is_zipfile(fp)
1040        self.assertTrue(chk)
1041
1042    def test_non_existent_file_raises_IOError(self):
1043        # make sure we don't raise an AttributeError when a partially-constructed
1044        # ZipFile instance is finalized; this tests for regression on SF tracker
1045        # bug #403871.
1046
1047        # The bug we're testing for caused an AttributeError to be raised
1048        # when a ZipFile instance was created for a file that did not
1049        # exist; the .fp member was not initialized but was needed by the
1050        # __del__() method.  Since the AttributeError is in the __del__(),
1051        # it is ignored, but the user should be sufficiently annoyed by
1052        # the message on the output that regression will be noticed
1053        # quickly.
1054        self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
1055
1056    def test_empty_file_raises_BadZipFile(self):
1057        with open(TESTFN, 'w') as f:
1058            pass
1059        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
1060
1061        with open(TESTFN, 'w') as fp:
1062            fp.write("short file")
1063        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
1064
1065    def test_closed_zip_raises_RuntimeError(self):
1066        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1067        data = StringIO()
1068        with zipfile.ZipFile(data, mode="w") as zipf:
1069            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1070
1071        # This is correct; calling .read on a closed ZipFile should raise
1072        # a RuntimeError, and so should calling .testzip.  An earlier
1073        # version of .testzip would swallow this exception (and any other)
1074        # and report that the first file in the archive was corrupt.
1075        self.assertRaises(RuntimeError, zipf.read, "foo.txt")
1076        self.assertRaises(RuntimeError, zipf.open, "foo.txt")
1077        self.assertRaises(RuntimeError, zipf.testzip)
1078        self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
1079        with open(TESTFN, 'w') as fid:
1080            fid.write('zipfile test data')
1081            self.assertRaises(RuntimeError, zipf.write, TESTFN)
1082
1083    def test_bad_constructor_mode(self):
1084        """Check that bad modes passed to ZipFile constructor are caught."""
1085        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
1086
1087    def test_bad_open_mode(self):
1088        """Check that bad modes passed to ZipFile.open are caught."""
1089        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1090            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1091
1092        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1093        # read the data to make sure the file is there
1094            zipf.read("foo.txt")
1095            self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
1096
1097    def test_read0(self):
1098        """Check that calling read(0) on a ZipExtFile object returns an empty
1099        string and doesn't advance file pointer."""
1100        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1101            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1102            # read the data to make sure the file is there
1103            with zipf.open("foo.txt") as f:
1104                for i in xrange(FIXEDTEST_SIZE):
1105                    self.assertEqual(f.read(0), '')
1106
1107                self.assertEqual(f.read(), "O, for a Muse of Fire!")
1108
1109    def test_open_non_existent_item(self):
1110        """Check that attempting to call open() for an item that doesn't
1111        exist in the archive raises a RuntimeError."""
1112        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1113            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1114
1115    def test_bad_compression_mode(self):
1116        """Check that bad compression methods passed to ZipFile.open are
1117        caught."""
1118        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
1119
1120    def test_unsupported_compression(self):
1121        # data is declared as shrunk, but actually deflated
1122        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1123        b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1124        b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1125        b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1126        b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1127        b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1128        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1129            self.assertRaises(NotImplementedError, zipf.open, 'x')
1130
1131    def test_null_byte_in_filename(self):
1132        """Check that a filename containing a null byte is properly
1133        terminated."""
1134        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1135            zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
1136            self.assertEqual(zipf.namelist(), ['foo.txt'])
1137
1138    def test_struct_sizes(self):
1139        """Check that ZIP internal structure sizes are calculated correctly."""
1140        self.assertEqual(zipfile.sizeEndCentDir, 22)
1141        self.assertEqual(zipfile.sizeCentralDir, 46)
1142        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1143        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1144
1145    def test_comments(self):
1146        """Check that comments on the archive are handled properly."""
1147
1148        # check default comment is empty
1149        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1150            self.assertEqual(zipf.comment, '')
1151            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1152
1153        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1154            self.assertEqual(zipf.comment, '')
1155
1156        # check a simple short comment
1157        comment = 'Bravely taking to his feet, he beat a very brave retreat.'
1158        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1159            zipf.comment = comment
1160            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1161        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1162            self.assertEqual(zipf.comment, comment)
1163
1164        # check a comment of max length
1165        comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
1166        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1167            zipf.comment = comment2
1168            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1169
1170        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1171            self.assertEqual(zipf.comment, comment2)
1172
1173        # check a comment that is too long is truncated
1174        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1175            with check_warnings(('', UserWarning)):
1176                zipf.comment = comment2 + 'oops'
1177            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1178        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1179            self.assertEqual(zipf.comment, comment2)
1180
1181    def test_change_comment_in_empty_archive(self):
1182        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1183            self.assertFalse(zipf.filelist)
1184            zipf.comment = b"this is a comment"
1185        with zipfile.ZipFile(TESTFN, "r") as zipf:
1186            self.assertEqual(zipf.comment, b"this is a comment")
1187
1188    def test_change_comment_in_nonempty_archive(self):
1189        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1190            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1191        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1192            self.assertTrue(zipf.filelist)
1193            zipf.comment = b"this is a comment"
1194        with zipfile.ZipFile(TESTFN, "r") as zipf:
1195            self.assertEqual(zipf.comment, b"this is a comment")
1196
1197    def check_testzip_with_bad_crc(self, compression):
1198        """Tests that files with bad CRCs return their name from testzip."""
1199        zipdata = self.zips_with_bad_crc[compression]
1200
1201        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1202            # testzip returns the name of the first corrupt file, or None
1203            self.assertEqual('afile', zipf.testzip())
1204
1205    def test_testzip_with_bad_crc_stored(self):
1206        self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
1207
1208    @skipUnless(zlib, "requires zlib")
1209    def test_testzip_with_bad_crc_deflated(self):
1210        self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
1211
1212    def check_read_with_bad_crc(self, compression):
1213        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
1214        zipdata = self.zips_with_bad_crc[compression]
1215
1216        # Using ZipFile.read()
1217        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1218            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
1219
1220        # Using ZipExtFile.read()
1221        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1222            with zipf.open('afile', 'r') as corrupt_file:
1223                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
1224
1225        # Same with small reads (in order to exercise the buffering logic)
1226        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1227            with zipf.open('afile', 'r') as corrupt_file:
1228                corrupt_file.MIN_READ_SIZE = 2
1229                with self.assertRaises(zipfile.BadZipfile):
1230                    while corrupt_file.read(2):
1231                        pass
1232
1233    def test_read_with_bad_crc_stored(self):
1234        self.check_read_with_bad_crc(zipfile.ZIP_STORED)
1235
1236    @skipUnless(zlib, "requires zlib")
1237    def test_read_with_bad_crc_deflated(self):
1238        self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
1239
1240    def check_read_return_size(self, compression):
1241        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
1242        # than requested.
1243        for test_size in (1, 4095, 4096, 4097, 16384):
1244            file_size = test_size + 1
1245            junk = getrandbytes(file_size)
1246            with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
1247                zipf.writestr('foo', junk)
1248                with zipf.open('foo', 'r') as fp:
1249                    buf = fp.read(test_size)
1250                    self.assertEqual(len(buf), test_size)
1251
1252    def test_read_return_size_stored(self):
1253        self.check_read_return_size(zipfile.ZIP_STORED)
1254
1255    @skipUnless(zlib, "requires zlib")
1256    def test_read_return_size_deflated(self):
1257        self.check_read_return_size(zipfile.ZIP_DEFLATED)
1258
1259    def test_empty_zipfile(self):
1260        # Check that creating a file in 'w' or 'a' mode and closing without
1261        # adding any files to the archives creates a valid empty ZIP file
1262        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1263            pass
1264        try:
1265            zipf = zipfile.ZipFile(TESTFN, mode="r")
1266            zipf.close()
1267        except zipfile.BadZipfile:
1268            self.fail("Unable to create empty ZIP file in 'w' mode")
1269
1270        with zipfile.ZipFile(TESTFN, mode="a") as zipf:
1271            pass
1272        try:
1273            zipf = zipfile.ZipFile(TESTFN, mode="r")
1274            zipf.close()
1275        except:
1276            self.fail("Unable to create empty ZIP file in 'a' mode")
1277
1278    def test_open_empty_file(self):
1279        # Issue 1710703: Check that opening a file with less than 22 bytes
1280        # raises a BadZipfile exception (rather than the previously unhelpful
1281        # IOError)
1282        with open(TESTFN, 'w') as f:
1283            pass
1284        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
1285
1286    def test_create_zipinfo_before_1980(self):
1287        self.assertRaises(ValueError,
1288                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1289
1290    def test_zipfile_with_short_extra_field(self):
1291        """If an extra field in the header is less than 4 bytes, skip it."""
1292        zipdata = (
1293            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1294            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1295            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1296            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1297            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1298            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1299            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1300        )
1301        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1302            # testzip returns the name of the first corrupt file, or None
1303            self.assertIsNone(zipf.testzip())
1304
1305    def tearDown(self):
1306        unlink(TESTFN)
1307        unlink(TESTFN2)
1308
1309
1310class DecryptionTests(unittest.TestCase):
1311    """Check that ZIP decryption works. Since the library does not
1312    support encryption at the moment, we use a pre-generated encrypted
1313    ZIP file."""
1314
1315    data = (
1316    'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
1317    '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
1318    '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
1319    'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
1320    '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
1321    '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
1322    '\x00\x00L\x00\x00\x00\x00\x00' )
1323    data2 = (
1324    'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
1325    '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
1326    '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
1327    'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
1328    '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
1329    '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1330    'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1331    '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1332
1333    plain = 'zipfile.py encryption test'
1334    plain2 = '\x00'*512
1335
1336    def setUp(self):
1337        with open(TESTFN, "wb") as fp:
1338            fp.write(self.data)
1339        self.zip = zipfile.ZipFile(TESTFN, "r")
1340        with open(TESTFN2, "wb") as fp:
1341            fp.write(self.data2)
1342        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
1343
1344    def tearDown(self):
1345        self.zip.close()
1346        os.unlink(TESTFN)
1347        self.zip2.close()
1348        os.unlink(TESTFN2)
1349
1350    def test_no_password(self):
1351        # Reading the encrypted file without password
1352        # must generate a RunTime exception
1353        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1354        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1355
1356    def test_bad_password(self):
1357        self.zip.setpassword("perl")
1358        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1359        self.zip2.setpassword("perl")
1360        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1361
1362    @skipUnless(zlib, "requires zlib")
1363    def test_good_password(self):
1364        self.zip.setpassword("python")
1365        self.assertEqual(self.zip.read("test.txt"), self.plain)
1366        self.zip2.setpassword("12345")
1367        self.assertEqual(self.zip2.read("zero"), self.plain2)
1368
1369
1370class TestsWithRandomBinaryFiles(unittest.TestCase):
1371    def setUp(self):
1372        datacount = randint(16, 64)*1024 + randint(1, 1024)
1373        self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000))
1374                            for i in xrange(datacount))
1375
1376        # Make a source file with some lines
1377        with open(TESTFN, "wb") as fp:
1378            fp.write(self.data)
1379
1380    def tearDown(self):
1381        unlink(TESTFN)
1382        unlink(TESTFN2)
1383
1384    def make_test_archive(self, f, compression):
1385        # Create the ZIP archive
1386        with zipfile.ZipFile(f, "w", compression) as zipfp:
1387            zipfp.write(TESTFN, "another.name")
1388            zipfp.write(TESTFN, TESTFN)
1389
1390    def zip_test(self, f, compression):
1391        self.make_test_archive(f, compression)
1392
1393        # Read the ZIP archive
1394        with zipfile.ZipFile(f, "r", compression) as zipfp:
1395            testdata = zipfp.read(TESTFN)
1396            self.assertEqual(len(testdata), len(self.data))
1397            self.assertEqual(testdata, self.data)
1398            self.assertEqual(zipfp.read("another.name"), self.data)
1399
1400    def test_stored(self):
1401        for f in (TESTFN2, TemporaryFile(), StringIO()):
1402            self.zip_test(f, zipfile.ZIP_STORED)
1403
1404    @skipUnless(zlib, "requires zlib")
1405    def test_deflated(self):
1406        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1407            self.zip_test(f, zipfile.ZIP_DEFLATED)
1408
1409    def zip_open_test(self, f, compression):
1410        self.make_test_archive(f, compression)
1411
1412        # Read the ZIP archive
1413        with zipfile.ZipFile(f, "r", compression) as zipfp:
1414            zipdata1 = []
1415            with zipfp.open(TESTFN) as zipopen1:
1416                while True:
1417                    read_data = zipopen1.read(256)
1418                    if not read_data:
1419                        break
1420                    zipdata1.append(read_data)
1421
1422            zipdata2 = []
1423            with zipfp.open("another.name") as zipopen2:
1424                while True:
1425                    read_data = zipopen2.read(256)
1426                    if not read_data:
1427                        break
1428                    zipdata2.append(read_data)
1429
1430            testdata1 = ''.join(zipdata1)
1431            self.assertEqual(len(testdata1), len(self.data))
1432            self.assertEqual(testdata1, self.data)
1433
1434            testdata2 = ''.join(zipdata2)
1435            self.assertEqual(len(testdata2), len(self.data))
1436            self.assertEqual(testdata2, self.data)
1437
1438    def test_open_stored(self):
1439        for f in (TESTFN2, TemporaryFile(), StringIO()):
1440            self.zip_open_test(f, zipfile.ZIP_STORED)
1441
1442    @skipUnless(zlib, "requires zlib")
1443    def test_open_deflated(self):
1444        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1445            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
1446
1447    def zip_random_open_test(self, f, compression):
1448        self.make_test_archive(f, compression)
1449
1450        # Read the ZIP archive
1451        with zipfile.ZipFile(f, "r", compression) as zipfp:
1452            zipdata1 = []
1453            with zipfp.open(TESTFN) as zipopen1:
1454                while True:
1455                    read_data = zipopen1.read(randint(1, 1024))
1456                    if not read_data:
1457                        break
1458                    zipdata1.append(read_data)
1459
1460            testdata = ''.join(zipdata1)
1461            self.assertEqual(len(testdata), len(self.data))
1462            self.assertEqual(testdata, self.data)
1463
1464    def test_random_open_stored(self):
1465        for f in (TESTFN2, TemporaryFile(), StringIO()):
1466            self.zip_random_open_test(f, zipfile.ZIP_STORED)
1467
1468    @skipUnless(zlib, "requires zlib")
1469    def test_random_open_deflated(self):
1470        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1471            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
1472
1473
1474@skipUnless(zlib, "requires zlib")
1475class TestsWithMultipleOpens(unittest.TestCase):
1476    @classmethod
1477    def setUpClass(cls):
1478        cls.data1 = b'111' + getrandbytes(10000)
1479        cls.data2 = b'222' + getrandbytes(10000)
1480
1481    def make_test_archive(self, f):
1482        # Create the ZIP archive
1483        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
1484            zipfp.writestr('ones', self.data1)
1485            zipfp.writestr('twos', self.data2)
1486
1487    def test_same_file(self):
1488        # Verify that (when the ZipFile is in control of creating file objects)
1489        # multiple open() calls can be made without interfering with each other.
1490        self.make_test_archive(TESTFN2)
1491        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1492            with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
1493                data1 = zopen1.read(500)
1494                data2 = zopen2.read(500)
1495                data1 += zopen1.read()
1496                data2 += zopen2.read()
1497            self.assertEqual(data1, data2)
1498            self.assertEqual(data1, self.data1)
1499
1500    def test_different_file(self):
1501        # Verify that (when the ZipFile is in control of creating file objects)
1502        # multiple open() calls can be made without interfering with each other.
1503        self.make_test_archive(TESTFN2)
1504        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1505            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1506                data1 = zopen1.read(500)
1507                data2 = zopen2.read(500)
1508                data1 += zopen1.read()
1509                data2 += zopen2.read()
1510            self.assertEqual(data1, self.data1)
1511            self.assertEqual(data2, self.data2)
1512
1513    def test_interleaved(self):
1514        # Verify that (when the ZipFile is in control of creating file objects)
1515        # multiple open() calls can be made without interfering with each other.
1516        self.make_test_archive(TESTFN2)
1517        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1518            with zipf.open('ones') as zopen1:
1519                data1 = zopen1.read(500)
1520                with zipf.open('twos') as zopen2:
1521                    data2 = zopen2.read(500)
1522                    data1 += zopen1.read()
1523                    data2 += zopen2.read()
1524            self.assertEqual(data1, self.data1)
1525            self.assertEqual(data2, self.data2)
1526
1527    def test_read_after_close(self):
1528        self.make_test_archive(TESTFN2)
1529        zopen1 = zopen2 = None
1530        try:
1531            with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1532                zopen1 = zipf.open('ones')
1533                zopen2 = zipf.open('twos')
1534            data1 = zopen1.read(500)
1535            data2 = zopen2.read(500)
1536            data1 += zopen1.read()
1537            data2 += zopen2.read()
1538        finally:
1539            if zopen1:
1540                zopen1.close()
1541            if zopen2:
1542                zopen2.close()
1543        self.assertEqual(data1, self.data1)
1544        self.assertEqual(data2, self.data2)
1545
1546    def test_read_after_write(self):
1547        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
1548            zipf.writestr('ones', self.data1)
1549            zipf.writestr('twos', self.data2)
1550            with zipf.open('ones') as zopen1:
1551                data1 = zopen1.read(500)
1552        self.assertEqual(data1, self.data1[:500])
1553        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1554            data1 = zipf.read('ones')
1555            data2 = zipf.read('twos')
1556        self.assertEqual(data1, self.data1)
1557        self.assertEqual(data2, self.data2)
1558
1559    def test_write_after_read(self):
1560        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipf:
1561            zipf.writestr('ones', self.data1)
1562            with zipf.open('ones') as zopen1:
1563                zopen1.read(500)
1564                zipf.writestr('twos', self.data2)
1565        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1566            data1 = zipf.read('ones')
1567            data2 = zipf.read('twos')
1568        self.assertEqual(data1, self.data1)
1569        self.assertEqual(data2, self.data2)
1570
1571    def test_many_opens(self):
1572        # Verify that read() and open() promptly close the file descriptor,
1573        # and don't rely on the garbage collector to free resources.
1574        self.make_test_archive(TESTFN2)
1575        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1576            for x in range(100):
1577                zipf.read('ones')
1578                with zipf.open('ones') as zopen1:
1579                    pass
1580        with open(os.devnull) as f:
1581            self.assertLess(f.fileno(), 100)
1582
1583    def tearDown(self):
1584        unlink(TESTFN2)
1585
1586
1587class TestWithDirectory(unittest.TestCase):
1588    def setUp(self):
1589        os.mkdir(TESTFN2)
1590
1591    def test_extract_dir(self):
1592        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
1593            zipf.extractall(TESTFN2)
1594        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
1595        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
1596        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
1597
1598    def test_bug_6050(self):
1599        # Extraction should succeed if directories already exist
1600        os.mkdir(os.path.join(TESTFN2, "a"))
1601        self.test_extract_dir()
1602
1603    def test_write_dir(self):
1604        dirpath = os.path.join(TESTFN2, "x")
1605        os.mkdir(dirpath)
1606        mode = os.stat(dirpath).st_mode & 0xFFFF
1607        with zipfile.ZipFile(TESTFN, "w") as zipf:
1608            zipf.write(dirpath)
1609            zinfo = zipf.filelist[0]
1610            self.assertTrue(zinfo.filename.endswith("/x/"))
1611            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1612            zipf.write(dirpath, "y")
1613            zinfo = zipf.filelist[1]
1614            self.assertTrue(zinfo.filename, "y/")
1615            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1616        with zipfile.ZipFile(TESTFN, "r") as zipf:
1617            zinfo = zipf.filelist[0]
1618            self.assertTrue(zinfo.filename.endswith("/x/"))
1619            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1620            zinfo = zipf.filelist[1]
1621            self.assertTrue(zinfo.filename, "y/")
1622            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1623            target = os.path.join(TESTFN2, "target")
1624            os.mkdir(target)
1625            zipf.extractall(target)
1626            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
1627            self.assertEqual(len(os.listdir(target)), 2)
1628
1629    def test_writestr_dir(self):
1630        os.mkdir(os.path.join(TESTFN2, "x"))
1631        with zipfile.ZipFile(TESTFN, "w") as zipf:
1632            zipf.writestr("x/", b'')
1633            zinfo = zipf.filelist[0]
1634            self.assertEqual(zinfo.filename, "x/")
1635            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
1636        with zipfile.ZipFile(TESTFN, "r") as zipf:
1637            zinfo = zipf.filelist[0]
1638            self.assertTrue(zinfo.filename.endswith("x/"))
1639            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
1640            target = os.path.join(TESTFN2, "target")
1641            os.mkdir(target)
1642            zipf.extractall(target)
1643            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
1644            self.assertEqual(os.listdir(target), ["x"])
1645
1646    def tearDown(self):
1647        rmtree(TESTFN2)
1648        if os.path.exists(TESTFN):
1649            unlink(TESTFN)
1650
1651
1652class UniversalNewlineTests(unittest.TestCase):
1653    def setUp(self):
1654        self.line_gen = ["Test of zipfile line %d." % i
1655                         for i in xrange(FIXEDTEST_SIZE)]
1656        self.seps = ('\r', '\r\n', '\n')
1657        self.arcdata, self.arcfiles = {}, {}
1658        for n, s in enumerate(self.seps):
1659            self.arcdata[s] = s.join(self.line_gen) + s
1660            self.arcfiles[s] = '%s-%d' % (TESTFN, n)
1661            with open(self.arcfiles[s], "wb") as fid:
1662                fid.write(self.arcdata[s])
1663
1664    def make_test_archive(self, f, compression):
1665        # Create the ZIP archive
1666        with zipfile.ZipFile(f, "w", compression) as zipfp:
1667            for fn in self.arcfiles.values():
1668                zipfp.write(fn, fn)
1669
1670    def read_test(self, f, compression):
1671        self.make_test_archive(f, compression)
1672
1673        # Read the ZIP archive
1674        with zipfile.ZipFile(f, "r") as zipfp:
1675            for sep, fn in self.arcfiles.items():
1676                with zipfp.open(fn, "rU") as fp:
1677                    zipdata = fp.read()
1678                self.assertEqual(self.arcdata[sep], zipdata)
1679
1680    def readline_read_test(self, f, compression):
1681        self.make_test_archive(f, compression)
1682
1683        # Read the ZIP archive
1684        zipfp = zipfile.ZipFile(f, "r")
1685        for sep, fn in self.arcfiles.items():
1686            with zipfp.open(fn, "rU") as zipopen:
1687                data = ''
1688                while True:
1689                    read = zipopen.readline()
1690                    if not read:
1691                        break
1692                    data += read
1693
1694                    read = zipopen.read(5)
1695                    if not read:
1696                        break
1697                    data += read
1698
1699            self.assertEqual(data, self.arcdata['\n'])
1700
1701        zipfp.close()
1702
1703    def readline_test(self, f, compression):
1704        self.make_test_archive(f, compression)
1705
1706        # Read the ZIP archive
1707        with zipfile.ZipFile(f, "r") as zipfp:
1708            for sep, fn in self.arcfiles.items():
1709                with zipfp.open(fn, "rU") as zipopen:
1710                    for line in self.line_gen:
1711                        linedata = zipopen.readline()
1712                        self.assertEqual(linedata, line + '\n')
1713
1714    def readlines_test(self, f, compression):
1715        self.make_test_archive(f, compression)
1716
1717        # Read the ZIP archive
1718        with zipfile.ZipFile(f, "r") as zipfp:
1719            for sep, fn in self.arcfiles.items():
1720                with zipfp.open(fn, "rU") as fp:
1721                    ziplines = fp.readlines()
1722                for line, zipline in zip(self.line_gen, ziplines):
1723                    self.assertEqual(zipline, line + '\n')
1724
1725    def iterlines_test(self, f, compression):
1726        self.make_test_archive(f, compression)
1727
1728        # Read the ZIP archive
1729        with zipfile.ZipFile(f, "r") as zipfp:
1730            for sep, fn in self.arcfiles.items():
1731                with zipfp.open(fn, "rU") as fid:
1732                    for line, zipline in zip(self.line_gen, fid):
1733                        self.assertEqual(zipline, line + '\n')
1734
1735    def test_read_stored(self):
1736        for f in (TESTFN2, TemporaryFile(), StringIO()):
1737            self.read_test(f, zipfile.ZIP_STORED)
1738
1739    def test_readline_read_stored(self):
1740        # Issue #7610: calls to readline() interleaved with calls to read().
1741        for f in (TESTFN2, TemporaryFile(), StringIO()):
1742            self.readline_read_test(f, zipfile.ZIP_STORED)
1743
1744    def test_readline_stored(self):
1745        for f in (TESTFN2, TemporaryFile(), StringIO()):
1746            self.readline_test(f, zipfile.ZIP_STORED)
1747
1748    def test_readlines_stored(self):
1749        for f in (TESTFN2, TemporaryFile(), StringIO()):
1750            self.readlines_test(f, zipfile.ZIP_STORED)
1751
1752    def test_iterlines_stored(self):
1753        for f in (TESTFN2, TemporaryFile(), StringIO()):
1754            self.iterlines_test(f, zipfile.ZIP_STORED)
1755
1756    @skipUnless(zlib, "requires zlib")
1757    def test_read_deflated(self):
1758        for f in (TESTFN2, TemporaryFile(), StringIO()):
1759            self.read_test(f, zipfile.ZIP_DEFLATED)
1760
1761    @skipUnless(zlib, "requires zlib")
1762    def test_readline_read_deflated(self):
1763        # Issue #7610: calls to readline() interleaved with calls to read().
1764        for f in (TESTFN2, TemporaryFile(), StringIO()):
1765            self.readline_read_test(f, zipfile.ZIP_DEFLATED)
1766
1767    @skipUnless(zlib, "requires zlib")
1768    def test_readline_deflated(self):
1769        for f in (TESTFN2, TemporaryFile(), StringIO()):
1770            self.readline_test(f, zipfile.ZIP_DEFLATED)
1771
1772    @skipUnless(zlib, "requires zlib")
1773    def test_readlines_deflated(self):
1774        for f in (TESTFN2, TemporaryFile(), StringIO()):
1775            self.readlines_test(f, zipfile.ZIP_DEFLATED)
1776
1777    @skipUnless(zlib, "requires zlib")
1778    def test_iterlines_deflated(self):
1779        for f in (TESTFN2, TemporaryFile(), StringIO()):
1780            self.iterlines_test(f, zipfile.ZIP_DEFLATED)
1781
1782    def tearDown(self):
1783        for sep, fn in self.arcfiles.items():
1784            os.remove(fn)
1785        unlink(TESTFN)
1786        unlink(TESTFN2)
1787
1788
1789class CommandLineTest(unittest.TestCase):
1790
1791    def zipfilecmd(self, *args, **kwargs):
1792        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
1793                                                      **kwargs)
1794        return out.replace(os.linesep.encode(), b'\n')
1795
1796    def zipfilecmd_failure(self, *args):
1797        return script_helper.assert_python_failure('-m', 'zipfile', *args)
1798
1799    def test_test_command(self):
1800        zip_name = findfile('zipdir.zip')
1801        out = self.zipfilecmd('-t', zip_name)
1802        self.assertEqual(out.rstrip(), b'Done testing')
1803        zip_name = findfile('testtar.tar')
1804        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
1805        self.assertEqual(out, b'')
1806
1807    def test_list_command(self):
1808        zip_name = findfile('zipdir.zip')
1809        with captured_stdout() as t, zipfile.ZipFile(zip_name, 'r') as tf:
1810            tf.printdir()
1811        expected = t.getvalue().encode('ascii', 'backslashreplace')
1812        out = self.zipfilecmd('-l', zip_name,
1813                              PYTHONIOENCODING='ascii:backslashreplace')
1814        self.assertEqual(out, expected)
1815
1816    @skipUnless(zlib, "requires zlib")
1817    def test_create_command(self):
1818        self.addCleanup(unlink, TESTFN)
1819        with open(TESTFN, 'w') as f:
1820            f.write('test 1')
1821        os.mkdir(TESTFNDIR)
1822        self.addCleanup(rmtree, TESTFNDIR)
1823        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
1824            f.write('test 2')
1825        files = [TESTFN, TESTFNDIR]
1826        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
1827        try:
1828            out = self.zipfilecmd('-c', TESTFN2, *files)
1829            self.assertEqual(out, b'')
1830            with zipfile.ZipFile(TESTFN2) as zf:
1831                self.assertEqual(zf.namelist(), namelist)
1832                self.assertEqual(zf.read(namelist[0]), b'test 1')
1833                self.assertEqual(zf.read(namelist[2]), b'test 2')
1834        finally:
1835            unlink(TESTFN2)
1836
1837    def test_extract_command(self):
1838        zip_name = findfile('zipdir.zip')
1839        extdir = TESTFNDIR
1840        os.mkdir(extdir)
1841        try:
1842            out = self.zipfilecmd('-e', zip_name, extdir)
1843            self.assertEqual(out, b'')
1844            with zipfile.ZipFile(zip_name) as zf:
1845                for zi in zf.infolist():
1846                    path = os.path.join(extdir,
1847                                zi.filename.replace('/', os.sep))
1848                    if zi.filename.endswith('/'):
1849                        self.assertTrue(os.path.isdir(path))
1850                    else:
1851                        self.assertTrue(os.path.isfile(path))
1852                        with open(path, 'rb') as f:
1853                            self.assertEqual(f.read(), zf.read(zi))
1854        finally:
1855            rmtree(extdir)
1856
1857def test_main():
1858    run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
1859                 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
1860                 TestWithDirectory, UniversalNewlineTests,
1861                 TestsWithRandomBinaryFiles, CommandLineTest)
1862
1863
1864if __name__ == "__main__":
1865    test_main()
1866