1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper, requires_hashdigest
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def sha256sum(data):
31    return sha256(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42sha256_regtype = (
43    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
44)
45sha256_sparse = (
46    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
47)
48
49
50class TarTest:
51    tarname = tarname
52    suffix = ''
53    open = io.FileIO
54    taropen = tarfile.TarFile.taropen
55
56    @property
57    def mode(self):
58        return self.prefix + self.suffix
59
60@support.requires_gzip
61class GzipTest:
62    tarname = gzipname
63    suffix = 'gz'
64    open = gzip.GzipFile if gzip else None
65    taropen = tarfile.TarFile.gzopen
66
67@support.requires_bz2
68class Bz2Test:
69    tarname = bz2name
70    suffix = 'bz2'
71    open = bz2.BZ2File if bz2 else None
72    taropen = tarfile.TarFile.bz2open
73
74@support.requires_lzma
75class LzmaTest:
76    tarname = xzname
77    suffix = 'xz'
78    open = lzma.LZMAFile if lzma else None
79    taropen = tarfile.TarFile.xzopen
80
81
82class ReadTest(TarTest):
83
84    prefix = "r:"
85
86    def setUp(self):
87        self.tar = tarfile.open(self.tarname, mode=self.mode,
88                                encoding="iso8859-1")
89
90    def tearDown(self):
91        self.tar.close()
92
93
94class UstarReadTest(ReadTest, unittest.TestCase):
95
96    def test_fileobj_regular_file(self):
97        tarinfo = self.tar.getmember("ustar/regtype")
98        with self.tar.extractfile(tarinfo) as fobj:
99            data = fobj.read()
100            self.assertEqual(len(data), tarinfo.size,
101                    "regular file extraction failed")
102            self.assertEqual(sha256sum(data), sha256_regtype,
103                    "regular file extraction failed")
104
105    def test_fileobj_readlines(self):
106        self.tar.extract("ustar/regtype", TEMPDIR)
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
109            lines1 = fobj1.readlines()
110
111        with self.tar.extractfile(tarinfo) as fobj:
112            fobj2 = io.TextIOWrapper(fobj)
113            lines2 = fobj2.readlines()
114            self.assertEqual(lines1, lines2,
115                    "fileobj.readlines() failed")
116            self.assertEqual(len(lines2), 114,
117                    "fileobj.readlines() failed")
118            self.assertEqual(lines2[83],
119                    "I will gladly admit that Python is not the fastest "
120                    "running scripting language.\n",
121                    "fileobj.readlines() failed")
122
123    def test_fileobj_iter(self):
124        self.tar.extract("ustar/regtype", TEMPDIR)
125        tarinfo = self.tar.getmember("ustar/regtype")
126        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
127            lines1 = fobj1.readlines()
128        with self.tar.extractfile(tarinfo) as fobj2:
129            lines2 = list(io.TextIOWrapper(fobj2))
130            self.assertEqual(lines1, lines2,
131                    "fileobj.__iter__() failed")
132
133    def test_fileobj_seek(self):
134        self.tar.extract("ustar/regtype", TEMPDIR)
135        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
136            data = fobj.read()
137
138        tarinfo = self.tar.getmember("ustar/regtype")
139        with self.tar.extractfile(tarinfo) as fobj:
140            text = fobj.read()
141            fobj.seek(0)
142            self.assertEqual(0, fobj.tell(),
143                         "seek() to file's start failed")
144            fobj.seek(2048, 0)
145            self.assertEqual(2048, fobj.tell(),
146                         "seek() to absolute position failed")
147            fobj.seek(-1024, 1)
148            self.assertEqual(1024, fobj.tell(),
149                         "seek() to negative relative position failed")
150            fobj.seek(1024, 1)
151            self.assertEqual(2048, fobj.tell(),
152                         "seek() to positive relative position failed")
153            s = fobj.read(10)
154            self.assertEqual(s, data[2048:2058],
155                         "read() after seek failed")
156            fobj.seek(0, 2)
157            self.assertEqual(tarinfo.size, fobj.tell(),
158                         "seek() to file's end failed")
159            self.assertEqual(fobj.read(), b"",
160                         "read() at file's end did not return empty string")
161            fobj.seek(-tarinfo.size, 2)
162            self.assertEqual(0, fobj.tell(),
163                         "relative seek() to file's end failed")
164            fobj.seek(512)
165            s1 = fobj.readlines()
166            fobj.seek(512)
167            s2 = fobj.readlines()
168            self.assertEqual(s1, s2,
169                         "readlines() after seek failed")
170            fobj.seek(0)
171            self.assertEqual(len(fobj.readline()), fobj.tell(),
172                         "tell() after readline() failed")
173            fobj.seek(512)
174            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
175                         "tell() after seek() and readline() failed")
176            fobj.seek(0)
177            line = fobj.readline()
178            self.assertEqual(fobj.read(), data[len(line):],
179                         "read() after readline() failed")
180
181    def test_fileobj_text(self):
182        with self.tar.extractfile("ustar/regtype") as fobj:
183            fobj = io.TextIOWrapper(fobj)
184            data = fobj.read().encode("iso8859-1")
185            self.assertEqual(sha256sum(data), sha256_regtype)
186            try:
187                fobj.seek(100)
188            except AttributeError:
189                # Issue #13815: seek() complained about a missing
190                # flush() method.
191                self.fail("seeking failed in text mode")
192
193    # Test if symbolic and hard links are resolved by extractfile().  The
194    # test link members each point to a regular member whose data is
195    # supposed to be exported.
196    def _test_fileobj_link(self, lnktype, regtype):
197        with self.tar.extractfile(lnktype) as a, \
198             self.tar.extractfile(regtype) as b:
199            self.assertEqual(a.name, b.name)
200
201    def test_fileobj_link1(self):
202        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
203
204    def test_fileobj_link2(self):
205        self._test_fileobj_link("./ustar/linktest2/lnktype",
206                                "ustar/linktest1/regtype")
207
208    def test_fileobj_symlink1(self):
209        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
210
211    def test_fileobj_symlink2(self):
212        self._test_fileobj_link("./ustar/linktest2/symtype",
213                                "ustar/linktest1/regtype")
214
215    def test_issue14160(self):
216        self._test_fileobj_link("symtype2", "ustar/regtype")
217
218class GzipUstarReadTest(GzipTest, UstarReadTest):
219    pass
220
221class Bz2UstarReadTest(Bz2Test, UstarReadTest):
222    pass
223
224class LzmaUstarReadTest(LzmaTest, UstarReadTest):
225    pass
226
227
228class ListTest(ReadTest, unittest.TestCase):
229
230    # Override setUp to use default encoding (UTF-8)
231    def setUp(self):
232        self.tar = tarfile.open(self.tarname, mode=self.mode)
233
234    def test_list(self):
235        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
236        with support.swap_attr(sys, 'stdout', tio):
237            self.tar.list(verbose=False)
238        out = tio.detach().getvalue()
239        self.assertIn(b'ustar/conttype', out)
240        self.assertIn(b'ustar/regtype', out)
241        self.assertIn(b'ustar/lnktype', out)
242        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
243        self.assertIn(b'./ustar/linktest2/symtype', out)
244        self.assertIn(b'./ustar/linktest2/lnktype', out)
245        # Make sure it puts trailing slash for directory
246        self.assertIn(b'ustar/dirtype/', out)
247        self.assertIn(b'ustar/dirtype-with-size/', out)
248        # Make sure it is able to print unencodable characters
249        def conv(b):
250            s = b.decode(self.tar.encoding, 'surrogateescape')
251            return s.encode('ascii', 'backslashreplace')
252        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
256                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
257        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
258        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
259        # Make sure it prints files separated by one newline without any
260        # 'ls -l'-like accessories if verbose flag is not being used
261        # ...
262        # ustar/conttype
263        # ustar/regtype
264        # ...
265        self.assertRegex(out, br'ustar/conttype ?\r?\n'
266                              br'ustar/regtype ?\r?\n')
267        # Make sure it does not print the source of link without verbose flag
268        self.assertNotIn(b'link to', out)
269        self.assertNotIn(b'->', out)
270
271    def test_list_verbose(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=True)
275        out = tio.detach().getvalue()
276        # Make sure it prints files separated by one newline with 'ls -l'-like
277        # accessories if verbose flag is being used
278        # ...
279        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
280        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
281        # ...
282        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
283                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
284                               br'ustar/\w+type ?\r?\n') * 2)
285        # Make sure it prints the source of link with verbose flag
286        self.assertIn(b'ustar/symtype -> regtype', out)
287        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
288        self.assertIn(b'./ustar/linktest2/lnktype link to '
289                      b'./ustar/linktest1/regtype', out)
290        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
291                      (b'/123' * 125) + b'/longname', out)
292        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
293                      (b'/123' * 125) + b'/longname', out)
294
295    def test_list_members(self):
296        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
297        def members(tar):
298            for tarinfo in tar.getmembers():
299                if 'reg' in tarinfo.name:
300                    yield tarinfo
301        with support.swap_attr(sys, 'stdout', tio):
302            self.tar.list(verbose=False, members=members(self.tar))
303        out = tio.detach().getvalue()
304        self.assertIn(b'ustar/regtype', out)
305        self.assertNotIn(b'ustar/conttype', out)
306
307
308class GzipListTest(GzipTest, ListTest):
309    pass
310
311
312class Bz2ListTest(Bz2Test, ListTest):
313    pass
314
315
316class LzmaListTest(LzmaTest, ListTest):
317    pass
318
319
320class CommonReadTest(ReadTest):
321
322    def test_empty_tarfile(self):
323        # Test for issue6123: Allow opening empty archives.
324        # This test checks if tarfile.open() is able to open an empty tar
325        # archive successfully. Note that an empty tar archive is not the
326        # same as an empty file!
327        with tarfile.open(tmpname, self.mode.replace("r", "w")):
328            pass
329        try:
330            tar = tarfile.open(tmpname, self.mode)
331            tar.getnames()
332        except tarfile.ReadError:
333            self.fail("tarfile.open() failed on empty archive")
334        else:
335            self.assertListEqual(tar.getmembers(), [])
336        finally:
337            tar.close()
338
339    def test_non_existent_tarfile(self):
340        # Test for issue11513: prevent non-existent gzipped tarfiles raising
341        # multiple exceptions.
342        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
343            tarfile.open("xxx", self.mode)
344
345    def test_null_tarfile(self):
346        # Test for issue6123: Allow opening empty archives.
347        # This test guarantees that tarfile.open() does not treat an empty
348        # file as an empty tar archive.
349        with open(tmpname, "wb"):
350            pass
351        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
352        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
353
354    def test_ignore_zeros(self):
355        # Test TarFile's ignore_zeros option.
356        # generate 512 pseudorandom bytes
357        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
358        for char in (b'\0', b'a'):
359            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
360            # are ignored correctly.
361            with self.open(tmpname, "w") as fobj:
362                fobj.write(char * 1024)
363                tarinfo = tarfile.TarInfo("foo")
364                tarinfo.size = len(data)
365                fobj.write(tarinfo.tobuf())
366                fobj.write(data)
367
368            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
369            try:
370                self.assertListEqual(tar.getnames(), ["foo"],
371                    "ignore_zeros=True should have skipped the %r-blocks" %
372                    char)
373            finally:
374                tar.close()
375
376    def test_premature_end_of_archive(self):
377        for size in (512, 600, 1024, 1200):
378            with tarfile.open(tmpname, "w:") as tar:
379                t = tarfile.TarInfo("foo")
380                t.size = 1024
381                tar.addfile(t, io.BytesIO(b"a" * 1024))
382
383            with open(tmpname, "r+b") as fobj:
384                fobj.truncate(size)
385
386            with tarfile.open(tmpname) as tar:
387                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
388                    for t in tar:
389                        pass
390
391            with tarfile.open(tmpname) as tar:
392                t = tar.next()
393
394                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
395                    tar.extract(t, TEMPDIR)
396
397                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
398                    tar.extractfile(t).read()
399
400    def test_length_zero_header(self):
401        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
402        # with an exception
403        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
404            with tarfile.open(support.findfile('recursion.tar')) as tar:
405                pass
406
407class MiscReadTestBase(CommonReadTest):
408    def requires_name_attribute(self):
409        pass
410
411    def test_no_name_argument(self):
412        self.requires_name_attribute()
413        with open(self.tarname, "rb") as fobj:
414            self.assertIsInstance(fobj.name, str)
415            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
416                self.assertIsInstance(tar.name, str)
417                self.assertEqual(tar.name, os.path.abspath(fobj.name))
418
419    def test_no_name_attribute(self):
420        with open(self.tarname, "rb") as fobj:
421            data = fobj.read()
422        fobj = io.BytesIO(data)
423        self.assertRaises(AttributeError, getattr, fobj, "name")
424        tar = tarfile.open(fileobj=fobj, mode=self.mode)
425        self.assertIsNone(tar.name)
426
427    def test_empty_name_attribute(self):
428        with open(self.tarname, "rb") as fobj:
429            data = fobj.read()
430        fobj = io.BytesIO(data)
431        fobj.name = ""
432        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
433            self.assertIsNone(tar.name)
434
435    def test_int_name_attribute(self):
436        # Issue 21044: tarfile.open() should handle fileobj with an integer
437        # 'name' attribute.
438        fd = os.open(self.tarname, os.O_RDONLY)
439        with open(fd, 'rb') as fobj:
440            self.assertIsInstance(fobj.name, int)
441            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
442                self.assertIsNone(tar.name)
443
444    def test_bytes_name_attribute(self):
445        self.requires_name_attribute()
446        tarname = os.fsencode(self.tarname)
447        with open(tarname, 'rb') as fobj:
448            self.assertIsInstance(fobj.name, bytes)
449            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
450                self.assertIsInstance(tar.name, bytes)
451                self.assertEqual(tar.name, os.path.abspath(fobj.name))
452
453    def test_pathlike_name(self):
454        tarname = pathlib.Path(self.tarname)
455        with tarfile.open(tarname, mode=self.mode) as tar:
456            self.assertIsInstance(tar.name, str)
457            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
458        with self.taropen(tarname) as tar:
459            self.assertIsInstance(tar.name, str)
460            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
461        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
462            self.assertIsInstance(tar.name, str)
463            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
464        if self.suffix == '':
465            with tarfile.TarFile(tarname, mode='r') as tar:
466                self.assertIsInstance(tar.name, str)
467                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
468
469    def test_illegal_mode_arg(self):
470        with open(tmpname, 'wb'):
471            pass
472        with self.assertRaisesRegex(ValueError, 'mode must be '):
473            tar = self.taropen(tmpname, 'q')
474        with self.assertRaisesRegex(ValueError, 'mode must be '):
475            tar = self.taropen(tmpname, 'rw')
476        with self.assertRaisesRegex(ValueError, 'mode must be '):
477            tar = self.taropen(tmpname, '')
478
479    def test_fileobj_with_offset(self):
480        # Skip the first member and store values from the second member
481        # of the testtar.
482        tar = tarfile.open(self.tarname, mode=self.mode)
483        try:
484            tar.next()
485            t = tar.next()
486            name = t.name
487            offset = t.offset
488            with tar.extractfile(t) as f:
489                data = f.read()
490        finally:
491            tar.close()
492
493        # Open the testtar and seek to the offset of the second member.
494        with self.open(self.tarname) as fobj:
495            fobj.seek(offset)
496
497            # Test if the tarfile starts with the second member.
498            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
499                t = tar.next()
500                self.assertEqual(t.name, name)
501                # Read to the end of fileobj and test if seeking back to the
502                # beginning works.
503                tar.getmembers()
504                self.assertEqual(tar.extractfile(t).read(), data,
505                        "seek back did not work")
506
507    def test_fail_comp(self):
508        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
509        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
510        with open(tarname, "rb") as fobj:
511            self.assertRaises(tarfile.ReadError, tarfile.open,
512                              fileobj=fobj, mode=self.mode)
513
514    def test_v7_dirtype(self):
515        # Test old style dirtype member (bug #1336623):
516        # Old V7 tars create directory members using an AREGTYPE
517        # header with a "/" appended to the filename field.
518        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
519        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
520                "v7 dirtype failed")
521
522    def test_xstar_type(self):
523        # The xstar format stores extra atime and ctime fields inside the
524        # space reserved for the prefix field. The prefix field must be
525        # ignored in this case, otherwise it will mess up the name.
526        try:
527            self.tar.getmember("misc/regtype-xstar")
528        except KeyError:
529            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
530
531    def test_check_members(self):
532        for tarinfo in self.tar:
533            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
534                    "wrong mtime for %s" % tarinfo.name)
535            if not tarinfo.name.startswith("ustar/"):
536                continue
537            self.assertEqual(tarinfo.uname, "tarfile",
538                    "wrong uname for %s" % tarinfo.name)
539
540    def test_find_members(self):
541        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
542                "could not find all members")
543
544    @unittest.skipUnless(hasattr(os, "link"),
545                         "Missing hardlink implementation")
546    @support.skip_unless_symlink
547    def test_extract_hardlink(self):
548        # Test hardlink extraction (e.g. bug #857297).
549        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
550            tar.extract("ustar/regtype", TEMPDIR)
551            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
552
553            tar.extract("ustar/lnktype", TEMPDIR)
554            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
555            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
556                data = f.read()
557            self.assertEqual(sha256sum(data), sha256_regtype)
558
559            tar.extract("ustar/symtype", TEMPDIR)
560            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
561            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
562                data = f.read()
563            self.assertEqual(sha256sum(data), sha256_regtype)
564
565    def test_extractall(self):
566        # Test if extractall() correctly restores directory permissions
567        # and times (see issue1735).
568        tar = tarfile.open(tarname, encoding="iso8859-1")
569        DIR = os.path.join(TEMPDIR, "extractall")
570        os.mkdir(DIR)
571        try:
572            directories = [t for t in tar if t.isdir()]
573            tar.extractall(DIR, directories)
574            for tarinfo in directories:
575                path = os.path.join(DIR, tarinfo.name)
576                if sys.platform != "win32":
577                    # Win32 has no support for fine grained permissions.
578                    self.assertEqual(tarinfo.mode & 0o777,
579                                     os.stat(path).st_mode & 0o777)
580                def format_mtime(mtime):
581                    if isinstance(mtime, float):
582                        return "{} ({})".format(mtime, mtime.hex())
583                    else:
584                        return "{!r} (int)".format(mtime)
585                file_mtime = os.path.getmtime(path)
586                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
587                    format_mtime(tarinfo.mtime),
588                    format_mtime(file_mtime),
589                    path)
590                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
591        finally:
592            tar.close()
593            support.rmtree(DIR)
594
595    def test_extract_directory(self):
596        dirtype = "ustar/dirtype"
597        DIR = os.path.join(TEMPDIR, "extractdir")
598        os.mkdir(DIR)
599        try:
600            with tarfile.open(tarname, encoding="iso8859-1") as tar:
601                tarinfo = tar.getmember(dirtype)
602                tar.extract(tarinfo, path=DIR)
603                extracted = os.path.join(DIR, dirtype)
604                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
605                if sys.platform != "win32":
606                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
607        finally:
608            support.rmtree(DIR)
609
610    def test_extractall_pathlike_name(self):
611        DIR = pathlib.Path(TEMPDIR) / "extractall"
612        with support.temp_dir(DIR), \
613             tarfile.open(tarname, encoding="iso8859-1") as tar:
614            directories = [t for t in tar if t.isdir()]
615            tar.extractall(DIR, directories)
616            for tarinfo in directories:
617                path = DIR / tarinfo.name
618                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
619
620    def test_extract_pathlike_name(self):
621        dirtype = "ustar/dirtype"
622        DIR = pathlib.Path(TEMPDIR) / "extractall"
623        with support.temp_dir(DIR), \
624             tarfile.open(tarname, encoding="iso8859-1") as tar:
625            tarinfo = tar.getmember(dirtype)
626            tar.extract(tarinfo, path=DIR)
627            extracted = DIR / dirtype
628            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
629
630    def test_init_close_fobj(self):
631        # Issue #7341: Close the internal file object in the TarFile
632        # constructor in case of an error. For the test we rely on
633        # the fact that opening an empty file raises a ReadError.
634        empty = os.path.join(TEMPDIR, "empty")
635        with open(empty, "wb") as fobj:
636            fobj.write(b"")
637
638        try:
639            tar = object.__new__(tarfile.TarFile)
640            try:
641                tar.__init__(empty)
642            except tarfile.ReadError:
643                self.assertTrue(tar.fileobj.closed)
644            else:
645                self.fail("ReadError not raised")
646        finally:
647            support.unlink(empty)
648
649    def test_parallel_iteration(self):
650        # Issue #16601: Restarting iteration over tarfile continued
651        # from where it left off.
652        with tarfile.open(self.tarname) as tar:
653            for m1, m2 in zip(tar, tar):
654                self.assertEqual(m1.offset, m2.offset)
655                self.assertEqual(m1.get_info(), m2.get_info())
656
657class MiscReadTest(MiscReadTestBase, unittest.TestCase):
658    test_fail_comp = None
659
660class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
661    pass
662
663class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
664    def requires_name_attribute(self):
665        self.skipTest("BZ2File have no name attribute")
666
667class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
668    def requires_name_attribute(self):
669        self.skipTest("LZMAFile have no name attribute")
670
671
672class StreamReadTest(CommonReadTest, unittest.TestCase):
673
674    prefix="r|"
675
676    def test_read_through(self):
677        # Issue #11224: A poorly designed _FileInFile.read() method
678        # caused seeking errors with stream tar files.
679        for tarinfo in self.tar:
680            if not tarinfo.isreg():
681                continue
682            with self.tar.extractfile(tarinfo) as fobj:
683                while True:
684                    try:
685                        buf = fobj.read(512)
686                    except tarfile.StreamError:
687                        self.fail("simple read-through using "
688                                  "TarFile.extractfile() failed")
689                    if not buf:
690                        break
691
692    def test_fileobj_regular_file(self):
693        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
694        with self.tar.extractfile(tarinfo) as fobj:
695            data = fobj.read()
696        self.assertEqual(len(data), tarinfo.size,
697                "regular file extraction failed")
698        self.assertEqual(sha256sum(data), sha256_regtype,
699                "regular file extraction failed")
700
701    def test_provoke_stream_error(self):
702        tarinfos = self.tar.getmembers()
703        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
704            self.assertRaises(tarfile.StreamError, f.read)
705
706    def test_compare_members(self):
707        tar1 = tarfile.open(tarname, encoding="iso8859-1")
708        try:
709            tar2 = self.tar
710
711            while True:
712                t1 = tar1.next()
713                t2 = tar2.next()
714                if t1 is None:
715                    break
716                self.assertIsNotNone(t2, "stream.next() failed.")
717
718                if t2.islnk() or t2.issym():
719                    with self.assertRaises(tarfile.StreamError):
720                        tar2.extractfile(t2)
721                    continue
722
723                v1 = tar1.extractfile(t1)
724                v2 = tar2.extractfile(t2)
725                if v1 is None:
726                    continue
727                self.assertIsNotNone(v2, "stream.extractfile() failed")
728                self.assertEqual(v1.read(), v2.read(),
729                        "stream extraction failed")
730        finally:
731            tar1.close()
732
733class GzipStreamReadTest(GzipTest, StreamReadTest):
734    pass
735
736class Bz2StreamReadTest(Bz2Test, StreamReadTest):
737    pass
738
739class LzmaStreamReadTest(LzmaTest, StreamReadTest):
740    pass
741
742
743class DetectReadTest(TarTest, unittest.TestCase):
744    def _testfunc_file(self, name, mode):
745        try:
746            tar = tarfile.open(name, mode)
747        except tarfile.ReadError as e:
748            self.fail()
749        else:
750            tar.close()
751
752    def _testfunc_fileobj(self, name, mode):
753        try:
754            with open(name, "rb") as f:
755                tar = tarfile.open(name, mode, fileobj=f)
756        except tarfile.ReadError as e:
757            self.fail()
758        else:
759            tar.close()
760
761    def _test_modes(self, testfunc):
762        if self.suffix:
763            with self.assertRaises(tarfile.ReadError):
764                tarfile.open(tarname, mode="r:" + self.suffix)
765            with self.assertRaises(tarfile.ReadError):
766                tarfile.open(tarname, mode="r|" + self.suffix)
767            with self.assertRaises(tarfile.ReadError):
768                tarfile.open(self.tarname, mode="r:")
769            with self.assertRaises(tarfile.ReadError):
770                tarfile.open(self.tarname, mode="r|")
771        testfunc(self.tarname, "r")
772        testfunc(self.tarname, "r:" + self.suffix)
773        testfunc(self.tarname, "r:*")
774        testfunc(self.tarname, "r|" + self.suffix)
775        testfunc(self.tarname, "r|*")
776
777    def test_detect_file(self):
778        self._test_modes(self._testfunc_file)
779
780    def test_detect_fileobj(self):
781        self._test_modes(self._testfunc_fileobj)
782
783class GzipDetectReadTest(GzipTest, DetectReadTest):
784    pass
785
786class Bz2DetectReadTest(Bz2Test, DetectReadTest):
787    def test_detect_stream_bz2(self):
788        # Originally, tarfile's stream detection looked for the string
789        # "BZh91" at the start of the file. This is incorrect because
790        # the '9' represents the blocksize (900,000 bytes). If the file was
791        # compressed using another blocksize autodetection fails.
792        with open(tarname, "rb") as fobj:
793            data = fobj.read()
794
795        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
796        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
797            fobj.write(data)
798
799        self._testfunc_file(tmpname, "r|*")
800
801class LzmaDetectReadTest(LzmaTest, DetectReadTest):
802    pass
803
804
805class MemberReadTest(ReadTest, unittest.TestCase):
806
807    def _test_member(self, tarinfo, chksum=None, **kwargs):
808        if chksum is not None:
809            with self.tar.extractfile(tarinfo) as f:
810                self.assertEqual(sha256sum(f.read()), chksum,
811                        "wrong sha256sum for %s" % tarinfo.name)
812
813        kwargs["mtime"] = 0o7606136617
814        kwargs["uid"] = 1000
815        kwargs["gid"] = 100
816        if "old-v7" not in tarinfo.name:
817            # V7 tar can't handle alphabetic owners.
818            kwargs["uname"] = "tarfile"
819            kwargs["gname"] = "tarfile"
820        for k, v in kwargs.items():
821            self.assertEqual(getattr(tarinfo, k), v,
822                    "wrong value in %s field of %s" % (k, tarinfo.name))
823
824    def test_find_regtype(self):
825        tarinfo = self.tar.getmember("ustar/regtype")
826        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
827
828    def test_find_conttype(self):
829        tarinfo = self.tar.getmember("ustar/conttype")
830        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
831
832    def test_find_dirtype(self):
833        tarinfo = self.tar.getmember("ustar/dirtype")
834        self._test_member(tarinfo, size=0)
835
836    def test_find_dirtype_with_size(self):
837        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
838        self._test_member(tarinfo, size=255)
839
840    def test_find_lnktype(self):
841        tarinfo = self.tar.getmember("ustar/lnktype")
842        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
843
844    def test_find_symtype(self):
845        tarinfo = self.tar.getmember("ustar/symtype")
846        self._test_member(tarinfo, size=0, linkname="regtype")
847
848    def test_find_blktype(self):
849        tarinfo = self.tar.getmember("ustar/blktype")
850        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
851
852    def test_find_chrtype(self):
853        tarinfo = self.tar.getmember("ustar/chrtype")
854        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
855
856    def test_find_fifotype(self):
857        tarinfo = self.tar.getmember("ustar/fifotype")
858        self._test_member(tarinfo, size=0)
859
860    def test_find_sparse(self):
861        tarinfo = self.tar.getmember("ustar/sparse")
862        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
863
864    def test_find_gnusparse(self):
865        tarinfo = self.tar.getmember("gnu/sparse")
866        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
867
868    def test_find_gnusparse_00(self):
869        tarinfo = self.tar.getmember("gnu/sparse-0.0")
870        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
871
872    def test_find_gnusparse_01(self):
873        tarinfo = self.tar.getmember("gnu/sparse-0.1")
874        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
875
876    def test_find_gnusparse_10(self):
877        tarinfo = self.tar.getmember("gnu/sparse-1.0")
878        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
879
880    def test_find_umlauts(self):
881        tarinfo = self.tar.getmember("ustar/umlauts-"
882                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
883        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
884
885    def test_find_ustar_longname(self):
886        name = "ustar/" + "12345/" * 39 + "1234567/longname"
887        self.assertIn(name, self.tar.getnames())
888
889    def test_find_regtype_oldv7(self):
890        tarinfo = self.tar.getmember("misc/regtype-old-v7")
891        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
892
893    def test_find_pax_umlauts(self):
894        self.tar.close()
895        self.tar = tarfile.open(self.tarname, mode=self.mode,
896                                encoding="iso8859-1")
897        tarinfo = self.tar.getmember("pax/umlauts-"
898                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
899        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
900
901
902class LongnameTest:
903
904    def test_read_longname(self):
905        # Test reading of longname (bug #1471427).
906        longname = self.subdir + "/" + "123/" * 125 + "longname"
907        try:
908            tarinfo = self.tar.getmember(longname)
909        except KeyError:
910            self.fail("longname not found")
911        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
912                "read longname as dirtype")
913
914    def test_read_longlink(self):
915        longname = self.subdir + "/" + "123/" * 125 + "longname"
916        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
917        try:
918            tarinfo = self.tar.getmember(longlink)
919        except KeyError:
920            self.fail("longlink not found")
921        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
922
923    def test_truncated_longname(self):
924        longname = self.subdir + "/" + "123/" * 125 + "longname"
925        tarinfo = self.tar.getmember(longname)
926        offset = tarinfo.offset
927        self.tar.fileobj.seek(offset)
928        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
929        with self.assertRaises(tarfile.ReadError):
930            tarfile.open(name="foo.tar", fileobj=fobj)
931
932    def test_header_offset(self):
933        # Test if the start offset of the TarInfo object includes
934        # the preceding extended header.
935        longname = self.subdir + "/" + "123/" * 125 + "longname"
936        offset = self.tar.getmember(longname).offset
937        with open(tarname, "rb") as fobj:
938            fobj.seek(offset)
939            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
940                                              "iso8859-1", "strict")
941            self.assertEqual(tarinfo.type, self.longnametype)
942
943
944class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
945
946    subdir = "gnu"
947    longnametype = tarfile.GNUTYPE_LONGNAME
948
949    # Since 3.2 tarfile is supposed to accurately restore sparse members and
950    # produce files with holes. This is what we actually want to test here.
951    # Unfortunately, not all platforms/filesystems support sparse files, and
952    # even on platforms that do it is non-trivial to make reliable assertions
953    # about holes in files. Therefore, we first do one basic test which works
954    # an all platforms, and after that a test that will work only on
955    # platforms/filesystems that prove to support sparse files.
956    def _test_sparse_file(self, name):
957        self.tar.extract(name, TEMPDIR)
958        filename = os.path.join(TEMPDIR, name)
959        with open(filename, "rb") as fobj:
960            data = fobj.read()
961        self.assertEqual(sha256sum(data), sha256_sparse,
962                "wrong sha256sum for %s" % name)
963
964        if self._fs_supports_holes():
965            s = os.stat(filename)
966            self.assertLess(s.st_blocks * 512, s.st_size)
967
968    def test_sparse_file_old(self):
969        self._test_sparse_file("gnu/sparse")
970
971    def test_sparse_file_00(self):
972        self._test_sparse_file("gnu/sparse-0.0")
973
974    def test_sparse_file_01(self):
975        self._test_sparse_file("gnu/sparse-0.1")
976
977    def test_sparse_file_10(self):
978        self._test_sparse_file("gnu/sparse-1.0")
979
980    @staticmethod
981    def _fs_supports_holes():
982        # Return True if the platform knows the st_blocks stat attribute and
983        # uses st_blocks units of 512 bytes, and if the filesystem is able to
984        # store holes of 4 KiB in files.
985        #
986        # The function returns False if page size is larger than 4 KiB.
987        # For example, ppc64 uses pages of 64 KiB.
988        if sys.platform.startswith("linux"):
989            # Linux evidentially has 512 byte st_blocks units.
990            name = os.path.join(TEMPDIR, "sparse-test")
991            with open(name, "wb") as fobj:
992                # Seek to "punch a hole" of 4 KiB
993                fobj.seek(4096)
994                fobj.write(b'x' * 4096)
995                fobj.truncate()
996            s = os.stat(name)
997            support.unlink(name)
998            return (s.st_blocks * 512 < s.st_size)
999        else:
1000            return False
1001
1002
1003class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1004
1005    subdir = "pax"
1006    longnametype = tarfile.XHDTYPE
1007
1008    def test_pax_global_headers(self):
1009        tar = tarfile.open(tarname, encoding="iso8859-1")
1010        try:
1011            tarinfo = tar.getmember("pax/regtype1")
1012            self.assertEqual(tarinfo.uname, "foo")
1013            self.assertEqual(tarinfo.gname, "bar")
1014            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1015                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1016
1017            tarinfo = tar.getmember("pax/regtype2")
1018            self.assertEqual(tarinfo.uname, "")
1019            self.assertEqual(tarinfo.gname, "bar")
1020            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1021                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1022
1023            tarinfo = tar.getmember("pax/regtype3")
1024            self.assertEqual(tarinfo.uname, "tarfile")
1025            self.assertEqual(tarinfo.gname, "tarfile")
1026            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1027                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1028        finally:
1029            tar.close()
1030
1031    def test_pax_number_fields(self):
1032        # All following number fields are read from the pax header.
1033        tar = tarfile.open(tarname, encoding="iso8859-1")
1034        try:
1035            tarinfo = tar.getmember("pax/regtype4")
1036            self.assertEqual(tarinfo.size, 7011)
1037            self.assertEqual(tarinfo.uid, 123)
1038            self.assertEqual(tarinfo.gid, 123)
1039            self.assertEqual(tarinfo.mtime, 1041808783.0)
1040            self.assertEqual(type(tarinfo.mtime), float)
1041            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1042            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1043        finally:
1044            tar.close()
1045
1046
1047class WriteTestBase(TarTest):
1048    # Put all write tests in here that are supposed to be tested
1049    # in all possible mode combinations.
1050
1051    def test_fileobj_no_close(self):
1052        fobj = io.BytesIO()
1053        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1054            tar.addfile(tarfile.TarInfo("foo"))
1055        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1056        # Issue #20238: Incomplete gzip output with mode="w:gz"
1057        data = fobj.getvalue()
1058        del tar
1059        support.gc_collect()
1060        self.assertFalse(fobj.closed)
1061        self.assertEqual(data, fobj.getvalue())
1062
1063    def test_eof_marker(self):
1064        # Make sure an end of archive marker is written (two zero blocks).
1065        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1066        # So, we create an archive that has exactly 10240 bytes without the
1067        # marker, and has 20480 bytes once the marker is written.
1068        with tarfile.open(tmpname, self.mode) as tar:
1069            t = tarfile.TarInfo("foo")
1070            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1071            tar.addfile(t, io.BytesIO(b"a" * t.size))
1072
1073        with self.open(tmpname, "rb") as fobj:
1074            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1075
1076
1077class WriteTest(WriteTestBase, unittest.TestCase):
1078
1079    prefix = "w:"
1080
1081    def test_100_char_name(self):
1082        # The name field in a tar header stores strings of at most 100 chars.
1083        # If a string is shorter than 100 chars it has to be padded with '\0',
1084        # which implies that a string of exactly 100 chars is stored without
1085        # a trailing '\0'.
1086        name = "0123456789" * 10
1087        tar = tarfile.open(tmpname, self.mode)
1088        try:
1089            t = tarfile.TarInfo(name)
1090            tar.addfile(t)
1091        finally:
1092            tar.close()
1093
1094        tar = tarfile.open(tmpname)
1095        try:
1096            self.assertEqual(tar.getnames()[0], name,
1097                    "failed to store 100 char filename")
1098        finally:
1099            tar.close()
1100
1101    def test_tar_size(self):
1102        # Test for bug #1013882.
1103        tar = tarfile.open(tmpname, self.mode)
1104        try:
1105            path = os.path.join(TEMPDIR, "file")
1106            with open(path, "wb") as fobj:
1107                fobj.write(b"aaa")
1108            tar.add(path)
1109        finally:
1110            tar.close()
1111        self.assertGreater(os.path.getsize(tmpname), 0,
1112                "tarfile is empty")
1113
1114    # The test_*_size tests test for bug #1167128.
1115    def test_file_size(self):
1116        tar = tarfile.open(tmpname, self.mode)
1117        try:
1118            path = os.path.join(TEMPDIR, "file")
1119            with open(path, "wb"):
1120                pass
1121            tarinfo = tar.gettarinfo(path)
1122            self.assertEqual(tarinfo.size, 0)
1123
1124            with open(path, "wb") as fobj:
1125                fobj.write(b"aaa")
1126            tarinfo = tar.gettarinfo(path)
1127            self.assertEqual(tarinfo.size, 3)
1128        finally:
1129            tar.close()
1130
1131    def test_directory_size(self):
1132        path = os.path.join(TEMPDIR, "directory")
1133        os.mkdir(path)
1134        try:
1135            tar = tarfile.open(tmpname, self.mode)
1136            try:
1137                tarinfo = tar.gettarinfo(path)
1138                self.assertEqual(tarinfo.size, 0)
1139            finally:
1140                tar.close()
1141        finally:
1142            support.rmdir(path)
1143
1144    # mock the following:
1145    #  os.listdir: so we know that files are in the wrong order
1146    def test_ordered_recursion(self):
1147        path = os.path.join(TEMPDIR, "directory")
1148        os.mkdir(path)
1149        open(os.path.join(path, "1"), "a").close()
1150        open(os.path.join(path, "2"), "a").close()
1151        try:
1152            tar = tarfile.open(tmpname, self.mode)
1153            try:
1154                with unittest.mock.patch('os.listdir') as mock_listdir:
1155                    mock_listdir.return_value = ["2", "1"]
1156                    tar.add(path)
1157                paths = []
1158                for m in tar.getmembers():
1159                    paths.append(os.path.split(m.name)[-1])
1160                self.assertEqual(paths, ["directory", "1", "2"]);
1161            finally:
1162                tar.close()
1163        finally:
1164            support.unlink(os.path.join(path, "1"))
1165            support.unlink(os.path.join(path, "2"))
1166            support.rmdir(path)
1167
1168    def test_gettarinfo_pathlike_name(self):
1169        with tarfile.open(tmpname, self.mode) as tar:
1170            path = pathlib.Path(TEMPDIR) / "file"
1171            with open(path, "wb") as fobj:
1172                fobj.write(b"aaa")
1173            tarinfo = tar.gettarinfo(path)
1174            tarinfo2 = tar.gettarinfo(os.fspath(path))
1175            self.assertIsInstance(tarinfo.name, str)
1176            self.assertEqual(tarinfo.name, tarinfo2.name)
1177            self.assertEqual(tarinfo.size, 3)
1178
1179    @unittest.skipUnless(hasattr(os, "link"),
1180                         "Missing hardlink implementation")
1181    def test_link_size(self):
1182        link = os.path.join(TEMPDIR, "link")
1183        target = os.path.join(TEMPDIR, "link_target")
1184        with open(target, "wb") as fobj:
1185            fobj.write(b"aaa")
1186        try:
1187            os.link(target, link)
1188        except PermissionError as e:
1189            self.skipTest('os.link(): %s' % e)
1190        try:
1191            tar = tarfile.open(tmpname, self.mode)
1192            try:
1193                # Record the link target in the inodes list.
1194                tar.gettarinfo(target)
1195                tarinfo = tar.gettarinfo(link)
1196                self.assertEqual(tarinfo.size, 0)
1197            finally:
1198                tar.close()
1199        finally:
1200            support.unlink(target)
1201            support.unlink(link)
1202
1203    @support.skip_unless_symlink
1204    def test_symlink_size(self):
1205        path = os.path.join(TEMPDIR, "symlink")
1206        os.symlink("link_target", path)
1207        try:
1208            tar = tarfile.open(tmpname, self.mode)
1209            try:
1210                tarinfo = tar.gettarinfo(path)
1211                self.assertEqual(tarinfo.size, 0)
1212            finally:
1213                tar.close()
1214        finally:
1215            support.unlink(path)
1216
1217    def test_add_self(self):
1218        # Test for #1257255.
1219        dstname = os.path.abspath(tmpname)
1220        tar = tarfile.open(tmpname, self.mode)
1221        try:
1222            self.assertEqual(tar.name, dstname,
1223                    "archive name must be absolute")
1224            tar.add(dstname)
1225            self.assertEqual(tar.getnames(), [],
1226                    "added the archive to itself")
1227
1228            with support.change_cwd(TEMPDIR):
1229                tar.add(dstname)
1230            self.assertEqual(tar.getnames(), [],
1231                    "added the archive to itself")
1232        finally:
1233            tar.close()
1234
1235    def test_filter(self):
1236        tempdir = os.path.join(TEMPDIR, "filter")
1237        os.mkdir(tempdir)
1238        try:
1239            for name in ("foo", "bar", "baz"):
1240                name = os.path.join(tempdir, name)
1241                support.create_empty_file(name)
1242
1243            def filter(tarinfo):
1244                if os.path.basename(tarinfo.name) == "bar":
1245                    return
1246                tarinfo.uid = 123
1247                tarinfo.uname = "foo"
1248                return tarinfo
1249
1250            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1251            try:
1252                tar.add(tempdir, arcname="empty_dir", filter=filter)
1253            finally:
1254                tar.close()
1255
1256            # Verify that filter is a keyword-only argument
1257            with self.assertRaises(TypeError):
1258                tar.add(tempdir, "empty_dir", True, None, filter)
1259
1260            tar = tarfile.open(tmpname, "r")
1261            try:
1262                for tarinfo in tar:
1263                    self.assertEqual(tarinfo.uid, 123)
1264                    self.assertEqual(tarinfo.uname, "foo")
1265                self.assertEqual(len(tar.getmembers()), 3)
1266            finally:
1267                tar.close()
1268        finally:
1269            support.rmtree(tempdir)
1270
1271    # Guarantee that stored pathnames are not modified. Don't
1272    # remove ./ or ../ or double slashes. Still make absolute
1273    # pathnames relative.
1274    # For details see bug #6054.
1275    def _test_pathname(self, path, cmp_path=None, dir=False):
1276        # Create a tarfile with an empty member named path
1277        # and compare the stored name with the original.
1278        foo = os.path.join(TEMPDIR, "foo")
1279        if not dir:
1280            support.create_empty_file(foo)
1281        else:
1282            os.mkdir(foo)
1283
1284        tar = tarfile.open(tmpname, self.mode)
1285        try:
1286            tar.add(foo, arcname=path)
1287        finally:
1288            tar.close()
1289
1290        tar = tarfile.open(tmpname, "r")
1291        try:
1292            t = tar.next()
1293        finally:
1294            tar.close()
1295
1296        if not dir:
1297            support.unlink(foo)
1298        else:
1299            support.rmdir(foo)
1300
1301        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1302
1303
1304    @support.skip_unless_symlink
1305    def test_extractall_symlinks(self):
1306        # Test if extractall works properly when tarfile contains symlinks
1307        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1308        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1309        os.mkdir(tempdir)
1310        try:
1311            source_file = os.path.join(tempdir,'source')
1312            target_file = os.path.join(tempdir,'symlink')
1313            with open(source_file,'w') as f:
1314                f.write('something\n')
1315            os.symlink(source_file, target_file)
1316            with tarfile.open(temparchive, 'w') as tar:
1317                tar.add(source_file, arcname="source")
1318                tar.add(target_file, arcname="symlink")
1319            # Let's extract it to the location which contains the symlink
1320            with tarfile.open(temparchive, errorlevel=2) as tar:
1321                # this should not raise OSError: [Errno 17] File exists
1322                try:
1323                    tar.extractall(path=tempdir)
1324                except OSError:
1325                    self.fail("extractall failed with symlinked files")
1326        finally:
1327            support.unlink(temparchive)
1328            support.rmtree(tempdir)
1329
1330    def test_pathnames(self):
1331        self._test_pathname("foo")
1332        self._test_pathname(os.path.join("foo", ".", "bar"))
1333        self._test_pathname(os.path.join("foo", "..", "bar"))
1334        self._test_pathname(os.path.join(".", "foo"))
1335        self._test_pathname(os.path.join(".", "foo", "."))
1336        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1337        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1338        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1339        self._test_pathname(os.path.join("..", "foo"))
1340        self._test_pathname(os.path.join("..", "foo", ".."))
1341        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1342        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1343
1344        self._test_pathname("foo" + os.sep + os.sep + "bar")
1345        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1346
1347    def test_abs_pathnames(self):
1348        if sys.platform == "win32":
1349            self._test_pathname("C:\\foo", "foo")
1350        else:
1351            self._test_pathname("/foo", "foo")
1352            self._test_pathname("///foo", "foo")
1353
1354    def test_cwd(self):
1355        # Test adding the current working directory.
1356        with support.change_cwd(TEMPDIR):
1357            tar = tarfile.open(tmpname, self.mode)
1358            try:
1359                tar.add(".")
1360            finally:
1361                tar.close()
1362
1363            tar = tarfile.open(tmpname, "r")
1364            try:
1365                for t in tar:
1366                    if t.name != ".":
1367                        self.assertTrue(t.name.startswith("./"), t.name)
1368            finally:
1369                tar.close()
1370
1371    def test_open_nonwritable_fileobj(self):
1372        for exctype in OSError, EOFError, RuntimeError:
1373            class BadFile(io.BytesIO):
1374                first = True
1375                def write(self, data):
1376                    if self.first:
1377                        self.first = False
1378                        raise exctype
1379
1380            f = BadFile()
1381            with self.assertRaises(exctype):
1382                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1383                                   format=tarfile.PAX_FORMAT,
1384                                   pax_headers={'non': 'empty'})
1385            self.assertFalse(f.closed)
1386
1387
1388class GzipWriteTest(GzipTest, WriteTest):
1389    pass
1390
1391
1392class Bz2WriteTest(Bz2Test, WriteTest):
1393    pass
1394
1395
1396class LzmaWriteTest(LzmaTest, WriteTest):
1397    pass
1398
1399
1400class StreamWriteTest(WriteTestBase, unittest.TestCase):
1401
1402    prefix = "w|"
1403    decompressor = None
1404
1405    def test_stream_padding(self):
1406        # Test for bug #1543303.
1407        tar = tarfile.open(tmpname, self.mode)
1408        tar.close()
1409        if self.decompressor:
1410            dec = self.decompressor()
1411            with open(tmpname, "rb") as fobj:
1412                data = fobj.read()
1413            data = dec.decompress(data)
1414            self.assertFalse(dec.unused_data, "found trailing data")
1415        else:
1416            with self.open(tmpname) as fobj:
1417                data = fobj.read()
1418        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1419                        "incorrect zero padding")
1420
1421    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1422                         "Missing umask implementation")
1423    def test_file_mode(self):
1424        # Test for issue #8464: Create files with correct
1425        # permissions.
1426        if os.path.exists(tmpname):
1427            support.unlink(tmpname)
1428
1429        original_umask = os.umask(0o022)
1430        try:
1431            tar = tarfile.open(tmpname, self.mode)
1432            tar.close()
1433            mode = os.stat(tmpname).st_mode & 0o777
1434            self.assertEqual(mode, 0o644, "wrong file permissions")
1435        finally:
1436            os.umask(original_umask)
1437
1438
1439class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1440    def test_source_directory_not_leaked(self):
1441        """
1442        Ensure the source directory is not included in the tar header
1443        per bpo-41316.
1444        """
1445        tarfile.open(tmpname, self.mode).close()
1446        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1447        assert os.path.dirname(tmpname) not in payload
1448
1449
1450class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1451    decompressor = bz2.BZ2Decompressor if bz2 else None
1452
1453class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1454    decompressor = lzma.LZMADecompressor if lzma else None
1455
1456
1457class GNUWriteTest(unittest.TestCase):
1458    # This testcase checks for correct creation of GNU Longname
1459    # and Longlink extended headers (cp. bug #812325).
1460
1461    def _length(self, s):
1462        blocks = len(s) // 512 + 1
1463        return blocks * 512
1464
1465    def _calc_size(self, name, link=None):
1466        # Initial tar header
1467        count = 512
1468
1469        if len(name) > tarfile.LENGTH_NAME:
1470            # GNU longname extended header + longname
1471            count += 512
1472            count += self._length(name)
1473        if link is not None and len(link) > tarfile.LENGTH_LINK:
1474            # GNU longlink extended header + longlink
1475            count += 512
1476            count += self._length(link)
1477        return count
1478
1479    def _test(self, name, link=None):
1480        tarinfo = tarfile.TarInfo(name)
1481        if link:
1482            tarinfo.linkname = link
1483            tarinfo.type = tarfile.LNKTYPE
1484
1485        tar = tarfile.open(tmpname, "w")
1486        try:
1487            tar.format = tarfile.GNU_FORMAT
1488            tar.addfile(tarinfo)
1489
1490            v1 = self._calc_size(name, link)
1491            v2 = tar.offset
1492            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1493        finally:
1494            tar.close()
1495
1496        tar = tarfile.open(tmpname)
1497        try:
1498            member = tar.next()
1499            self.assertIsNotNone(member,
1500                    "unable to read longname member")
1501            self.assertEqual(tarinfo.name, member.name,
1502                    "unable to read longname member")
1503            self.assertEqual(tarinfo.linkname, member.linkname,
1504                    "unable to read longname member")
1505        finally:
1506            tar.close()
1507
1508    def test_longname_1023(self):
1509        self._test(("longnam/" * 127) + "longnam")
1510
1511    def test_longname_1024(self):
1512        self._test(("longnam/" * 127) + "longname")
1513
1514    def test_longname_1025(self):
1515        self._test(("longnam/" * 127) + "longname_")
1516
1517    def test_longlink_1023(self):
1518        self._test("name", ("longlnk/" * 127) + "longlnk")
1519
1520    def test_longlink_1024(self):
1521        self._test("name", ("longlnk/" * 127) + "longlink")
1522
1523    def test_longlink_1025(self):
1524        self._test("name", ("longlnk/" * 127) + "longlink_")
1525
1526    def test_longnamelink_1023(self):
1527        self._test(("longnam/" * 127) + "longnam",
1528                   ("longlnk/" * 127) + "longlnk")
1529
1530    def test_longnamelink_1024(self):
1531        self._test(("longnam/" * 127) + "longname",
1532                   ("longlnk/" * 127) + "longlink")
1533
1534    def test_longnamelink_1025(self):
1535        self._test(("longnam/" * 127) + "longname_",
1536                   ("longlnk/" * 127) + "longlink_")
1537
1538
1539class CreateTest(WriteTestBase, unittest.TestCase):
1540
1541    prefix = "x:"
1542
1543    file_path = os.path.join(TEMPDIR, "spameggs42")
1544
1545    def setUp(self):
1546        support.unlink(tmpname)
1547
1548    @classmethod
1549    def setUpClass(cls):
1550        with open(cls.file_path, "wb") as fobj:
1551            fobj.write(b"aaa")
1552
1553    @classmethod
1554    def tearDownClass(cls):
1555        support.unlink(cls.file_path)
1556
1557    def test_create(self):
1558        with tarfile.open(tmpname, self.mode) as tobj:
1559            tobj.add(self.file_path)
1560
1561        with self.taropen(tmpname) as tobj:
1562            names = tobj.getnames()
1563        self.assertEqual(len(names), 1)
1564        self.assertIn('spameggs42', names[0])
1565
1566    def test_create_existing(self):
1567        with tarfile.open(tmpname, self.mode) as tobj:
1568            tobj.add(self.file_path)
1569
1570        with self.assertRaises(FileExistsError):
1571            tobj = tarfile.open(tmpname, self.mode)
1572
1573        with self.taropen(tmpname) as tobj:
1574            names = tobj.getnames()
1575        self.assertEqual(len(names), 1)
1576        self.assertIn('spameggs42', names[0])
1577
1578    def test_create_taropen(self):
1579        with self.taropen(tmpname, "x") as tobj:
1580            tobj.add(self.file_path)
1581
1582        with self.taropen(tmpname) as tobj:
1583            names = tobj.getnames()
1584        self.assertEqual(len(names), 1)
1585        self.assertIn('spameggs42', names[0])
1586
1587    def test_create_existing_taropen(self):
1588        with self.taropen(tmpname, "x") as tobj:
1589            tobj.add(self.file_path)
1590
1591        with self.assertRaises(FileExistsError):
1592            with self.taropen(tmpname, "x"):
1593                pass
1594
1595        with self.taropen(tmpname) as tobj:
1596            names = tobj.getnames()
1597        self.assertEqual(len(names), 1)
1598        self.assertIn("spameggs42", names[0])
1599
1600    def test_create_pathlike_name(self):
1601        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1602            self.assertIsInstance(tobj.name, str)
1603            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1604            tobj.add(pathlib.Path(self.file_path))
1605            names = tobj.getnames()
1606        self.assertEqual(len(names), 1)
1607        self.assertIn('spameggs42', names[0])
1608
1609        with self.taropen(tmpname) as tobj:
1610            names = tobj.getnames()
1611        self.assertEqual(len(names), 1)
1612        self.assertIn('spameggs42', names[0])
1613
1614    def test_create_taropen_pathlike_name(self):
1615        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1616            self.assertIsInstance(tobj.name, str)
1617            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1618            tobj.add(pathlib.Path(self.file_path))
1619            names = tobj.getnames()
1620        self.assertEqual(len(names), 1)
1621        self.assertIn('spameggs42', names[0])
1622
1623        with self.taropen(tmpname) as tobj:
1624            names = tobj.getnames()
1625        self.assertEqual(len(names), 1)
1626        self.assertIn('spameggs42', names[0])
1627
1628
1629class GzipCreateTest(GzipTest, CreateTest):
1630    pass
1631
1632
1633class Bz2CreateTest(Bz2Test, CreateTest):
1634    pass
1635
1636
1637class LzmaCreateTest(LzmaTest, CreateTest):
1638    pass
1639
1640
1641class CreateWithXModeTest(CreateTest):
1642
1643    prefix = "x"
1644
1645    test_create_taropen = None
1646    test_create_existing_taropen = None
1647
1648
1649@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1650class HardlinkTest(unittest.TestCase):
1651    # Test the creation of LNKTYPE (hardlink) members in an archive.
1652
1653    def setUp(self):
1654        self.foo = os.path.join(TEMPDIR, "foo")
1655        self.bar = os.path.join(TEMPDIR, "bar")
1656
1657        with open(self.foo, "wb") as fobj:
1658            fobj.write(b"foo")
1659
1660        try:
1661            os.link(self.foo, self.bar)
1662        except PermissionError as e:
1663            self.skipTest('os.link(): %s' % e)
1664
1665        self.tar = tarfile.open(tmpname, "w")
1666        self.tar.add(self.foo)
1667
1668    def tearDown(self):
1669        self.tar.close()
1670        support.unlink(self.foo)
1671        support.unlink(self.bar)
1672
1673    def test_add_twice(self):
1674        # The same name will be added as a REGTYPE every
1675        # time regardless of st_nlink.
1676        tarinfo = self.tar.gettarinfo(self.foo)
1677        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1678                "add file as regular failed")
1679
1680    def test_add_hardlink(self):
1681        tarinfo = self.tar.gettarinfo(self.bar)
1682        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1683                "add file as hardlink failed")
1684
1685    def test_dereference_hardlink(self):
1686        self.tar.dereference = True
1687        tarinfo = self.tar.gettarinfo(self.bar)
1688        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1689                "dereferencing hardlink failed")
1690
1691
1692class PaxWriteTest(GNUWriteTest):
1693
1694    def _test(self, name, link=None):
1695        # See GNUWriteTest.
1696        tarinfo = tarfile.TarInfo(name)
1697        if link:
1698            tarinfo.linkname = link
1699            tarinfo.type = tarfile.LNKTYPE
1700
1701        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1702        try:
1703            tar.addfile(tarinfo)
1704        finally:
1705            tar.close()
1706
1707        tar = tarfile.open(tmpname)
1708        try:
1709            if link:
1710                l = tar.getmembers()[0].linkname
1711                self.assertEqual(link, l, "PAX longlink creation failed")
1712            else:
1713                n = tar.getmembers()[0].name
1714                self.assertEqual(name, n, "PAX longname creation failed")
1715        finally:
1716            tar.close()
1717
1718    def test_pax_global_header(self):
1719        pax_headers = {
1720                "foo": "bar",
1721                "uid": "0",
1722                "mtime": "1.23",
1723                "test": "\xe4\xf6\xfc",
1724                "\xe4\xf6\xfc": "test"}
1725
1726        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1727                pax_headers=pax_headers)
1728        try:
1729            tar.addfile(tarfile.TarInfo("test"))
1730        finally:
1731            tar.close()
1732
1733        # Test if the global header was written correctly.
1734        tar = tarfile.open(tmpname, encoding="iso8859-1")
1735        try:
1736            self.assertEqual(tar.pax_headers, pax_headers)
1737            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1738            # Test if all the fields are strings.
1739            for key, val in tar.pax_headers.items():
1740                self.assertIsNot(type(key), bytes)
1741                self.assertIsNot(type(val), bytes)
1742                if key in tarfile.PAX_NUMBER_FIELDS:
1743                    try:
1744                        tarfile.PAX_NUMBER_FIELDS[key](val)
1745                    except (TypeError, ValueError):
1746                        self.fail("unable to convert pax header field")
1747        finally:
1748            tar.close()
1749
1750    def test_pax_extended_header(self):
1751        # The fields from the pax header have priority over the
1752        # TarInfo.
1753        pax_headers = {"path": "foo", "uid": "123"}
1754
1755        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1756                           encoding="iso8859-1")
1757        try:
1758            t = tarfile.TarInfo()
1759            t.name = "\xe4\xf6\xfc" # non-ASCII
1760            t.uid = 8**8 # too large
1761            t.pax_headers = pax_headers
1762            tar.addfile(t)
1763        finally:
1764            tar.close()
1765
1766        tar = tarfile.open(tmpname, encoding="iso8859-1")
1767        try:
1768            t = tar.getmembers()[0]
1769            self.assertEqual(t.pax_headers, pax_headers)
1770            self.assertEqual(t.name, "foo")
1771            self.assertEqual(t.uid, 123)
1772        finally:
1773            tar.close()
1774
1775
1776class UnicodeTest:
1777
1778    def test_iso8859_1_filename(self):
1779        self._test_unicode_filename("iso8859-1")
1780
1781    def test_utf7_filename(self):
1782        self._test_unicode_filename("utf7")
1783
1784    def test_utf8_filename(self):
1785        self._test_unicode_filename("utf-8")
1786
1787    def _test_unicode_filename(self, encoding):
1788        tar = tarfile.open(tmpname, "w", format=self.format,
1789                           encoding=encoding, errors="strict")
1790        try:
1791            name = "\xe4\xf6\xfc"
1792            tar.addfile(tarfile.TarInfo(name))
1793        finally:
1794            tar.close()
1795
1796        tar = tarfile.open(tmpname, encoding=encoding)
1797        try:
1798            self.assertEqual(tar.getmembers()[0].name, name)
1799        finally:
1800            tar.close()
1801
1802    def test_unicode_filename_error(self):
1803        tar = tarfile.open(tmpname, "w", format=self.format,
1804                           encoding="ascii", errors="strict")
1805        try:
1806            tarinfo = tarfile.TarInfo()
1807
1808            tarinfo.name = "\xe4\xf6\xfc"
1809            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1810
1811            tarinfo.name = "foo"
1812            tarinfo.uname = "\xe4\xf6\xfc"
1813            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1814        finally:
1815            tar.close()
1816
1817    def test_unicode_argument(self):
1818        tar = tarfile.open(tarname, "r",
1819                           encoding="iso8859-1", errors="strict")
1820        try:
1821            for t in tar:
1822                self.assertIs(type(t.name), str)
1823                self.assertIs(type(t.linkname), str)
1824                self.assertIs(type(t.uname), str)
1825                self.assertIs(type(t.gname), str)
1826        finally:
1827            tar.close()
1828
1829    def test_uname_unicode(self):
1830        t = tarfile.TarInfo("foo")
1831        t.uname = "\xe4\xf6\xfc"
1832        t.gname = "\xe4\xf6\xfc"
1833
1834        tar = tarfile.open(tmpname, mode="w", format=self.format,
1835                           encoding="iso8859-1")
1836        try:
1837            tar.addfile(t)
1838        finally:
1839            tar.close()
1840
1841        tar = tarfile.open(tmpname, encoding="iso8859-1")
1842        try:
1843            t = tar.getmember("foo")
1844            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1845            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1846
1847            if self.format != tarfile.PAX_FORMAT:
1848                tar.close()
1849                tar = tarfile.open(tmpname, encoding="ascii")
1850                t = tar.getmember("foo")
1851                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1852                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1853        finally:
1854            tar.close()
1855
1856
1857class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1858
1859    format = tarfile.USTAR_FORMAT
1860
1861    # Test whether the utf-8 encoded version of a filename exceeds the 100
1862    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1863    # bytes).
1864    def test_unicode_name1(self):
1865        self._test_ustar_name("0123456789" * 10)
1866        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1867        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1868        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1869
1870    def test_unicode_name2(self):
1871        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1872        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1873
1874    # Test whether the utf-8 encoded version of a filename exceeds the 155
1875    # bytes prefix + '/' + 100 bytes name limit.
1876    def test_unicode_longname1(self):
1877        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1878        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1879        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1880        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1881
1882    def test_unicode_longname2(self):
1883        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1884        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1885
1886    def test_unicode_longname3(self):
1887        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1888        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1889        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1890
1891    def test_unicode_longname4(self):
1892        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1893        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1894
1895    def _test_ustar_name(self, name, exc=None):
1896        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1897            t = tarfile.TarInfo(name)
1898            if exc is None:
1899                tar.addfile(t)
1900            else:
1901                self.assertRaises(exc, tar.addfile, t)
1902
1903        if exc is None:
1904            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1905                for t in tar:
1906                    self.assertEqual(name, t.name)
1907                    break
1908
1909    # Test the same as above for the 100 bytes link field.
1910    def test_unicode_link1(self):
1911        self._test_ustar_link("0123456789" * 10)
1912        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1913        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1914        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1915
1916    def test_unicode_link2(self):
1917        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1918        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1919
1920    def _test_ustar_link(self, name, exc=None):
1921        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1922            t = tarfile.TarInfo("foo")
1923            t.linkname = name
1924            if exc is None:
1925                tar.addfile(t)
1926            else:
1927                self.assertRaises(exc, tar.addfile, t)
1928
1929        if exc is None:
1930            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1931                for t in tar:
1932                    self.assertEqual(name, t.linkname)
1933                    break
1934
1935
1936class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1937
1938    format = tarfile.GNU_FORMAT
1939
1940    def test_bad_pax_header(self):
1941        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1942        # without a hdrcharset=BINARY header.
1943        for encoding, name in (
1944                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1945                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1946            with tarfile.open(tarname, encoding=encoding,
1947                              errors="surrogateescape") as tar:
1948                try:
1949                    t = tar.getmember(name)
1950                except KeyError:
1951                    self.fail("unable to read bad GNU tar pax header")
1952
1953
1954class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1955
1956    format = tarfile.PAX_FORMAT
1957
1958    # PAX_FORMAT ignores encoding in write mode.
1959    test_unicode_filename_error = None
1960
1961    def test_binary_header(self):
1962        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1963        for encoding, name in (
1964                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1965                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1966            with tarfile.open(tarname, encoding=encoding,
1967                              errors="surrogateescape") as tar:
1968                try:
1969                    t = tar.getmember(name)
1970                except KeyError:
1971                    self.fail("unable to read POSIX.1-2008 binary header")
1972
1973
1974class AppendTestBase:
1975    # Test append mode (cp. patch #1652681).
1976
1977    def setUp(self):
1978        self.tarname = tmpname
1979        if os.path.exists(self.tarname):
1980            support.unlink(self.tarname)
1981
1982    def _create_testtar(self, mode="w:"):
1983        with tarfile.open(tarname, encoding="iso8859-1") as src:
1984            t = src.getmember("ustar/regtype")
1985            t.name = "foo"
1986            with src.extractfile(t) as f:
1987                with tarfile.open(self.tarname, mode) as tar:
1988                    tar.addfile(t, f)
1989
1990    def test_append_compressed(self):
1991        self._create_testtar("w:" + self.suffix)
1992        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1993
1994class AppendTest(AppendTestBase, unittest.TestCase):
1995    test_append_compressed = None
1996
1997    def _add_testfile(self, fileobj=None):
1998        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1999            tar.addfile(tarfile.TarInfo("bar"))
2000
2001    def _test(self, names=["bar"], fileobj=None):
2002        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2003            self.assertEqual(tar.getnames(), names)
2004
2005    def test_non_existing(self):
2006        self._add_testfile()
2007        self._test()
2008
2009    def test_empty(self):
2010        tarfile.open(self.tarname, "w:").close()
2011        self._add_testfile()
2012        self._test()
2013
2014    def test_empty_fileobj(self):
2015        fobj = io.BytesIO(b"\0" * 1024)
2016        self._add_testfile(fobj)
2017        fobj.seek(0)
2018        self._test(fileobj=fobj)
2019
2020    def test_fileobj(self):
2021        self._create_testtar()
2022        with open(self.tarname, "rb") as fobj:
2023            data = fobj.read()
2024        fobj = io.BytesIO(data)
2025        self._add_testfile(fobj)
2026        fobj.seek(0)
2027        self._test(names=["foo", "bar"], fileobj=fobj)
2028
2029    def test_existing(self):
2030        self._create_testtar()
2031        self._add_testfile()
2032        self._test(names=["foo", "bar"])
2033
2034    # Append mode is supposed to fail if the tarfile to append to
2035    # does not end with a zero block.
2036    def _test_error(self, data):
2037        with open(self.tarname, "wb") as fobj:
2038            fobj.write(data)
2039        self.assertRaises(tarfile.ReadError, self._add_testfile)
2040
2041    def test_null(self):
2042        self._test_error(b"")
2043
2044    def test_incomplete(self):
2045        self._test_error(b"\0" * 13)
2046
2047    def test_premature_eof(self):
2048        data = tarfile.TarInfo("foo").tobuf()
2049        self._test_error(data)
2050
2051    def test_trailing_garbage(self):
2052        data = tarfile.TarInfo("foo").tobuf()
2053        self._test_error(data + b"\0" * 13)
2054
2055    def test_invalid(self):
2056        self._test_error(b"a" * 512)
2057
2058class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2059    pass
2060
2061class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2062    pass
2063
2064class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2065    pass
2066
2067
2068class LimitsTest(unittest.TestCase):
2069
2070    def test_ustar_limits(self):
2071        # 100 char name
2072        tarinfo = tarfile.TarInfo("0123456789" * 10)
2073        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2074
2075        # 101 char name that cannot be stored
2076        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2077        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2078
2079        # 256 char name with a slash at pos 156
2080        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2081        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2082
2083        # 256 char name that cannot be stored
2084        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2085        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2086
2087        # 512 char name
2088        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2089        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2090
2091        # 512 char linkname
2092        tarinfo = tarfile.TarInfo("longlink")
2093        tarinfo.linkname = "123/" * 126 + "longname"
2094        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2095
2096        # uid > 8 digits
2097        tarinfo = tarfile.TarInfo("name")
2098        tarinfo.uid = 0o10000000
2099        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2100
2101    def test_gnu_limits(self):
2102        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2103        tarinfo.tobuf(tarfile.GNU_FORMAT)
2104
2105        tarinfo = tarfile.TarInfo("longlink")
2106        tarinfo.linkname = "123/" * 126 + "longname"
2107        tarinfo.tobuf(tarfile.GNU_FORMAT)
2108
2109        # uid >= 256 ** 7
2110        tarinfo = tarfile.TarInfo("name")
2111        tarinfo.uid = 0o4000000000000000000
2112        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2113
2114    def test_pax_limits(self):
2115        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2116        tarinfo.tobuf(tarfile.PAX_FORMAT)
2117
2118        tarinfo = tarfile.TarInfo("longlink")
2119        tarinfo.linkname = "123/" * 126 + "longname"
2120        tarinfo.tobuf(tarfile.PAX_FORMAT)
2121
2122        tarinfo = tarfile.TarInfo("name")
2123        tarinfo.uid = 0o4000000000000000000
2124        tarinfo.tobuf(tarfile.PAX_FORMAT)
2125
2126
2127class MiscTest(unittest.TestCase):
2128
2129    def test_char_fields(self):
2130        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2131                         b"foo\0\0\0\0\0")
2132        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2133                         b"foo")
2134        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2135                         "foo")
2136        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2137                         "foo")
2138
2139    def test_read_number_fields(self):
2140        # Issue 13158: Test if GNU tar specific base-256 number fields
2141        # are decoded correctly.
2142        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2143        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2144        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2145                         0o10000000)
2146        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2147                         0xffffffff)
2148        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2149                         -1)
2150        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2151                         -100)
2152        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2153                         -0x100000000000000)
2154
2155        # Issue 24514: Test if empty number fields are converted to zero.
2156        self.assertEqual(tarfile.nti(b"\0"), 0)
2157        self.assertEqual(tarfile.nti(b"       \0"), 0)
2158
2159    def test_write_number_fields(self):
2160        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2161        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2162        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2163                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2164        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2165                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2166        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2167                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2168        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2169                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2170        self.assertEqual(tarfile.itn(-0x100000000000000,
2171                                     format=tarfile.GNU_FORMAT),
2172                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2173
2174        # Issue 32713: Test if itn() supports float values outside the
2175        # non-GNU format range
2176        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2177                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2178        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2179                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2180        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2181
2182    def test_number_field_limits(self):
2183        with self.assertRaises(ValueError):
2184            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2185        with self.assertRaises(ValueError):
2186            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2187        with self.assertRaises(ValueError):
2188            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2189        with self.assertRaises(ValueError):
2190            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2191
2192    def test__all__(self):
2193        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2194                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2195                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2196                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2197                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2198                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2199                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2200                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2201                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2202                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2203                     'filemode',
2204                     'EmptyHeaderError', 'TruncatedHeaderError',
2205                     'EOFHeaderError', 'InvalidHeaderError',
2206                     'SubsequentHeaderError', 'ExFileObject',
2207                     'main'}
2208        support.check__all__(self, tarfile, blacklist=blacklist)
2209
2210
2211class CommandLineTest(unittest.TestCase):
2212
2213    def tarfilecmd(self, *args, **kwargs):
2214        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2215                                                      **kwargs)
2216        return out.replace(os.linesep.encode(), b'\n')
2217
2218    def tarfilecmd_failure(self, *args):
2219        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2220
2221    def make_simple_tarfile(self, tar_name):
2222        files = [support.findfile('tokenize_tests.txt'),
2223                 support.findfile('tokenize_tests-no-coding-cookie-'
2224                                  'and-utf8-bom-sig-only.txt')]
2225        self.addCleanup(support.unlink, tar_name)
2226        with tarfile.open(tar_name, 'w') as tf:
2227            for tardata in files:
2228                tf.add(tardata, arcname=os.path.basename(tardata))
2229
2230    def test_bad_use(self):
2231        rc, out, err = self.tarfilecmd_failure()
2232        self.assertEqual(out, b'')
2233        self.assertIn(b'usage', err.lower())
2234        self.assertIn(b'error', err.lower())
2235        self.assertIn(b'required', err.lower())
2236        rc, out, err = self.tarfilecmd_failure('-l', '')
2237        self.assertEqual(out, b'')
2238        self.assertNotEqual(err.strip(), b'')
2239
2240    def test_test_command(self):
2241        for tar_name in testtarnames:
2242            for opt in '-t', '--test':
2243                out = self.tarfilecmd(opt, tar_name)
2244                self.assertEqual(out, b'')
2245
2246    def test_test_command_verbose(self):
2247        for tar_name in testtarnames:
2248            for opt in '-v', '--verbose':
2249                out = self.tarfilecmd(opt, '-t', tar_name)
2250                self.assertIn(b'is a tar archive.\n', out)
2251
2252    def test_test_command_invalid_file(self):
2253        zipname = support.findfile('zipdir.zip')
2254        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2255        self.assertIn(b' is not a tar archive.', err)
2256        self.assertEqual(out, b'')
2257        self.assertEqual(rc, 1)
2258
2259        for tar_name in testtarnames:
2260            with self.subTest(tar_name=tar_name):
2261                with open(tar_name, 'rb') as f:
2262                    data = f.read()
2263                try:
2264                    with open(tmpname, 'wb') as f:
2265                        f.write(data[:511])
2266                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2267                    self.assertEqual(out, b'')
2268                    self.assertEqual(rc, 1)
2269                finally:
2270                    support.unlink(tmpname)
2271
2272    def test_list_command(self):
2273        for tar_name in testtarnames:
2274            with support.captured_stdout() as t:
2275                with tarfile.open(tar_name, 'r') as tf:
2276                    tf.list(verbose=False)
2277            expected = t.getvalue().encode('ascii', 'backslashreplace')
2278            for opt in '-l', '--list':
2279                out = self.tarfilecmd(opt, tar_name,
2280                                      PYTHONIOENCODING='ascii')
2281                self.assertEqual(out, expected)
2282
2283    def test_list_command_verbose(self):
2284        for tar_name in testtarnames:
2285            with support.captured_stdout() as t:
2286                with tarfile.open(tar_name, 'r') as tf:
2287                    tf.list(verbose=True)
2288            expected = t.getvalue().encode('ascii', 'backslashreplace')
2289            for opt in '-v', '--verbose':
2290                out = self.tarfilecmd(opt, '-l', tar_name,
2291                                      PYTHONIOENCODING='ascii')
2292                self.assertEqual(out, expected)
2293
2294    def test_list_command_invalid_file(self):
2295        zipname = support.findfile('zipdir.zip')
2296        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2297        self.assertIn(b' is not a tar archive.', err)
2298        self.assertEqual(out, b'')
2299        self.assertEqual(rc, 1)
2300
2301    def test_create_command(self):
2302        files = [support.findfile('tokenize_tests.txt'),
2303                 support.findfile('tokenize_tests-no-coding-cookie-'
2304                                  'and-utf8-bom-sig-only.txt')]
2305        for opt in '-c', '--create':
2306            try:
2307                out = self.tarfilecmd(opt, tmpname, *files)
2308                self.assertEqual(out, b'')
2309                with tarfile.open(tmpname) as tar:
2310                    tar.getmembers()
2311            finally:
2312                support.unlink(tmpname)
2313
2314    def test_create_command_verbose(self):
2315        files = [support.findfile('tokenize_tests.txt'),
2316                 support.findfile('tokenize_tests-no-coding-cookie-'
2317                                  'and-utf8-bom-sig-only.txt')]
2318        for opt in '-v', '--verbose':
2319            try:
2320                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2321                self.assertIn(b' file created.', out)
2322                with tarfile.open(tmpname) as tar:
2323                    tar.getmembers()
2324            finally:
2325                support.unlink(tmpname)
2326
2327    def test_create_command_dotless_filename(self):
2328        files = [support.findfile('tokenize_tests.txt')]
2329        try:
2330            out = self.tarfilecmd('-c', dotlessname, *files)
2331            self.assertEqual(out, b'')
2332            with tarfile.open(dotlessname) as tar:
2333                tar.getmembers()
2334        finally:
2335            support.unlink(dotlessname)
2336
2337    def test_create_command_dot_started_filename(self):
2338        tar_name = os.path.join(TEMPDIR, ".testtar")
2339        files = [support.findfile('tokenize_tests.txt')]
2340        try:
2341            out = self.tarfilecmd('-c', tar_name, *files)
2342            self.assertEqual(out, b'')
2343            with tarfile.open(tar_name) as tar:
2344                tar.getmembers()
2345        finally:
2346            support.unlink(tar_name)
2347
2348    def test_create_command_compressed(self):
2349        files = [support.findfile('tokenize_tests.txt'),
2350                 support.findfile('tokenize_tests-no-coding-cookie-'
2351                                  'and-utf8-bom-sig-only.txt')]
2352        for filetype in (GzipTest, Bz2Test, LzmaTest):
2353            if not filetype.open:
2354                continue
2355            try:
2356                tar_name = tmpname + '.' + filetype.suffix
2357                out = self.tarfilecmd('-c', tar_name, *files)
2358                with filetype.taropen(tar_name) as tar:
2359                    tar.getmembers()
2360            finally:
2361                support.unlink(tar_name)
2362
2363    def test_extract_command(self):
2364        self.make_simple_tarfile(tmpname)
2365        for opt in '-e', '--extract':
2366            try:
2367                with support.temp_cwd(tarextdir):
2368                    out = self.tarfilecmd(opt, tmpname)
2369                self.assertEqual(out, b'')
2370            finally:
2371                support.rmtree(tarextdir)
2372
2373    def test_extract_command_verbose(self):
2374        self.make_simple_tarfile(tmpname)
2375        for opt in '-v', '--verbose':
2376            try:
2377                with support.temp_cwd(tarextdir):
2378                    out = self.tarfilecmd(opt, '-e', tmpname)
2379                self.assertIn(b' file is extracted.', out)
2380            finally:
2381                support.rmtree(tarextdir)
2382
2383    def test_extract_command_different_directory(self):
2384        self.make_simple_tarfile(tmpname)
2385        try:
2386            with support.temp_cwd(tarextdir):
2387                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2388            self.assertEqual(out, b'')
2389        finally:
2390            support.rmtree(tarextdir)
2391
2392    def test_extract_command_invalid_file(self):
2393        zipname = support.findfile('zipdir.zip')
2394        with support.temp_cwd(tarextdir):
2395            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2396        self.assertIn(b' is not a tar archive.', err)
2397        self.assertEqual(out, b'')
2398        self.assertEqual(rc, 1)
2399
2400
2401class ContextManagerTest(unittest.TestCase):
2402
2403    def test_basic(self):
2404        with tarfile.open(tarname) as tar:
2405            self.assertFalse(tar.closed, "closed inside runtime context")
2406        self.assertTrue(tar.closed, "context manager failed")
2407
2408    def test_closed(self):
2409        # The __enter__() method is supposed to raise OSError
2410        # if the TarFile object is already closed.
2411        tar = tarfile.open(tarname)
2412        tar.close()
2413        with self.assertRaises(OSError):
2414            with tar:
2415                pass
2416
2417    def test_exception(self):
2418        # Test if the OSError exception is passed through properly.
2419        with self.assertRaises(Exception) as exc:
2420            with tarfile.open(tarname) as tar:
2421                raise OSError
2422        self.assertIsInstance(exc.exception, OSError,
2423                              "wrong exception raised in context manager")
2424        self.assertTrue(tar.closed, "context manager failed")
2425
2426    def test_no_eof(self):
2427        # __exit__() must not write end-of-archive blocks if an
2428        # exception was raised.
2429        try:
2430            with tarfile.open(tmpname, "w") as tar:
2431                raise Exception
2432        except:
2433            pass
2434        self.assertEqual(os.path.getsize(tmpname), 0,
2435                "context manager wrote an end-of-archive block")
2436        self.assertTrue(tar.closed, "context manager failed")
2437
2438    def test_eof(self):
2439        # __exit__() must write end-of-archive blocks, i.e. call
2440        # TarFile.close() if there was no error.
2441        with tarfile.open(tmpname, "w"):
2442            pass
2443        self.assertNotEqual(os.path.getsize(tmpname), 0,
2444                "context manager wrote no end-of-archive block")
2445
2446    def test_fileobj(self):
2447        # Test that __exit__() did not close the external file
2448        # object.
2449        with open(tmpname, "wb") as fobj:
2450            try:
2451                with tarfile.open(fileobj=fobj, mode="w") as tar:
2452                    raise Exception
2453            except:
2454                pass
2455            self.assertFalse(fobj.closed, "external file object was closed")
2456            self.assertTrue(tar.closed, "context manager failed")
2457
2458
2459@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2460class LinkEmulationTest(ReadTest, unittest.TestCase):
2461
2462    # Test for issue #8741 regression. On platforms that do not support
2463    # symbolic or hard links tarfile tries to extract these types of members
2464    # as the regular files they point to.
2465    def _test_link_extraction(self, name):
2466        self.tar.extract(name, TEMPDIR)
2467        with open(os.path.join(TEMPDIR, name), "rb") as f:
2468            data = f.read()
2469        self.assertEqual(sha256sum(data), sha256_regtype)
2470
2471    # See issues #1578269, #8879, and #17689 for some history on these skips
2472    @unittest.skipIf(hasattr(os.path, "islink"),
2473                     "Skip emulation - has os.path.islink but not os.link")
2474    def test_hardlink_extraction1(self):
2475        self._test_link_extraction("ustar/lnktype")
2476
2477    @unittest.skipIf(hasattr(os.path, "islink"),
2478                     "Skip emulation - has os.path.islink but not os.link")
2479    def test_hardlink_extraction2(self):
2480        self._test_link_extraction("./ustar/linktest2/lnktype")
2481
2482    @unittest.skipIf(hasattr(os, "symlink"),
2483                     "Skip emulation if symlink exists")
2484    def test_symlink_extraction1(self):
2485        self._test_link_extraction("ustar/symtype")
2486
2487    @unittest.skipIf(hasattr(os, "symlink"),
2488                     "Skip emulation if symlink exists")
2489    def test_symlink_extraction2(self):
2490        self._test_link_extraction("./ustar/linktest2/symtype")
2491
2492
2493class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2494    # Issue5068: The _BZ2Proxy.read() method loops forever
2495    # on an empty or partial bzipped file.
2496
2497    def _test_partial_input(self, mode):
2498        class MyBytesIO(io.BytesIO):
2499            hit_eof = False
2500            def read(self, n):
2501                if self.hit_eof:
2502                    raise AssertionError("infinite loop detected in "
2503                                         "tarfile.open()")
2504                self.hit_eof = self.tell() == len(self.getvalue())
2505                return super(MyBytesIO, self).read(n)
2506            def seek(self, *args):
2507                self.hit_eof = False
2508                return super(MyBytesIO, self).seek(*args)
2509
2510        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2511        for x in range(len(data) + 1):
2512            try:
2513                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2514            except tarfile.ReadError:
2515                pass # we have no interest in ReadErrors
2516
2517    def test_partial_input(self):
2518        self._test_partial_input("r")
2519
2520    def test_partial_input_bz2(self):
2521        self._test_partial_input("r:bz2")
2522
2523
2524def root_is_uid_gid_0():
2525    try:
2526        import pwd, grp
2527    except ImportError:
2528        return False
2529    if pwd.getpwuid(0)[0] != 'root':
2530        return False
2531    if grp.getgrgid(0)[0] != 'root':
2532        return False
2533    return True
2534
2535
2536@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2537@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2538class NumericOwnerTest(unittest.TestCase):
2539    # mock the following:
2540    #  os.chown: so we can test what's being called
2541    #  os.chmod: so the modes are not actually changed. if they are, we can't
2542    #             delete the files/directories
2543    #  os.geteuid: so we can lie and say we're root (uid = 0)
2544
2545    @staticmethod
2546    def _make_test_archive(filename_1, dirname_1, filename_2):
2547        # the file contents to write
2548        fobj = io.BytesIO(b"content")
2549
2550        # create a tar file with a file, a directory, and a file within that
2551        #  directory. Assign various .uid/.gid values to them
2552        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2553                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2554                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2555                 ]
2556        with tarfile.open(tmpname, 'w') as tarfl:
2557            for name, uid, gid, typ, contents in items:
2558                t = tarfile.TarInfo(name)
2559                t.uid = uid
2560                t.gid = gid
2561                t.uname = 'root'
2562                t.gname = 'root'
2563                t.type = typ
2564                tarfl.addfile(t, contents)
2565
2566        # return the full pathname to the tar file
2567        return tmpname
2568
2569    @staticmethod
2570    @contextmanager
2571    def _setup_test(mock_geteuid):
2572        mock_geteuid.return_value = 0  # lie and say we're root
2573        fname = 'numeric-owner-testfile'
2574        dirname = 'dir'
2575
2576        # the names we want stored in the tarfile
2577        filename_1 = fname
2578        dirname_1 = dirname
2579        filename_2 = os.path.join(dirname, fname)
2580
2581        # create the tarfile with the contents we're after
2582        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2583                                                           dirname_1,
2584                                                           filename_2)
2585
2586        # open the tarfile for reading. yield it and the names of the items
2587        #  we stored into the file
2588        with tarfile.open(tar_filename) as tarfl:
2589            yield tarfl, filename_1, dirname_1, filename_2
2590
2591    @unittest.mock.patch('os.chown')
2592    @unittest.mock.patch('os.chmod')
2593    @unittest.mock.patch('os.geteuid')
2594    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2595                                        mock_chown):
2596        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2597                                                filename_2):
2598            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2599            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2600
2601        # convert to filesystem paths
2602        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2603        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2604
2605        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2606                                     unittest.mock.call(f_filename_2, 88, 87),
2607                                     ],
2608                                    any_order=True)
2609
2610    @unittest.mock.patch('os.chown')
2611    @unittest.mock.patch('os.chmod')
2612    @unittest.mock.patch('os.geteuid')
2613    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2614                                           mock_chown):
2615        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2616                                                filename_2):
2617            tarfl.extractall(TEMPDIR, numeric_owner=True)
2618
2619        # convert to filesystem paths
2620        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2621        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2622        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2623
2624        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2625                                     unittest.mock.call(f_dirname_1, 77, 76),
2626                                     unittest.mock.call(f_filename_2, 88, 87),
2627                                     ],
2628                                    any_order=True)
2629
2630    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2631    #  because the uname and gname in the test file are 'root', and extract()
2632    #  will look them up using pwd and grp to find their uid and gid, which we
2633    #  test here to be 0.
2634    @unittest.skipUnless(root_is_uid_gid_0(),
2635                         'uid=0,gid=0 must be named "root"')
2636    @unittest.mock.patch('os.chown')
2637    @unittest.mock.patch('os.chmod')
2638    @unittest.mock.patch('os.geteuid')
2639    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2640                                           mock_chown):
2641        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2642            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2643
2644        # convert to filesystem paths
2645        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2646
2647        mock_chown.assert_called_with(f_filename_1, 0, 0)
2648
2649    @unittest.mock.patch('os.geteuid')
2650    def test_keyword_only(self, mock_geteuid):
2651        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2652            self.assertRaises(TypeError,
2653                              tarfl.extract, filename_1, TEMPDIR, False, True)
2654
2655
2656def setUpModule():
2657    support.unlink(TEMPDIR)
2658    os.makedirs(TEMPDIR)
2659
2660    global testtarnames
2661    testtarnames = [tarname]
2662    with open(tarname, "rb") as fobj:
2663        data = fobj.read()
2664
2665    # Create compressed tarfiles.
2666    for c in GzipTest, Bz2Test, LzmaTest:
2667        if c.open:
2668            support.unlink(c.tarname)
2669            testtarnames.append(c.tarname)
2670            with c.open(c.tarname, "wb") as tar:
2671                tar.write(data)
2672
2673def tearDownModule():
2674    if os.path.exists(TEMPDIR):
2675        support.rmtree(TEMPDIR)
2676
2677if __name__ == "__main__":
2678    unittest.main()
2679