1"""Test script for the gzip module.
2"""
3
4import array
5import functools
6import io
7import os
8import pathlib
9import struct
10import sys
11import unittest
12from subprocess import PIPE, Popen
13from test.support import import_helper
14from test.support import os_helper
15from test.support import _4G, bigmemtest
16from test.support.script_helper import assert_python_ok, assert_python_failure
17
18gzip = import_helper.import_module('gzip')
19
20data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
21  PyObject *RetVal;
22  int flushmode = Z_FINISH;
23  unsigned long start_total_out;
24
25"""
26
27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
28/* See http://www.gzip.org/zlib/
29/* See http://www.winimage.com/zLibDll for Windows */
30"""
31
32
33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
34
35
36class UnseekableIO(io.BytesIO):
37    def seekable(self):
38        return False
39
40    def tell(self):
41        raise io.UnsupportedOperation
42
43    def seek(self, *args):
44        raise io.UnsupportedOperation
45
46
47class BaseTest(unittest.TestCase):
48    filename = os_helper.TESTFN
49
50    def setUp(self):
51        os_helper.unlink(self.filename)
52
53    def tearDown(self):
54        os_helper.unlink(self.filename)
55
56
57class TestGzip(BaseTest):
58    def write_and_read_back(self, data, mode='b'):
59        b_data = bytes(data)
60        with gzip.GzipFile(self.filename, 'w'+mode) as f:
61            l = f.write(data)
62        self.assertEqual(l, len(b_data))
63        with gzip.GzipFile(self.filename, 'r'+mode) as f:
64            self.assertEqual(f.read(), b_data)
65
66    def test_write(self):
67        with gzip.GzipFile(self.filename, 'wb') as f:
68            f.write(data1 * 50)
69
70            # Try flush and fileno.
71            f.flush()
72            f.fileno()
73            if hasattr(os, 'fsync'):
74                os.fsync(f.fileno())
75            f.close()
76
77        # Test multiple close() calls.
78        f.close()
79
80    def test_write_read_with_pathlike_file(self):
81        filename = pathlib.Path(self.filename)
82        with gzip.GzipFile(filename, 'w') as f:
83            f.write(data1 * 50)
84        self.assertIsInstance(f.name, str)
85        with gzip.GzipFile(filename, 'a') as f:
86            f.write(data1)
87        with gzip.GzipFile(filename) as f:
88            d = f.read()
89        self.assertEqual(d, data1 * 51)
90        self.assertIsInstance(f.name, str)
91
92    # The following test_write_xy methods test that write accepts
93    # the corresponding bytes-like object type as input
94    # and that the data written equals bytes(xy) in all cases.
95    def test_write_memoryview(self):
96        self.write_and_read_back(memoryview(data1 * 50))
97        m = memoryview(bytes(range(256)))
98        data = m.cast('B', shape=[8,8,4])
99        self.write_and_read_back(data)
100
101    def test_write_bytearray(self):
102        self.write_and_read_back(bytearray(data1 * 50))
103
104    def test_write_array(self):
105        self.write_and_read_back(array.array('I', data1 * 40))
106
107    def test_write_incompatible_type(self):
108        # Test that non-bytes-like types raise TypeError.
109        # Issue #21560: attempts to write incompatible types
110        # should not affect the state of the fileobject
111        with gzip.GzipFile(self.filename, 'wb') as f:
112            with self.assertRaises(TypeError):
113                f.write('')
114            with self.assertRaises(TypeError):
115                f.write([])
116            f.write(data1)
117        with gzip.GzipFile(self.filename, 'rb') as f:
118            self.assertEqual(f.read(), data1)
119
120    def test_read(self):
121        self.test_write()
122        # Try reading.
123        with gzip.GzipFile(self.filename, 'r') as f:
124            d = f.read()
125        self.assertEqual(d, data1*50)
126
127    def test_read1(self):
128        self.test_write()
129        blocks = []
130        nread = 0
131        with gzip.GzipFile(self.filename, 'r') as f:
132            while True:
133                d = f.read1()
134                if not d:
135                    break
136                blocks.append(d)
137                nread += len(d)
138                # Check that position was updated correctly (see issue10791).
139                self.assertEqual(f.tell(), nread)
140        self.assertEqual(b''.join(blocks), data1 * 50)
141
142    @bigmemtest(size=_4G, memuse=1)
143    def test_read_large(self, size):
144        # Read chunk size over UINT_MAX should be supported, despite zlib's
145        # limitation per low-level call
146        compressed = gzip.compress(data1, compresslevel=1)
147        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
148        self.assertEqual(f.read(size), data1)
149
150    def test_io_on_closed_object(self):
151        # Test that I/O operations on closed GzipFile objects raise a
152        # ValueError, just like the corresponding functions on file objects.
153
154        # Write to a file, open it for reading, then close it.
155        self.test_write()
156        f = gzip.GzipFile(self.filename, 'r')
157        fileobj = f.fileobj
158        self.assertFalse(fileobj.closed)
159        f.close()
160        self.assertTrue(fileobj.closed)
161        with self.assertRaises(ValueError):
162            f.read(1)
163        with self.assertRaises(ValueError):
164            f.seek(0)
165        with self.assertRaises(ValueError):
166            f.tell()
167        # Open the file for writing, then close it.
168        f = gzip.GzipFile(self.filename, 'w')
169        fileobj = f.fileobj
170        self.assertFalse(fileobj.closed)
171        f.close()
172        self.assertTrue(fileobj.closed)
173        with self.assertRaises(ValueError):
174            f.write(b'')
175        with self.assertRaises(ValueError):
176            f.flush()
177
178    def test_append(self):
179        self.test_write()
180        # Append to the previous file
181        with gzip.GzipFile(self.filename, 'ab') as f:
182            f.write(data2 * 15)
183
184        with gzip.GzipFile(self.filename, 'rb') as f:
185            d = f.read()
186        self.assertEqual(d, (data1*50) + (data2*15))
187
188    def test_many_append(self):
189        # Bug #1074261 was triggered when reading a file that contained
190        # many, many members.  Create such a file and verify that reading it
191        # works.
192        with gzip.GzipFile(self.filename, 'wb', 9) as f:
193            f.write(b'a')
194        for i in range(0, 200):
195            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
196                f.write(b'a')
197
198        # Try reading the file
199        with gzip.GzipFile(self.filename, "rb") as zgfile:
200            contents = b""
201            while 1:
202                ztxt = zgfile.read(8192)
203                contents += ztxt
204                if not ztxt: break
205        self.assertEqual(contents, b'a'*201)
206
207    def test_exclusive_write(self):
208        with gzip.GzipFile(self.filename, 'xb') as f:
209            f.write(data1 * 50)
210        with gzip.GzipFile(self.filename, 'rb') as f:
211            self.assertEqual(f.read(), data1 * 50)
212        with self.assertRaises(FileExistsError):
213            gzip.GzipFile(self.filename, 'xb')
214
215    def test_buffered_reader(self):
216        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
217        # performance.
218        self.test_write()
219
220        with gzip.GzipFile(self.filename, 'rb') as f:
221            with io.BufferedReader(f) as r:
222                lines = [line for line in r]
223
224        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
225
226    def test_readline(self):
227        self.test_write()
228        # Try .readline() with varying line lengths
229
230        with gzip.GzipFile(self.filename, 'rb') as f:
231            line_length = 0
232            while 1:
233                L = f.readline(line_length)
234                if not L and line_length != 0: break
235                self.assertTrue(len(L) <= line_length)
236                line_length = (line_length + 1) % 50
237
238    def test_readlines(self):
239        self.test_write()
240        # Try .readlines()
241
242        with gzip.GzipFile(self.filename, 'rb') as f:
243            L = f.readlines()
244
245        with gzip.GzipFile(self.filename, 'rb') as f:
246            while 1:
247                L = f.readlines(150)
248                if L == []: break
249
250    def test_seek_read(self):
251        self.test_write()
252        # Try seek, read test
253
254        with gzip.GzipFile(self.filename) as f:
255            while 1:
256                oldpos = f.tell()
257                line1 = f.readline()
258                if not line1: break
259                newpos = f.tell()
260                f.seek(oldpos)  # negative seek
261                if len(line1)>10:
262                    amount = 10
263                else:
264                    amount = len(line1)
265                line2 = f.read(amount)
266                self.assertEqual(line1[:amount], line2)
267                f.seek(newpos)  # positive seek
268
269    def test_seek_whence(self):
270        self.test_write()
271        # Try seek(whence=1), read test
272
273        with gzip.GzipFile(self.filename) as f:
274            f.read(10)
275            f.seek(10, whence=1)
276            y = f.read(10)
277        self.assertEqual(y, data1[20:30])
278
279    def test_seek_write(self):
280        # Try seek, write test
281        with gzip.GzipFile(self.filename, 'w') as f:
282            for pos in range(0, 256, 16):
283                f.seek(pos)
284                f.write(b'GZ\n')
285
286    def test_mode(self):
287        self.test_write()
288        with gzip.GzipFile(self.filename, 'r') as f:
289            self.assertEqual(f.myfileobj.mode, 'rb')
290        os_helper.unlink(self.filename)
291        with gzip.GzipFile(self.filename, 'x') as f:
292            self.assertEqual(f.myfileobj.mode, 'xb')
293
294    def test_1647484(self):
295        for mode in ('wb', 'rb'):
296            with gzip.GzipFile(self.filename, mode) as f:
297                self.assertTrue(hasattr(f, "name"))
298                self.assertEqual(f.name, self.filename)
299
300    def test_paddedfile_getattr(self):
301        self.test_write()
302        with gzip.GzipFile(self.filename, 'rb') as f:
303            self.assertTrue(hasattr(f.fileobj, "name"))
304            self.assertEqual(f.fileobj.name, self.filename)
305
306    def test_mtime(self):
307        mtime = 123456789
308        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
309            fWrite.write(data1)
310        with gzip.GzipFile(self.filename) as fRead:
311            self.assertTrue(hasattr(fRead, 'mtime'))
312            self.assertIsNone(fRead.mtime)
313            dataRead = fRead.read()
314            self.assertEqual(dataRead, data1)
315            self.assertEqual(fRead.mtime, mtime)
316
317    def test_metadata(self):
318        mtime = 123456789
319
320        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
321            fWrite.write(data1)
322
323        with open(self.filename, 'rb') as fRead:
324            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
325
326            idBytes = fRead.read(2)
327            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
328
329            cmByte = fRead.read(1)
330            self.assertEqual(cmByte, b'\x08') # deflate
331
332            try:
333                expectedname = self.filename.encode('Latin-1') + b'\x00'
334                expectedflags = b'\x08' # only the FNAME flag is set
335            except UnicodeEncodeError:
336                expectedname = b''
337                expectedflags = b'\x00'
338
339            flagsByte = fRead.read(1)
340            self.assertEqual(flagsByte, expectedflags)
341
342            mtimeBytes = fRead.read(4)
343            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
344
345            xflByte = fRead.read(1)
346            self.assertEqual(xflByte, b'\x02') # maximum compression
347
348            osByte = fRead.read(1)
349            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
350
351            # Since the FNAME flag is set, the zero-terminated filename follows.
352            # RFC 1952 specifies that this is the name of the input file, if any.
353            # However, the gzip module defaults to storing the name of the output
354            # file in this field.
355            nameBytes = fRead.read(len(expectedname))
356            self.assertEqual(nameBytes, expectedname)
357
358            # Since no other flags were set, the header ends here.
359            # Rather than process the compressed data, let's seek to the trailer.
360            fRead.seek(os.stat(self.filename).st_size - 8)
361
362            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
363            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
364
365            isizeBytes = fRead.read(4)
366            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
367
368    def test_metadata_ascii_name(self):
369        self.filename = os_helper.TESTFN_ASCII
370        self.test_metadata()
371
372    def test_compresslevel_metadata(self):
373        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
374        # specifically, discussion of XFL in section 2.3.1
375        cases = [
376            ('fast', 1, b'\x04'),
377            ('best', 9, b'\x02'),
378            ('tradeoff', 6, b'\x00'),
379        ]
380        xflOffset = 8
381
382        for (name, level, expectedXflByte) in cases:
383            with self.subTest(name):
384                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
385                with fWrite:
386                    fWrite.write(data1)
387                with open(self.filename, 'rb') as fRead:
388                    fRead.seek(xflOffset)
389                    xflByte = fRead.read(1)
390                    self.assertEqual(xflByte, expectedXflByte)
391
392    def test_with_open(self):
393        # GzipFile supports the context management protocol
394        with gzip.GzipFile(self.filename, "wb") as f:
395            f.write(b"xxx")
396        f = gzip.GzipFile(self.filename, "rb")
397        f.close()
398        try:
399            with f:
400                pass
401        except ValueError:
402            pass
403        else:
404            self.fail("__enter__ on a closed file didn't raise an exception")
405        try:
406            with gzip.GzipFile(self.filename, "wb") as f:
407                1/0
408        except ZeroDivisionError:
409            pass
410        else:
411            self.fail("1/0 didn't raise an exception")
412
413    def test_zero_padded_file(self):
414        with gzip.GzipFile(self.filename, "wb") as f:
415            f.write(data1 * 50)
416
417        # Pad the file with zeroes
418        with open(self.filename, "ab") as f:
419            f.write(b"\x00" * 50)
420
421        with gzip.GzipFile(self.filename, "rb") as f:
422            d = f.read()
423            self.assertEqual(d, data1 * 50, "Incorrect data in file")
424
425    def test_gzip_BadGzipFile_exception(self):
426        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
427
428    def test_bad_gzip_file(self):
429        with open(self.filename, 'wb') as file:
430            file.write(data1 * 50)
431        with gzip.GzipFile(self.filename, 'r') as file:
432            self.assertRaises(gzip.BadGzipFile, file.readlines)
433
434    def test_non_seekable_file(self):
435        uncompressed = data1 * 50
436        buf = UnseekableIO()
437        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
438            f.write(uncompressed)
439        compressed = buf.getvalue()
440        buf = UnseekableIO(compressed)
441        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
442            self.assertEqual(f.read(), uncompressed)
443
444    def test_peek(self):
445        uncompressed = data1 * 200
446        with gzip.GzipFile(self.filename, "wb") as f:
447            f.write(uncompressed)
448
449        def sizes():
450            while True:
451                for n in range(5, 50, 10):
452                    yield n
453
454        with gzip.GzipFile(self.filename, "rb") as f:
455            f.max_read_chunk = 33
456            nread = 0
457            for n in sizes():
458                s = f.peek(n)
459                if s == b'':
460                    break
461                self.assertEqual(f.read(len(s)), s)
462                nread += len(s)
463            self.assertEqual(f.read(100), b'')
464            self.assertEqual(nread, len(uncompressed))
465
466    def test_textio_readlines(self):
467        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
468        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
469        self.test_write()
470        with gzip.GzipFile(self.filename, 'r') as f:
471            with io.TextIOWrapper(f, encoding="ascii") as t:
472                self.assertEqual(t.readlines(), lines)
473
474    def test_fileobj_from_fdopen(self):
475        # Issue #13781: Opening a GzipFile for writing fails when using a
476        # fileobj created with os.fdopen().
477        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
478        with os.fdopen(fd, "wb") as f:
479            with gzip.GzipFile(fileobj=f, mode="w") as g:
480                pass
481
482    def test_fileobj_mode(self):
483        gzip.GzipFile(self.filename, "wb").close()
484        with open(self.filename, "r+b") as f:
485            with gzip.GzipFile(fileobj=f, mode='r') as g:
486                self.assertEqual(g.mode, gzip.READ)
487            with gzip.GzipFile(fileobj=f, mode='w') as g:
488                self.assertEqual(g.mode, gzip.WRITE)
489            with gzip.GzipFile(fileobj=f, mode='a') as g:
490                self.assertEqual(g.mode, gzip.WRITE)
491            with gzip.GzipFile(fileobj=f, mode='x') as g:
492                self.assertEqual(g.mode, gzip.WRITE)
493            with self.assertRaises(ValueError):
494                gzip.GzipFile(fileobj=f, mode='z')
495        for mode in "rb", "r+b":
496            with open(self.filename, mode) as f:
497                with gzip.GzipFile(fileobj=f) as g:
498                    self.assertEqual(g.mode, gzip.READ)
499        for mode in "wb", "ab", "xb":
500            if "x" in mode:
501                os_helper.unlink(self.filename)
502            with open(self.filename, mode) as f:
503                with self.assertWarns(FutureWarning):
504                    g = gzip.GzipFile(fileobj=f)
505                with g:
506                    self.assertEqual(g.mode, gzip.WRITE)
507
508    def test_bytes_filename(self):
509        str_filename = self.filename
510        try:
511            bytes_filename = str_filename.encode("ascii")
512        except UnicodeEncodeError:
513            self.skipTest("Temporary file name needs to be ASCII")
514        with gzip.GzipFile(bytes_filename, "wb") as f:
515            f.write(data1 * 50)
516        with gzip.GzipFile(bytes_filename, "rb") as f:
517            self.assertEqual(f.read(), data1 * 50)
518        # Sanity check that we are actually operating on the right file.
519        with gzip.GzipFile(str_filename, "rb") as f:
520            self.assertEqual(f.read(), data1 * 50)
521
522    def test_decompress_limited(self):
523        """Decompressed data buffering should be limited"""
524        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
525        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
526
527        bomb = io.BytesIO(bomb)
528        decomp = gzip.GzipFile(fileobj=bomb)
529        self.assertEqual(decomp.read(1), b'\0')
530        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
531        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
532            "Excessive amount of data was decompressed")
533
534    # Testing compress/decompress shortcut functions
535
536    def test_compress(self):
537        for data in [data1, data2]:
538            for args in [(), (1,), (6,), (9,)]:
539                datac = gzip.compress(data, *args)
540                self.assertEqual(type(datac), bytes)
541                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
542                    self.assertEqual(f.read(), data)
543
544    def test_compress_mtime(self):
545        mtime = 123456789
546        for data in [data1, data2]:
547            for args in [(), (1,), (6,), (9,)]:
548                with self.subTest(data=data, args=args):
549                    datac = gzip.compress(data, *args, mtime=mtime)
550                    self.assertEqual(type(datac), bytes)
551                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
552                        f.read(1) # to set mtime attribute
553                        self.assertEqual(f.mtime, mtime)
554
555    def test_decompress(self):
556        for data in (data1, data2):
557            buf = io.BytesIO()
558            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
559                f.write(data)
560            self.assertEqual(gzip.decompress(buf.getvalue()), data)
561            # Roundtrip with compress
562            datac = gzip.compress(data)
563            self.assertEqual(gzip.decompress(datac), data)
564
565    def test_decompress_truncated_trailer(self):
566        compressed_data = gzip.compress(data1)
567        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
568
569    def test_decompress_missing_trailer(self):
570        compressed_data = gzip.compress(data1)
571        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
572
573    def test_read_truncated(self):
574        data = data1*50
575        # Drop the CRC (4 bytes) and file size (4 bytes).
576        truncated = gzip.compress(data)[:-8]
577        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
578            self.assertRaises(EOFError, f.read)
579        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
580            self.assertEqual(f.read(len(data)), data)
581            self.assertRaises(EOFError, f.read, 1)
582        # Incomplete 10-byte header.
583        for i in range(2, 10):
584            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
585                self.assertRaises(EOFError, f.read, 1)
586
587    def test_read_with_extra(self):
588        # Gzip data with an extra field
589        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
590                  b'\x05\x00Extra'
591                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
592        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
593            self.assertEqual(f.read(), b'Test')
594
595    def test_prepend_error(self):
596        # See issue #20875
597        with gzip.open(self.filename, "wb") as f:
598            f.write(data1)
599        with gzip.open(self.filename, "rb") as f:
600            f._buffer.raw._fp.prepend()
601
602    def test_issue44439(self):
603        q = array.array('Q', [1, 2, 3, 4, 5])
604        LENGTH = len(q) * q.itemsize
605
606        with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
607            self.assertEqual(f.write(q), LENGTH)
608            self.assertEqual(f.tell(), LENGTH)
609
610
611class TestOpen(BaseTest):
612    def test_binary_modes(self):
613        uncompressed = data1 * 50
614
615        with gzip.open(self.filename, "wb") as f:
616            f.write(uncompressed)
617        with open(self.filename, "rb") as f:
618            file_data = gzip.decompress(f.read())
619            self.assertEqual(file_data, uncompressed)
620
621        with gzip.open(self.filename, "rb") as f:
622            self.assertEqual(f.read(), uncompressed)
623
624        with gzip.open(self.filename, "ab") as f:
625            f.write(uncompressed)
626        with open(self.filename, "rb") as f:
627            file_data = gzip.decompress(f.read())
628            self.assertEqual(file_data, uncompressed * 2)
629
630        with self.assertRaises(FileExistsError):
631            gzip.open(self.filename, "xb")
632        os_helper.unlink(self.filename)
633        with gzip.open(self.filename, "xb") as f:
634            f.write(uncompressed)
635        with open(self.filename, "rb") as f:
636            file_data = gzip.decompress(f.read())
637            self.assertEqual(file_data, uncompressed)
638
639    def test_pathlike_file(self):
640        filename = pathlib.Path(self.filename)
641        with gzip.open(filename, "wb") as f:
642            f.write(data1 * 50)
643        with gzip.open(filename, "ab") as f:
644            f.write(data1)
645        with gzip.open(filename) as f:
646            self.assertEqual(f.read(), data1 * 51)
647
648    def test_implicit_binary_modes(self):
649        # Test implicit binary modes (no "b" or "t" in mode string).
650        uncompressed = data1 * 50
651
652        with gzip.open(self.filename, "w") as f:
653            f.write(uncompressed)
654        with open(self.filename, "rb") as f:
655            file_data = gzip.decompress(f.read())
656            self.assertEqual(file_data, uncompressed)
657
658        with gzip.open(self.filename, "r") as f:
659            self.assertEqual(f.read(), uncompressed)
660
661        with gzip.open(self.filename, "a") as f:
662            f.write(uncompressed)
663        with open(self.filename, "rb") as f:
664            file_data = gzip.decompress(f.read())
665            self.assertEqual(file_data, uncompressed * 2)
666
667        with self.assertRaises(FileExistsError):
668            gzip.open(self.filename, "x")
669        os_helper.unlink(self.filename)
670        with gzip.open(self.filename, "x") as f:
671            f.write(uncompressed)
672        with open(self.filename, "rb") as f:
673            file_data = gzip.decompress(f.read())
674            self.assertEqual(file_data, uncompressed)
675
676    def test_text_modes(self):
677        uncompressed = data1.decode("ascii") * 50
678        uncompressed_raw = uncompressed.replace("\n", os.linesep)
679        with gzip.open(self.filename, "wt", encoding="ascii") as f:
680            f.write(uncompressed)
681        with open(self.filename, "rb") as f:
682            file_data = gzip.decompress(f.read()).decode("ascii")
683            self.assertEqual(file_data, uncompressed_raw)
684        with gzip.open(self.filename, "rt", encoding="ascii") as f:
685            self.assertEqual(f.read(), uncompressed)
686        with gzip.open(self.filename, "at", encoding="ascii") as f:
687            f.write(uncompressed)
688        with open(self.filename, "rb") as f:
689            file_data = gzip.decompress(f.read()).decode("ascii")
690            self.assertEqual(file_data, uncompressed_raw * 2)
691
692    def test_fileobj(self):
693        uncompressed_bytes = data1 * 50
694        uncompressed_str = uncompressed_bytes.decode("ascii")
695        compressed = gzip.compress(uncompressed_bytes)
696        with gzip.open(io.BytesIO(compressed), "r") as f:
697            self.assertEqual(f.read(), uncompressed_bytes)
698        with gzip.open(io.BytesIO(compressed), "rb") as f:
699            self.assertEqual(f.read(), uncompressed_bytes)
700        with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
701            self.assertEqual(f.read(), uncompressed_str)
702
703    def test_bad_params(self):
704        # Test invalid parameter combinations.
705        with self.assertRaises(TypeError):
706            gzip.open(123.456)
707        with self.assertRaises(ValueError):
708            gzip.open(self.filename, "wbt")
709        with self.assertRaises(ValueError):
710            gzip.open(self.filename, "xbt")
711        with self.assertRaises(ValueError):
712            gzip.open(self.filename, "rb", encoding="utf-8")
713        with self.assertRaises(ValueError):
714            gzip.open(self.filename, "rb", errors="ignore")
715        with self.assertRaises(ValueError):
716            gzip.open(self.filename, "rb", newline="\n")
717
718    def test_encoding(self):
719        # Test non-default encoding.
720        uncompressed = data1.decode("ascii") * 50
721        uncompressed_raw = uncompressed.replace("\n", os.linesep)
722        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
723            f.write(uncompressed)
724        with open(self.filename, "rb") as f:
725            file_data = gzip.decompress(f.read()).decode("utf-16")
726            self.assertEqual(file_data, uncompressed_raw)
727        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
728            self.assertEqual(f.read(), uncompressed)
729
730    def test_encoding_error_handler(self):
731        # Test with non-default encoding error handler.
732        with gzip.open(self.filename, "wb") as f:
733            f.write(b"foo\xffbar")
734        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
735                as f:
736            self.assertEqual(f.read(), "foobar")
737
738    def test_newline(self):
739        # Test with explicit newline (universal newline mode disabled).
740        uncompressed = data1.decode("ascii") * 50
741        with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
742            f.write(uncompressed)
743        with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
744            self.assertEqual(f.readlines(), [uncompressed])
745
746
747def create_and_remove_directory(directory):
748    def decorator(function):
749        @functools.wraps(function)
750        def wrapper(*args, **kwargs):
751            os.makedirs(directory)
752            try:
753                return function(*args, **kwargs)
754            finally:
755                os_helper.rmtree(directory)
756        return wrapper
757    return decorator
758
759
760class TestCommandLine(unittest.TestCase):
761    data = b'This is a simple test with gzip'
762
763    def test_decompress_stdin_stdout(self):
764        with io.BytesIO() as bytes_io:
765            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
766                gzip_file.write(self.data)
767
768            args = sys.executable, '-m', 'gzip', '-d'
769            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
770                out, err = proc.communicate(bytes_io.getvalue())
771
772        self.assertEqual(err, b'')
773        self.assertEqual(out, self.data)
774
775    @create_and_remove_directory(TEMPDIR)
776    def test_decompress_infile_outfile(self):
777        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
778        self.assertFalse(os.path.exists(gzipname))
779
780        with gzip.open(gzipname, mode='wb') as fp:
781            fp.write(self.data)
782        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
783
784        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
785            self.assertEqual(gunziped.read(), self.data)
786
787        self.assertTrue(os.path.exists(gzipname))
788        self.assertEqual(rc, 0)
789        self.assertEqual(out, b'')
790        self.assertEqual(err, b'')
791
792    def test_decompress_infile_outfile_error(self):
793        rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
794        self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
795        self.assertEqual(rc, 1)
796        self.assertEqual(out, b'')
797
798    @create_and_remove_directory(TEMPDIR)
799    def test_compress_stdin_outfile(self):
800        args = sys.executable, '-m', 'gzip'
801        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
802            out, err = proc.communicate(self.data)
803
804        self.assertEqual(err, b'')
805        self.assertEqual(out[:2], b"\x1f\x8b")
806
807    @create_and_remove_directory(TEMPDIR)
808    def test_compress_infile_outfile_default(self):
809        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
810        gzipname = local_testgzip + '.gz'
811        self.assertFalse(os.path.exists(gzipname))
812
813        with open(local_testgzip, 'wb') as fp:
814            fp.write(self.data)
815
816        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
817
818        self.assertTrue(os.path.exists(gzipname))
819        self.assertEqual(out, b'')
820        self.assertEqual(err, b'')
821
822    @create_and_remove_directory(TEMPDIR)
823    def test_compress_infile_outfile(self):
824        for compress_level in ('--fast', '--best'):
825            with self.subTest(compress_level=compress_level):
826                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
827                gzipname = local_testgzip + '.gz'
828                self.assertFalse(os.path.exists(gzipname))
829
830                with open(local_testgzip, 'wb') as fp:
831                    fp.write(self.data)
832
833                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
834
835                self.assertTrue(os.path.exists(gzipname))
836                self.assertEqual(out, b'')
837                self.assertEqual(err, b'')
838                os.remove(gzipname)
839                self.assertFalse(os.path.exists(gzipname))
840
841    def test_compress_fast_best_are_exclusive(self):
842        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
843        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
844        self.assertEqual(out, b'')
845
846    def test_decompress_cannot_have_flags_compression(self):
847        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
848        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
849        self.assertEqual(out, b'')
850
851
852if __name__ == "__main__":
853    unittest.main()
854