1# Tests of the full ZIP64 functionality of zipfile
2# The support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import support
5
6# XXX(nnorwitz): disable this test by looking for extralargefile resource,
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14import zipfile, os, unittest
15import time
16import sys
17
18from tempfile import TemporaryFile
19
20from test.support import TESTFN, requires_zlib
21
22TESTFN2 = TESTFN + "2"
23
24# How much time in seconds can pass before we print a 'Still working' message.
25_PRINT_WORKING_MSG_INTERVAL = 60
26
27class TestsWithSourceFile(unittest.TestCase):
28    def setUp(self):
29        # Create test data.
30        line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
31        self.data = '\n'.join(line_gen).encode('ascii')
32
33        # And write it to a file.
34        with open(TESTFN, "wb") as fp:
35            fp.write(self.data)
36
37    def zipTest(self, f, compression):
38        # Create the ZIP archive.
39        with zipfile.ZipFile(f, "w", compression) as zipfp:
40
41            # It will contain enough copies of self.data to reach about 6 GiB of
42            # raw data to store.
43            filecount = 6*1024**3 // len(self.data)
44
45            next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
46            for num in range(filecount):
47                zipfp.writestr("testfn%d" % num, self.data)
48                # Print still working message since this test can be really slow
49                if next_time <= time.monotonic():
50                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
51                    print((
52                    '  zipTest still writing %d of %d, be patient...' %
53                    (num, filecount)), file=sys.__stdout__)
54                    sys.__stdout__.flush()
55
56        # Read the ZIP archive
57        with zipfile.ZipFile(f, "r", compression) as zipfp:
58            for num in range(filecount):
59                self.assertEqual(zipfp.read("testfn%d" % num), self.data)
60                # Print still working message since this test can be really slow
61                if next_time <= time.monotonic():
62                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
63                    print((
64                    '  zipTest still reading %d of %d, be patient...' %
65                    (num, filecount)), file=sys.__stdout__)
66                    sys.__stdout__.flush()
67
68    def testStored(self):
69        # Try the temp file first.  If we do TESTFN2 first, then it hogs
70        # gigabytes of disk space for the duration of the test.
71        with TemporaryFile() as f:
72            self.zipTest(f, zipfile.ZIP_STORED)
73            self.assertFalse(f.closed)
74        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
75
76    @requires_zlib
77    def testDeflated(self):
78        # Try the temp file first.  If we do TESTFN2 first, then it hogs
79        # gigabytes of disk space for the duration of the test.
80        with TemporaryFile() as f:
81            self.zipTest(f, zipfile.ZIP_DEFLATED)
82            self.assertFalse(f.closed)
83        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
84
85    def tearDown(self):
86        for fname in TESTFN, TESTFN2:
87            if os.path.exists(fname):
88                os.remove(fname)
89
90
91class OtherTests(unittest.TestCase):
92    def testMoreThan64kFiles(self):
93        # This test checks that more than 64k files can be added to an archive,
94        # and that the resulting archive can be read properly by ZipFile
95        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=True) as zipf:
96            zipf.debug = 100
97            numfiles = (1 << 16) * 3//2
98            for i in range(numfiles):
99                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
100            self.assertEqual(len(zipf.namelist()), numfiles)
101
102        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
103            self.assertEqual(len(zipf2.namelist()), numfiles)
104            for i in range(numfiles):
105                content = zipf2.read("foo%08d" % i).decode('ascii')
106                self.assertEqual(content, "%d" % (i**3 % 57))
107
108    def testMoreThan64kFilesAppend(self):
109        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=False) as zipf:
110            zipf.debug = 100
111            numfiles = (1 << 16) - 1
112            for i in range(numfiles):
113                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
114            self.assertEqual(len(zipf.namelist()), numfiles)
115            with self.assertRaises(zipfile.LargeZipFile):
116                zipf.writestr("foo%08d" % numfiles, b'')
117            self.assertEqual(len(zipf.namelist()), numfiles)
118
119        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=False) as zipf:
120            zipf.debug = 100
121            self.assertEqual(len(zipf.namelist()), numfiles)
122            with self.assertRaises(zipfile.LargeZipFile):
123                zipf.writestr("foo%08d" % numfiles, b'')
124            self.assertEqual(len(zipf.namelist()), numfiles)
125
126        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=True) as zipf:
127            zipf.debug = 100
128            self.assertEqual(len(zipf.namelist()), numfiles)
129            numfiles2 = (1 << 16) * 3//2
130            for i in range(numfiles, numfiles2):
131                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
132            self.assertEqual(len(zipf.namelist()), numfiles2)
133
134        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
135            self.assertEqual(len(zipf2.namelist()), numfiles2)
136            for i in range(numfiles2):
137                content = zipf2.read("foo%08d" % i).decode('ascii')
138                self.assertEqual(content, "%d" % (i**3 % 57))
139
140    def tearDown(self):
141        support.unlink(TESTFN)
142        support.unlink(TESTFN2)
143
144if __name__ == "__main__":
145    unittest.main()
146