1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import zlib
23except ImportError:
24    zlib = None
25try:
26    import bz2
27except ImportError:
28    bz2 = None
29try:
30    import lzma
31except ImportError:
32    lzma = None
33
34def sha256sum(data):
35    return sha256(data).hexdigest()
36
37TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
38tarextdir = TEMPDIR + '-extract-test'
39tarname = support.findfile("testtar.tar")
40gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
41bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
42xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
43tmpname = os.path.join(TEMPDIR, "tmp.tar")
44dotlessname = os.path.join(TEMPDIR, "testtar")
45
46sha256_regtype = (
47    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
48)
49sha256_sparse = (
50    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
51)
52
53
54class TarTest:
55    tarname = tarname
56    suffix = ''
57    open = io.FileIO
58    taropen = tarfile.TarFile.taropen
59
60    @property
61    def mode(self):
62        return self.prefix + self.suffix
63
64@support.requires_gzip()
65class GzipTest:
66    tarname = gzipname
67    suffix = 'gz'
68    open = gzip.GzipFile if gzip else None
69    taropen = tarfile.TarFile.gzopen
70
71@support.requires_bz2()
72class Bz2Test:
73    tarname = bz2name
74    suffix = 'bz2'
75    open = bz2.BZ2File if bz2 else None
76    taropen = tarfile.TarFile.bz2open
77
78@support.requires_lzma()
79class LzmaTest:
80    tarname = xzname
81    suffix = 'xz'
82    open = lzma.LZMAFile if lzma else None
83    taropen = tarfile.TarFile.xzopen
84
85
86class ReadTest(TarTest):
87
88    prefix = "r:"
89
90    def setUp(self):
91        self.tar = tarfile.open(self.tarname, mode=self.mode,
92                                encoding="iso8859-1")
93
94    def tearDown(self):
95        self.tar.close()
96
97
98class UstarReadTest(ReadTest, unittest.TestCase):
99
100    def test_fileobj_regular_file(self):
101        tarinfo = self.tar.getmember("ustar/regtype")
102        with self.tar.extractfile(tarinfo) as fobj:
103            data = fobj.read()
104            self.assertEqual(len(data), tarinfo.size,
105                    "regular file extraction failed")
106            self.assertEqual(sha256sum(data), sha256_regtype,
107                    "regular file extraction failed")
108
109    def test_fileobj_readlines(self):
110        self.tar.extract("ustar/regtype", TEMPDIR)
111        tarinfo = self.tar.getmember("ustar/regtype")
112        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
113            lines1 = fobj1.readlines()
114
115        with self.tar.extractfile(tarinfo) as fobj:
116            fobj2 = io.TextIOWrapper(fobj)
117            lines2 = fobj2.readlines()
118            self.assertEqual(lines1, lines2,
119                    "fileobj.readlines() failed")
120            self.assertEqual(len(lines2), 114,
121                    "fileobj.readlines() failed")
122            self.assertEqual(lines2[83],
123                    "I will gladly admit that Python is not the fastest "
124                    "running scripting language.\n",
125                    "fileobj.readlines() failed")
126
127    def test_fileobj_iter(self):
128        self.tar.extract("ustar/regtype", TEMPDIR)
129        tarinfo = self.tar.getmember("ustar/regtype")
130        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
131            lines1 = fobj1.readlines()
132        with self.tar.extractfile(tarinfo) as fobj2:
133            lines2 = list(io.TextIOWrapper(fobj2))
134            self.assertEqual(lines1, lines2,
135                    "fileobj.__iter__() failed")
136
137    def test_fileobj_seek(self):
138        self.tar.extract("ustar/regtype", TEMPDIR)
139        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
140            data = fobj.read()
141
142        tarinfo = self.tar.getmember("ustar/regtype")
143        with self.tar.extractfile(tarinfo) as fobj:
144            text = fobj.read()
145            fobj.seek(0)
146            self.assertEqual(0, fobj.tell(),
147                         "seek() to file's start failed")
148            fobj.seek(2048, 0)
149            self.assertEqual(2048, fobj.tell(),
150                         "seek() to absolute position failed")
151            fobj.seek(-1024, 1)
152            self.assertEqual(1024, fobj.tell(),
153                         "seek() to negative relative position failed")
154            fobj.seek(1024, 1)
155            self.assertEqual(2048, fobj.tell(),
156                         "seek() to positive relative position failed")
157            s = fobj.read(10)
158            self.assertEqual(s, data[2048:2058],
159                         "read() after seek failed")
160            fobj.seek(0, 2)
161            self.assertEqual(tarinfo.size, fobj.tell(),
162                         "seek() to file's end failed")
163            self.assertEqual(fobj.read(), b"",
164                         "read() at file's end did not return empty string")
165            fobj.seek(-tarinfo.size, 2)
166            self.assertEqual(0, fobj.tell(),
167                         "relative seek() to file's end failed")
168            fobj.seek(512)
169            s1 = fobj.readlines()
170            fobj.seek(512)
171            s2 = fobj.readlines()
172            self.assertEqual(s1, s2,
173                         "readlines() after seek failed")
174            fobj.seek(0)
175            self.assertEqual(len(fobj.readline()), fobj.tell(),
176                         "tell() after readline() failed")
177            fobj.seek(512)
178            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
179                         "tell() after seek() and readline() failed")
180            fobj.seek(0)
181            line = fobj.readline()
182            self.assertEqual(fobj.read(), data[len(line):],
183                         "read() after readline() failed")
184
185    def test_fileobj_text(self):
186        with self.tar.extractfile("ustar/regtype") as fobj:
187            fobj = io.TextIOWrapper(fobj)
188            data = fobj.read().encode("iso8859-1")
189            self.assertEqual(sha256sum(data), sha256_regtype)
190            try:
191                fobj.seek(100)
192            except AttributeError:
193                # Issue #13815: seek() complained about a missing
194                # flush() method.
195                self.fail("seeking failed in text mode")
196
197    # Test if symbolic and hard links are resolved by extractfile().  The
198    # test link members each point to a regular member whose data is
199    # supposed to be exported.
200    def _test_fileobj_link(self, lnktype, regtype):
201        with self.tar.extractfile(lnktype) as a, \
202             self.tar.extractfile(regtype) as b:
203            self.assertEqual(a.name, b.name)
204
205    def test_fileobj_link1(self):
206        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
207
208    def test_fileobj_link2(self):
209        self._test_fileobj_link("./ustar/linktest2/lnktype",
210                                "ustar/linktest1/regtype")
211
212    def test_fileobj_symlink1(self):
213        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
214
215    def test_fileobj_symlink2(self):
216        self._test_fileobj_link("./ustar/linktest2/symtype",
217                                "ustar/linktest1/regtype")
218
219    def test_issue14160(self):
220        self._test_fileobj_link("symtype2", "ustar/regtype")
221
222class GzipUstarReadTest(GzipTest, UstarReadTest):
223    pass
224
225class Bz2UstarReadTest(Bz2Test, UstarReadTest):
226    pass
227
228class LzmaUstarReadTest(LzmaTest, UstarReadTest):
229    pass
230
231
232class ListTest(ReadTest, unittest.TestCase):
233
234    # Override setUp to use default encoding (UTF-8)
235    def setUp(self):
236        self.tar = tarfile.open(self.tarname, mode=self.mode)
237
238    def test_list(self):
239        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
240        with support.swap_attr(sys, 'stdout', tio):
241            self.tar.list(verbose=False)
242        out = tio.detach().getvalue()
243        self.assertIn(b'ustar/conttype', out)
244        self.assertIn(b'ustar/regtype', out)
245        self.assertIn(b'ustar/lnktype', out)
246        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
247        self.assertIn(b'./ustar/linktest2/symtype', out)
248        self.assertIn(b'./ustar/linktest2/lnktype', out)
249        # Make sure it puts trailing slash for directory
250        self.assertIn(b'ustar/dirtype/', out)
251        self.assertIn(b'ustar/dirtype-with-size/', out)
252        # Make sure it is able to print unencodable characters
253        def conv(b):
254            s = b.decode(self.tar.encoding, 'surrogateescape')
255            return s.encode('ascii', 'backslashreplace')
256        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
257        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
258                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
259        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
260                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
261        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
262        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
263        # Make sure it prints files separated by one newline without any
264        # 'ls -l'-like accessories if verbose flag is not being used
265        # ...
266        # ustar/conttype
267        # ustar/regtype
268        # ...
269        self.assertRegex(out, br'ustar/conttype ?\r?\n'
270                              br'ustar/regtype ?\r?\n')
271        # Make sure it does not print the source of link without verbose flag
272        self.assertNotIn(b'link to', out)
273        self.assertNotIn(b'->', out)
274
275    def test_list_verbose(self):
276        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
277        with support.swap_attr(sys, 'stdout', tio):
278            self.tar.list(verbose=True)
279        out = tio.detach().getvalue()
280        # Make sure it prints files separated by one newline with 'ls -l'-like
281        # accessories if verbose flag is being used
282        # ...
283        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
284        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
285        # ...
286        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
287                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
288                               br'ustar/\w+type ?\r?\n') * 2)
289        # Make sure it prints the source of link with verbose flag
290        self.assertIn(b'ustar/symtype -> regtype', out)
291        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
292        self.assertIn(b'./ustar/linktest2/lnktype link to '
293                      b'./ustar/linktest1/regtype', out)
294        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
295                      (b'/123' * 125) + b'/longname', out)
296        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
297                      (b'/123' * 125) + b'/longname', out)
298
299    def test_list_members(self):
300        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
301        def members(tar):
302            for tarinfo in tar.getmembers():
303                if 'reg' in tarinfo.name:
304                    yield tarinfo
305        with support.swap_attr(sys, 'stdout', tio):
306            self.tar.list(verbose=False, members=members(self.tar))
307        out = tio.detach().getvalue()
308        self.assertIn(b'ustar/regtype', out)
309        self.assertNotIn(b'ustar/conttype', out)
310
311
312class GzipListTest(GzipTest, ListTest):
313    pass
314
315
316class Bz2ListTest(Bz2Test, ListTest):
317    pass
318
319
320class LzmaListTest(LzmaTest, ListTest):
321    pass
322
323
324class CommonReadTest(ReadTest):
325
326    def test_is_tarfile_erroneous(self):
327        with open(tmpname, "wb"):
328            pass
329
330        # is_tarfile works on filenames
331        self.assertFalse(tarfile.is_tarfile(tmpname))
332
333        # is_tarfile works on path-like objects
334        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
335
336        # is_tarfile works on file objects
337        with open(tmpname, "rb") as fobj:
338            self.assertFalse(tarfile.is_tarfile(fobj))
339
340        # is_tarfile works on file-like objects
341        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
342
343    def test_is_tarfile_valid(self):
344        # is_tarfile works on filenames
345        self.assertTrue(tarfile.is_tarfile(self.tarname))
346
347        # is_tarfile works on path-like objects
348        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
349
350        # is_tarfile works on file objects
351        with open(self.tarname, "rb") as fobj:
352            self.assertTrue(tarfile.is_tarfile(fobj))
353
354        # is_tarfile works on file-like objects
355        with open(self.tarname, "rb") as fobj:
356            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
357
358    def test_empty_tarfile(self):
359        # Test for issue6123: Allow opening empty archives.
360        # This test checks if tarfile.open() is able to open an empty tar
361        # archive successfully. Note that an empty tar archive is not the
362        # same as an empty file!
363        with tarfile.open(tmpname, self.mode.replace("r", "w")):
364            pass
365        try:
366            tar = tarfile.open(tmpname, self.mode)
367            tar.getnames()
368        except tarfile.ReadError:
369            self.fail("tarfile.open() failed on empty archive")
370        else:
371            self.assertListEqual(tar.getmembers(), [])
372        finally:
373            tar.close()
374
375    def test_non_existent_tarfile(self):
376        # Test for issue11513: prevent non-existent gzipped tarfiles raising
377        # multiple exceptions.
378        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
379            tarfile.open("xxx", self.mode)
380
381    def test_null_tarfile(self):
382        # Test for issue6123: Allow opening empty archives.
383        # This test guarantees that tarfile.open() does not treat an empty
384        # file as an empty tar archive.
385        with open(tmpname, "wb"):
386            pass
387        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
388        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
389
390    def test_ignore_zeros(self):
391        # Test TarFile's ignore_zeros option.
392        # generate 512 pseudorandom bytes
393        data = Random(0).randbytes(512)
394        for char in (b'\0', b'a'):
395            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
396            # are ignored correctly.
397            with self.open(tmpname, "w") as fobj:
398                fobj.write(char * 1024)
399                tarinfo = tarfile.TarInfo("foo")
400                tarinfo.size = len(data)
401                fobj.write(tarinfo.tobuf())
402                fobj.write(data)
403
404            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
405            try:
406                self.assertListEqual(tar.getnames(), ["foo"],
407                    "ignore_zeros=True should have skipped the %r-blocks" %
408                    char)
409            finally:
410                tar.close()
411
412    def test_premature_end_of_archive(self):
413        for size in (512, 600, 1024, 1200):
414            with tarfile.open(tmpname, "w:") as tar:
415                t = tarfile.TarInfo("foo")
416                t.size = 1024
417                tar.addfile(t, io.BytesIO(b"a" * 1024))
418
419            with open(tmpname, "r+b") as fobj:
420                fobj.truncate(size)
421
422            with tarfile.open(tmpname) as tar:
423                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
424                    for t in tar:
425                        pass
426
427            with tarfile.open(tmpname) as tar:
428                t = tar.next()
429
430                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
431                    tar.extract(t, TEMPDIR)
432
433                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
434                    tar.extractfile(t).read()
435
436    def test_length_zero_header(self):
437        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
438        # with an exception
439        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
440            with tarfile.open(support.findfile('recursion.tar')) as tar:
441                pass
442
443class MiscReadTestBase(CommonReadTest):
444    def requires_name_attribute(self):
445        pass
446
447    def test_no_name_argument(self):
448        self.requires_name_attribute()
449        with open(self.tarname, "rb") as fobj:
450            self.assertIsInstance(fobj.name, str)
451            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
452                self.assertIsInstance(tar.name, str)
453                self.assertEqual(tar.name, os.path.abspath(fobj.name))
454
455    def test_no_name_attribute(self):
456        with open(self.tarname, "rb") as fobj:
457            data = fobj.read()
458        fobj = io.BytesIO(data)
459        self.assertRaises(AttributeError, getattr, fobj, "name")
460        tar = tarfile.open(fileobj=fobj, mode=self.mode)
461        self.assertIsNone(tar.name)
462
463    def test_empty_name_attribute(self):
464        with open(self.tarname, "rb") as fobj:
465            data = fobj.read()
466        fobj = io.BytesIO(data)
467        fobj.name = ""
468        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
469            self.assertIsNone(tar.name)
470
471    def test_int_name_attribute(self):
472        # Issue 21044: tarfile.open() should handle fileobj with an integer
473        # 'name' attribute.
474        fd = os.open(self.tarname, os.O_RDONLY)
475        with open(fd, 'rb') as fobj:
476            self.assertIsInstance(fobj.name, int)
477            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
478                self.assertIsNone(tar.name)
479
480    def test_bytes_name_attribute(self):
481        self.requires_name_attribute()
482        tarname = os.fsencode(self.tarname)
483        with open(tarname, 'rb') as fobj:
484            self.assertIsInstance(fobj.name, bytes)
485            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
486                self.assertIsInstance(tar.name, bytes)
487                self.assertEqual(tar.name, os.path.abspath(fobj.name))
488
489    def test_pathlike_name(self):
490        tarname = pathlib.Path(self.tarname)
491        with tarfile.open(tarname, mode=self.mode) as tar:
492            self.assertIsInstance(tar.name, str)
493            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
494        with self.taropen(tarname) as tar:
495            self.assertIsInstance(tar.name, str)
496            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
497        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
498            self.assertIsInstance(tar.name, str)
499            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
500        if self.suffix == '':
501            with tarfile.TarFile(tarname, mode='r') as tar:
502                self.assertIsInstance(tar.name, str)
503                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
504
505    def test_illegal_mode_arg(self):
506        with open(tmpname, 'wb'):
507            pass
508        with self.assertRaisesRegex(ValueError, 'mode must be '):
509            tar = self.taropen(tmpname, 'q')
510        with self.assertRaisesRegex(ValueError, 'mode must be '):
511            tar = self.taropen(tmpname, 'rw')
512        with self.assertRaisesRegex(ValueError, 'mode must be '):
513            tar = self.taropen(tmpname, '')
514
515    def test_fileobj_with_offset(self):
516        # Skip the first member and store values from the second member
517        # of the testtar.
518        tar = tarfile.open(self.tarname, mode=self.mode)
519        try:
520            tar.next()
521            t = tar.next()
522            name = t.name
523            offset = t.offset
524            with tar.extractfile(t) as f:
525                data = f.read()
526        finally:
527            tar.close()
528
529        # Open the testtar and seek to the offset of the second member.
530        with self.open(self.tarname) as fobj:
531            fobj.seek(offset)
532
533            # Test if the tarfile starts with the second member.
534            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
535                t = tar.next()
536                self.assertEqual(t.name, name)
537                # Read to the end of fileobj and test if seeking back to the
538                # beginning works.
539                tar.getmembers()
540                self.assertEqual(tar.extractfile(t).read(), data,
541                        "seek back did not work")
542
543    def test_fail_comp(self):
544        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
545        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
546        with open(tarname, "rb") as fobj:
547            self.assertRaises(tarfile.ReadError, tarfile.open,
548                              fileobj=fobj, mode=self.mode)
549
550    def test_v7_dirtype(self):
551        # Test old style dirtype member (bug #1336623):
552        # Old V7 tars create directory members using an AREGTYPE
553        # header with a "/" appended to the filename field.
554        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
555        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
556                "v7 dirtype failed")
557
558    def test_xstar_type(self):
559        # The xstar format stores extra atime and ctime fields inside the
560        # space reserved for the prefix field. The prefix field must be
561        # ignored in this case, otherwise it will mess up the name.
562        try:
563            self.tar.getmember("misc/regtype-xstar")
564        except KeyError:
565            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
566
567    def test_check_members(self):
568        for tarinfo in self.tar:
569            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
570                    "wrong mtime for %s" % tarinfo.name)
571            if not tarinfo.name.startswith("ustar/"):
572                continue
573            self.assertEqual(tarinfo.uname, "tarfile",
574                    "wrong uname for %s" % tarinfo.name)
575
576    def test_find_members(self):
577        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
578                "could not find all members")
579
580    @unittest.skipUnless(hasattr(os, "link"),
581                         "Missing hardlink implementation")
582    @support.skip_unless_symlink
583    def test_extract_hardlink(self):
584        # Test hardlink extraction (e.g. bug #857297).
585        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
586            tar.extract("ustar/regtype", TEMPDIR)
587            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
588
589            tar.extract("ustar/lnktype", TEMPDIR)
590            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
591            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
592                data = f.read()
593            self.assertEqual(sha256sum(data), sha256_regtype)
594
595            tar.extract("ustar/symtype", TEMPDIR)
596            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
597            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
598                data = f.read()
599            self.assertEqual(sha256sum(data), sha256_regtype)
600
601    def test_extractall(self):
602        # Test if extractall() correctly restores directory permissions
603        # and times (see issue1735).
604        tar = tarfile.open(tarname, encoding="iso8859-1")
605        DIR = os.path.join(TEMPDIR, "extractall")
606        os.mkdir(DIR)
607        try:
608            directories = [t for t in tar if t.isdir()]
609            tar.extractall(DIR, directories)
610            for tarinfo in directories:
611                path = os.path.join(DIR, tarinfo.name)
612                if sys.platform != "win32":
613                    # Win32 has no support for fine grained permissions.
614                    self.assertEqual(tarinfo.mode & 0o777,
615                                     os.stat(path).st_mode & 0o777)
616                def format_mtime(mtime):
617                    if isinstance(mtime, float):
618                        return "{} ({})".format(mtime, mtime.hex())
619                    else:
620                        return "{!r} (int)".format(mtime)
621                file_mtime = os.path.getmtime(path)
622                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
623                    format_mtime(tarinfo.mtime),
624                    format_mtime(file_mtime),
625                    path)
626                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
627        finally:
628            tar.close()
629            support.rmtree(DIR)
630
631    def test_extract_directory(self):
632        dirtype = "ustar/dirtype"
633        DIR = os.path.join(TEMPDIR, "extractdir")
634        os.mkdir(DIR)
635        try:
636            with tarfile.open(tarname, encoding="iso8859-1") as tar:
637                tarinfo = tar.getmember(dirtype)
638                tar.extract(tarinfo, path=DIR)
639                extracted = os.path.join(DIR, dirtype)
640                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
641                if sys.platform != "win32":
642                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
643        finally:
644            support.rmtree(DIR)
645
646    def test_extractall_pathlike_name(self):
647        DIR = pathlib.Path(TEMPDIR) / "extractall"
648        with support.temp_dir(DIR), \
649             tarfile.open(tarname, encoding="iso8859-1") as tar:
650            directories = [t for t in tar if t.isdir()]
651            tar.extractall(DIR, directories)
652            for tarinfo in directories:
653                path = DIR / tarinfo.name
654                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
655
656    def test_extract_pathlike_name(self):
657        dirtype = "ustar/dirtype"
658        DIR = pathlib.Path(TEMPDIR) / "extractall"
659        with support.temp_dir(DIR), \
660             tarfile.open(tarname, encoding="iso8859-1") as tar:
661            tarinfo = tar.getmember(dirtype)
662            tar.extract(tarinfo, path=DIR)
663            extracted = DIR / dirtype
664            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
665
666    def test_init_close_fobj(self):
667        # Issue #7341: Close the internal file object in the TarFile
668        # constructor in case of an error. For the test we rely on
669        # the fact that opening an empty file raises a ReadError.
670        empty = os.path.join(TEMPDIR, "empty")
671        with open(empty, "wb") as fobj:
672            fobj.write(b"")
673
674        try:
675            tar = object.__new__(tarfile.TarFile)
676            try:
677                tar.__init__(empty)
678            except tarfile.ReadError:
679                self.assertTrue(tar.fileobj.closed)
680            else:
681                self.fail("ReadError not raised")
682        finally:
683            support.unlink(empty)
684
685    def test_parallel_iteration(self):
686        # Issue #16601: Restarting iteration over tarfile continued
687        # from where it left off.
688        with tarfile.open(self.tarname) as tar:
689            for m1, m2 in zip(tar, tar):
690                self.assertEqual(m1.offset, m2.offset)
691                self.assertEqual(m1.get_info(), m2.get_info())
692
693    @unittest.skipIf(zlib is None, "requires zlib")
694    def test_zlib_error_does_not_leak(self):
695        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
696        # parsing certain types of invalid data
697        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
698            mock.side_effect = zlib.error
699            with self.assertRaises(tarfile.ReadError):
700                tarfile.open(self.tarname)
701
702
703class MiscReadTest(MiscReadTestBase, unittest.TestCase):
704    test_fail_comp = None
705
706class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
707    pass
708
709class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
710    def requires_name_attribute(self):
711        self.skipTest("BZ2File have no name attribute")
712
713class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
714    def requires_name_attribute(self):
715        self.skipTest("LZMAFile have no name attribute")
716
717
718class StreamReadTest(CommonReadTest, unittest.TestCase):
719
720    prefix="r|"
721
722    def test_read_through(self):
723        # Issue #11224: A poorly designed _FileInFile.read() method
724        # caused seeking errors with stream tar files.
725        for tarinfo in self.tar:
726            if not tarinfo.isreg():
727                continue
728            with self.tar.extractfile(tarinfo) as fobj:
729                while True:
730                    try:
731                        buf = fobj.read(512)
732                    except tarfile.StreamError:
733                        self.fail("simple read-through using "
734                                  "TarFile.extractfile() failed")
735                    if not buf:
736                        break
737
738    def test_fileobj_regular_file(self):
739        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
740        with self.tar.extractfile(tarinfo) as fobj:
741            data = fobj.read()
742        self.assertEqual(len(data), tarinfo.size,
743                "regular file extraction failed")
744        self.assertEqual(sha256sum(data), sha256_regtype,
745                "regular file extraction failed")
746
747    def test_provoke_stream_error(self):
748        tarinfos = self.tar.getmembers()
749        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
750            self.assertRaises(tarfile.StreamError, f.read)
751
752    def test_compare_members(self):
753        tar1 = tarfile.open(tarname, encoding="iso8859-1")
754        try:
755            tar2 = self.tar
756
757            while True:
758                t1 = tar1.next()
759                t2 = tar2.next()
760                if t1 is None:
761                    break
762                self.assertIsNotNone(t2, "stream.next() failed.")
763
764                if t2.islnk() or t2.issym():
765                    with self.assertRaises(tarfile.StreamError):
766                        tar2.extractfile(t2)
767                    continue
768
769                v1 = tar1.extractfile(t1)
770                v2 = tar2.extractfile(t2)
771                if v1 is None:
772                    continue
773                self.assertIsNotNone(v2, "stream.extractfile() failed")
774                self.assertEqual(v1.read(), v2.read(),
775                        "stream extraction failed")
776        finally:
777            tar1.close()
778
779class GzipStreamReadTest(GzipTest, StreamReadTest):
780    pass
781
782class Bz2StreamReadTest(Bz2Test, StreamReadTest):
783    pass
784
785class LzmaStreamReadTest(LzmaTest, StreamReadTest):
786    pass
787
788
789class DetectReadTest(TarTest, unittest.TestCase):
790    def _testfunc_file(self, name, mode):
791        try:
792            tar = tarfile.open(name, mode)
793        except tarfile.ReadError as e:
794            self.fail()
795        else:
796            tar.close()
797
798    def _testfunc_fileobj(self, name, mode):
799        try:
800            with open(name, "rb") as f:
801                tar = tarfile.open(name, mode, fileobj=f)
802        except tarfile.ReadError as e:
803            self.fail()
804        else:
805            tar.close()
806
807    def _test_modes(self, testfunc):
808        if self.suffix:
809            with self.assertRaises(tarfile.ReadError):
810                tarfile.open(tarname, mode="r:" + self.suffix)
811            with self.assertRaises(tarfile.ReadError):
812                tarfile.open(tarname, mode="r|" + self.suffix)
813            with self.assertRaises(tarfile.ReadError):
814                tarfile.open(self.tarname, mode="r:")
815            with self.assertRaises(tarfile.ReadError):
816                tarfile.open(self.tarname, mode="r|")
817        testfunc(self.tarname, "r")
818        testfunc(self.tarname, "r:" + self.suffix)
819        testfunc(self.tarname, "r:*")
820        testfunc(self.tarname, "r|" + self.suffix)
821        testfunc(self.tarname, "r|*")
822
823    def test_detect_file(self):
824        self._test_modes(self._testfunc_file)
825
826    def test_detect_fileobj(self):
827        self._test_modes(self._testfunc_fileobj)
828
829class GzipDetectReadTest(GzipTest, DetectReadTest):
830    pass
831
832class Bz2DetectReadTest(Bz2Test, DetectReadTest):
833    def test_detect_stream_bz2(self):
834        # Originally, tarfile's stream detection looked for the string
835        # "BZh91" at the start of the file. This is incorrect because
836        # the '9' represents the blocksize (900,000 bytes). If the file was
837        # compressed using another blocksize autodetection fails.
838        with open(tarname, "rb") as fobj:
839            data = fobj.read()
840
841        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
842        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
843            fobj.write(data)
844
845        self._testfunc_file(tmpname, "r|*")
846
847class LzmaDetectReadTest(LzmaTest, DetectReadTest):
848    pass
849
850
851class MemberReadTest(ReadTest, unittest.TestCase):
852
853    def _test_member(self, tarinfo, chksum=None, **kwargs):
854        if chksum is not None:
855            with self.tar.extractfile(tarinfo) as f:
856                self.assertEqual(sha256sum(f.read()), chksum,
857                        "wrong sha256sum for %s" % tarinfo.name)
858
859        kwargs["mtime"] = 0o7606136617
860        kwargs["uid"] = 1000
861        kwargs["gid"] = 100
862        if "old-v7" not in tarinfo.name:
863            # V7 tar can't handle alphabetic owners.
864            kwargs["uname"] = "tarfile"
865            kwargs["gname"] = "tarfile"
866        for k, v in kwargs.items():
867            self.assertEqual(getattr(tarinfo, k), v,
868                    "wrong value in %s field of %s" % (k, tarinfo.name))
869
870    def test_find_regtype(self):
871        tarinfo = self.tar.getmember("ustar/regtype")
872        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
873
874    def test_find_conttype(self):
875        tarinfo = self.tar.getmember("ustar/conttype")
876        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
877
878    def test_find_dirtype(self):
879        tarinfo = self.tar.getmember("ustar/dirtype")
880        self._test_member(tarinfo, size=0)
881
882    def test_find_dirtype_with_size(self):
883        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
884        self._test_member(tarinfo, size=255)
885
886    def test_find_lnktype(self):
887        tarinfo = self.tar.getmember("ustar/lnktype")
888        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
889
890    def test_find_symtype(self):
891        tarinfo = self.tar.getmember("ustar/symtype")
892        self._test_member(tarinfo, size=0, linkname="regtype")
893
894    def test_find_blktype(self):
895        tarinfo = self.tar.getmember("ustar/blktype")
896        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
897
898    def test_find_chrtype(self):
899        tarinfo = self.tar.getmember("ustar/chrtype")
900        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
901
902    def test_find_fifotype(self):
903        tarinfo = self.tar.getmember("ustar/fifotype")
904        self._test_member(tarinfo, size=0)
905
906    def test_find_sparse(self):
907        tarinfo = self.tar.getmember("ustar/sparse")
908        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
909
910    def test_find_gnusparse(self):
911        tarinfo = self.tar.getmember("gnu/sparse")
912        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
913
914    def test_find_gnusparse_00(self):
915        tarinfo = self.tar.getmember("gnu/sparse-0.0")
916        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
917
918    def test_find_gnusparse_01(self):
919        tarinfo = self.tar.getmember("gnu/sparse-0.1")
920        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
921
922    def test_find_gnusparse_10(self):
923        tarinfo = self.tar.getmember("gnu/sparse-1.0")
924        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
925
926    def test_find_umlauts(self):
927        tarinfo = self.tar.getmember("ustar/umlauts-"
928                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
929        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
930
931    def test_find_ustar_longname(self):
932        name = "ustar/" + "12345/" * 39 + "1234567/longname"
933        self.assertIn(name, self.tar.getnames())
934
935    def test_find_regtype_oldv7(self):
936        tarinfo = self.tar.getmember("misc/regtype-old-v7")
937        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
938
939    def test_find_pax_umlauts(self):
940        self.tar.close()
941        self.tar = tarfile.open(self.tarname, mode=self.mode,
942                                encoding="iso8859-1")
943        tarinfo = self.tar.getmember("pax/umlauts-"
944                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
945        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
946
947
948class LongnameTest:
949
950    def test_read_longname(self):
951        # Test reading of longname (bug #1471427).
952        longname = self.subdir + "/" + "123/" * 125 + "longname"
953        try:
954            tarinfo = self.tar.getmember(longname)
955        except KeyError:
956            self.fail("longname not found")
957        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
958                "read longname as dirtype")
959
960    def test_read_longlink(self):
961        longname = self.subdir + "/" + "123/" * 125 + "longname"
962        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
963        try:
964            tarinfo = self.tar.getmember(longlink)
965        except KeyError:
966            self.fail("longlink not found")
967        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
968
969    def test_truncated_longname(self):
970        longname = self.subdir + "/" + "123/" * 125 + "longname"
971        tarinfo = self.tar.getmember(longname)
972        offset = tarinfo.offset
973        self.tar.fileobj.seek(offset)
974        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
975        with self.assertRaises(tarfile.ReadError):
976            tarfile.open(name="foo.tar", fileobj=fobj)
977
978    def test_header_offset(self):
979        # Test if the start offset of the TarInfo object includes
980        # the preceding extended header.
981        longname = self.subdir + "/" + "123/" * 125 + "longname"
982        offset = self.tar.getmember(longname).offset
983        with open(tarname, "rb") as fobj:
984            fobj.seek(offset)
985            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
986                                              "iso8859-1", "strict")
987            self.assertEqual(tarinfo.type, self.longnametype)
988
989
990class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
991
992    subdir = "gnu"
993    longnametype = tarfile.GNUTYPE_LONGNAME
994
995    # Since 3.2 tarfile is supposed to accurately restore sparse members and
996    # produce files with holes. This is what we actually want to test here.
997    # Unfortunately, not all platforms/filesystems support sparse files, and
998    # even on platforms that do it is non-trivial to make reliable assertions
999    # about holes in files. Therefore, we first do one basic test which works
1000    # an all platforms, and after that a test that will work only on
1001    # platforms/filesystems that prove to support sparse files.
1002    def _test_sparse_file(self, name):
1003        self.tar.extract(name, TEMPDIR)
1004        filename = os.path.join(TEMPDIR, name)
1005        with open(filename, "rb") as fobj:
1006            data = fobj.read()
1007        self.assertEqual(sha256sum(data), sha256_sparse,
1008                "wrong sha256sum for %s" % name)
1009
1010        if self._fs_supports_holes():
1011            s = os.stat(filename)
1012            self.assertLess(s.st_blocks * 512, s.st_size)
1013
1014    def test_sparse_file_old(self):
1015        self._test_sparse_file("gnu/sparse")
1016
1017    def test_sparse_file_00(self):
1018        self._test_sparse_file("gnu/sparse-0.0")
1019
1020    def test_sparse_file_01(self):
1021        self._test_sparse_file("gnu/sparse-0.1")
1022
1023    def test_sparse_file_10(self):
1024        self._test_sparse_file("gnu/sparse-1.0")
1025
1026    @staticmethod
1027    def _fs_supports_holes():
1028        # Return True if the platform knows the st_blocks stat attribute and
1029        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1030        # store holes of 4 KiB in files.
1031        #
1032        # The function returns False if page size is larger than 4 KiB.
1033        # For example, ppc64 uses pages of 64 KiB.
1034        if sys.platform.startswith("linux"):
1035            # Linux evidentially has 512 byte st_blocks units.
1036            name = os.path.join(TEMPDIR, "sparse-test")
1037            with open(name, "wb") as fobj:
1038                # Seek to "punch a hole" of 4 KiB
1039                fobj.seek(4096)
1040                fobj.write(b'x' * 4096)
1041                fobj.truncate()
1042            s = os.stat(name)
1043            support.unlink(name)
1044            return (s.st_blocks * 512 < s.st_size)
1045        else:
1046            return False
1047
1048
1049class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1050
1051    subdir = "pax"
1052    longnametype = tarfile.XHDTYPE
1053
1054    def test_pax_global_headers(self):
1055        tar = tarfile.open(tarname, encoding="iso8859-1")
1056        try:
1057            tarinfo = tar.getmember("pax/regtype1")
1058            self.assertEqual(tarinfo.uname, "foo")
1059            self.assertEqual(tarinfo.gname, "bar")
1060            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1061                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1062
1063            tarinfo = tar.getmember("pax/regtype2")
1064            self.assertEqual(tarinfo.uname, "")
1065            self.assertEqual(tarinfo.gname, "bar")
1066            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1067                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1068
1069            tarinfo = tar.getmember("pax/regtype3")
1070            self.assertEqual(tarinfo.uname, "tarfile")
1071            self.assertEqual(tarinfo.gname, "tarfile")
1072            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1073                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1074        finally:
1075            tar.close()
1076
1077    def test_pax_number_fields(self):
1078        # All following number fields are read from the pax header.
1079        tar = tarfile.open(tarname, encoding="iso8859-1")
1080        try:
1081            tarinfo = tar.getmember("pax/regtype4")
1082            self.assertEqual(tarinfo.size, 7011)
1083            self.assertEqual(tarinfo.uid, 123)
1084            self.assertEqual(tarinfo.gid, 123)
1085            self.assertEqual(tarinfo.mtime, 1041808783.0)
1086            self.assertEqual(type(tarinfo.mtime), float)
1087            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1088            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1089        finally:
1090            tar.close()
1091
1092
1093class WriteTestBase(TarTest):
1094    # Put all write tests in here that are supposed to be tested
1095    # in all possible mode combinations.
1096
1097    def test_fileobj_no_close(self):
1098        fobj = io.BytesIO()
1099        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1100            tar.addfile(tarfile.TarInfo("foo"))
1101        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1102        # Issue #20238: Incomplete gzip output with mode="w:gz"
1103        data = fobj.getvalue()
1104        del tar
1105        support.gc_collect()
1106        self.assertFalse(fobj.closed)
1107        self.assertEqual(data, fobj.getvalue())
1108
1109    def test_eof_marker(self):
1110        # Make sure an end of archive marker is written (two zero blocks).
1111        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1112        # So, we create an archive that has exactly 10240 bytes without the
1113        # marker, and has 20480 bytes once the marker is written.
1114        with tarfile.open(tmpname, self.mode) as tar:
1115            t = tarfile.TarInfo("foo")
1116            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1117            tar.addfile(t, io.BytesIO(b"a" * t.size))
1118
1119        with self.open(tmpname, "rb") as fobj:
1120            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1121
1122
1123class WriteTest(WriteTestBase, unittest.TestCase):
1124
1125    prefix = "w:"
1126
1127    def test_100_char_name(self):
1128        # The name field in a tar header stores strings of at most 100 chars.
1129        # If a string is shorter than 100 chars it has to be padded with '\0',
1130        # which implies that a string of exactly 100 chars is stored without
1131        # a trailing '\0'.
1132        name = "0123456789" * 10
1133        tar = tarfile.open(tmpname, self.mode)
1134        try:
1135            t = tarfile.TarInfo(name)
1136            tar.addfile(t)
1137        finally:
1138            tar.close()
1139
1140        tar = tarfile.open(tmpname)
1141        try:
1142            self.assertEqual(tar.getnames()[0], name,
1143                    "failed to store 100 char filename")
1144        finally:
1145            tar.close()
1146
1147    def test_tar_size(self):
1148        # Test for bug #1013882.
1149        tar = tarfile.open(tmpname, self.mode)
1150        try:
1151            path = os.path.join(TEMPDIR, "file")
1152            with open(path, "wb") as fobj:
1153                fobj.write(b"aaa")
1154            tar.add(path)
1155        finally:
1156            tar.close()
1157        self.assertGreater(os.path.getsize(tmpname), 0,
1158                "tarfile is empty")
1159
1160    # The test_*_size tests test for bug #1167128.
1161    def test_file_size(self):
1162        tar = tarfile.open(tmpname, self.mode)
1163        try:
1164            path = os.path.join(TEMPDIR, "file")
1165            with open(path, "wb"):
1166                pass
1167            tarinfo = tar.gettarinfo(path)
1168            self.assertEqual(tarinfo.size, 0)
1169
1170            with open(path, "wb") as fobj:
1171                fobj.write(b"aaa")
1172            tarinfo = tar.gettarinfo(path)
1173            self.assertEqual(tarinfo.size, 3)
1174        finally:
1175            tar.close()
1176
1177    def test_directory_size(self):
1178        path = os.path.join(TEMPDIR, "directory")
1179        os.mkdir(path)
1180        try:
1181            tar = tarfile.open(tmpname, self.mode)
1182            try:
1183                tarinfo = tar.gettarinfo(path)
1184                self.assertEqual(tarinfo.size, 0)
1185            finally:
1186                tar.close()
1187        finally:
1188            support.rmdir(path)
1189
1190    # mock the following:
1191    #  os.listdir: so we know that files are in the wrong order
1192    def test_ordered_recursion(self):
1193        path = os.path.join(TEMPDIR, "directory")
1194        os.mkdir(path)
1195        open(os.path.join(path, "1"), "a").close()
1196        open(os.path.join(path, "2"), "a").close()
1197        try:
1198            tar = tarfile.open(tmpname, self.mode)
1199            try:
1200                with unittest.mock.patch('os.listdir') as mock_listdir:
1201                    mock_listdir.return_value = ["2", "1"]
1202                    tar.add(path)
1203                paths = []
1204                for m in tar.getmembers():
1205                    paths.append(os.path.split(m.name)[-1])
1206                self.assertEqual(paths, ["directory", "1", "2"]);
1207            finally:
1208                tar.close()
1209        finally:
1210            support.unlink(os.path.join(path, "1"))
1211            support.unlink(os.path.join(path, "2"))
1212            support.rmdir(path)
1213
1214    def test_gettarinfo_pathlike_name(self):
1215        with tarfile.open(tmpname, self.mode) as tar:
1216            path = pathlib.Path(TEMPDIR) / "file"
1217            with open(path, "wb") as fobj:
1218                fobj.write(b"aaa")
1219            tarinfo = tar.gettarinfo(path)
1220            tarinfo2 = tar.gettarinfo(os.fspath(path))
1221            self.assertIsInstance(tarinfo.name, str)
1222            self.assertEqual(tarinfo.name, tarinfo2.name)
1223            self.assertEqual(tarinfo.size, 3)
1224
1225    @unittest.skipUnless(hasattr(os, "link"),
1226                         "Missing hardlink implementation")
1227    def test_link_size(self):
1228        link = os.path.join(TEMPDIR, "link")
1229        target = os.path.join(TEMPDIR, "link_target")
1230        with open(target, "wb") as fobj:
1231            fobj.write(b"aaa")
1232        try:
1233            os.link(target, link)
1234        except PermissionError as e:
1235            self.skipTest('os.link(): %s' % e)
1236        try:
1237            tar = tarfile.open(tmpname, self.mode)
1238            try:
1239                # Record the link target in the inodes list.
1240                tar.gettarinfo(target)
1241                tarinfo = tar.gettarinfo(link)
1242                self.assertEqual(tarinfo.size, 0)
1243            finally:
1244                tar.close()
1245        finally:
1246            support.unlink(target)
1247            support.unlink(link)
1248
1249    @support.skip_unless_symlink
1250    def test_symlink_size(self):
1251        path = os.path.join(TEMPDIR, "symlink")
1252        os.symlink("link_target", path)
1253        try:
1254            tar = tarfile.open(tmpname, self.mode)
1255            try:
1256                tarinfo = tar.gettarinfo(path)
1257                self.assertEqual(tarinfo.size, 0)
1258            finally:
1259                tar.close()
1260        finally:
1261            support.unlink(path)
1262
1263    def test_add_self(self):
1264        # Test for #1257255.
1265        dstname = os.path.abspath(tmpname)
1266        tar = tarfile.open(tmpname, self.mode)
1267        try:
1268            self.assertEqual(tar.name, dstname,
1269                    "archive name must be absolute")
1270            tar.add(dstname)
1271            self.assertEqual(tar.getnames(), [],
1272                    "added the archive to itself")
1273
1274            with support.change_cwd(TEMPDIR):
1275                tar.add(dstname)
1276            self.assertEqual(tar.getnames(), [],
1277                    "added the archive to itself")
1278        finally:
1279            tar.close()
1280
1281    def test_filter(self):
1282        tempdir = os.path.join(TEMPDIR, "filter")
1283        os.mkdir(tempdir)
1284        try:
1285            for name in ("foo", "bar", "baz"):
1286                name = os.path.join(tempdir, name)
1287                support.create_empty_file(name)
1288
1289            def filter(tarinfo):
1290                if os.path.basename(tarinfo.name) == "bar":
1291                    return
1292                tarinfo.uid = 123
1293                tarinfo.uname = "foo"
1294                return tarinfo
1295
1296            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1297            try:
1298                tar.add(tempdir, arcname="empty_dir", filter=filter)
1299            finally:
1300                tar.close()
1301
1302            # Verify that filter is a keyword-only argument
1303            with self.assertRaises(TypeError):
1304                tar.add(tempdir, "empty_dir", True, None, filter)
1305
1306            tar = tarfile.open(tmpname, "r")
1307            try:
1308                for tarinfo in tar:
1309                    self.assertEqual(tarinfo.uid, 123)
1310                    self.assertEqual(tarinfo.uname, "foo")
1311                self.assertEqual(len(tar.getmembers()), 3)
1312            finally:
1313                tar.close()
1314        finally:
1315            support.rmtree(tempdir)
1316
1317    # Guarantee that stored pathnames are not modified. Don't
1318    # remove ./ or ../ or double slashes. Still make absolute
1319    # pathnames relative.
1320    # For details see bug #6054.
1321    def _test_pathname(self, path, cmp_path=None, dir=False):
1322        # Create a tarfile with an empty member named path
1323        # and compare the stored name with the original.
1324        foo = os.path.join(TEMPDIR, "foo")
1325        if not dir:
1326            support.create_empty_file(foo)
1327        else:
1328            os.mkdir(foo)
1329
1330        tar = tarfile.open(tmpname, self.mode)
1331        try:
1332            tar.add(foo, arcname=path)
1333        finally:
1334            tar.close()
1335
1336        tar = tarfile.open(tmpname, "r")
1337        try:
1338            t = tar.next()
1339        finally:
1340            tar.close()
1341
1342        if not dir:
1343            support.unlink(foo)
1344        else:
1345            support.rmdir(foo)
1346
1347        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1348
1349
1350    @support.skip_unless_symlink
1351    def test_extractall_symlinks(self):
1352        # Test if extractall works properly when tarfile contains symlinks
1353        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1354        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1355        os.mkdir(tempdir)
1356        try:
1357            source_file = os.path.join(tempdir,'source')
1358            target_file = os.path.join(tempdir,'symlink')
1359            with open(source_file,'w') as f:
1360                f.write('something\n')
1361            os.symlink(source_file, target_file)
1362            with tarfile.open(temparchive, 'w') as tar:
1363                tar.add(source_file, arcname="source")
1364                tar.add(target_file, arcname="symlink")
1365            # Let's extract it to the location which contains the symlink
1366            with tarfile.open(temparchive, errorlevel=2) as tar:
1367                # this should not raise OSError: [Errno 17] File exists
1368                try:
1369                    tar.extractall(path=tempdir)
1370                except OSError:
1371                    self.fail("extractall failed with symlinked files")
1372        finally:
1373            support.unlink(temparchive)
1374            support.rmtree(tempdir)
1375
1376    def test_pathnames(self):
1377        self._test_pathname("foo")
1378        self._test_pathname(os.path.join("foo", ".", "bar"))
1379        self._test_pathname(os.path.join("foo", "..", "bar"))
1380        self._test_pathname(os.path.join(".", "foo"))
1381        self._test_pathname(os.path.join(".", "foo", "."))
1382        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1383        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1384        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1385        self._test_pathname(os.path.join("..", "foo"))
1386        self._test_pathname(os.path.join("..", "foo", ".."))
1387        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1388        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1389
1390        self._test_pathname("foo" + os.sep + os.sep + "bar")
1391        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1392
1393    def test_abs_pathnames(self):
1394        if sys.platform == "win32":
1395            self._test_pathname("C:\\foo", "foo")
1396        else:
1397            self._test_pathname("/foo", "foo")
1398            self._test_pathname("///foo", "foo")
1399
1400    def test_cwd(self):
1401        # Test adding the current working directory.
1402        with support.change_cwd(TEMPDIR):
1403            tar = tarfile.open(tmpname, self.mode)
1404            try:
1405                tar.add(".")
1406            finally:
1407                tar.close()
1408
1409            tar = tarfile.open(tmpname, "r")
1410            try:
1411                for t in tar:
1412                    if t.name != ".":
1413                        self.assertTrue(t.name.startswith("./"), t.name)
1414            finally:
1415                tar.close()
1416
1417    def test_open_nonwritable_fileobj(self):
1418        for exctype in OSError, EOFError, RuntimeError:
1419            class BadFile(io.BytesIO):
1420                first = True
1421                def write(self, data):
1422                    if self.first:
1423                        self.first = False
1424                        raise exctype
1425
1426            f = BadFile()
1427            with self.assertRaises(exctype):
1428                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1429                                   format=tarfile.PAX_FORMAT,
1430                                   pax_headers={'non': 'empty'})
1431            self.assertFalse(f.closed)
1432
1433
1434class GzipWriteTest(GzipTest, WriteTest):
1435    pass
1436
1437
1438class Bz2WriteTest(Bz2Test, WriteTest):
1439    pass
1440
1441
1442class LzmaWriteTest(LzmaTest, WriteTest):
1443    pass
1444
1445
1446class StreamWriteTest(WriteTestBase, unittest.TestCase):
1447
1448    prefix = "w|"
1449    decompressor = None
1450
1451    def test_stream_padding(self):
1452        # Test for bug #1543303.
1453        tar = tarfile.open(tmpname, self.mode)
1454        tar.close()
1455        if self.decompressor:
1456            dec = self.decompressor()
1457            with open(tmpname, "rb") as fobj:
1458                data = fobj.read()
1459            data = dec.decompress(data)
1460            self.assertFalse(dec.unused_data, "found trailing data")
1461        else:
1462            with self.open(tmpname) as fobj:
1463                data = fobj.read()
1464        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1465                        "incorrect zero padding")
1466
1467    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1468                         "Missing umask implementation")
1469    def test_file_mode(self):
1470        # Test for issue #8464: Create files with correct
1471        # permissions.
1472        if os.path.exists(tmpname):
1473            support.unlink(tmpname)
1474
1475        original_umask = os.umask(0o022)
1476        try:
1477            tar = tarfile.open(tmpname, self.mode)
1478            tar.close()
1479            mode = os.stat(tmpname).st_mode & 0o777
1480            self.assertEqual(mode, 0o644, "wrong file permissions")
1481        finally:
1482            os.umask(original_umask)
1483
1484
1485class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1486    def test_source_directory_not_leaked(self):
1487        """
1488        Ensure the source directory is not included in the tar header
1489        per bpo-41316.
1490        """
1491        tarfile.open(tmpname, self.mode).close()
1492        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1493        assert os.path.dirname(tmpname) not in payload
1494
1495
1496class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1497    decompressor = bz2.BZ2Decompressor if bz2 else None
1498
1499class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1500    decompressor = lzma.LZMADecompressor if lzma else None
1501
1502
1503class GNUWriteTest(unittest.TestCase):
1504    # This testcase checks for correct creation of GNU Longname
1505    # and Longlink extended headers (cp. bug #812325).
1506
1507    def _length(self, s):
1508        blocks = len(s) // 512 + 1
1509        return blocks * 512
1510
1511    def _calc_size(self, name, link=None):
1512        # Initial tar header
1513        count = 512
1514
1515        if len(name) > tarfile.LENGTH_NAME:
1516            # GNU longname extended header + longname
1517            count += 512
1518            count += self._length(name)
1519        if link is not None and len(link) > tarfile.LENGTH_LINK:
1520            # GNU longlink extended header + longlink
1521            count += 512
1522            count += self._length(link)
1523        return count
1524
1525    def _test(self, name, link=None):
1526        tarinfo = tarfile.TarInfo(name)
1527        if link:
1528            tarinfo.linkname = link
1529            tarinfo.type = tarfile.LNKTYPE
1530
1531        tar = tarfile.open(tmpname, "w")
1532        try:
1533            tar.format = tarfile.GNU_FORMAT
1534            tar.addfile(tarinfo)
1535
1536            v1 = self._calc_size(name, link)
1537            v2 = tar.offset
1538            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1539        finally:
1540            tar.close()
1541
1542        tar = tarfile.open(tmpname)
1543        try:
1544            member = tar.next()
1545            self.assertIsNotNone(member,
1546                    "unable to read longname member")
1547            self.assertEqual(tarinfo.name, member.name,
1548                    "unable to read longname member")
1549            self.assertEqual(tarinfo.linkname, member.linkname,
1550                    "unable to read longname member")
1551        finally:
1552            tar.close()
1553
1554    def test_longname_1023(self):
1555        self._test(("longnam/" * 127) + "longnam")
1556
1557    def test_longname_1024(self):
1558        self._test(("longnam/" * 127) + "longname")
1559
1560    def test_longname_1025(self):
1561        self._test(("longnam/" * 127) + "longname_")
1562
1563    def test_longlink_1023(self):
1564        self._test("name", ("longlnk/" * 127) + "longlnk")
1565
1566    def test_longlink_1024(self):
1567        self._test("name", ("longlnk/" * 127) + "longlink")
1568
1569    def test_longlink_1025(self):
1570        self._test("name", ("longlnk/" * 127) + "longlink_")
1571
1572    def test_longnamelink_1023(self):
1573        self._test(("longnam/" * 127) + "longnam",
1574                   ("longlnk/" * 127) + "longlnk")
1575
1576    def test_longnamelink_1024(self):
1577        self._test(("longnam/" * 127) + "longname",
1578                   ("longlnk/" * 127) + "longlink")
1579
1580    def test_longnamelink_1025(self):
1581        self._test(("longnam/" * 127) + "longname_",
1582                   ("longlnk/" * 127) + "longlink_")
1583
1584
1585class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1586
1587    prefix = "w:"
1588
1589    def test_headers_written_only_for_device_files(self):
1590        # Regression test for bpo-18819.
1591        tempdir = os.path.join(TEMPDIR, "device_header_test")
1592        os.mkdir(tempdir)
1593        try:
1594            tar = tarfile.open(tmpname, self.mode)
1595            try:
1596                input_blk = tarfile.TarInfo(name="my_block_device")
1597                input_reg = tarfile.TarInfo(name="my_regular_file")
1598                input_blk.type = tarfile.BLKTYPE
1599                input_reg.type = tarfile.REGTYPE
1600                tar.addfile(input_blk)
1601                tar.addfile(input_reg)
1602            finally:
1603                tar.close()
1604
1605            # devmajor and devminor should be *interpreted* as 0 in both...
1606            tar = tarfile.open(tmpname, "r")
1607            try:
1608                output_blk = tar.getmember("my_block_device")
1609                output_reg = tar.getmember("my_regular_file")
1610            finally:
1611                tar.close()
1612            self.assertEqual(output_blk.devmajor, 0)
1613            self.assertEqual(output_blk.devminor, 0)
1614            self.assertEqual(output_reg.devmajor, 0)
1615            self.assertEqual(output_reg.devminor, 0)
1616
1617            # ...but the fields should not actually be set on regular files:
1618            with open(tmpname, "rb") as infile:
1619                buf = infile.read()
1620            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1621            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1622            # See `struct posixheader` in GNU docs for byte offsets:
1623            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1624            device_headers = slice(329, 329 + 16)
1625            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1626            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1627        finally:
1628            support.rmtree(tempdir)
1629
1630
1631class CreateTest(WriteTestBase, unittest.TestCase):
1632
1633    prefix = "x:"
1634
1635    file_path = os.path.join(TEMPDIR, "spameggs42")
1636
1637    def setUp(self):
1638        support.unlink(tmpname)
1639
1640    @classmethod
1641    def setUpClass(cls):
1642        with open(cls.file_path, "wb") as fobj:
1643            fobj.write(b"aaa")
1644
1645    @classmethod
1646    def tearDownClass(cls):
1647        support.unlink(cls.file_path)
1648
1649    def test_create(self):
1650        with tarfile.open(tmpname, self.mode) as tobj:
1651            tobj.add(self.file_path)
1652
1653        with self.taropen(tmpname) as tobj:
1654            names = tobj.getnames()
1655        self.assertEqual(len(names), 1)
1656        self.assertIn('spameggs42', names[0])
1657
1658    def test_create_existing(self):
1659        with tarfile.open(tmpname, self.mode) as tobj:
1660            tobj.add(self.file_path)
1661
1662        with self.assertRaises(FileExistsError):
1663            tobj = tarfile.open(tmpname, self.mode)
1664
1665        with self.taropen(tmpname) as tobj:
1666            names = tobj.getnames()
1667        self.assertEqual(len(names), 1)
1668        self.assertIn('spameggs42', names[0])
1669
1670    def test_create_taropen(self):
1671        with self.taropen(tmpname, "x") as tobj:
1672            tobj.add(self.file_path)
1673
1674        with self.taropen(tmpname) as tobj:
1675            names = tobj.getnames()
1676        self.assertEqual(len(names), 1)
1677        self.assertIn('spameggs42', names[0])
1678
1679    def test_create_existing_taropen(self):
1680        with self.taropen(tmpname, "x") as tobj:
1681            tobj.add(self.file_path)
1682
1683        with self.assertRaises(FileExistsError):
1684            with self.taropen(tmpname, "x"):
1685                pass
1686
1687        with self.taropen(tmpname) as tobj:
1688            names = tobj.getnames()
1689        self.assertEqual(len(names), 1)
1690        self.assertIn("spameggs42", names[0])
1691
1692    def test_create_pathlike_name(self):
1693        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1694            self.assertIsInstance(tobj.name, str)
1695            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1696            tobj.add(pathlib.Path(self.file_path))
1697            names = tobj.getnames()
1698        self.assertEqual(len(names), 1)
1699        self.assertIn('spameggs42', names[0])
1700
1701        with self.taropen(tmpname) as tobj:
1702            names = tobj.getnames()
1703        self.assertEqual(len(names), 1)
1704        self.assertIn('spameggs42', names[0])
1705
1706    def test_create_taropen_pathlike_name(self):
1707        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1708            self.assertIsInstance(tobj.name, str)
1709            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1710            tobj.add(pathlib.Path(self.file_path))
1711            names = tobj.getnames()
1712        self.assertEqual(len(names), 1)
1713        self.assertIn('spameggs42', names[0])
1714
1715        with self.taropen(tmpname) as tobj:
1716            names = tobj.getnames()
1717        self.assertEqual(len(names), 1)
1718        self.assertIn('spameggs42', names[0])
1719
1720
1721class GzipCreateTest(GzipTest, CreateTest):
1722
1723    def test_create_with_compresslevel(self):
1724        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1725            tobj.add(self.file_path)
1726        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
1727            pass
1728
1729
1730class Bz2CreateTest(Bz2Test, CreateTest):
1731
1732    def test_create_with_compresslevel(self):
1733        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1734            tobj.add(self.file_path)
1735        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
1736            pass
1737
1738
1739class LzmaCreateTest(LzmaTest, CreateTest):
1740
1741    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
1742    # It does not allow for preset to be specified when reading.
1743    def test_create_with_preset(self):
1744        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
1745            tobj.add(self.file_path)
1746
1747
1748class CreateWithXModeTest(CreateTest):
1749
1750    prefix = "x"
1751
1752    test_create_taropen = None
1753    test_create_existing_taropen = None
1754
1755
1756@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1757class HardlinkTest(unittest.TestCase):
1758    # Test the creation of LNKTYPE (hardlink) members in an archive.
1759
1760    def setUp(self):
1761        self.foo = os.path.join(TEMPDIR, "foo")
1762        self.bar = os.path.join(TEMPDIR, "bar")
1763
1764        with open(self.foo, "wb") as fobj:
1765            fobj.write(b"foo")
1766
1767        try:
1768            os.link(self.foo, self.bar)
1769        except PermissionError as e:
1770            self.skipTest('os.link(): %s' % e)
1771
1772        self.tar = tarfile.open(tmpname, "w")
1773        self.tar.add(self.foo)
1774
1775    def tearDown(self):
1776        self.tar.close()
1777        support.unlink(self.foo)
1778        support.unlink(self.bar)
1779
1780    def test_add_twice(self):
1781        # The same name will be added as a REGTYPE every
1782        # time regardless of st_nlink.
1783        tarinfo = self.tar.gettarinfo(self.foo)
1784        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1785                "add file as regular failed")
1786
1787    def test_add_hardlink(self):
1788        tarinfo = self.tar.gettarinfo(self.bar)
1789        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1790                "add file as hardlink failed")
1791
1792    def test_dereference_hardlink(self):
1793        self.tar.dereference = True
1794        tarinfo = self.tar.gettarinfo(self.bar)
1795        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1796                "dereferencing hardlink failed")
1797
1798
1799class PaxWriteTest(GNUWriteTest):
1800
1801    def _test(self, name, link=None):
1802        # See GNUWriteTest.
1803        tarinfo = tarfile.TarInfo(name)
1804        if link:
1805            tarinfo.linkname = link
1806            tarinfo.type = tarfile.LNKTYPE
1807
1808        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1809        try:
1810            tar.addfile(tarinfo)
1811        finally:
1812            tar.close()
1813
1814        tar = tarfile.open(tmpname)
1815        try:
1816            if link:
1817                l = tar.getmembers()[0].linkname
1818                self.assertEqual(link, l, "PAX longlink creation failed")
1819            else:
1820                n = tar.getmembers()[0].name
1821                self.assertEqual(name, n, "PAX longname creation failed")
1822        finally:
1823            tar.close()
1824
1825    def test_pax_global_header(self):
1826        pax_headers = {
1827                "foo": "bar",
1828                "uid": "0",
1829                "mtime": "1.23",
1830                "test": "\xe4\xf6\xfc",
1831                "\xe4\xf6\xfc": "test"}
1832
1833        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1834                pax_headers=pax_headers)
1835        try:
1836            tar.addfile(tarfile.TarInfo("test"))
1837        finally:
1838            tar.close()
1839
1840        # Test if the global header was written correctly.
1841        tar = tarfile.open(tmpname, encoding="iso8859-1")
1842        try:
1843            self.assertEqual(tar.pax_headers, pax_headers)
1844            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1845            # Test if all the fields are strings.
1846            for key, val in tar.pax_headers.items():
1847                self.assertIsNot(type(key), bytes)
1848                self.assertIsNot(type(val), bytes)
1849                if key in tarfile.PAX_NUMBER_FIELDS:
1850                    try:
1851                        tarfile.PAX_NUMBER_FIELDS[key](val)
1852                    except (TypeError, ValueError):
1853                        self.fail("unable to convert pax header field")
1854        finally:
1855            tar.close()
1856
1857    def test_pax_extended_header(self):
1858        # The fields from the pax header have priority over the
1859        # TarInfo.
1860        pax_headers = {"path": "foo", "uid": "123"}
1861
1862        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1863                           encoding="iso8859-1")
1864        try:
1865            t = tarfile.TarInfo()
1866            t.name = "\xe4\xf6\xfc" # non-ASCII
1867            t.uid = 8**8 # too large
1868            t.pax_headers = pax_headers
1869            tar.addfile(t)
1870        finally:
1871            tar.close()
1872
1873        tar = tarfile.open(tmpname, encoding="iso8859-1")
1874        try:
1875            t = tar.getmembers()[0]
1876            self.assertEqual(t.pax_headers, pax_headers)
1877            self.assertEqual(t.name, "foo")
1878            self.assertEqual(t.uid, 123)
1879        finally:
1880            tar.close()
1881
1882
1883class UnicodeTest:
1884
1885    def test_iso8859_1_filename(self):
1886        self._test_unicode_filename("iso8859-1")
1887
1888    def test_utf7_filename(self):
1889        self._test_unicode_filename("utf7")
1890
1891    def test_utf8_filename(self):
1892        self._test_unicode_filename("utf-8")
1893
1894    def _test_unicode_filename(self, encoding):
1895        tar = tarfile.open(tmpname, "w", format=self.format,
1896                           encoding=encoding, errors="strict")
1897        try:
1898            name = "\xe4\xf6\xfc"
1899            tar.addfile(tarfile.TarInfo(name))
1900        finally:
1901            tar.close()
1902
1903        tar = tarfile.open(tmpname, encoding=encoding)
1904        try:
1905            self.assertEqual(tar.getmembers()[0].name, name)
1906        finally:
1907            tar.close()
1908
1909    def test_unicode_filename_error(self):
1910        tar = tarfile.open(tmpname, "w", format=self.format,
1911                           encoding="ascii", errors="strict")
1912        try:
1913            tarinfo = tarfile.TarInfo()
1914
1915            tarinfo.name = "\xe4\xf6\xfc"
1916            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1917
1918            tarinfo.name = "foo"
1919            tarinfo.uname = "\xe4\xf6\xfc"
1920            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1921        finally:
1922            tar.close()
1923
1924    def test_unicode_argument(self):
1925        tar = tarfile.open(tarname, "r",
1926                           encoding="iso8859-1", errors="strict")
1927        try:
1928            for t in tar:
1929                self.assertIs(type(t.name), str)
1930                self.assertIs(type(t.linkname), str)
1931                self.assertIs(type(t.uname), str)
1932                self.assertIs(type(t.gname), str)
1933        finally:
1934            tar.close()
1935
1936    def test_uname_unicode(self):
1937        t = tarfile.TarInfo("foo")
1938        t.uname = "\xe4\xf6\xfc"
1939        t.gname = "\xe4\xf6\xfc"
1940
1941        tar = tarfile.open(tmpname, mode="w", format=self.format,
1942                           encoding="iso8859-1")
1943        try:
1944            tar.addfile(t)
1945        finally:
1946            tar.close()
1947
1948        tar = tarfile.open(tmpname, encoding="iso8859-1")
1949        try:
1950            t = tar.getmember("foo")
1951            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1952            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1953
1954            if self.format != tarfile.PAX_FORMAT:
1955                tar.close()
1956                tar = tarfile.open(tmpname, encoding="ascii")
1957                t = tar.getmember("foo")
1958                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1959                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1960        finally:
1961            tar.close()
1962
1963
1964class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1965
1966    format = tarfile.USTAR_FORMAT
1967
1968    # Test whether the utf-8 encoded version of a filename exceeds the 100
1969    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1970    # bytes).
1971    def test_unicode_name1(self):
1972        self._test_ustar_name("0123456789" * 10)
1973        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1974        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1975        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1976
1977    def test_unicode_name2(self):
1978        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1979        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1980
1981    # Test whether the utf-8 encoded version of a filename exceeds the 155
1982    # bytes prefix + '/' + 100 bytes name limit.
1983    def test_unicode_longname1(self):
1984        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1985        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1986        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1987        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1988
1989    def test_unicode_longname2(self):
1990        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1991        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1992
1993    def test_unicode_longname3(self):
1994        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1995        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1996        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1997
1998    def test_unicode_longname4(self):
1999        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2000        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2001
2002    def _test_ustar_name(self, name, exc=None):
2003        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2004            t = tarfile.TarInfo(name)
2005            if exc is None:
2006                tar.addfile(t)
2007            else:
2008                self.assertRaises(exc, tar.addfile, t)
2009
2010        if exc is None:
2011            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2012                for t in tar:
2013                    self.assertEqual(name, t.name)
2014                    break
2015
2016    # Test the same as above for the 100 bytes link field.
2017    def test_unicode_link1(self):
2018        self._test_ustar_link("0123456789" * 10)
2019        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2020        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2021        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2022
2023    def test_unicode_link2(self):
2024        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2025        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2026
2027    def _test_ustar_link(self, name, exc=None):
2028        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2029            t = tarfile.TarInfo("foo")
2030            t.linkname = name
2031            if exc is None:
2032                tar.addfile(t)
2033            else:
2034                self.assertRaises(exc, tar.addfile, t)
2035
2036        if exc is None:
2037            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2038                for t in tar:
2039                    self.assertEqual(name, t.linkname)
2040                    break
2041
2042
2043class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2044
2045    format = tarfile.GNU_FORMAT
2046
2047    def test_bad_pax_header(self):
2048        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2049        # without a hdrcharset=BINARY header.
2050        for encoding, name in (
2051                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2052                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2053            with tarfile.open(tarname, encoding=encoding,
2054                              errors="surrogateescape") as tar:
2055                try:
2056                    t = tar.getmember(name)
2057                except KeyError:
2058                    self.fail("unable to read bad GNU tar pax header")
2059
2060
2061class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2062
2063    format = tarfile.PAX_FORMAT
2064
2065    # PAX_FORMAT ignores encoding in write mode.
2066    test_unicode_filename_error = None
2067
2068    def test_binary_header(self):
2069        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2070        for encoding, name in (
2071                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2072                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2073            with tarfile.open(tarname, encoding=encoding,
2074                              errors="surrogateescape") as tar:
2075                try:
2076                    t = tar.getmember(name)
2077                except KeyError:
2078                    self.fail("unable to read POSIX.1-2008 binary header")
2079
2080
2081class AppendTestBase:
2082    # Test append mode (cp. patch #1652681).
2083
2084    def setUp(self):
2085        self.tarname = tmpname
2086        if os.path.exists(self.tarname):
2087            support.unlink(self.tarname)
2088
2089    def _create_testtar(self, mode="w:"):
2090        with tarfile.open(tarname, encoding="iso8859-1") as src:
2091            t = src.getmember("ustar/regtype")
2092            t.name = "foo"
2093            with src.extractfile(t) as f:
2094                with tarfile.open(self.tarname, mode) as tar:
2095                    tar.addfile(t, f)
2096
2097    def test_append_compressed(self):
2098        self._create_testtar("w:" + self.suffix)
2099        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2100
2101class AppendTest(AppendTestBase, unittest.TestCase):
2102    test_append_compressed = None
2103
2104    def _add_testfile(self, fileobj=None):
2105        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2106            tar.addfile(tarfile.TarInfo("bar"))
2107
2108    def _test(self, names=["bar"], fileobj=None):
2109        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2110            self.assertEqual(tar.getnames(), names)
2111
2112    def test_non_existing(self):
2113        self._add_testfile()
2114        self._test()
2115
2116    def test_empty(self):
2117        tarfile.open(self.tarname, "w:").close()
2118        self._add_testfile()
2119        self._test()
2120
2121    def test_empty_fileobj(self):
2122        fobj = io.BytesIO(b"\0" * 1024)
2123        self._add_testfile(fobj)
2124        fobj.seek(0)
2125        self._test(fileobj=fobj)
2126
2127    def test_fileobj(self):
2128        self._create_testtar()
2129        with open(self.tarname, "rb") as fobj:
2130            data = fobj.read()
2131        fobj = io.BytesIO(data)
2132        self._add_testfile(fobj)
2133        fobj.seek(0)
2134        self._test(names=["foo", "bar"], fileobj=fobj)
2135
2136    def test_existing(self):
2137        self._create_testtar()
2138        self._add_testfile()
2139        self._test(names=["foo", "bar"])
2140
2141    # Append mode is supposed to fail if the tarfile to append to
2142    # does not end with a zero block.
2143    def _test_error(self, data):
2144        with open(self.tarname, "wb") as fobj:
2145            fobj.write(data)
2146        self.assertRaises(tarfile.ReadError, self._add_testfile)
2147
2148    def test_null(self):
2149        self._test_error(b"")
2150
2151    def test_incomplete(self):
2152        self._test_error(b"\0" * 13)
2153
2154    def test_premature_eof(self):
2155        data = tarfile.TarInfo("foo").tobuf()
2156        self._test_error(data)
2157
2158    def test_trailing_garbage(self):
2159        data = tarfile.TarInfo("foo").tobuf()
2160        self._test_error(data + b"\0" * 13)
2161
2162    def test_invalid(self):
2163        self._test_error(b"a" * 512)
2164
2165class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2166    pass
2167
2168class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2169    pass
2170
2171class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2172    pass
2173
2174
2175class LimitsTest(unittest.TestCase):
2176
2177    def test_ustar_limits(self):
2178        # 100 char name
2179        tarinfo = tarfile.TarInfo("0123456789" * 10)
2180        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2181
2182        # 101 char name that cannot be stored
2183        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2184        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2185
2186        # 256 char name with a slash at pos 156
2187        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2188        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2189
2190        # 256 char name that cannot be stored
2191        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2192        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2193
2194        # 512 char name
2195        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2196        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2197
2198        # 512 char linkname
2199        tarinfo = tarfile.TarInfo("longlink")
2200        tarinfo.linkname = "123/" * 126 + "longname"
2201        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2202
2203        # uid > 8 digits
2204        tarinfo = tarfile.TarInfo("name")
2205        tarinfo.uid = 0o10000000
2206        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2207
2208    def test_gnu_limits(self):
2209        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2210        tarinfo.tobuf(tarfile.GNU_FORMAT)
2211
2212        tarinfo = tarfile.TarInfo("longlink")
2213        tarinfo.linkname = "123/" * 126 + "longname"
2214        tarinfo.tobuf(tarfile.GNU_FORMAT)
2215
2216        # uid >= 256 ** 7
2217        tarinfo = tarfile.TarInfo("name")
2218        tarinfo.uid = 0o4000000000000000000
2219        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2220
2221    def test_pax_limits(self):
2222        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2223        tarinfo.tobuf(tarfile.PAX_FORMAT)
2224
2225        tarinfo = tarfile.TarInfo("longlink")
2226        tarinfo.linkname = "123/" * 126 + "longname"
2227        tarinfo.tobuf(tarfile.PAX_FORMAT)
2228
2229        tarinfo = tarfile.TarInfo("name")
2230        tarinfo.uid = 0o4000000000000000000
2231        tarinfo.tobuf(tarfile.PAX_FORMAT)
2232
2233
2234class MiscTest(unittest.TestCase):
2235
2236    def test_char_fields(self):
2237        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2238                         b"foo\0\0\0\0\0")
2239        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2240                         b"foo")
2241        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2242                         "foo")
2243        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2244                         "foo")
2245
2246    def test_read_number_fields(self):
2247        # Issue 13158: Test if GNU tar specific base-256 number fields
2248        # are decoded correctly.
2249        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2250        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2251        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2252                         0o10000000)
2253        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2254                         0xffffffff)
2255        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2256                         -1)
2257        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2258                         -100)
2259        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2260                         -0x100000000000000)
2261
2262        # Issue 24514: Test if empty number fields are converted to zero.
2263        self.assertEqual(tarfile.nti(b"\0"), 0)
2264        self.assertEqual(tarfile.nti(b"       \0"), 0)
2265
2266    def test_write_number_fields(self):
2267        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2268        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2269        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2270                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2271        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2272                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2273        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2274                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2275        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2276                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2277        self.assertEqual(tarfile.itn(-0x100000000000000,
2278                                     format=tarfile.GNU_FORMAT),
2279                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2280
2281        # Issue 32713: Test if itn() supports float values outside the
2282        # non-GNU format range
2283        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2284                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2285        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2286                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2287        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2288
2289    def test_number_field_limits(self):
2290        with self.assertRaises(ValueError):
2291            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2292        with self.assertRaises(ValueError):
2293            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2294        with self.assertRaises(ValueError):
2295            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2296        with self.assertRaises(ValueError):
2297            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2298
2299    def test__all__(self):
2300        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2301                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2302                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2303                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2304                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2305                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2306                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2307                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2308                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2309                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2310                     'filemode',
2311                     'EmptyHeaderError', 'TruncatedHeaderError',
2312                     'EOFHeaderError', 'InvalidHeaderError',
2313                     'SubsequentHeaderError', 'ExFileObject',
2314                     'main'}
2315        support.check__all__(self, tarfile, blacklist=blacklist)
2316
2317
2318class CommandLineTest(unittest.TestCase):
2319
2320    def tarfilecmd(self, *args, **kwargs):
2321        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2322                                                      **kwargs)
2323        return out.replace(os.linesep.encode(), b'\n')
2324
2325    def tarfilecmd_failure(self, *args):
2326        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2327
2328    def make_simple_tarfile(self, tar_name):
2329        files = [support.findfile('tokenize_tests.txt'),
2330                 support.findfile('tokenize_tests-no-coding-cookie-'
2331                                  'and-utf8-bom-sig-only.txt')]
2332        self.addCleanup(support.unlink, tar_name)
2333        with tarfile.open(tar_name, 'w') as tf:
2334            for tardata in files:
2335                tf.add(tardata, arcname=os.path.basename(tardata))
2336
2337    def test_bad_use(self):
2338        rc, out, err = self.tarfilecmd_failure()
2339        self.assertEqual(out, b'')
2340        self.assertIn(b'usage', err.lower())
2341        self.assertIn(b'error', err.lower())
2342        self.assertIn(b'required', err.lower())
2343        rc, out, err = self.tarfilecmd_failure('-l', '')
2344        self.assertEqual(out, b'')
2345        self.assertNotEqual(err.strip(), b'')
2346
2347    def test_test_command(self):
2348        for tar_name in testtarnames:
2349            for opt in '-t', '--test':
2350                out = self.tarfilecmd(opt, tar_name)
2351                self.assertEqual(out, b'')
2352
2353    def test_test_command_verbose(self):
2354        for tar_name in testtarnames:
2355            for opt in '-v', '--verbose':
2356                out = self.tarfilecmd(opt, '-t', tar_name,
2357                                      PYTHONIOENCODING='utf-8')
2358                self.assertIn(b'is a tar archive.\n', out)
2359
2360    def test_test_command_invalid_file(self):
2361        zipname = support.findfile('zipdir.zip')
2362        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2363        self.assertIn(b' is not a tar archive.', err)
2364        self.assertEqual(out, b'')
2365        self.assertEqual(rc, 1)
2366
2367        for tar_name in testtarnames:
2368            with self.subTest(tar_name=tar_name):
2369                with open(tar_name, 'rb') as f:
2370                    data = f.read()
2371                try:
2372                    with open(tmpname, 'wb') as f:
2373                        f.write(data[:511])
2374                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2375                    self.assertEqual(out, b'')
2376                    self.assertEqual(rc, 1)
2377                finally:
2378                    support.unlink(tmpname)
2379
2380    def test_list_command(self):
2381        for tar_name in testtarnames:
2382            with support.captured_stdout() as t:
2383                with tarfile.open(tar_name, 'r') as tf:
2384                    tf.list(verbose=False)
2385            expected = t.getvalue().encode('ascii', 'backslashreplace')
2386            for opt in '-l', '--list':
2387                out = self.tarfilecmd(opt, tar_name,
2388                                      PYTHONIOENCODING='ascii')
2389                self.assertEqual(out, expected)
2390
2391    def test_list_command_verbose(self):
2392        for tar_name in testtarnames:
2393            with support.captured_stdout() as t:
2394                with tarfile.open(tar_name, 'r') as tf:
2395                    tf.list(verbose=True)
2396            expected = t.getvalue().encode('ascii', 'backslashreplace')
2397            for opt in '-v', '--verbose':
2398                out = self.tarfilecmd(opt, '-l', tar_name,
2399                                      PYTHONIOENCODING='ascii')
2400                self.assertEqual(out, expected)
2401
2402    def test_list_command_invalid_file(self):
2403        zipname = support.findfile('zipdir.zip')
2404        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2405        self.assertIn(b' is not a tar archive.', err)
2406        self.assertEqual(out, b'')
2407        self.assertEqual(rc, 1)
2408
2409    def test_create_command(self):
2410        files = [support.findfile('tokenize_tests.txt'),
2411                 support.findfile('tokenize_tests-no-coding-cookie-'
2412                                  'and-utf8-bom-sig-only.txt')]
2413        for opt in '-c', '--create':
2414            try:
2415                out = self.tarfilecmd(opt, tmpname, *files)
2416                self.assertEqual(out, b'')
2417                with tarfile.open(tmpname) as tar:
2418                    tar.getmembers()
2419            finally:
2420                support.unlink(tmpname)
2421
2422    def test_create_command_verbose(self):
2423        files = [support.findfile('tokenize_tests.txt'),
2424                 support.findfile('tokenize_tests-no-coding-cookie-'
2425                                  'and-utf8-bom-sig-only.txt')]
2426        for opt in '-v', '--verbose':
2427            try:
2428                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2429                                      PYTHONIOENCODING='utf-8')
2430                self.assertIn(b' file created.', out)
2431                with tarfile.open(tmpname) as tar:
2432                    tar.getmembers()
2433            finally:
2434                support.unlink(tmpname)
2435
2436    def test_create_command_dotless_filename(self):
2437        files = [support.findfile('tokenize_tests.txt')]
2438        try:
2439            out = self.tarfilecmd('-c', dotlessname, *files)
2440            self.assertEqual(out, b'')
2441            with tarfile.open(dotlessname) as tar:
2442                tar.getmembers()
2443        finally:
2444            support.unlink(dotlessname)
2445
2446    def test_create_command_dot_started_filename(self):
2447        tar_name = os.path.join(TEMPDIR, ".testtar")
2448        files = [support.findfile('tokenize_tests.txt')]
2449        try:
2450            out = self.tarfilecmd('-c', tar_name, *files)
2451            self.assertEqual(out, b'')
2452            with tarfile.open(tar_name) as tar:
2453                tar.getmembers()
2454        finally:
2455            support.unlink(tar_name)
2456
2457    def test_create_command_compressed(self):
2458        files = [support.findfile('tokenize_tests.txt'),
2459                 support.findfile('tokenize_tests-no-coding-cookie-'
2460                                  'and-utf8-bom-sig-only.txt')]
2461        for filetype in (GzipTest, Bz2Test, LzmaTest):
2462            if not filetype.open:
2463                continue
2464            try:
2465                tar_name = tmpname + '.' + filetype.suffix
2466                out = self.tarfilecmd('-c', tar_name, *files)
2467                with filetype.taropen(tar_name) as tar:
2468                    tar.getmembers()
2469            finally:
2470                support.unlink(tar_name)
2471
2472    def test_extract_command(self):
2473        self.make_simple_tarfile(tmpname)
2474        for opt in '-e', '--extract':
2475            try:
2476                with support.temp_cwd(tarextdir):
2477                    out = self.tarfilecmd(opt, tmpname)
2478                self.assertEqual(out, b'')
2479            finally:
2480                support.rmtree(tarextdir)
2481
2482    def test_extract_command_verbose(self):
2483        self.make_simple_tarfile(tmpname)
2484        for opt in '-v', '--verbose':
2485            try:
2486                with support.temp_cwd(tarextdir):
2487                    out = self.tarfilecmd(opt, '-e', tmpname,
2488                                          PYTHONIOENCODING='utf-8')
2489                self.assertIn(b' file is extracted.', out)
2490            finally:
2491                support.rmtree(tarextdir)
2492
2493    def test_extract_command_different_directory(self):
2494        self.make_simple_tarfile(tmpname)
2495        try:
2496            with support.temp_cwd(tarextdir):
2497                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2498            self.assertEqual(out, b'')
2499        finally:
2500            support.rmtree(tarextdir)
2501
2502    def test_extract_command_invalid_file(self):
2503        zipname = support.findfile('zipdir.zip')
2504        with support.temp_cwd(tarextdir):
2505            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2506        self.assertIn(b' is not a tar archive.', err)
2507        self.assertEqual(out, b'')
2508        self.assertEqual(rc, 1)
2509
2510
2511class ContextManagerTest(unittest.TestCase):
2512
2513    def test_basic(self):
2514        with tarfile.open(tarname) as tar:
2515            self.assertFalse(tar.closed, "closed inside runtime context")
2516        self.assertTrue(tar.closed, "context manager failed")
2517
2518    def test_closed(self):
2519        # The __enter__() method is supposed to raise OSError
2520        # if the TarFile object is already closed.
2521        tar = tarfile.open(tarname)
2522        tar.close()
2523        with self.assertRaises(OSError):
2524            with tar:
2525                pass
2526
2527    def test_exception(self):
2528        # Test if the OSError exception is passed through properly.
2529        with self.assertRaises(Exception) as exc:
2530            with tarfile.open(tarname) as tar:
2531                raise OSError
2532        self.assertIsInstance(exc.exception, OSError,
2533                              "wrong exception raised in context manager")
2534        self.assertTrue(tar.closed, "context manager failed")
2535
2536    def test_no_eof(self):
2537        # __exit__() must not write end-of-archive blocks if an
2538        # exception was raised.
2539        try:
2540            with tarfile.open(tmpname, "w") as tar:
2541                raise Exception
2542        except:
2543            pass
2544        self.assertEqual(os.path.getsize(tmpname), 0,
2545                "context manager wrote an end-of-archive block")
2546        self.assertTrue(tar.closed, "context manager failed")
2547
2548    def test_eof(self):
2549        # __exit__() must write end-of-archive blocks, i.e. call
2550        # TarFile.close() if there was no error.
2551        with tarfile.open(tmpname, "w"):
2552            pass
2553        self.assertNotEqual(os.path.getsize(tmpname), 0,
2554                "context manager wrote no end-of-archive block")
2555
2556    def test_fileobj(self):
2557        # Test that __exit__() did not close the external file
2558        # object.
2559        with open(tmpname, "wb") as fobj:
2560            try:
2561                with tarfile.open(fileobj=fobj, mode="w") as tar:
2562                    raise Exception
2563            except:
2564                pass
2565            self.assertFalse(fobj.closed, "external file object was closed")
2566            self.assertTrue(tar.closed, "context manager failed")
2567
2568
2569@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2570class LinkEmulationTest(ReadTest, unittest.TestCase):
2571
2572    # Test for issue #8741 regression. On platforms that do not support
2573    # symbolic or hard links tarfile tries to extract these types of members
2574    # as the regular files they point to.
2575    def _test_link_extraction(self, name):
2576        self.tar.extract(name, TEMPDIR)
2577        with open(os.path.join(TEMPDIR, name), "rb") as f:
2578            data = f.read()
2579        self.assertEqual(sha256sum(data), sha256_regtype)
2580
2581    # See issues #1578269, #8879, and #17689 for some history on these skips
2582    @unittest.skipIf(hasattr(os.path, "islink"),
2583                     "Skip emulation - has os.path.islink but not os.link")
2584    def test_hardlink_extraction1(self):
2585        self._test_link_extraction("ustar/lnktype")
2586
2587    @unittest.skipIf(hasattr(os.path, "islink"),
2588                     "Skip emulation - has os.path.islink but not os.link")
2589    def test_hardlink_extraction2(self):
2590        self._test_link_extraction("./ustar/linktest2/lnktype")
2591
2592    @unittest.skipIf(hasattr(os, "symlink"),
2593                     "Skip emulation if symlink exists")
2594    def test_symlink_extraction1(self):
2595        self._test_link_extraction("ustar/symtype")
2596
2597    @unittest.skipIf(hasattr(os, "symlink"),
2598                     "Skip emulation if symlink exists")
2599    def test_symlink_extraction2(self):
2600        self._test_link_extraction("./ustar/linktest2/symtype")
2601
2602
2603class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2604    # Issue5068: The _BZ2Proxy.read() method loops forever
2605    # on an empty or partial bzipped file.
2606
2607    def _test_partial_input(self, mode):
2608        class MyBytesIO(io.BytesIO):
2609            hit_eof = False
2610            def read(self, n):
2611                if self.hit_eof:
2612                    raise AssertionError("infinite loop detected in "
2613                                         "tarfile.open()")
2614                self.hit_eof = self.tell() == len(self.getvalue())
2615                return super(MyBytesIO, self).read(n)
2616            def seek(self, *args):
2617                self.hit_eof = False
2618                return super(MyBytesIO, self).seek(*args)
2619
2620        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2621        for x in range(len(data) + 1):
2622            try:
2623                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2624            except tarfile.ReadError:
2625                pass # we have no interest in ReadErrors
2626
2627    def test_partial_input(self):
2628        self._test_partial_input("r")
2629
2630    def test_partial_input_bz2(self):
2631        self._test_partial_input("r:bz2")
2632
2633
2634def root_is_uid_gid_0():
2635    try:
2636        import pwd, grp
2637    except ImportError:
2638        return False
2639    if pwd.getpwuid(0)[0] != 'root':
2640        return False
2641    if grp.getgrgid(0)[0] != 'root':
2642        return False
2643    return True
2644
2645
2646@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2647@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2648class NumericOwnerTest(unittest.TestCase):
2649    # mock the following:
2650    #  os.chown: so we can test what's being called
2651    #  os.chmod: so the modes are not actually changed. if they are, we can't
2652    #             delete the files/directories
2653    #  os.geteuid: so we can lie and say we're root (uid = 0)
2654
2655    @staticmethod
2656    def _make_test_archive(filename_1, dirname_1, filename_2):
2657        # the file contents to write
2658        fobj = io.BytesIO(b"content")
2659
2660        # create a tar file with a file, a directory, and a file within that
2661        #  directory. Assign various .uid/.gid values to them
2662        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2663                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2664                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2665                 ]
2666        with tarfile.open(tmpname, 'w') as tarfl:
2667            for name, uid, gid, typ, contents in items:
2668                t = tarfile.TarInfo(name)
2669                t.uid = uid
2670                t.gid = gid
2671                t.uname = 'root'
2672                t.gname = 'root'
2673                t.type = typ
2674                tarfl.addfile(t, contents)
2675
2676        # return the full pathname to the tar file
2677        return tmpname
2678
2679    @staticmethod
2680    @contextmanager
2681    def _setup_test(mock_geteuid):
2682        mock_geteuid.return_value = 0  # lie and say we're root
2683        fname = 'numeric-owner-testfile'
2684        dirname = 'dir'
2685
2686        # the names we want stored in the tarfile
2687        filename_1 = fname
2688        dirname_1 = dirname
2689        filename_2 = os.path.join(dirname, fname)
2690
2691        # create the tarfile with the contents we're after
2692        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2693                                                           dirname_1,
2694                                                           filename_2)
2695
2696        # open the tarfile for reading. yield it and the names of the items
2697        #  we stored into the file
2698        with tarfile.open(tar_filename) as tarfl:
2699            yield tarfl, filename_1, dirname_1, filename_2
2700
2701    @unittest.mock.patch('os.chown')
2702    @unittest.mock.patch('os.chmod')
2703    @unittest.mock.patch('os.geteuid')
2704    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2705                                        mock_chown):
2706        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2707                                                filename_2):
2708            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2709            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2710
2711        # convert to filesystem paths
2712        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2713        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2714
2715        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2716                                     unittest.mock.call(f_filename_2, 88, 87),
2717                                     ],
2718                                    any_order=True)
2719
2720    @unittest.mock.patch('os.chown')
2721    @unittest.mock.patch('os.chmod')
2722    @unittest.mock.patch('os.geteuid')
2723    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2724                                           mock_chown):
2725        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2726                                                filename_2):
2727            tarfl.extractall(TEMPDIR, numeric_owner=True)
2728
2729        # convert to filesystem paths
2730        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2731        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2732        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2733
2734        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2735                                     unittest.mock.call(f_dirname_1, 77, 76),
2736                                     unittest.mock.call(f_filename_2, 88, 87),
2737                                     ],
2738                                    any_order=True)
2739
2740    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2741    #  because the uname and gname in the test file are 'root', and extract()
2742    #  will look them up using pwd and grp to find their uid and gid, which we
2743    #  test here to be 0.
2744    @unittest.skipUnless(root_is_uid_gid_0(),
2745                         'uid=0,gid=0 must be named "root"')
2746    @unittest.mock.patch('os.chown')
2747    @unittest.mock.patch('os.chmod')
2748    @unittest.mock.patch('os.geteuid')
2749    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2750                                           mock_chown):
2751        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2752            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2753
2754        # convert to filesystem paths
2755        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2756
2757        mock_chown.assert_called_with(f_filename_1, 0, 0)
2758
2759    @unittest.mock.patch('os.geteuid')
2760    def test_keyword_only(self, mock_geteuid):
2761        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2762            self.assertRaises(TypeError,
2763                              tarfl.extract, filename_1, TEMPDIR, False, True)
2764
2765
2766def setUpModule():
2767    support.unlink(TEMPDIR)
2768    os.makedirs(TEMPDIR)
2769
2770    global testtarnames
2771    testtarnames = [tarname]
2772    with open(tarname, "rb") as fobj:
2773        data = fobj.read()
2774
2775    # Create compressed tarfiles.
2776    for c in GzipTest, Bz2Test, LzmaTest:
2777        if c.open:
2778            support.unlink(c.tarname)
2779            testtarnames.append(c.tarname)
2780            with c.open(c.tarname, "wb") as tar:
2781                tar.write(data)
2782
2783def tearDownModule():
2784    if os.path.exists(TEMPDIR):
2785        support.rmtree(TEMPDIR)
2786
2787if __name__ == "__main__":
2788    unittest.main()
2789