1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import os_helper
15from test.support import script_helper
16
17# Check for our compression modules.
18try:
19    import gzip
20except ImportError:
21    gzip = None
22try:
23    import zlib
24except ImportError:
25    zlib = None
26try:
27    import bz2
28except ImportError:
29    bz2 = None
30try:
31    import lzma
32except ImportError:
33    lzma = None
34
35def sha256sum(data):
36    return sha256(data).hexdigest()
37
38TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
39tarextdir = TEMPDIR + '-extract-test'
40tarname = support.findfile("testtar.tar")
41gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
42bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
43xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
44tmpname = os.path.join(TEMPDIR, "tmp.tar")
45dotlessname = os.path.join(TEMPDIR, "testtar")
46
47sha256_regtype = (
48    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
49)
50sha256_sparse = (
51    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
52)
53
54
55class TarTest:
56    tarname = tarname
57    suffix = ''
58    open = io.FileIO
59    taropen = tarfile.TarFile.taropen
60
61    @property
62    def mode(self):
63        return self.prefix + self.suffix
64
65@support.requires_gzip()
66class GzipTest:
67    tarname = gzipname
68    suffix = 'gz'
69    open = gzip.GzipFile if gzip else None
70    taropen = tarfile.TarFile.gzopen
71
72@support.requires_bz2()
73class Bz2Test:
74    tarname = bz2name
75    suffix = 'bz2'
76    open = bz2.BZ2File if bz2 else None
77    taropen = tarfile.TarFile.bz2open
78
79@support.requires_lzma()
80class LzmaTest:
81    tarname = xzname
82    suffix = 'xz'
83    open = lzma.LZMAFile if lzma else None
84    taropen = tarfile.TarFile.xzopen
85
86
87class ReadTest(TarTest):
88
89    prefix = "r:"
90
91    def setUp(self):
92        self.tar = tarfile.open(self.tarname, mode=self.mode,
93                                encoding="iso8859-1")
94
95    def tearDown(self):
96        self.tar.close()
97
98
99class UstarReadTest(ReadTest, unittest.TestCase):
100
101    def test_fileobj_regular_file(self):
102        tarinfo = self.tar.getmember("ustar/regtype")
103        with self.tar.extractfile(tarinfo) as fobj:
104            data = fobj.read()
105            self.assertEqual(len(data), tarinfo.size,
106                    "regular file extraction failed")
107            self.assertEqual(sha256sum(data), sha256_regtype,
108                    "regular file extraction failed")
109
110    def test_fileobj_readlines(self):
111        self.tar.extract("ustar/regtype", TEMPDIR)
112        tarinfo = self.tar.getmember("ustar/regtype")
113        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
114            lines1 = fobj1.readlines()
115
116        with self.tar.extractfile(tarinfo) as fobj:
117            fobj2 = io.TextIOWrapper(fobj)
118            lines2 = fobj2.readlines()
119            self.assertEqual(lines1, lines2,
120                    "fileobj.readlines() failed")
121            self.assertEqual(len(lines2), 114,
122                    "fileobj.readlines() failed")
123            self.assertEqual(lines2[83],
124                    "I will gladly admit that Python is not the fastest "
125                    "running scripting language.\n",
126                    "fileobj.readlines() failed")
127
128    def test_fileobj_iter(self):
129        self.tar.extract("ustar/regtype", TEMPDIR)
130        tarinfo = self.tar.getmember("ustar/regtype")
131        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
132            lines1 = fobj1.readlines()
133        with self.tar.extractfile(tarinfo) as fobj2:
134            lines2 = list(io.TextIOWrapper(fobj2))
135            self.assertEqual(lines1, lines2,
136                    "fileobj.__iter__() failed")
137
138    def test_fileobj_seek(self):
139        self.tar.extract("ustar/regtype", TEMPDIR)
140        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
141            data = fobj.read()
142
143        tarinfo = self.tar.getmember("ustar/regtype")
144        with self.tar.extractfile(tarinfo) as fobj:
145            text = fobj.read()
146            fobj.seek(0)
147            self.assertEqual(0, fobj.tell(),
148                         "seek() to file's start failed")
149            fobj.seek(2048, 0)
150            self.assertEqual(2048, fobj.tell(),
151                         "seek() to absolute position failed")
152            fobj.seek(-1024, 1)
153            self.assertEqual(1024, fobj.tell(),
154                         "seek() to negative relative position failed")
155            fobj.seek(1024, 1)
156            self.assertEqual(2048, fobj.tell(),
157                         "seek() to positive relative position failed")
158            s = fobj.read(10)
159            self.assertEqual(s, data[2048:2058],
160                         "read() after seek failed")
161            fobj.seek(0, 2)
162            self.assertEqual(tarinfo.size, fobj.tell(),
163                         "seek() to file's end failed")
164            self.assertEqual(fobj.read(), b"",
165                         "read() at file's end did not return empty string")
166            fobj.seek(-tarinfo.size, 2)
167            self.assertEqual(0, fobj.tell(),
168                         "relative seek() to file's end failed")
169            fobj.seek(512)
170            s1 = fobj.readlines()
171            fobj.seek(512)
172            s2 = fobj.readlines()
173            self.assertEqual(s1, s2,
174                         "readlines() after seek failed")
175            fobj.seek(0)
176            self.assertEqual(len(fobj.readline()), fobj.tell(),
177                         "tell() after readline() failed")
178            fobj.seek(512)
179            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
180                         "tell() after seek() and readline() failed")
181            fobj.seek(0)
182            line = fobj.readline()
183            self.assertEqual(fobj.read(), data[len(line):],
184                         "read() after readline() failed")
185
186    def test_fileobj_text(self):
187        with self.tar.extractfile("ustar/regtype") as fobj:
188            fobj = io.TextIOWrapper(fobj)
189            data = fobj.read().encode("iso8859-1")
190            self.assertEqual(sha256sum(data), sha256_regtype)
191            try:
192                fobj.seek(100)
193            except AttributeError:
194                # Issue #13815: seek() complained about a missing
195                # flush() method.
196                self.fail("seeking failed in text mode")
197
198    # Test if symbolic and hard links are resolved by extractfile().  The
199    # test link members each point to a regular member whose data is
200    # supposed to be exported.
201    def _test_fileobj_link(self, lnktype, regtype):
202        with self.tar.extractfile(lnktype) as a, \
203             self.tar.extractfile(regtype) as b:
204            self.assertEqual(a.name, b.name)
205
206    def test_fileobj_link1(self):
207        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
208
209    def test_fileobj_link2(self):
210        self._test_fileobj_link("./ustar/linktest2/lnktype",
211                                "ustar/linktest1/regtype")
212
213    def test_fileobj_symlink1(self):
214        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
215
216    def test_fileobj_symlink2(self):
217        self._test_fileobj_link("./ustar/linktest2/symtype",
218                                "ustar/linktest1/regtype")
219
220    def test_issue14160(self):
221        self._test_fileobj_link("symtype2", "ustar/regtype")
222
223class GzipUstarReadTest(GzipTest, UstarReadTest):
224    pass
225
226class Bz2UstarReadTest(Bz2Test, UstarReadTest):
227    pass
228
229class LzmaUstarReadTest(LzmaTest, UstarReadTest):
230    pass
231
232
233class ListTest(ReadTest, unittest.TestCase):
234
235    # Override setUp to use default encoding (UTF-8)
236    def setUp(self):
237        self.tar = tarfile.open(self.tarname, mode=self.mode)
238
239    def test_list(self):
240        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
241        with support.swap_attr(sys, 'stdout', tio):
242            self.tar.list(verbose=False)
243        out = tio.detach().getvalue()
244        self.assertIn(b'ustar/conttype', out)
245        self.assertIn(b'ustar/regtype', out)
246        self.assertIn(b'ustar/lnktype', out)
247        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
248        self.assertIn(b'./ustar/linktest2/symtype', out)
249        self.assertIn(b'./ustar/linktest2/lnktype', out)
250        # Make sure it puts trailing slash for directory
251        self.assertIn(b'ustar/dirtype/', out)
252        self.assertIn(b'ustar/dirtype-with-size/', out)
253        # Make sure it is able to print unencodable characters
254        def conv(b):
255            s = b.decode(self.tar.encoding, 'surrogateescape')
256            return s.encode('ascii', 'backslashreplace')
257        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
258        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
259                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
260        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
261                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
262        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
263        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
264        # Make sure it prints files separated by one newline without any
265        # 'ls -l'-like accessories if verbose flag is not being used
266        # ...
267        # ustar/conttype
268        # ustar/regtype
269        # ...
270        self.assertRegex(out, br'ustar/conttype ?\r?\n'
271                              br'ustar/regtype ?\r?\n')
272        # Make sure it does not print the source of link without verbose flag
273        self.assertNotIn(b'link to', out)
274        self.assertNotIn(b'->', out)
275
276    def test_list_verbose(self):
277        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
278        with support.swap_attr(sys, 'stdout', tio):
279            self.tar.list(verbose=True)
280        out = tio.detach().getvalue()
281        # Make sure it prints files separated by one newline with 'ls -l'-like
282        # accessories if verbose flag is being used
283        # ...
284        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
285        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
286        # ...
287        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
288                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
289                               br'ustar/\w+type ?\r?\n') * 2)
290        # Make sure it prints the source of link with verbose flag
291        self.assertIn(b'ustar/symtype -> regtype', out)
292        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
293        self.assertIn(b'./ustar/linktest2/lnktype link to '
294                      b'./ustar/linktest1/regtype', out)
295        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
296                      (b'/123' * 125) + b'/longname', out)
297        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
298                      (b'/123' * 125) + b'/longname', out)
299
300    def test_list_members(self):
301        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
302        def members(tar):
303            for tarinfo in tar.getmembers():
304                if 'reg' in tarinfo.name:
305                    yield tarinfo
306        with support.swap_attr(sys, 'stdout', tio):
307            self.tar.list(verbose=False, members=members(self.tar))
308        out = tio.detach().getvalue()
309        self.assertIn(b'ustar/regtype', out)
310        self.assertNotIn(b'ustar/conttype', out)
311
312
313class GzipListTest(GzipTest, ListTest):
314    pass
315
316
317class Bz2ListTest(Bz2Test, ListTest):
318    pass
319
320
321class LzmaListTest(LzmaTest, ListTest):
322    pass
323
324
325class CommonReadTest(ReadTest):
326
327    def test_is_tarfile_erroneous(self):
328        with open(tmpname, "wb"):
329            pass
330
331        # is_tarfile works on filenames
332        self.assertFalse(tarfile.is_tarfile(tmpname))
333
334        # is_tarfile works on path-like objects
335        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
336
337        # is_tarfile works on file objects
338        with open(tmpname, "rb") as fobj:
339            self.assertFalse(tarfile.is_tarfile(fobj))
340
341        # is_tarfile works on file-like objects
342        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
343
344    def test_is_tarfile_valid(self):
345        # is_tarfile works on filenames
346        self.assertTrue(tarfile.is_tarfile(self.tarname))
347
348        # is_tarfile works on path-like objects
349        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
350
351        # is_tarfile works on file objects
352        with open(self.tarname, "rb") as fobj:
353            self.assertTrue(tarfile.is_tarfile(fobj))
354
355        # is_tarfile works on file-like objects
356        with open(self.tarname, "rb") as fobj:
357            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
358
359    def test_empty_tarfile(self):
360        # Test for issue6123: Allow opening empty archives.
361        # This test checks if tarfile.open() is able to open an empty tar
362        # archive successfully. Note that an empty tar archive is not the
363        # same as an empty file!
364        with tarfile.open(tmpname, self.mode.replace("r", "w")):
365            pass
366        try:
367            tar = tarfile.open(tmpname, self.mode)
368            tar.getnames()
369        except tarfile.ReadError:
370            self.fail("tarfile.open() failed on empty archive")
371        else:
372            self.assertListEqual(tar.getmembers(), [])
373        finally:
374            tar.close()
375
376    def test_non_existent_tarfile(self):
377        # Test for issue11513: prevent non-existent gzipped tarfiles raising
378        # multiple exceptions.
379        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
380            tarfile.open("xxx", self.mode)
381
382    def test_null_tarfile(self):
383        # Test for issue6123: Allow opening empty archives.
384        # This test guarantees that tarfile.open() does not treat an empty
385        # file as an empty tar archive.
386        with open(tmpname, "wb"):
387            pass
388        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
389        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
390
391    def test_ignore_zeros(self):
392        # Test TarFile's ignore_zeros option.
393        # generate 512 pseudorandom bytes
394        data = Random(0).randbytes(512)
395        for char in (b'\0', b'a'):
396            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
397            # are ignored correctly.
398            with self.open(tmpname, "w") as fobj:
399                fobj.write(char * 1024)
400                tarinfo = tarfile.TarInfo("foo")
401                tarinfo.size = len(data)
402                fobj.write(tarinfo.tobuf())
403                fobj.write(data)
404
405            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
406            try:
407                self.assertListEqual(tar.getnames(), ["foo"],
408                    "ignore_zeros=True should have skipped the %r-blocks" %
409                    char)
410            finally:
411                tar.close()
412
413    def test_premature_end_of_archive(self):
414        for size in (512, 600, 1024, 1200):
415            with tarfile.open(tmpname, "w:") as tar:
416                t = tarfile.TarInfo("foo")
417                t.size = 1024
418                tar.addfile(t, io.BytesIO(b"a" * 1024))
419
420            with open(tmpname, "r+b") as fobj:
421                fobj.truncate(size)
422
423            with tarfile.open(tmpname) as tar:
424                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
425                    for t in tar:
426                        pass
427
428            with tarfile.open(tmpname) as tar:
429                t = tar.next()
430
431                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
432                    tar.extract(t, TEMPDIR)
433
434                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
435                    tar.extractfile(t).read()
436
437    def test_length_zero_header(self):
438        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
439        # with an exception
440        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
441            with tarfile.open(support.findfile('recursion.tar')) as tar:
442                pass
443
444class MiscReadTestBase(CommonReadTest):
445    def requires_name_attribute(self):
446        pass
447
448    def test_no_name_argument(self):
449        self.requires_name_attribute()
450        with open(self.tarname, "rb") as fobj:
451            self.assertIsInstance(fobj.name, str)
452            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
453                self.assertIsInstance(tar.name, str)
454                self.assertEqual(tar.name, os.path.abspath(fobj.name))
455
456    def test_no_name_attribute(self):
457        with open(self.tarname, "rb") as fobj:
458            data = fobj.read()
459        fobj = io.BytesIO(data)
460        self.assertRaises(AttributeError, getattr, fobj, "name")
461        tar = tarfile.open(fileobj=fobj, mode=self.mode)
462        self.assertIsNone(tar.name)
463
464    def test_empty_name_attribute(self):
465        with open(self.tarname, "rb") as fobj:
466            data = fobj.read()
467        fobj = io.BytesIO(data)
468        fobj.name = ""
469        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
470            self.assertIsNone(tar.name)
471
472    def test_int_name_attribute(self):
473        # Issue 21044: tarfile.open() should handle fileobj with an integer
474        # 'name' attribute.
475        fd = os.open(self.tarname, os.O_RDONLY)
476        with open(fd, 'rb') as fobj:
477            self.assertIsInstance(fobj.name, int)
478            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
479                self.assertIsNone(tar.name)
480
481    def test_bytes_name_attribute(self):
482        self.requires_name_attribute()
483        tarname = os.fsencode(self.tarname)
484        with open(tarname, 'rb') as fobj:
485            self.assertIsInstance(fobj.name, bytes)
486            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
487                self.assertIsInstance(tar.name, bytes)
488                self.assertEqual(tar.name, os.path.abspath(fobj.name))
489
490    def test_pathlike_name(self):
491        tarname = pathlib.Path(self.tarname)
492        with tarfile.open(tarname, mode=self.mode) as tar:
493            self.assertIsInstance(tar.name, str)
494            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
495        with self.taropen(tarname) as tar:
496            self.assertIsInstance(tar.name, str)
497            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
498        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
499            self.assertIsInstance(tar.name, str)
500            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
501        if self.suffix == '':
502            with tarfile.TarFile(tarname, mode='r') as tar:
503                self.assertIsInstance(tar.name, str)
504                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
505
506    def test_illegal_mode_arg(self):
507        with open(tmpname, 'wb'):
508            pass
509        with self.assertRaisesRegex(ValueError, 'mode must be '):
510            tar = self.taropen(tmpname, 'q')
511        with self.assertRaisesRegex(ValueError, 'mode must be '):
512            tar = self.taropen(tmpname, 'rw')
513        with self.assertRaisesRegex(ValueError, 'mode must be '):
514            tar = self.taropen(tmpname, '')
515
516    def test_fileobj_with_offset(self):
517        # Skip the first member and store values from the second member
518        # of the testtar.
519        tar = tarfile.open(self.tarname, mode=self.mode)
520        try:
521            tar.next()
522            t = tar.next()
523            name = t.name
524            offset = t.offset
525            with tar.extractfile(t) as f:
526                data = f.read()
527        finally:
528            tar.close()
529
530        # Open the testtar and seek to the offset of the second member.
531        with self.open(self.tarname) as fobj:
532            fobj.seek(offset)
533
534            # Test if the tarfile starts with the second member.
535            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
536                t = tar.next()
537                self.assertEqual(t.name, name)
538                # Read to the end of fileobj and test if seeking back to the
539                # beginning works.
540                tar.getmembers()
541                self.assertEqual(tar.extractfile(t).read(), data,
542                        "seek back did not work")
543
544    def test_fail_comp(self):
545        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
546        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
547        with open(tarname, "rb") as fobj:
548            self.assertRaises(tarfile.ReadError, tarfile.open,
549                              fileobj=fobj, mode=self.mode)
550
551    def test_v7_dirtype(self):
552        # Test old style dirtype member (bug #1336623):
553        # Old V7 tars create directory members using an AREGTYPE
554        # header with a "/" appended to the filename field.
555        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
556        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
557                "v7 dirtype failed")
558
559    def test_xstar_type(self):
560        # The xstar format stores extra atime and ctime fields inside the
561        # space reserved for the prefix field. The prefix field must be
562        # ignored in this case, otherwise it will mess up the name.
563        try:
564            self.tar.getmember("misc/regtype-xstar")
565        except KeyError:
566            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
567
568    def test_check_members(self):
569        for tarinfo in self.tar:
570            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
571                    "wrong mtime for %s" % tarinfo.name)
572            if not tarinfo.name.startswith("ustar/"):
573                continue
574            self.assertEqual(tarinfo.uname, "tarfile",
575                    "wrong uname for %s" % tarinfo.name)
576
577    def test_find_members(self):
578        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
579                "could not find all members")
580
581    @unittest.skipUnless(hasattr(os, "link"),
582                         "Missing hardlink implementation")
583    @os_helper.skip_unless_symlink
584    def test_extract_hardlink(self):
585        # Test hardlink extraction (e.g. bug #857297).
586        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
587            tar.extract("ustar/regtype", TEMPDIR)
588            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
589
590            tar.extract("ustar/lnktype", TEMPDIR)
591            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
592            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
593                data = f.read()
594            self.assertEqual(sha256sum(data), sha256_regtype)
595
596            tar.extract("ustar/symtype", TEMPDIR)
597            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
598            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
599                data = f.read()
600            self.assertEqual(sha256sum(data), sha256_regtype)
601
602    def test_extractall(self):
603        # Test if extractall() correctly restores directory permissions
604        # and times (see issue1735).
605        tar = tarfile.open(tarname, encoding="iso8859-1")
606        DIR = os.path.join(TEMPDIR, "extractall")
607        os.mkdir(DIR)
608        try:
609            directories = [t for t in tar if t.isdir()]
610            tar.extractall(DIR, directories)
611            for tarinfo in directories:
612                path = os.path.join(DIR, tarinfo.name)
613                if sys.platform != "win32":
614                    # Win32 has no support for fine grained permissions.
615                    self.assertEqual(tarinfo.mode & 0o777,
616                                     os.stat(path).st_mode & 0o777)
617                def format_mtime(mtime):
618                    if isinstance(mtime, float):
619                        return "{} ({})".format(mtime, mtime.hex())
620                    else:
621                        return "{!r} (int)".format(mtime)
622                file_mtime = os.path.getmtime(path)
623                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
624                    format_mtime(tarinfo.mtime),
625                    format_mtime(file_mtime),
626                    path)
627                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
628        finally:
629            tar.close()
630            os_helper.rmtree(DIR)
631
632    def test_extract_directory(self):
633        dirtype = "ustar/dirtype"
634        DIR = os.path.join(TEMPDIR, "extractdir")
635        os.mkdir(DIR)
636        try:
637            with tarfile.open(tarname, encoding="iso8859-1") as tar:
638                tarinfo = tar.getmember(dirtype)
639                tar.extract(tarinfo, path=DIR)
640                extracted = os.path.join(DIR, dirtype)
641                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
642                if sys.platform != "win32":
643                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
644        finally:
645            os_helper.rmtree(DIR)
646
647    def test_extractall_pathlike_name(self):
648        DIR = pathlib.Path(TEMPDIR) / "extractall"
649        with os_helper.temp_dir(DIR), \
650             tarfile.open(tarname, encoding="iso8859-1") as tar:
651            directories = [t for t in tar if t.isdir()]
652            tar.extractall(DIR, directories)
653            for tarinfo in directories:
654                path = DIR / tarinfo.name
655                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
656
657    def test_extract_pathlike_name(self):
658        dirtype = "ustar/dirtype"
659        DIR = pathlib.Path(TEMPDIR) / "extractall"
660        with os_helper.temp_dir(DIR), \
661             tarfile.open(tarname, encoding="iso8859-1") as tar:
662            tarinfo = tar.getmember(dirtype)
663            tar.extract(tarinfo, path=DIR)
664            extracted = DIR / dirtype
665            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
666
667    def test_init_close_fobj(self):
668        # Issue #7341: Close the internal file object in the TarFile
669        # constructor in case of an error. For the test we rely on
670        # the fact that opening an empty file raises a ReadError.
671        empty = os.path.join(TEMPDIR, "empty")
672        with open(empty, "wb") as fobj:
673            fobj.write(b"")
674
675        try:
676            tar = object.__new__(tarfile.TarFile)
677            try:
678                tar.__init__(empty)
679            except tarfile.ReadError:
680                self.assertTrue(tar.fileobj.closed)
681            else:
682                self.fail("ReadError not raised")
683        finally:
684            os_helper.unlink(empty)
685
686    def test_parallel_iteration(self):
687        # Issue #16601: Restarting iteration over tarfile continued
688        # from where it left off.
689        with tarfile.open(self.tarname) as tar:
690            for m1, m2 in zip(tar, tar):
691                self.assertEqual(m1.offset, m2.offset)
692                self.assertEqual(m1.get_info(), m2.get_info())
693
694    @unittest.skipIf(zlib is None, "requires zlib")
695    def test_zlib_error_does_not_leak(self):
696        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
697        # parsing certain types of invalid data
698        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
699            mock.side_effect = zlib.error
700            with self.assertRaises(tarfile.ReadError):
701                tarfile.open(self.tarname)
702
703
704class MiscReadTest(MiscReadTestBase, unittest.TestCase):
705    test_fail_comp = None
706
707class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
708    pass
709
710class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
711    def requires_name_attribute(self):
712        self.skipTest("BZ2File have no name attribute")
713
714class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
715    def requires_name_attribute(self):
716        self.skipTest("LZMAFile have no name attribute")
717
718
719class StreamReadTest(CommonReadTest, unittest.TestCase):
720
721    prefix="r|"
722
723    def test_read_through(self):
724        # Issue #11224: A poorly designed _FileInFile.read() method
725        # caused seeking errors with stream tar files.
726        for tarinfo in self.tar:
727            if not tarinfo.isreg():
728                continue
729            with self.tar.extractfile(tarinfo) as fobj:
730                while True:
731                    try:
732                        buf = fobj.read(512)
733                    except tarfile.StreamError:
734                        self.fail("simple read-through using "
735                                  "TarFile.extractfile() failed")
736                    if not buf:
737                        break
738
739    def test_fileobj_regular_file(self):
740        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
741        with self.tar.extractfile(tarinfo) as fobj:
742            data = fobj.read()
743        self.assertEqual(len(data), tarinfo.size,
744                "regular file extraction failed")
745        self.assertEqual(sha256sum(data), sha256_regtype,
746                "regular file extraction failed")
747
748    def test_provoke_stream_error(self):
749        tarinfos = self.tar.getmembers()
750        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
751            self.assertRaises(tarfile.StreamError, f.read)
752
753    def test_compare_members(self):
754        tar1 = tarfile.open(tarname, encoding="iso8859-1")
755        try:
756            tar2 = self.tar
757
758            while True:
759                t1 = tar1.next()
760                t2 = tar2.next()
761                if t1 is None:
762                    break
763                self.assertIsNotNone(t2, "stream.next() failed.")
764
765                if t2.islnk() or t2.issym():
766                    with self.assertRaises(tarfile.StreamError):
767                        tar2.extractfile(t2)
768                    continue
769
770                v1 = tar1.extractfile(t1)
771                v2 = tar2.extractfile(t2)
772                if v1 is None:
773                    continue
774                self.assertIsNotNone(v2, "stream.extractfile() failed")
775                self.assertEqual(v1.read(), v2.read(),
776                        "stream extraction failed")
777        finally:
778            tar1.close()
779
780class GzipStreamReadTest(GzipTest, StreamReadTest):
781    pass
782
783class Bz2StreamReadTest(Bz2Test, StreamReadTest):
784    pass
785
786class LzmaStreamReadTest(LzmaTest, StreamReadTest):
787    pass
788
789
790class DetectReadTest(TarTest, unittest.TestCase):
791    def _testfunc_file(self, name, mode):
792        try:
793            tar = tarfile.open(name, mode)
794        except tarfile.ReadError as e:
795            self.fail()
796        else:
797            tar.close()
798
799    def _testfunc_fileobj(self, name, mode):
800        try:
801            with open(name, "rb") as f:
802                tar = tarfile.open(name, mode, fileobj=f)
803        except tarfile.ReadError as e:
804            self.fail()
805        else:
806            tar.close()
807
808    def _test_modes(self, testfunc):
809        if self.suffix:
810            with self.assertRaises(tarfile.ReadError):
811                tarfile.open(tarname, mode="r:" + self.suffix)
812            with self.assertRaises(tarfile.ReadError):
813                tarfile.open(tarname, mode="r|" + self.suffix)
814            with self.assertRaises(tarfile.ReadError):
815                tarfile.open(self.tarname, mode="r:")
816            with self.assertRaises(tarfile.ReadError):
817                tarfile.open(self.tarname, mode="r|")
818        testfunc(self.tarname, "r")
819        testfunc(self.tarname, "r:" + self.suffix)
820        testfunc(self.tarname, "r:*")
821        testfunc(self.tarname, "r|" + self.suffix)
822        testfunc(self.tarname, "r|*")
823
824    def test_detect_file(self):
825        self._test_modes(self._testfunc_file)
826
827    def test_detect_fileobj(self):
828        self._test_modes(self._testfunc_fileobj)
829
830class GzipDetectReadTest(GzipTest, DetectReadTest):
831    pass
832
833class Bz2DetectReadTest(Bz2Test, DetectReadTest):
834    def test_detect_stream_bz2(self):
835        # Originally, tarfile's stream detection looked for the string
836        # "BZh91" at the start of the file. This is incorrect because
837        # the '9' represents the blocksize (900,000 bytes). If the file was
838        # compressed using another blocksize autodetection fails.
839        with open(tarname, "rb") as fobj:
840            data = fobj.read()
841
842        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
843        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
844            fobj.write(data)
845
846        self._testfunc_file(tmpname, "r|*")
847
848class LzmaDetectReadTest(LzmaTest, DetectReadTest):
849    pass
850
851
852class MemberReadTest(ReadTest, unittest.TestCase):
853
854    def _test_member(self, tarinfo, chksum=None, **kwargs):
855        if chksum is not None:
856            with self.tar.extractfile(tarinfo) as f:
857                self.assertEqual(sha256sum(f.read()), chksum,
858                        "wrong sha256sum for %s" % tarinfo.name)
859
860        kwargs["mtime"] = 0o7606136617
861        kwargs["uid"] = 1000
862        kwargs["gid"] = 100
863        if "old-v7" not in tarinfo.name:
864            # V7 tar can't handle alphabetic owners.
865            kwargs["uname"] = "tarfile"
866            kwargs["gname"] = "tarfile"
867        for k, v in kwargs.items():
868            self.assertEqual(getattr(tarinfo, k), v,
869                    "wrong value in %s field of %s" % (k, tarinfo.name))
870
871    def test_find_regtype(self):
872        tarinfo = self.tar.getmember("ustar/regtype")
873        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
874
875    def test_find_conttype(self):
876        tarinfo = self.tar.getmember("ustar/conttype")
877        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
878
879    def test_find_dirtype(self):
880        tarinfo = self.tar.getmember("ustar/dirtype")
881        self._test_member(tarinfo, size=0)
882
883    def test_find_dirtype_with_size(self):
884        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
885        self._test_member(tarinfo, size=255)
886
887    def test_find_lnktype(self):
888        tarinfo = self.tar.getmember("ustar/lnktype")
889        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
890
891    def test_find_symtype(self):
892        tarinfo = self.tar.getmember("ustar/symtype")
893        self._test_member(tarinfo, size=0, linkname="regtype")
894
895    def test_find_blktype(self):
896        tarinfo = self.tar.getmember("ustar/blktype")
897        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
898
899    def test_find_chrtype(self):
900        tarinfo = self.tar.getmember("ustar/chrtype")
901        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
902
903    def test_find_fifotype(self):
904        tarinfo = self.tar.getmember("ustar/fifotype")
905        self._test_member(tarinfo, size=0)
906
907    def test_find_sparse(self):
908        tarinfo = self.tar.getmember("ustar/sparse")
909        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
910
911    def test_find_gnusparse(self):
912        tarinfo = self.tar.getmember("gnu/sparse")
913        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
914
915    def test_find_gnusparse_00(self):
916        tarinfo = self.tar.getmember("gnu/sparse-0.0")
917        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
918
919    def test_find_gnusparse_01(self):
920        tarinfo = self.tar.getmember("gnu/sparse-0.1")
921        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
922
923    def test_find_gnusparse_10(self):
924        tarinfo = self.tar.getmember("gnu/sparse-1.0")
925        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
926
927    def test_find_umlauts(self):
928        tarinfo = self.tar.getmember("ustar/umlauts-"
929                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
930        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
931
932    def test_find_ustar_longname(self):
933        name = "ustar/" + "12345/" * 39 + "1234567/longname"
934        self.assertIn(name, self.tar.getnames())
935
936    def test_find_regtype_oldv7(self):
937        tarinfo = self.tar.getmember("misc/regtype-old-v7")
938        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
939
940    def test_find_pax_umlauts(self):
941        self.tar.close()
942        self.tar = tarfile.open(self.tarname, mode=self.mode,
943                                encoding="iso8859-1")
944        tarinfo = self.tar.getmember("pax/umlauts-"
945                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
946        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
947
948
949class LongnameTest:
950
951    def test_read_longname(self):
952        # Test reading of longname (bug #1471427).
953        longname = self.subdir + "/" + "123/" * 125 + "longname"
954        try:
955            tarinfo = self.tar.getmember(longname)
956        except KeyError:
957            self.fail("longname not found")
958        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
959                "read longname as dirtype")
960
961    def test_read_longlink(self):
962        longname = self.subdir + "/" + "123/" * 125 + "longname"
963        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
964        try:
965            tarinfo = self.tar.getmember(longlink)
966        except KeyError:
967            self.fail("longlink not found")
968        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
969
970    def test_truncated_longname(self):
971        longname = self.subdir + "/" + "123/" * 125 + "longname"
972        tarinfo = self.tar.getmember(longname)
973        offset = tarinfo.offset
974        self.tar.fileobj.seek(offset)
975        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
976        with self.assertRaises(tarfile.ReadError):
977            tarfile.open(name="foo.tar", fileobj=fobj)
978
979    def test_header_offset(self):
980        # Test if the start offset of the TarInfo object includes
981        # the preceding extended header.
982        longname = self.subdir + "/" + "123/" * 125 + "longname"
983        offset = self.tar.getmember(longname).offset
984        with open(tarname, "rb") as fobj:
985            fobj.seek(offset)
986            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
987                                              "iso8859-1", "strict")
988            self.assertEqual(tarinfo.type, self.longnametype)
989
990
991class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
992
993    subdir = "gnu"
994    longnametype = tarfile.GNUTYPE_LONGNAME
995
996    # Since 3.2 tarfile is supposed to accurately restore sparse members and
997    # produce files with holes. This is what we actually want to test here.
998    # Unfortunately, not all platforms/filesystems support sparse files, and
999    # even on platforms that do it is non-trivial to make reliable assertions
1000    # about holes in files. Therefore, we first do one basic test which works
1001    # an all platforms, and after that a test that will work only on
1002    # platforms/filesystems that prove to support sparse files.
1003    def _test_sparse_file(self, name):
1004        self.tar.extract(name, TEMPDIR)
1005        filename = os.path.join(TEMPDIR, name)
1006        with open(filename, "rb") as fobj:
1007            data = fobj.read()
1008        self.assertEqual(sha256sum(data), sha256_sparse,
1009                "wrong sha256sum for %s" % name)
1010
1011        if self._fs_supports_holes():
1012            s = os.stat(filename)
1013            self.assertLess(s.st_blocks * 512, s.st_size)
1014
1015    def test_sparse_file_old(self):
1016        self._test_sparse_file("gnu/sparse")
1017
1018    def test_sparse_file_00(self):
1019        self._test_sparse_file("gnu/sparse-0.0")
1020
1021    def test_sparse_file_01(self):
1022        self._test_sparse_file("gnu/sparse-0.1")
1023
1024    def test_sparse_file_10(self):
1025        self._test_sparse_file("gnu/sparse-1.0")
1026
1027    @staticmethod
1028    def _fs_supports_holes():
1029        # Return True if the platform knows the st_blocks stat attribute and
1030        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1031        # store holes of 4 KiB in files.
1032        #
1033        # The function returns False if page size is larger than 4 KiB.
1034        # For example, ppc64 uses pages of 64 KiB.
1035        if sys.platform.startswith("linux"):
1036            # Linux evidentially has 512 byte st_blocks units.
1037            name = os.path.join(TEMPDIR, "sparse-test")
1038            with open(name, "wb") as fobj:
1039                # Seek to "punch a hole" of 4 KiB
1040                fobj.seek(4096)
1041                fobj.write(b'x' * 4096)
1042                fobj.truncate()
1043            s = os.stat(name)
1044            os_helper.unlink(name)
1045            return (s.st_blocks * 512 < s.st_size)
1046        else:
1047            return False
1048
1049
1050class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1051
1052    subdir = "pax"
1053    longnametype = tarfile.XHDTYPE
1054
1055    def test_pax_global_headers(self):
1056        tar = tarfile.open(tarname, encoding="iso8859-1")
1057        try:
1058            tarinfo = tar.getmember("pax/regtype1")
1059            self.assertEqual(tarinfo.uname, "foo")
1060            self.assertEqual(tarinfo.gname, "bar")
1061            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1062                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1063
1064            tarinfo = tar.getmember("pax/regtype2")
1065            self.assertEqual(tarinfo.uname, "")
1066            self.assertEqual(tarinfo.gname, "bar")
1067            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1068                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1069
1070            tarinfo = tar.getmember("pax/regtype3")
1071            self.assertEqual(tarinfo.uname, "tarfile")
1072            self.assertEqual(tarinfo.gname, "tarfile")
1073            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1074                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1075        finally:
1076            tar.close()
1077
1078    def test_pax_number_fields(self):
1079        # All following number fields are read from the pax header.
1080        tar = tarfile.open(tarname, encoding="iso8859-1")
1081        try:
1082            tarinfo = tar.getmember("pax/regtype4")
1083            self.assertEqual(tarinfo.size, 7011)
1084            self.assertEqual(tarinfo.uid, 123)
1085            self.assertEqual(tarinfo.gid, 123)
1086            self.assertEqual(tarinfo.mtime, 1041808783.0)
1087            self.assertEqual(type(tarinfo.mtime), float)
1088            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1089            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1090        finally:
1091            tar.close()
1092
1093
1094class WriteTestBase(TarTest):
1095    # Put all write tests in here that are supposed to be tested
1096    # in all possible mode combinations.
1097
1098    def test_fileobj_no_close(self):
1099        fobj = io.BytesIO()
1100        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1101            tar.addfile(tarfile.TarInfo("foo"))
1102        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1103        # Issue #20238: Incomplete gzip output with mode="w:gz"
1104        data = fobj.getvalue()
1105        del tar
1106        support.gc_collect()
1107        self.assertFalse(fobj.closed)
1108        self.assertEqual(data, fobj.getvalue())
1109
1110    def test_eof_marker(self):
1111        # Make sure an end of archive marker is written (two zero blocks).
1112        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1113        # So, we create an archive that has exactly 10240 bytes without the
1114        # marker, and has 20480 bytes once the marker is written.
1115        with tarfile.open(tmpname, self.mode) as tar:
1116            t = tarfile.TarInfo("foo")
1117            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1118            tar.addfile(t, io.BytesIO(b"a" * t.size))
1119
1120        with self.open(tmpname, "rb") as fobj:
1121            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1122
1123
1124class WriteTest(WriteTestBase, unittest.TestCase):
1125
1126    prefix = "w:"
1127
1128    def test_100_char_name(self):
1129        # The name field in a tar header stores strings of at most 100 chars.
1130        # If a string is shorter than 100 chars it has to be padded with '\0',
1131        # which implies that a string of exactly 100 chars is stored without
1132        # a trailing '\0'.
1133        name = "0123456789" * 10
1134        tar = tarfile.open(tmpname, self.mode)
1135        try:
1136            t = tarfile.TarInfo(name)
1137            tar.addfile(t)
1138        finally:
1139            tar.close()
1140
1141        tar = tarfile.open(tmpname)
1142        try:
1143            self.assertEqual(tar.getnames()[0], name,
1144                    "failed to store 100 char filename")
1145        finally:
1146            tar.close()
1147
1148    def test_tar_size(self):
1149        # Test for bug #1013882.
1150        tar = tarfile.open(tmpname, self.mode)
1151        try:
1152            path = os.path.join(TEMPDIR, "file")
1153            with open(path, "wb") as fobj:
1154                fobj.write(b"aaa")
1155            tar.add(path)
1156        finally:
1157            tar.close()
1158        self.assertGreater(os.path.getsize(tmpname), 0,
1159                "tarfile is empty")
1160
1161    # The test_*_size tests test for bug #1167128.
1162    def test_file_size(self):
1163        tar = tarfile.open(tmpname, self.mode)
1164        try:
1165            path = os.path.join(TEMPDIR, "file")
1166            with open(path, "wb"):
1167                pass
1168            tarinfo = tar.gettarinfo(path)
1169            self.assertEqual(tarinfo.size, 0)
1170
1171            with open(path, "wb") as fobj:
1172                fobj.write(b"aaa")
1173            tarinfo = tar.gettarinfo(path)
1174            self.assertEqual(tarinfo.size, 3)
1175        finally:
1176            tar.close()
1177
1178    def test_directory_size(self):
1179        path = os.path.join(TEMPDIR, "directory")
1180        os.mkdir(path)
1181        try:
1182            tar = tarfile.open(tmpname, self.mode)
1183            try:
1184                tarinfo = tar.gettarinfo(path)
1185                self.assertEqual(tarinfo.size, 0)
1186            finally:
1187                tar.close()
1188        finally:
1189            os_helper.rmdir(path)
1190
1191    # mock the following:
1192    #  os.listdir: so we know that files are in the wrong order
1193    def test_ordered_recursion(self):
1194        path = os.path.join(TEMPDIR, "directory")
1195        os.mkdir(path)
1196        open(os.path.join(path, "1"), "a").close()
1197        open(os.path.join(path, "2"), "a").close()
1198        try:
1199            tar = tarfile.open(tmpname, self.mode)
1200            try:
1201                with unittest.mock.patch('os.listdir') as mock_listdir:
1202                    mock_listdir.return_value = ["2", "1"]
1203                    tar.add(path)
1204                paths = []
1205                for m in tar.getmembers():
1206                    paths.append(os.path.split(m.name)[-1])
1207                self.assertEqual(paths, ["directory", "1", "2"]);
1208            finally:
1209                tar.close()
1210        finally:
1211            os_helper.unlink(os.path.join(path, "1"))
1212            os_helper.unlink(os.path.join(path, "2"))
1213            os_helper.rmdir(path)
1214
1215    def test_gettarinfo_pathlike_name(self):
1216        with tarfile.open(tmpname, self.mode) as tar:
1217            path = pathlib.Path(TEMPDIR) / "file"
1218            with open(path, "wb") as fobj:
1219                fobj.write(b"aaa")
1220            tarinfo = tar.gettarinfo(path)
1221            tarinfo2 = tar.gettarinfo(os.fspath(path))
1222            self.assertIsInstance(tarinfo.name, str)
1223            self.assertEqual(tarinfo.name, tarinfo2.name)
1224            self.assertEqual(tarinfo.size, 3)
1225
1226    @unittest.skipUnless(hasattr(os, "link"),
1227                         "Missing hardlink implementation")
1228    def test_link_size(self):
1229        link = os.path.join(TEMPDIR, "link")
1230        target = os.path.join(TEMPDIR, "link_target")
1231        with open(target, "wb") as fobj:
1232            fobj.write(b"aaa")
1233        try:
1234            os.link(target, link)
1235        except PermissionError as e:
1236            self.skipTest('os.link(): %s' % e)
1237        try:
1238            tar = tarfile.open(tmpname, self.mode)
1239            try:
1240                # Record the link target in the inodes list.
1241                tar.gettarinfo(target)
1242                tarinfo = tar.gettarinfo(link)
1243                self.assertEqual(tarinfo.size, 0)
1244            finally:
1245                tar.close()
1246        finally:
1247            os_helper.unlink(target)
1248            os_helper.unlink(link)
1249
1250    @os_helper.skip_unless_symlink
1251    def test_symlink_size(self):
1252        path = os.path.join(TEMPDIR, "symlink")
1253        os.symlink("link_target", path)
1254        try:
1255            tar = tarfile.open(tmpname, self.mode)
1256            try:
1257                tarinfo = tar.gettarinfo(path)
1258                self.assertEqual(tarinfo.size, 0)
1259            finally:
1260                tar.close()
1261        finally:
1262            os_helper.unlink(path)
1263
1264    def test_add_self(self):
1265        # Test for #1257255.
1266        dstname = os.path.abspath(tmpname)
1267        tar = tarfile.open(tmpname, self.mode)
1268        try:
1269            self.assertEqual(tar.name, dstname,
1270                    "archive name must be absolute")
1271            tar.add(dstname)
1272            self.assertEqual(tar.getnames(), [],
1273                    "added the archive to itself")
1274
1275            with os_helper.change_cwd(TEMPDIR):
1276                tar.add(dstname)
1277            self.assertEqual(tar.getnames(), [],
1278                    "added the archive to itself")
1279        finally:
1280            tar.close()
1281
1282    def test_filter(self):
1283        tempdir = os.path.join(TEMPDIR, "filter")
1284        os.mkdir(tempdir)
1285        try:
1286            for name in ("foo", "bar", "baz"):
1287                name = os.path.join(tempdir, name)
1288                os_helper.create_empty_file(name)
1289
1290            def filter(tarinfo):
1291                if os.path.basename(tarinfo.name) == "bar":
1292                    return
1293                tarinfo.uid = 123
1294                tarinfo.uname = "foo"
1295                return tarinfo
1296
1297            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1298            try:
1299                tar.add(tempdir, arcname="empty_dir", filter=filter)
1300            finally:
1301                tar.close()
1302
1303            # Verify that filter is a keyword-only argument
1304            with self.assertRaises(TypeError):
1305                tar.add(tempdir, "empty_dir", True, None, filter)
1306
1307            tar = tarfile.open(tmpname, "r")
1308            try:
1309                for tarinfo in tar:
1310                    self.assertEqual(tarinfo.uid, 123)
1311                    self.assertEqual(tarinfo.uname, "foo")
1312                self.assertEqual(len(tar.getmembers()), 3)
1313            finally:
1314                tar.close()
1315        finally:
1316            os_helper.rmtree(tempdir)
1317
1318    # Guarantee that stored pathnames are not modified. Don't
1319    # remove ./ or ../ or double slashes. Still make absolute
1320    # pathnames relative.
1321    # For details see bug #6054.
1322    def _test_pathname(self, path, cmp_path=None, dir=False):
1323        # Create a tarfile with an empty member named path
1324        # and compare the stored name with the original.
1325        foo = os.path.join(TEMPDIR, "foo")
1326        if not dir:
1327            os_helper.create_empty_file(foo)
1328        else:
1329            os.mkdir(foo)
1330
1331        tar = tarfile.open(tmpname, self.mode)
1332        try:
1333            tar.add(foo, arcname=path)
1334        finally:
1335            tar.close()
1336
1337        tar = tarfile.open(tmpname, "r")
1338        try:
1339            t = tar.next()
1340        finally:
1341            tar.close()
1342
1343        if not dir:
1344            os_helper.unlink(foo)
1345        else:
1346            os_helper.rmdir(foo)
1347
1348        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1349
1350
1351    @os_helper.skip_unless_symlink
1352    def test_extractall_symlinks(self):
1353        # Test if extractall works properly when tarfile contains symlinks
1354        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1355        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1356        os.mkdir(tempdir)
1357        try:
1358            source_file = os.path.join(tempdir,'source')
1359            target_file = os.path.join(tempdir,'symlink')
1360            with open(source_file,'w') as f:
1361                f.write('something\n')
1362            os.symlink(source_file, target_file)
1363            with tarfile.open(temparchive, 'w') as tar:
1364                tar.add(source_file, arcname="source")
1365                tar.add(target_file, arcname="symlink")
1366            # Let's extract it to the location which contains the symlink
1367            with tarfile.open(temparchive, errorlevel=2) as tar:
1368                # this should not raise OSError: [Errno 17] File exists
1369                try:
1370                    tar.extractall(path=tempdir)
1371                except OSError:
1372                    self.fail("extractall failed with symlinked files")
1373        finally:
1374            os_helper.unlink(temparchive)
1375            os_helper.rmtree(tempdir)
1376
1377    def test_pathnames(self):
1378        self._test_pathname("foo")
1379        self._test_pathname(os.path.join("foo", ".", "bar"))
1380        self._test_pathname(os.path.join("foo", "..", "bar"))
1381        self._test_pathname(os.path.join(".", "foo"))
1382        self._test_pathname(os.path.join(".", "foo", "."))
1383        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1384        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1385        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1386        self._test_pathname(os.path.join("..", "foo"))
1387        self._test_pathname(os.path.join("..", "foo", ".."))
1388        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1389        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1390
1391        self._test_pathname("foo" + os.sep + os.sep + "bar")
1392        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1393
1394    def test_abs_pathnames(self):
1395        if sys.platform == "win32":
1396            self._test_pathname("C:\\foo", "foo")
1397        else:
1398            self._test_pathname("/foo", "foo")
1399            self._test_pathname("///foo", "foo")
1400
1401    def test_cwd(self):
1402        # Test adding the current working directory.
1403        with os_helper.change_cwd(TEMPDIR):
1404            tar = tarfile.open(tmpname, self.mode)
1405            try:
1406                tar.add(".")
1407            finally:
1408                tar.close()
1409
1410            tar = tarfile.open(tmpname, "r")
1411            try:
1412                for t in tar:
1413                    if t.name != ".":
1414                        self.assertTrue(t.name.startswith("./"), t.name)
1415            finally:
1416                tar.close()
1417
1418    def test_open_nonwritable_fileobj(self):
1419        for exctype in OSError, EOFError, RuntimeError:
1420            class BadFile(io.BytesIO):
1421                first = True
1422                def write(self, data):
1423                    if self.first:
1424                        self.first = False
1425                        raise exctype
1426
1427            f = BadFile()
1428            with self.assertRaises(exctype):
1429                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1430                                   format=tarfile.PAX_FORMAT,
1431                                   pax_headers={'non': 'empty'})
1432            self.assertFalse(f.closed)
1433
1434
1435class GzipWriteTest(GzipTest, WriteTest):
1436    pass
1437
1438
1439class Bz2WriteTest(Bz2Test, WriteTest):
1440    pass
1441
1442
1443class LzmaWriteTest(LzmaTest, WriteTest):
1444    pass
1445
1446
1447class StreamWriteTest(WriteTestBase, unittest.TestCase):
1448
1449    prefix = "w|"
1450    decompressor = None
1451
1452    def test_stream_padding(self):
1453        # Test for bug #1543303.
1454        tar = tarfile.open(tmpname, self.mode)
1455        tar.close()
1456        if self.decompressor:
1457            dec = self.decompressor()
1458            with open(tmpname, "rb") as fobj:
1459                data = fobj.read()
1460            data = dec.decompress(data)
1461            self.assertFalse(dec.unused_data, "found trailing data")
1462        else:
1463            with self.open(tmpname) as fobj:
1464                data = fobj.read()
1465        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1466                        "incorrect zero padding")
1467
1468    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1469                         "Missing umask implementation")
1470    def test_file_mode(self):
1471        # Test for issue #8464: Create files with correct
1472        # permissions.
1473        if os.path.exists(tmpname):
1474            os_helper.unlink(tmpname)
1475
1476        original_umask = os.umask(0o022)
1477        try:
1478            tar = tarfile.open(tmpname, self.mode)
1479            tar.close()
1480            mode = os.stat(tmpname).st_mode & 0o777
1481            self.assertEqual(mode, 0o644, "wrong file permissions")
1482        finally:
1483            os.umask(original_umask)
1484
1485
1486class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1487    def test_source_directory_not_leaked(self):
1488        """
1489        Ensure the source directory is not included in the tar header
1490        per bpo-41316.
1491        """
1492        tarfile.open(tmpname, self.mode).close()
1493        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1494        assert os.path.dirname(tmpname) not in payload
1495
1496
1497class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1498    decompressor = bz2.BZ2Decompressor if bz2 else None
1499
1500class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1501    decompressor = lzma.LZMADecompressor if lzma else None
1502
1503
1504class GNUWriteTest(unittest.TestCase):
1505    # This testcase checks for correct creation of GNU Longname
1506    # and Longlink extended headers (cp. bug #812325).
1507
1508    def _length(self, s):
1509        blocks = len(s) // 512 + 1
1510        return blocks * 512
1511
1512    def _calc_size(self, name, link=None):
1513        # Initial tar header
1514        count = 512
1515
1516        if len(name) > tarfile.LENGTH_NAME:
1517            # GNU longname extended header + longname
1518            count += 512
1519            count += self._length(name)
1520        if link is not None and len(link) > tarfile.LENGTH_LINK:
1521            # GNU longlink extended header + longlink
1522            count += 512
1523            count += self._length(link)
1524        return count
1525
1526    def _test(self, name, link=None):
1527        tarinfo = tarfile.TarInfo(name)
1528        if link:
1529            tarinfo.linkname = link
1530            tarinfo.type = tarfile.LNKTYPE
1531
1532        tar = tarfile.open(tmpname, "w")
1533        try:
1534            tar.format = tarfile.GNU_FORMAT
1535            tar.addfile(tarinfo)
1536
1537            v1 = self._calc_size(name, link)
1538            v2 = tar.offset
1539            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1540        finally:
1541            tar.close()
1542
1543        tar = tarfile.open(tmpname)
1544        try:
1545            member = tar.next()
1546            self.assertIsNotNone(member,
1547                    "unable to read longname member")
1548            self.assertEqual(tarinfo.name, member.name,
1549                    "unable to read longname member")
1550            self.assertEqual(tarinfo.linkname, member.linkname,
1551                    "unable to read longname member")
1552        finally:
1553            tar.close()
1554
1555    def test_longname_1023(self):
1556        self._test(("longnam/" * 127) + "longnam")
1557
1558    def test_longname_1024(self):
1559        self._test(("longnam/" * 127) + "longname")
1560
1561    def test_longname_1025(self):
1562        self._test(("longnam/" * 127) + "longname_")
1563
1564    def test_longlink_1023(self):
1565        self._test("name", ("longlnk/" * 127) + "longlnk")
1566
1567    def test_longlink_1024(self):
1568        self._test("name", ("longlnk/" * 127) + "longlink")
1569
1570    def test_longlink_1025(self):
1571        self._test("name", ("longlnk/" * 127) + "longlink_")
1572
1573    def test_longnamelink_1023(self):
1574        self._test(("longnam/" * 127) + "longnam",
1575                   ("longlnk/" * 127) + "longlnk")
1576
1577    def test_longnamelink_1024(self):
1578        self._test(("longnam/" * 127) + "longname",
1579                   ("longlnk/" * 127) + "longlink")
1580
1581    def test_longnamelink_1025(self):
1582        self._test(("longnam/" * 127) + "longname_",
1583                   ("longlnk/" * 127) + "longlink_")
1584
1585
1586class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1587
1588    prefix = "w:"
1589
1590    def test_headers_written_only_for_device_files(self):
1591        # Regression test for bpo-18819.
1592        tempdir = os.path.join(TEMPDIR, "device_header_test")
1593        os.mkdir(tempdir)
1594        try:
1595            tar = tarfile.open(tmpname, self.mode)
1596            try:
1597                input_blk = tarfile.TarInfo(name="my_block_device")
1598                input_reg = tarfile.TarInfo(name="my_regular_file")
1599                input_blk.type = tarfile.BLKTYPE
1600                input_reg.type = tarfile.REGTYPE
1601                tar.addfile(input_blk)
1602                tar.addfile(input_reg)
1603            finally:
1604                tar.close()
1605
1606            # devmajor and devminor should be *interpreted* as 0 in both...
1607            tar = tarfile.open(tmpname, "r")
1608            try:
1609                output_blk = tar.getmember("my_block_device")
1610                output_reg = tar.getmember("my_regular_file")
1611            finally:
1612                tar.close()
1613            self.assertEqual(output_blk.devmajor, 0)
1614            self.assertEqual(output_blk.devminor, 0)
1615            self.assertEqual(output_reg.devmajor, 0)
1616            self.assertEqual(output_reg.devminor, 0)
1617
1618            # ...but the fields should not actually be set on regular files:
1619            with open(tmpname, "rb") as infile:
1620                buf = infile.read()
1621            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1622            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1623            # See `struct posixheader` in GNU docs for byte offsets:
1624            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1625            device_headers = slice(329, 329 + 16)
1626            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1627            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1628        finally:
1629            os_helper.rmtree(tempdir)
1630
1631
1632class CreateTest(WriteTestBase, unittest.TestCase):
1633
1634    prefix = "x:"
1635
1636    file_path = os.path.join(TEMPDIR, "spameggs42")
1637
1638    def setUp(self):
1639        os_helper.unlink(tmpname)
1640
1641    @classmethod
1642    def setUpClass(cls):
1643        with open(cls.file_path, "wb") as fobj:
1644            fobj.write(b"aaa")
1645
1646    @classmethod
1647    def tearDownClass(cls):
1648        os_helper.unlink(cls.file_path)
1649
1650    def test_create(self):
1651        with tarfile.open(tmpname, self.mode) as tobj:
1652            tobj.add(self.file_path)
1653
1654        with self.taropen(tmpname) as tobj:
1655            names = tobj.getnames()
1656        self.assertEqual(len(names), 1)
1657        self.assertIn('spameggs42', names[0])
1658
1659    def test_create_existing(self):
1660        with tarfile.open(tmpname, self.mode) as tobj:
1661            tobj.add(self.file_path)
1662
1663        with self.assertRaises(FileExistsError):
1664            tobj = tarfile.open(tmpname, self.mode)
1665
1666        with self.taropen(tmpname) as tobj:
1667            names = tobj.getnames()
1668        self.assertEqual(len(names), 1)
1669        self.assertIn('spameggs42', names[0])
1670
1671    def test_create_taropen(self):
1672        with self.taropen(tmpname, "x") as tobj:
1673            tobj.add(self.file_path)
1674
1675        with self.taropen(tmpname) as tobj:
1676            names = tobj.getnames()
1677        self.assertEqual(len(names), 1)
1678        self.assertIn('spameggs42', names[0])
1679
1680    def test_create_existing_taropen(self):
1681        with self.taropen(tmpname, "x") as tobj:
1682            tobj.add(self.file_path)
1683
1684        with self.assertRaises(FileExistsError):
1685            with self.taropen(tmpname, "x"):
1686                pass
1687
1688        with self.taropen(tmpname) as tobj:
1689            names = tobj.getnames()
1690        self.assertEqual(len(names), 1)
1691        self.assertIn("spameggs42", names[0])
1692
1693    def test_create_pathlike_name(self):
1694        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1695            self.assertIsInstance(tobj.name, str)
1696            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1697            tobj.add(pathlib.Path(self.file_path))
1698            names = tobj.getnames()
1699        self.assertEqual(len(names), 1)
1700        self.assertIn('spameggs42', names[0])
1701
1702        with self.taropen(tmpname) as tobj:
1703            names = tobj.getnames()
1704        self.assertEqual(len(names), 1)
1705        self.assertIn('spameggs42', names[0])
1706
1707    def test_create_taropen_pathlike_name(self):
1708        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1709            self.assertIsInstance(tobj.name, str)
1710            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1711            tobj.add(pathlib.Path(self.file_path))
1712            names = tobj.getnames()
1713        self.assertEqual(len(names), 1)
1714        self.assertIn('spameggs42', names[0])
1715
1716        with self.taropen(tmpname) as tobj:
1717            names = tobj.getnames()
1718        self.assertEqual(len(names), 1)
1719        self.assertIn('spameggs42', names[0])
1720
1721
1722class GzipCreateTest(GzipTest, CreateTest):
1723
1724    def test_create_with_compresslevel(self):
1725        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1726            tobj.add(self.file_path)
1727        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
1728            pass
1729
1730
1731class Bz2CreateTest(Bz2Test, CreateTest):
1732
1733    def test_create_with_compresslevel(self):
1734        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1735            tobj.add(self.file_path)
1736        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
1737            pass
1738
1739
1740class LzmaCreateTest(LzmaTest, CreateTest):
1741
1742    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
1743    # It does not allow for preset to be specified when reading.
1744    def test_create_with_preset(self):
1745        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
1746            tobj.add(self.file_path)
1747
1748
1749class CreateWithXModeTest(CreateTest):
1750
1751    prefix = "x"
1752
1753    test_create_taropen = None
1754    test_create_existing_taropen = None
1755
1756
1757@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1758class HardlinkTest(unittest.TestCase):
1759    # Test the creation of LNKTYPE (hardlink) members in an archive.
1760
1761    def setUp(self):
1762        self.foo = os.path.join(TEMPDIR, "foo")
1763        self.bar = os.path.join(TEMPDIR, "bar")
1764
1765        with open(self.foo, "wb") as fobj:
1766            fobj.write(b"foo")
1767
1768        try:
1769            os.link(self.foo, self.bar)
1770        except PermissionError as e:
1771            self.skipTest('os.link(): %s' % e)
1772
1773        self.tar = tarfile.open(tmpname, "w")
1774        self.tar.add(self.foo)
1775
1776    def tearDown(self):
1777        self.tar.close()
1778        os_helper.unlink(self.foo)
1779        os_helper.unlink(self.bar)
1780
1781    def test_add_twice(self):
1782        # The same name will be added as a REGTYPE every
1783        # time regardless of st_nlink.
1784        tarinfo = self.tar.gettarinfo(self.foo)
1785        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1786                "add file as regular failed")
1787
1788    def test_add_hardlink(self):
1789        tarinfo = self.tar.gettarinfo(self.bar)
1790        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1791                "add file as hardlink failed")
1792
1793    def test_dereference_hardlink(self):
1794        self.tar.dereference = True
1795        tarinfo = self.tar.gettarinfo(self.bar)
1796        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1797                "dereferencing hardlink failed")
1798
1799
1800class PaxWriteTest(GNUWriteTest):
1801
1802    def _test(self, name, link=None):
1803        # See GNUWriteTest.
1804        tarinfo = tarfile.TarInfo(name)
1805        if link:
1806            tarinfo.linkname = link
1807            tarinfo.type = tarfile.LNKTYPE
1808
1809        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1810        try:
1811            tar.addfile(tarinfo)
1812        finally:
1813            tar.close()
1814
1815        tar = tarfile.open(tmpname)
1816        try:
1817            if link:
1818                l = tar.getmembers()[0].linkname
1819                self.assertEqual(link, l, "PAX longlink creation failed")
1820            else:
1821                n = tar.getmembers()[0].name
1822                self.assertEqual(name, n, "PAX longname creation failed")
1823        finally:
1824            tar.close()
1825
1826    def test_pax_global_header(self):
1827        pax_headers = {
1828                "foo": "bar",
1829                "uid": "0",
1830                "mtime": "1.23",
1831                "test": "\xe4\xf6\xfc",
1832                "\xe4\xf6\xfc": "test"}
1833
1834        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1835                pax_headers=pax_headers)
1836        try:
1837            tar.addfile(tarfile.TarInfo("test"))
1838        finally:
1839            tar.close()
1840
1841        # Test if the global header was written correctly.
1842        tar = tarfile.open(tmpname, encoding="iso8859-1")
1843        try:
1844            self.assertEqual(tar.pax_headers, pax_headers)
1845            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1846            # Test if all the fields are strings.
1847            for key, val in tar.pax_headers.items():
1848                self.assertIsNot(type(key), bytes)
1849                self.assertIsNot(type(val), bytes)
1850                if key in tarfile.PAX_NUMBER_FIELDS:
1851                    try:
1852                        tarfile.PAX_NUMBER_FIELDS[key](val)
1853                    except (TypeError, ValueError):
1854                        self.fail("unable to convert pax header field")
1855        finally:
1856            tar.close()
1857
1858    def test_pax_extended_header(self):
1859        # The fields from the pax header have priority over the
1860        # TarInfo.
1861        pax_headers = {"path": "foo", "uid": "123"}
1862
1863        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1864                           encoding="iso8859-1")
1865        try:
1866            t = tarfile.TarInfo()
1867            t.name = "\xe4\xf6\xfc" # non-ASCII
1868            t.uid = 8**8 # too large
1869            t.pax_headers = pax_headers
1870            tar.addfile(t)
1871        finally:
1872            tar.close()
1873
1874        tar = tarfile.open(tmpname, encoding="iso8859-1")
1875        try:
1876            t = tar.getmembers()[0]
1877            self.assertEqual(t.pax_headers, pax_headers)
1878            self.assertEqual(t.name, "foo")
1879            self.assertEqual(t.uid, 123)
1880        finally:
1881            tar.close()
1882
1883
1884class UnicodeTest:
1885
1886    def test_iso8859_1_filename(self):
1887        self._test_unicode_filename("iso8859-1")
1888
1889    def test_utf7_filename(self):
1890        self._test_unicode_filename("utf7")
1891
1892    def test_utf8_filename(self):
1893        self._test_unicode_filename("utf-8")
1894
1895    def _test_unicode_filename(self, encoding):
1896        tar = tarfile.open(tmpname, "w", format=self.format,
1897                           encoding=encoding, errors="strict")
1898        try:
1899            name = "\xe4\xf6\xfc"
1900            tar.addfile(tarfile.TarInfo(name))
1901        finally:
1902            tar.close()
1903
1904        tar = tarfile.open(tmpname, encoding=encoding)
1905        try:
1906            self.assertEqual(tar.getmembers()[0].name, name)
1907        finally:
1908            tar.close()
1909
1910    def test_unicode_filename_error(self):
1911        tar = tarfile.open(tmpname, "w", format=self.format,
1912                           encoding="ascii", errors="strict")
1913        try:
1914            tarinfo = tarfile.TarInfo()
1915
1916            tarinfo.name = "\xe4\xf6\xfc"
1917            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1918
1919            tarinfo.name = "foo"
1920            tarinfo.uname = "\xe4\xf6\xfc"
1921            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1922        finally:
1923            tar.close()
1924
1925    def test_unicode_argument(self):
1926        tar = tarfile.open(tarname, "r",
1927                           encoding="iso8859-1", errors="strict")
1928        try:
1929            for t in tar:
1930                self.assertIs(type(t.name), str)
1931                self.assertIs(type(t.linkname), str)
1932                self.assertIs(type(t.uname), str)
1933                self.assertIs(type(t.gname), str)
1934        finally:
1935            tar.close()
1936
1937    def test_uname_unicode(self):
1938        t = tarfile.TarInfo("foo")
1939        t.uname = "\xe4\xf6\xfc"
1940        t.gname = "\xe4\xf6\xfc"
1941
1942        tar = tarfile.open(tmpname, mode="w", format=self.format,
1943                           encoding="iso8859-1")
1944        try:
1945            tar.addfile(t)
1946        finally:
1947            tar.close()
1948
1949        tar = tarfile.open(tmpname, encoding="iso8859-1")
1950        try:
1951            t = tar.getmember("foo")
1952            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1953            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1954
1955            if self.format != tarfile.PAX_FORMAT:
1956                tar.close()
1957                tar = tarfile.open(tmpname, encoding="ascii")
1958                t = tar.getmember("foo")
1959                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1960                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1961        finally:
1962            tar.close()
1963
1964
1965class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1966
1967    format = tarfile.USTAR_FORMAT
1968
1969    # Test whether the utf-8 encoded version of a filename exceeds the 100
1970    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1971    # bytes).
1972    def test_unicode_name1(self):
1973        self._test_ustar_name("0123456789" * 10)
1974        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1975        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1976        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1977
1978    def test_unicode_name2(self):
1979        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1980        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1981
1982    # Test whether the utf-8 encoded version of a filename exceeds the 155
1983    # bytes prefix + '/' + 100 bytes name limit.
1984    def test_unicode_longname1(self):
1985        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1986        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1987        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1988        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1989
1990    def test_unicode_longname2(self):
1991        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1992        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1993
1994    def test_unicode_longname3(self):
1995        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1996        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1997        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1998
1999    def test_unicode_longname4(self):
2000        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2001        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2002
2003    def _test_ustar_name(self, name, exc=None):
2004        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2005            t = tarfile.TarInfo(name)
2006            if exc is None:
2007                tar.addfile(t)
2008            else:
2009                self.assertRaises(exc, tar.addfile, t)
2010
2011        if exc is None:
2012            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2013                for t in tar:
2014                    self.assertEqual(name, t.name)
2015                    break
2016
2017    # Test the same as above for the 100 bytes link field.
2018    def test_unicode_link1(self):
2019        self._test_ustar_link("0123456789" * 10)
2020        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2021        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2022        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2023
2024    def test_unicode_link2(self):
2025        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2026        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2027
2028    def _test_ustar_link(self, name, exc=None):
2029        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2030            t = tarfile.TarInfo("foo")
2031            t.linkname = name
2032            if exc is None:
2033                tar.addfile(t)
2034            else:
2035                self.assertRaises(exc, tar.addfile, t)
2036
2037        if exc is None:
2038            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2039                for t in tar:
2040                    self.assertEqual(name, t.linkname)
2041                    break
2042
2043
2044class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2045
2046    format = tarfile.GNU_FORMAT
2047
2048    def test_bad_pax_header(self):
2049        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2050        # without a hdrcharset=BINARY header.
2051        for encoding, name in (
2052                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2053                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2054            with tarfile.open(tarname, encoding=encoding,
2055                              errors="surrogateescape") as tar:
2056                try:
2057                    t = tar.getmember(name)
2058                except KeyError:
2059                    self.fail("unable to read bad GNU tar pax header")
2060
2061
2062class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2063
2064    format = tarfile.PAX_FORMAT
2065
2066    # PAX_FORMAT ignores encoding in write mode.
2067    test_unicode_filename_error = None
2068
2069    def test_binary_header(self):
2070        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2071        for encoding, name in (
2072                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2073                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2074            with tarfile.open(tarname, encoding=encoding,
2075                              errors="surrogateescape") as tar:
2076                try:
2077                    t = tar.getmember(name)
2078                except KeyError:
2079                    self.fail("unable to read POSIX.1-2008 binary header")
2080
2081
2082class AppendTestBase:
2083    # Test append mode (cp. patch #1652681).
2084
2085    def setUp(self):
2086        self.tarname = tmpname
2087        if os.path.exists(self.tarname):
2088            os_helper.unlink(self.tarname)
2089
2090    def _create_testtar(self, mode="w:"):
2091        with tarfile.open(tarname, encoding="iso8859-1") as src:
2092            t = src.getmember("ustar/regtype")
2093            t.name = "foo"
2094            with src.extractfile(t) as f:
2095                with tarfile.open(self.tarname, mode) as tar:
2096                    tar.addfile(t, f)
2097
2098    def test_append_compressed(self):
2099        self._create_testtar("w:" + self.suffix)
2100        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2101
2102class AppendTest(AppendTestBase, unittest.TestCase):
2103    test_append_compressed = None
2104
2105    def _add_testfile(self, fileobj=None):
2106        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2107            tar.addfile(tarfile.TarInfo("bar"))
2108
2109    def _test(self, names=["bar"], fileobj=None):
2110        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2111            self.assertEqual(tar.getnames(), names)
2112
2113    def test_non_existing(self):
2114        self._add_testfile()
2115        self._test()
2116
2117    def test_empty(self):
2118        tarfile.open(self.tarname, "w:").close()
2119        self._add_testfile()
2120        self._test()
2121
2122    def test_empty_fileobj(self):
2123        fobj = io.BytesIO(b"\0" * 1024)
2124        self._add_testfile(fobj)
2125        fobj.seek(0)
2126        self._test(fileobj=fobj)
2127
2128    def test_fileobj(self):
2129        self._create_testtar()
2130        with open(self.tarname, "rb") as fobj:
2131            data = fobj.read()
2132        fobj = io.BytesIO(data)
2133        self._add_testfile(fobj)
2134        fobj.seek(0)
2135        self._test(names=["foo", "bar"], fileobj=fobj)
2136
2137    def test_existing(self):
2138        self._create_testtar()
2139        self._add_testfile()
2140        self._test(names=["foo", "bar"])
2141
2142    # Append mode is supposed to fail if the tarfile to append to
2143    # does not end with a zero block.
2144    def _test_error(self, data):
2145        with open(self.tarname, "wb") as fobj:
2146            fobj.write(data)
2147        self.assertRaises(tarfile.ReadError, self._add_testfile)
2148
2149    def test_null(self):
2150        self._test_error(b"")
2151
2152    def test_incomplete(self):
2153        self._test_error(b"\0" * 13)
2154
2155    def test_premature_eof(self):
2156        data = tarfile.TarInfo("foo").tobuf()
2157        self._test_error(data)
2158
2159    def test_trailing_garbage(self):
2160        data = tarfile.TarInfo("foo").tobuf()
2161        self._test_error(data + b"\0" * 13)
2162
2163    def test_invalid(self):
2164        self._test_error(b"a" * 512)
2165
2166class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2167    pass
2168
2169class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2170    pass
2171
2172class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2173    pass
2174
2175
2176class LimitsTest(unittest.TestCase):
2177
2178    def test_ustar_limits(self):
2179        # 100 char name
2180        tarinfo = tarfile.TarInfo("0123456789" * 10)
2181        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2182
2183        # 101 char name that cannot be stored
2184        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2185        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2186
2187        # 256 char name with a slash at pos 156
2188        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2189        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2190
2191        # 256 char name that cannot be stored
2192        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2193        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2194
2195        # 512 char name
2196        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2197        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2198
2199        # 512 char linkname
2200        tarinfo = tarfile.TarInfo("longlink")
2201        tarinfo.linkname = "123/" * 126 + "longname"
2202        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2203
2204        # uid > 8 digits
2205        tarinfo = tarfile.TarInfo("name")
2206        tarinfo.uid = 0o10000000
2207        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2208
2209    def test_gnu_limits(self):
2210        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2211        tarinfo.tobuf(tarfile.GNU_FORMAT)
2212
2213        tarinfo = tarfile.TarInfo("longlink")
2214        tarinfo.linkname = "123/" * 126 + "longname"
2215        tarinfo.tobuf(tarfile.GNU_FORMAT)
2216
2217        # uid >= 256 ** 7
2218        tarinfo = tarfile.TarInfo("name")
2219        tarinfo.uid = 0o4000000000000000000
2220        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2221
2222    def test_pax_limits(self):
2223        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2224        tarinfo.tobuf(tarfile.PAX_FORMAT)
2225
2226        tarinfo = tarfile.TarInfo("longlink")
2227        tarinfo.linkname = "123/" * 126 + "longname"
2228        tarinfo.tobuf(tarfile.PAX_FORMAT)
2229
2230        tarinfo = tarfile.TarInfo("name")
2231        tarinfo.uid = 0o4000000000000000000
2232        tarinfo.tobuf(tarfile.PAX_FORMAT)
2233
2234
2235class MiscTest(unittest.TestCase):
2236
2237    def test_char_fields(self):
2238        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2239                         b"foo\0\0\0\0\0")
2240        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2241                         b"foo")
2242        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2243                         "foo")
2244        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2245                         "foo")
2246
2247    def test_read_number_fields(self):
2248        # Issue 13158: Test if GNU tar specific base-256 number fields
2249        # are decoded correctly.
2250        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2251        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2252        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2253                         0o10000000)
2254        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2255                         0xffffffff)
2256        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2257                         -1)
2258        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2259                         -100)
2260        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2261                         -0x100000000000000)
2262
2263        # Issue 24514: Test if empty number fields are converted to zero.
2264        self.assertEqual(tarfile.nti(b"\0"), 0)
2265        self.assertEqual(tarfile.nti(b"       \0"), 0)
2266
2267    def test_write_number_fields(self):
2268        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2269        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2270        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2271                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2272        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2273                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2274        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2275                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2276        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2277                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2278        self.assertEqual(tarfile.itn(-0x100000000000000,
2279                                     format=tarfile.GNU_FORMAT),
2280                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2281
2282        # Issue 32713: Test if itn() supports float values outside the
2283        # non-GNU format range
2284        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2285                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2286        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2287                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2288        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2289
2290    def test_number_field_limits(self):
2291        with self.assertRaises(ValueError):
2292            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2293        with self.assertRaises(ValueError):
2294            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2295        with self.assertRaises(ValueError):
2296            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2297        with self.assertRaises(ValueError):
2298            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2299
2300    def test__all__(self):
2301        not_exported = {
2302            'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE',
2303            'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME',
2304            'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2305            'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE',
2306            'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE',
2307            'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES',
2308            'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS',
2309            'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums',
2310            'copyfileobj', 'filemode', 'EmptyHeaderError',
2311            'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError',
2312            'SubsequentHeaderError', 'ExFileObject', 'main'}
2313        support.check__all__(self, tarfile, not_exported=not_exported)
2314
2315    def test_useful_error_message_when_modules_missing(self):
2316        fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz')
2317        with self.assertRaises(tarfile.ReadError) as excinfo:
2318            error = tarfile.CompressionError('lzma module is not available'),
2319            with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error):
2320                tarfile.open(fname)
2321
2322        self.assertIn(
2323            "\n- method xz: CompressionError('lzma module is not available')\n",
2324            str(excinfo.exception),
2325        )
2326
2327
2328class CommandLineTest(unittest.TestCase):
2329
2330    def tarfilecmd(self, *args, **kwargs):
2331        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2332                                                      **kwargs)
2333        return out.replace(os.linesep.encode(), b'\n')
2334
2335    def tarfilecmd_failure(self, *args):
2336        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2337
2338    def make_simple_tarfile(self, tar_name):
2339        files = [support.findfile('tokenize_tests.txt'),
2340                 support.findfile('tokenize_tests-no-coding-cookie-'
2341                                  'and-utf8-bom-sig-only.txt')]
2342        self.addCleanup(os_helper.unlink, tar_name)
2343        with tarfile.open(tar_name, 'w') as tf:
2344            for tardata in files:
2345                tf.add(tardata, arcname=os.path.basename(tardata))
2346
2347    def test_bad_use(self):
2348        rc, out, err = self.tarfilecmd_failure()
2349        self.assertEqual(out, b'')
2350        self.assertIn(b'usage', err.lower())
2351        self.assertIn(b'error', err.lower())
2352        self.assertIn(b'required', err.lower())
2353        rc, out, err = self.tarfilecmd_failure('-l', '')
2354        self.assertEqual(out, b'')
2355        self.assertNotEqual(err.strip(), b'')
2356
2357    def test_test_command(self):
2358        for tar_name in testtarnames:
2359            for opt in '-t', '--test':
2360                out = self.tarfilecmd(opt, tar_name)
2361                self.assertEqual(out, b'')
2362
2363    def test_test_command_verbose(self):
2364        for tar_name in testtarnames:
2365            for opt in '-v', '--verbose':
2366                out = self.tarfilecmd(opt, '-t', tar_name,
2367                                      PYTHONIOENCODING='utf-8')
2368                self.assertIn(b'is a tar archive.\n', out)
2369
2370    def test_test_command_invalid_file(self):
2371        zipname = support.findfile('zipdir.zip')
2372        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2373        self.assertIn(b' is not a tar archive.', err)
2374        self.assertEqual(out, b'')
2375        self.assertEqual(rc, 1)
2376
2377        for tar_name in testtarnames:
2378            with self.subTest(tar_name=tar_name):
2379                with open(tar_name, 'rb') as f:
2380                    data = f.read()
2381                try:
2382                    with open(tmpname, 'wb') as f:
2383                        f.write(data[:511])
2384                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2385                    self.assertEqual(out, b'')
2386                    self.assertEqual(rc, 1)
2387                finally:
2388                    os_helper.unlink(tmpname)
2389
2390    def test_list_command(self):
2391        for tar_name in testtarnames:
2392            with support.captured_stdout() as t:
2393                with tarfile.open(tar_name, 'r') as tf:
2394                    tf.list(verbose=False)
2395            expected = t.getvalue().encode('ascii', 'backslashreplace')
2396            for opt in '-l', '--list':
2397                out = self.tarfilecmd(opt, tar_name,
2398                                      PYTHONIOENCODING='ascii')
2399                self.assertEqual(out, expected)
2400
2401    def test_list_command_verbose(self):
2402        for tar_name in testtarnames:
2403            with support.captured_stdout() as t:
2404                with tarfile.open(tar_name, 'r') as tf:
2405                    tf.list(verbose=True)
2406            expected = t.getvalue().encode('ascii', 'backslashreplace')
2407            for opt in '-v', '--verbose':
2408                out = self.tarfilecmd(opt, '-l', tar_name,
2409                                      PYTHONIOENCODING='ascii')
2410                self.assertEqual(out, expected)
2411
2412    def test_list_command_invalid_file(self):
2413        zipname = support.findfile('zipdir.zip')
2414        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2415        self.assertIn(b' is not a tar archive.', err)
2416        self.assertEqual(out, b'')
2417        self.assertEqual(rc, 1)
2418
2419    def test_create_command(self):
2420        files = [support.findfile('tokenize_tests.txt'),
2421                 support.findfile('tokenize_tests-no-coding-cookie-'
2422                                  'and-utf8-bom-sig-only.txt')]
2423        for opt in '-c', '--create':
2424            try:
2425                out = self.tarfilecmd(opt, tmpname, *files)
2426                self.assertEqual(out, b'')
2427                with tarfile.open(tmpname) as tar:
2428                    tar.getmembers()
2429            finally:
2430                os_helper.unlink(tmpname)
2431
2432    def test_create_command_verbose(self):
2433        files = [support.findfile('tokenize_tests.txt'),
2434                 support.findfile('tokenize_tests-no-coding-cookie-'
2435                                  'and-utf8-bom-sig-only.txt')]
2436        for opt in '-v', '--verbose':
2437            try:
2438                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2439                                      PYTHONIOENCODING='utf-8')
2440                self.assertIn(b' file created.', out)
2441                with tarfile.open(tmpname) as tar:
2442                    tar.getmembers()
2443            finally:
2444                os_helper.unlink(tmpname)
2445
2446    def test_create_command_dotless_filename(self):
2447        files = [support.findfile('tokenize_tests.txt')]
2448        try:
2449            out = self.tarfilecmd('-c', dotlessname, *files)
2450            self.assertEqual(out, b'')
2451            with tarfile.open(dotlessname) as tar:
2452                tar.getmembers()
2453        finally:
2454            os_helper.unlink(dotlessname)
2455
2456    def test_create_command_dot_started_filename(self):
2457        tar_name = os.path.join(TEMPDIR, ".testtar")
2458        files = [support.findfile('tokenize_tests.txt')]
2459        try:
2460            out = self.tarfilecmd('-c', tar_name, *files)
2461            self.assertEqual(out, b'')
2462            with tarfile.open(tar_name) as tar:
2463                tar.getmembers()
2464        finally:
2465            os_helper.unlink(tar_name)
2466
2467    def test_create_command_compressed(self):
2468        files = [support.findfile('tokenize_tests.txt'),
2469                 support.findfile('tokenize_tests-no-coding-cookie-'
2470                                  'and-utf8-bom-sig-only.txt')]
2471        for filetype in (GzipTest, Bz2Test, LzmaTest):
2472            if not filetype.open:
2473                continue
2474            try:
2475                tar_name = tmpname + '.' + filetype.suffix
2476                out = self.tarfilecmd('-c', tar_name, *files)
2477                with filetype.taropen(tar_name) as tar:
2478                    tar.getmembers()
2479            finally:
2480                os_helper.unlink(tar_name)
2481
2482    def test_extract_command(self):
2483        self.make_simple_tarfile(tmpname)
2484        for opt in '-e', '--extract':
2485            try:
2486                with os_helper.temp_cwd(tarextdir):
2487                    out = self.tarfilecmd(opt, tmpname)
2488                self.assertEqual(out, b'')
2489            finally:
2490                os_helper.rmtree(tarextdir)
2491
2492    def test_extract_command_verbose(self):
2493        self.make_simple_tarfile(tmpname)
2494        for opt in '-v', '--verbose':
2495            try:
2496                with os_helper.temp_cwd(tarextdir):
2497                    out = self.tarfilecmd(opt, '-e', tmpname,
2498                                          PYTHONIOENCODING='utf-8')
2499                self.assertIn(b' file is extracted.', out)
2500            finally:
2501                os_helper.rmtree(tarextdir)
2502
2503    def test_extract_command_different_directory(self):
2504        self.make_simple_tarfile(tmpname)
2505        try:
2506            with os_helper.temp_cwd(tarextdir):
2507                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2508            self.assertEqual(out, b'')
2509        finally:
2510            os_helper.rmtree(tarextdir)
2511
2512    def test_extract_command_invalid_file(self):
2513        zipname = support.findfile('zipdir.zip')
2514        with os_helper.temp_cwd(tarextdir):
2515            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2516        self.assertIn(b' is not a tar archive.', err)
2517        self.assertEqual(out, b'')
2518        self.assertEqual(rc, 1)
2519
2520
2521class ContextManagerTest(unittest.TestCase):
2522
2523    def test_basic(self):
2524        with tarfile.open(tarname) as tar:
2525            self.assertFalse(tar.closed, "closed inside runtime context")
2526        self.assertTrue(tar.closed, "context manager failed")
2527
2528    def test_closed(self):
2529        # The __enter__() method is supposed to raise OSError
2530        # if the TarFile object is already closed.
2531        tar = tarfile.open(tarname)
2532        tar.close()
2533        with self.assertRaises(OSError):
2534            with tar:
2535                pass
2536
2537    def test_exception(self):
2538        # Test if the OSError exception is passed through properly.
2539        with self.assertRaises(Exception) as exc:
2540            with tarfile.open(tarname) as tar:
2541                raise OSError
2542        self.assertIsInstance(exc.exception, OSError,
2543                              "wrong exception raised in context manager")
2544        self.assertTrue(tar.closed, "context manager failed")
2545
2546    def test_no_eof(self):
2547        # __exit__() must not write end-of-archive blocks if an
2548        # exception was raised.
2549        try:
2550            with tarfile.open(tmpname, "w") as tar:
2551                raise Exception
2552        except:
2553            pass
2554        self.assertEqual(os.path.getsize(tmpname), 0,
2555                "context manager wrote an end-of-archive block")
2556        self.assertTrue(tar.closed, "context manager failed")
2557
2558    def test_eof(self):
2559        # __exit__() must write end-of-archive blocks, i.e. call
2560        # TarFile.close() if there was no error.
2561        with tarfile.open(tmpname, "w"):
2562            pass
2563        self.assertNotEqual(os.path.getsize(tmpname), 0,
2564                "context manager wrote no end-of-archive block")
2565
2566    def test_fileobj(self):
2567        # Test that __exit__() did not close the external file
2568        # object.
2569        with open(tmpname, "wb") as fobj:
2570            try:
2571                with tarfile.open(fileobj=fobj, mode="w") as tar:
2572                    raise Exception
2573            except:
2574                pass
2575            self.assertFalse(fobj.closed, "external file object was closed")
2576            self.assertTrue(tar.closed, "context manager failed")
2577
2578
2579@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2580class LinkEmulationTest(ReadTest, unittest.TestCase):
2581
2582    # Test for issue #8741 regression. On platforms that do not support
2583    # symbolic or hard links tarfile tries to extract these types of members
2584    # as the regular files they point to.
2585    def _test_link_extraction(self, name):
2586        self.tar.extract(name, TEMPDIR)
2587        with open(os.path.join(TEMPDIR, name), "rb") as f:
2588            data = f.read()
2589        self.assertEqual(sha256sum(data), sha256_regtype)
2590
2591    # See issues #1578269, #8879, and #17689 for some history on these skips
2592    @unittest.skipIf(hasattr(os.path, "islink"),
2593                     "Skip emulation - has os.path.islink but not os.link")
2594    def test_hardlink_extraction1(self):
2595        self._test_link_extraction("ustar/lnktype")
2596
2597    @unittest.skipIf(hasattr(os.path, "islink"),
2598                     "Skip emulation - has os.path.islink but not os.link")
2599    def test_hardlink_extraction2(self):
2600        self._test_link_extraction("./ustar/linktest2/lnktype")
2601
2602    @unittest.skipIf(hasattr(os, "symlink"),
2603                     "Skip emulation if symlink exists")
2604    def test_symlink_extraction1(self):
2605        self._test_link_extraction("ustar/symtype")
2606
2607    @unittest.skipIf(hasattr(os, "symlink"),
2608                     "Skip emulation if symlink exists")
2609    def test_symlink_extraction2(self):
2610        self._test_link_extraction("./ustar/linktest2/symtype")
2611
2612
2613class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2614    # Issue5068: The _BZ2Proxy.read() method loops forever
2615    # on an empty or partial bzipped file.
2616
2617    def _test_partial_input(self, mode):
2618        class MyBytesIO(io.BytesIO):
2619            hit_eof = False
2620            def read(self, n):
2621                if self.hit_eof:
2622                    raise AssertionError("infinite loop detected in "
2623                                         "tarfile.open()")
2624                self.hit_eof = self.tell() == len(self.getvalue())
2625                return super(MyBytesIO, self).read(n)
2626            def seek(self, *args):
2627                self.hit_eof = False
2628                return super(MyBytesIO, self).seek(*args)
2629
2630        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2631        for x in range(len(data) + 1):
2632            try:
2633                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2634            except tarfile.ReadError:
2635                pass # we have no interest in ReadErrors
2636
2637    def test_partial_input(self):
2638        self._test_partial_input("r")
2639
2640    def test_partial_input_bz2(self):
2641        self._test_partial_input("r:bz2")
2642
2643
2644def root_is_uid_gid_0():
2645    try:
2646        import pwd, grp
2647    except ImportError:
2648        return False
2649    if pwd.getpwuid(0)[0] != 'root':
2650        return False
2651    if grp.getgrgid(0)[0] != 'root':
2652        return False
2653    return True
2654
2655
2656@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2657@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2658class NumericOwnerTest(unittest.TestCase):
2659    # mock the following:
2660    #  os.chown: so we can test what's being called
2661    #  os.chmod: so the modes are not actually changed. if they are, we can't
2662    #             delete the files/directories
2663    #  os.geteuid: so we can lie and say we're root (uid = 0)
2664
2665    @staticmethod
2666    def _make_test_archive(filename_1, dirname_1, filename_2):
2667        # the file contents to write
2668        fobj = io.BytesIO(b"content")
2669
2670        # create a tar file with a file, a directory, and a file within that
2671        #  directory. Assign various .uid/.gid values to them
2672        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2673                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2674                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2675                 ]
2676        with tarfile.open(tmpname, 'w') as tarfl:
2677            for name, uid, gid, typ, contents in items:
2678                t = tarfile.TarInfo(name)
2679                t.uid = uid
2680                t.gid = gid
2681                t.uname = 'root'
2682                t.gname = 'root'
2683                t.type = typ
2684                tarfl.addfile(t, contents)
2685
2686        # return the full pathname to the tar file
2687        return tmpname
2688
2689    @staticmethod
2690    @contextmanager
2691    def _setup_test(mock_geteuid):
2692        mock_geteuid.return_value = 0  # lie and say we're root
2693        fname = 'numeric-owner-testfile'
2694        dirname = 'dir'
2695
2696        # the names we want stored in the tarfile
2697        filename_1 = fname
2698        dirname_1 = dirname
2699        filename_2 = os.path.join(dirname, fname)
2700
2701        # create the tarfile with the contents we're after
2702        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2703                                                           dirname_1,
2704                                                           filename_2)
2705
2706        # open the tarfile for reading. yield it and the names of the items
2707        #  we stored into the file
2708        with tarfile.open(tar_filename) as tarfl:
2709            yield tarfl, filename_1, dirname_1, filename_2
2710
2711    @unittest.mock.patch('os.chown')
2712    @unittest.mock.patch('os.chmod')
2713    @unittest.mock.patch('os.geteuid')
2714    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2715                                        mock_chown):
2716        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2717                                                filename_2):
2718            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2719            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2720
2721        # convert to filesystem paths
2722        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2723        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2724
2725        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2726                                     unittest.mock.call(f_filename_2, 88, 87),
2727                                     ],
2728                                    any_order=True)
2729
2730    @unittest.mock.patch('os.chown')
2731    @unittest.mock.patch('os.chmod')
2732    @unittest.mock.patch('os.geteuid')
2733    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2734                                           mock_chown):
2735        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2736                                                filename_2):
2737            tarfl.extractall(TEMPDIR, numeric_owner=True)
2738
2739        # convert to filesystem paths
2740        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2741        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2742        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2743
2744        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2745                                     unittest.mock.call(f_dirname_1, 77, 76),
2746                                     unittest.mock.call(f_filename_2, 88, 87),
2747                                     ],
2748                                    any_order=True)
2749
2750    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2751    #  because the uname and gname in the test file are 'root', and extract()
2752    #  will look them up using pwd and grp to find their uid and gid, which we
2753    #  test here to be 0.
2754    @unittest.skipUnless(root_is_uid_gid_0(),
2755                         'uid=0,gid=0 must be named "root"')
2756    @unittest.mock.patch('os.chown')
2757    @unittest.mock.patch('os.chmod')
2758    @unittest.mock.patch('os.geteuid')
2759    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2760                                           mock_chown):
2761        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2762            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2763
2764        # convert to filesystem paths
2765        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2766
2767        mock_chown.assert_called_with(f_filename_1, 0, 0)
2768
2769    @unittest.mock.patch('os.geteuid')
2770    def test_keyword_only(self, mock_geteuid):
2771        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2772            self.assertRaises(TypeError,
2773                              tarfl.extract, filename_1, TEMPDIR, False, True)
2774
2775
2776def setUpModule():
2777    os_helper.unlink(TEMPDIR)
2778    os.makedirs(TEMPDIR)
2779
2780    global testtarnames
2781    testtarnames = [tarname]
2782    with open(tarname, "rb") as fobj:
2783        data = fobj.read()
2784
2785    # Create compressed tarfiles.
2786    for c in GzipTest, Bz2Test, LzmaTest:
2787        if c.open:
2788            os_helper.unlink(c.tarname)
2789            testtarnames.append(c.tarname)
2790            with c.open(c.tarname, "wb") as tar:
2791                tar.write(data)
2792
2793def tearDownModule():
2794    if os.path.exists(TEMPDIR):
2795        os_helper.rmtree(TEMPDIR)
2796
2797if __name__ == "__main__":
2798    unittest.main()
2799