1# Tests of the full ZIP64 functionality of zipfile
2# The support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import support
5
6# XXX(nnorwitz): disable this test by looking for extralargefile resource,
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14import zipfile, os, unittest
15import time
16import sys
17
18from tempfile import TemporaryFile
19
20from test.support import os_helper
21from test.support import TESTFN, requires_zlib
22
23TESTFN2 = TESTFN + "2"
24
25# How much time in seconds can pass before we print a 'Still working' message.
26_PRINT_WORKING_MSG_INTERVAL = 60
27
28class TestsWithSourceFile(unittest.TestCase):
29    def setUp(self):
30        # Create test data.
31        line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
32        self.data = '\n'.join(line_gen).encode('ascii')
33
34        # And write it to a file.
35        with open(TESTFN, "wb") as fp:
36            fp.write(self.data)
37
38    def zipTest(self, f, compression):
39        # Create the ZIP archive.
40        with zipfile.ZipFile(f, "w", compression) as zipfp:
41
42            # It will contain enough copies of self.data to reach about 6 GiB of
43            # raw data to store.
44            filecount = 6*1024**3 // len(self.data)
45
46            next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
47            for num in range(filecount):
48                zipfp.writestr("testfn%d" % num, self.data)
49                # Print still working message since this test can be really slow
50                if next_time <= time.monotonic():
51                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
52                    print((
53                    '  zipTest still writing %d of %d, be patient...' %
54                    (num, filecount)), file=sys.__stdout__)
55                    sys.__stdout__.flush()
56
57        # Read the ZIP archive
58        with zipfile.ZipFile(f, "r", compression) as zipfp:
59            for num in range(filecount):
60                self.assertEqual(zipfp.read("testfn%d" % num), self.data)
61                # Print still working message since this test can be really slow
62                if next_time <= time.monotonic():
63                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
64                    print((
65                    '  zipTest still reading %d of %d, be patient...' %
66                    (num, filecount)), file=sys.__stdout__)
67                    sys.__stdout__.flush()
68
69    def testStored(self):
70        # Try the temp file first.  If we do TESTFN2 first, then it hogs
71        # gigabytes of disk space for the duration of the test.
72        with TemporaryFile() as f:
73            self.zipTest(f, zipfile.ZIP_STORED)
74            self.assertFalse(f.closed)
75        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
76
77    @requires_zlib()
78    def testDeflated(self):
79        # Try the temp file first.  If we do TESTFN2 first, then it hogs
80        # gigabytes of disk space for the duration of the test.
81        with TemporaryFile() as f:
82            self.zipTest(f, zipfile.ZIP_DEFLATED)
83            self.assertFalse(f.closed)
84        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
85
86    def tearDown(self):
87        for fname in TESTFN, TESTFN2:
88            if os.path.exists(fname):
89                os.remove(fname)
90
91
92class OtherTests(unittest.TestCase):
93    def testMoreThan64kFiles(self):
94        # This test checks that more than 64k files can be added to an archive,
95        # and that the resulting archive can be read properly by ZipFile
96        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=True) as zipf:
97            zipf.debug = 100
98            numfiles = (1 << 16) * 3//2
99            for i in range(numfiles):
100                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
101            self.assertEqual(len(zipf.namelist()), numfiles)
102
103        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
104            self.assertEqual(len(zipf2.namelist()), numfiles)
105            for i in range(numfiles):
106                content = zipf2.read("foo%08d" % i).decode('ascii')
107                self.assertEqual(content, "%d" % (i**3 % 57))
108
109    def testMoreThan64kFilesAppend(self):
110        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=False) as zipf:
111            zipf.debug = 100
112            numfiles = (1 << 16) - 1
113            for i in range(numfiles):
114                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
115            self.assertEqual(len(zipf.namelist()), numfiles)
116            with self.assertRaises(zipfile.LargeZipFile):
117                zipf.writestr("foo%08d" % numfiles, b'')
118            self.assertEqual(len(zipf.namelist()), numfiles)
119
120        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=False) as zipf:
121            zipf.debug = 100
122            self.assertEqual(len(zipf.namelist()), numfiles)
123            with self.assertRaises(zipfile.LargeZipFile):
124                zipf.writestr("foo%08d" % numfiles, b'')
125            self.assertEqual(len(zipf.namelist()), numfiles)
126
127        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=True) as zipf:
128            zipf.debug = 100
129            self.assertEqual(len(zipf.namelist()), numfiles)
130            numfiles2 = (1 << 16) * 3//2
131            for i in range(numfiles, numfiles2):
132                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
133            self.assertEqual(len(zipf.namelist()), numfiles2)
134
135        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
136            self.assertEqual(len(zipf2.namelist()), numfiles2)
137            for i in range(numfiles2):
138                content = zipf2.read("foo%08d" % i).decode('ascii')
139                self.assertEqual(content, "%d" % (i**3 % 57))
140
141    def tearDown(self):
142        os_helper.unlink(TESTFN)
143        os_helper.unlink(TESTFN2)
144
145if __name__ == "__main__":
146    unittest.main()
147