1# Tests of the full ZIP64 functionality of zipfile
2# The test_support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import test_support
5
6# XXX(nnorwitz): disable this test by looking for extra largfile resource
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9test_support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14# We can test part of the module without zlib.
15try:
16    import zlib
17except ImportError:
18    zlib = None
19
20import zipfile, os, unittest
21import time
22import sys
23
24from tempfile import TemporaryFile
25
26from test.test_support import TESTFN, run_unittest
27
28TESTFN2 = TESTFN + "2"
29
30# How much time in seconds can pass before we print a 'Still working' message.
31_PRINT_WORKING_MSG_INTERVAL = 5 * 60
32
33class TestsWithSourceFile(unittest.TestCase):
34    def setUp(self):
35        # Create test data.
36        # xrange() is important here -- don't want to create immortal space
37        # for a million ints.
38        line_gen = ("Test of zipfile line %d." % i for i in xrange(1000000))
39        self.data = '\n'.join(line_gen)
40
41        # And write it to a file.
42        fp = open(TESTFN, "wb")
43        fp.write(self.data)
44        fp.close()
45
46    def zipTest(self, f, compression):
47        # Create the ZIP archive.
48        zipfp = zipfile.ZipFile(f, "w", compression, allowZip64=True)
49
50        # It will contain enough copies of self.data to reach about 6GB of
51        # raw data to store.
52        filecount = 6*1024**3 // len(self.data)
53
54        next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
55        for num in range(filecount):
56            zipfp.writestr("testfn%d" % num, self.data)
57            # Print still working message since this test can be really slow
58            if next_time <= time.time():
59                next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
60                print >>sys.__stdout__, (
61                   '  zipTest still writing %d of %d, be patient...' %
62                   (num, filecount))
63                sys.__stdout__.flush()
64        zipfp.close()
65
66        # Read the ZIP archive
67        zipfp = zipfile.ZipFile(f, "r", compression)
68        for num in range(filecount):
69            self.assertEqual(zipfp.read("testfn%d" % num), self.data)
70            # Print still working message since this test can be really slow
71            if next_time <= time.time():
72                next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
73                print >>sys.__stdout__, (
74                   '  zipTest still reading %d of %d, be patient...' %
75                   (num, filecount))
76                sys.__stdout__.flush()
77        zipfp.close()
78
79    def testStored(self):
80        # Try the temp file first.  If we do TESTFN2 first, then it hogs
81        # gigabytes of disk space for the duration of the test.
82        with TemporaryFile() as f:
83            self.zipTest(f, zipfile.ZIP_STORED)
84            self.assertFalse(f.closed)
85        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
86
87    @unittest.skipUnless(zlib, "requires zlib")
88    def testDeflated(self):
89        # Try the temp file first.  If we do TESTFN2 first, then it hogs
90        # gigabytes of disk space for the duration of the test.
91        with TemporaryFile() as f:
92            self.zipTest(f, zipfile.ZIP_DEFLATED)
93            self.assertFalse(f.closed)
94        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
95
96    def tearDown(self):
97        for fname in TESTFN, TESTFN2:
98            if os.path.exists(fname):
99                os.remove(fname)
100
101
102class OtherTests(unittest.TestCase):
103    def testMoreThan64kFiles(self):
104        # This test checks that more than 64k files can be added to an archive,
105        # and that the resulting archive can be read properly by ZipFile
106        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True)
107        zipf.debug = 100
108        numfiles = (1 << 16) * 3/2
109        for i in xrange(numfiles):
110            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
111        self.assertEqual(len(zipf.namelist()), numfiles)
112        zipf.close()
113
114        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
115        self.assertEqual(len(zipf2.namelist()), numfiles)
116        for i in xrange(numfiles):
117            self.assertEqual(zipf2.read("foo%08d" % i), "%d" % (i**3 % 57))
118        zipf2.close()
119
120    def testMoreThan64kFilesAppend(self):
121        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False)
122        zipf.debug = 100
123        numfiles = (1 << 16) - 1
124        for i in range(numfiles):
125            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
126        self.assertEqual(len(zipf.namelist()), numfiles)
127        with self.assertRaises(zipfile.LargeZipFile):
128            zipf.writestr("foo%08d" % numfiles, b'')
129        self.assertEqual(len(zipf.namelist()), numfiles)
130        zipf.close()
131
132        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False)
133        zipf.debug = 100
134        self.assertEqual(len(zipf.namelist()), numfiles)
135        with self.assertRaises(zipfile.LargeZipFile):
136            zipf.writestr("foo%08d" % numfiles, b'')
137        self.assertEqual(len(zipf.namelist()), numfiles)
138        zipf.close()
139
140        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True)
141        zipf.debug = 100
142        self.assertEqual(len(zipf.namelist()), numfiles)
143        numfiles2 = (1 << 16) * 3//2
144        for i in range(numfiles, numfiles2):
145            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
146        self.assertEqual(len(zipf.namelist()), numfiles2)
147        zipf.close()
148
149        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
150        self.assertEqual(len(zipf2.namelist()), numfiles2)
151        for i in range(numfiles2):
152            self.assertEqual(zipf2.read("foo%08d" % i), "%d" % (i**3 % 57))
153        zipf2.close()
154
155    def tearDown(self):
156        test_support.unlink(TESTFN)
157        test_support.unlink(TESTFN2)
158
159def test_main():
160    run_unittest(TestsWithSourceFile, OtherTests)
161
162if __name__ == "__main__":
163    test_main()
164