1import sys
2import os
3import io
4from hashlib import md5
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def md5sum(data):
31    return md5(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
43md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
44
45
46class TarTest:
47    tarname = tarname
48    suffix = ''
49    open = io.FileIO
50    taropen = tarfile.TarFile.taropen
51
52    @property
53    def mode(self):
54        return self.prefix + self.suffix
55
56@support.requires_gzip
57class GzipTest:
58    tarname = gzipname
59    suffix = 'gz'
60    open = gzip.GzipFile if gzip else None
61    taropen = tarfile.TarFile.gzopen
62
63@support.requires_bz2
64class Bz2Test:
65    tarname = bz2name
66    suffix = 'bz2'
67    open = bz2.BZ2File if bz2 else None
68    taropen = tarfile.TarFile.bz2open
69
70@support.requires_lzma
71class LzmaTest:
72    tarname = xzname
73    suffix = 'xz'
74    open = lzma.LZMAFile if lzma else None
75    taropen = tarfile.TarFile.xzopen
76
77
78class ReadTest(TarTest):
79
80    prefix = "r:"
81
82    def setUp(self):
83        self.tar = tarfile.open(self.tarname, mode=self.mode,
84                                encoding="iso8859-1")
85
86    def tearDown(self):
87        self.tar.close()
88
89
90class UstarReadTest(ReadTest, unittest.TestCase):
91
92    def test_fileobj_regular_file(self):
93        tarinfo = self.tar.getmember("ustar/regtype")
94        with self.tar.extractfile(tarinfo) as fobj:
95            data = fobj.read()
96            self.assertEqual(len(data), tarinfo.size,
97                    "regular file extraction failed")
98            self.assertEqual(md5sum(data), md5_regtype,
99                    "regular file extraction failed")
100
101    def test_fileobj_readlines(self):
102        self.tar.extract("ustar/regtype", TEMPDIR)
103        tarinfo = self.tar.getmember("ustar/regtype")
104        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
105            lines1 = fobj1.readlines()
106
107        with self.tar.extractfile(tarinfo) as fobj:
108            fobj2 = io.TextIOWrapper(fobj)
109            lines2 = fobj2.readlines()
110            self.assertEqual(lines1, lines2,
111                    "fileobj.readlines() failed")
112            self.assertEqual(len(lines2), 114,
113                    "fileobj.readlines() failed")
114            self.assertEqual(lines2[83],
115                    "I will gladly admit that Python is not the fastest "
116                    "running scripting language.\n",
117                    "fileobj.readlines() failed")
118
119    def test_fileobj_iter(self):
120        self.tar.extract("ustar/regtype", TEMPDIR)
121        tarinfo = self.tar.getmember("ustar/regtype")
122        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
123            lines1 = fobj1.readlines()
124        with self.tar.extractfile(tarinfo) as fobj2:
125            lines2 = list(io.TextIOWrapper(fobj2))
126            self.assertEqual(lines1, lines2,
127                    "fileobj.__iter__() failed")
128
129    def test_fileobj_seek(self):
130        self.tar.extract("ustar/regtype", TEMPDIR)
131        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
132            data = fobj.read()
133
134        tarinfo = self.tar.getmember("ustar/regtype")
135        fobj = self.tar.extractfile(tarinfo)
136
137        text = fobj.read()
138        fobj.seek(0)
139        self.assertEqual(0, fobj.tell(),
140                     "seek() to file's start failed")
141        fobj.seek(2048, 0)
142        self.assertEqual(2048, fobj.tell(),
143                     "seek() to absolute position failed")
144        fobj.seek(-1024, 1)
145        self.assertEqual(1024, fobj.tell(),
146                     "seek() to negative relative position failed")
147        fobj.seek(1024, 1)
148        self.assertEqual(2048, fobj.tell(),
149                     "seek() to positive relative position failed")
150        s = fobj.read(10)
151        self.assertEqual(s, data[2048:2058],
152                     "read() after seek failed")
153        fobj.seek(0, 2)
154        self.assertEqual(tarinfo.size, fobj.tell(),
155                     "seek() to file's end failed")
156        self.assertEqual(fobj.read(), b"",
157                     "read() at file's end did not return empty string")
158        fobj.seek(-tarinfo.size, 2)
159        self.assertEqual(0, fobj.tell(),
160                     "relative seek() to file's end failed")
161        fobj.seek(512)
162        s1 = fobj.readlines()
163        fobj.seek(512)
164        s2 = fobj.readlines()
165        self.assertEqual(s1, s2,
166                     "readlines() after seek failed")
167        fobj.seek(0)
168        self.assertEqual(len(fobj.readline()), fobj.tell(),
169                     "tell() after readline() failed")
170        fobj.seek(512)
171        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
172                     "tell() after seek() and readline() failed")
173        fobj.seek(0)
174        line = fobj.readline()
175        self.assertEqual(fobj.read(), data[len(line):],
176                     "read() after readline() failed")
177        fobj.close()
178
179    def test_fileobj_text(self):
180        with self.tar.extractfile("ustar/regtype") as fobj:
181            fobj = io.TextIOWrapper(fobj)
182            data = fobj.read().encode("iso8859-1")
183            self.assertEqual(md5sum(data), md5_regtype)
184            try:
185                fobj.seek(100)
186            except AttributeError:
187                # Issue #13815: seek() complained about a missing
188                # flush() method.
189                self.fail("seeking failed in text mode")
190
191    # Test if symbolic and hard links are resolved by extractfile().  The
192    # test link members each point to a regular member whose data is
193    # supposed to be exported.
194    def _test_fileobj_link(self, lnktype, regtype):
195        with self.tar.extractfile(lnktype) as a, \
196             self.tar.extractfile(regtype) as b:
197            self.assertEqual(a.name, b.name)
198
199    def test_fileobj_link1(self):
200        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
201
202    def test_fileobj_link2(self):
203        self._test_fileobj_link("./ustar/linktest2/lnktype",
204                                "ustar/linktest1/regtype")
205
206    def test_fileobj_symlink1(self):
207        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
208
209    def test_fileobj_symlink2(self):
210        self._test_fileobj_link("./ustar/linktest2/symtype",
211                                "ustar/linktest1/regtype")
212
213    def test_issue14160(self):
214        self._test_fileobj_link("symtype2", "ustar/regtype")
215
216class GzipUstarReadTest(GzipTest, UstarReadTest):
217    pass
218
219class Bz2UstarReadTest(Bz2Test, UstarReadTest):
220    pass
221
222class LzmaUstarReadTest(LzmaTest, UstarReadTest):
223    pass
224
225
226class ListTest(ReadTest, unittest.TestCase):
227
228    # Override setUp to use default encoding (UTF-8)
229    def setUp(self):
230        self.tar = tarfile.open(self.tarname, mode=self.mode)
231
232    def test_list(self):
233        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
234        with support.swap_attr(sys, 'stdout', tio):
235            self.tar.list(verbose=False)
236        out = tio.detach().getvalue()
237        self.assertIn(b'ustar/conttype', out)
238        self.assertIn(b'ustar/regtype', out)
239        self.assertIn(b'ustar/lnktype', out)
240        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
241        self.assertIn(b'./ustar/linktest2/symtype', out)
242        self.assertIn(b'./ustar/linktest2/lnktype', out)
243        # Make sure it puts trailing slash for directory
244        self.assertIn(b'ustar/dirtype/', out)
245        self.assertIn(b'ustar/dirtype-with-size/', out)
246        # Make sure it is able to print unencodable characters
247        def conv(b):
248            s = b.decode(self.tar.encoding, 'surrogateescape')
249            return s.encode('ascii', 'backslashreplace')
250        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
251        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
252                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
256        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
257        # Make sure it prints files separated by one newline without any
258        # 'ls -l'-like accessories if verbose flag is not being used
259        # ...
260        # ustar/conttype
261        # ustar/regtype
262        # ...
263        self.assertRegex(out, br'ustar/conttype ?\r?\n'
264                              br'ustar/regtype ?\r?\n')
265        # Make sure it does not print the source of link without verbose flag
266        self.assertNotIn(b'link to', out)
267        self.assertNotIn(b'->', out)
268
269    def test_list_verbose(self):
270        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
271        with support.swap_attr(sys, 'stdout', tio):
272            self.tar.list(verbose=True)
273        out = tio.detach().getvalue()
274        # Make sure it prints files separated by one newline with 'ls -l'-like
275        # accessories if verbose flag is being used
276        # ...
277        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
278        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
279        # ...
280        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
281                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
282                               br'ustar/\w+type ?\r?\n') * 2)
283        # Make sure it prints the source of link with verbose flag
284        self.assertIn(b'ustar/symtype -> regtype', out)
285        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
286        self.assertIn(b'./ustar/linktest2/lnktype link to '
287                      b'./ustar/linktest1/regtype', out)
288        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
289                      (b'/123' * 125) + b'/longname', out)
290        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
291                      (b'/123' * 125) + b'/longname', out)
292
293    def test_list_members(self):
294        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
295        def members(tar):
296            for tarinfo in tar.getmembers():
297                if 'reg' in tarinfo.name:
298                    yield tarinfo
299        with support.swap_attr(sys, 'stdout', tio):
300            self.tar.list(verbose=False, members=members(self.tar))
301        out = tio.detach().getvalue()
302        self.assertIn(b'ustar/regtype', out)
303        self.assertNotIn(b'ustar/conttype', out)
304
305
306class GzipListTest(GzipTest, ListTest):
307    pass
308
309
310class Bz2ListTest(Bz2Test, ListTest):
311    pass
312
313
314class LzmaListTest(LzmaTest, ListTest):
315    pass
316
317
318class CommonReadTest(ReadTest):
319
320    def test_empty_tarfile(self):
321        # Test for issue6123: Allow opening empty archives.
322        # This test checks if tarfile.open() is able to open an empty tar
323        # archive successfully. Note that an empty tar archive is not the
324        # same as an empty file!
325        with tarfile.open(tmpname, self.mode.replace("r", "w")):
326            pass
327        try:
328            tar = tarfile.open(tmpname, self.mode)
329            tar.getnames()
330        except tarfile.ReadError:
331            self.fail("tarfile.open() failed on empty archive")
332        else:
333            self.assertListEqual(tar.getmembers(), [])
334        finally:
335            tar.close()
336
337    def test_non_existent_tarfile(self):
338        # Test for issue11513: prevent non-existent gzipped tarfiles raising
339        # multiple exceptions.
340        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
341            tarfile.open("xxx", self.mode)
342
343    def test_null_tarfile(self):
344        # Test for issue6123: Allow opening empty archives.
345        # This test guarantees that tarfile.open() does not treat an empty
346        # file as an empty tar archive.
347        with open(tmpname, "wb"):
348            pass
349        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
350        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
351
352    def test_ignore_zeros(self):
353        # Test TarFile's ignore_zeros option.
354        # generate 512 pseudorandom bytes
355        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
356        for char in (b'\0', b'a'):
357            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
358            # are ignored correctly.
359            with self.open(tmpname, "w") as fobj:
360                fobj.write(char * 1024)
361                tarinfo = tarfile.TarInfo("foo")
362                tarinfo.size = len(data)
363                fobj.write(tarinfo.tobuf())
364                fobj.write(data)
365
366            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
367            try:
368                self.assertListEqual(tar.getnames(), ["foo"],
369                    "ignore_zeros=True should have skipped the %r-blocks" %
370                    char)
371            finally:
372                tar.close()
373
374    def test_premature_end_of_archive(self):
375        for size in (512, 600, 1024, 1200):
376            with tarfile.open(tmpname, "w:") as tar:
377                t = tarfile.TarInfo("foo")
378                t.size = 1024
379                tar.addfile(t, io.BytesIO(b"a" * 1024))
380
381            with open(tmpname, "r+b") as fobj:
382                fobj.truncate(size)
383
384            with tarfile.open(tmpname) as tar:
385                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
386                    for t in tar:
387                        pass
388
389            with tarfile.open(tmpname) as tar:
390                t = tar.next()
391
392                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
393                    tar.extract(t, TEMPDIR)
394
395                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
396                    tar.extractfile(t).read()
397
398    def test_length_zero_header(self):
399        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
400        # with an exception
401        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
402            with tarfile.open(support.findfile('recursion.tar')) as tar:
403                pass
404
405class MiscReadTestBase(CommonReadTest):
406    def requires_name_attribute(self):
407        pass
408
409    def test_no_name_argument(self):
410        self.requires_name_attribute()
411        with open(self.tarname, "rb") as fobj:
412            self.assertIsInstance(fobj.name, str)
413            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
414                self.assertIsInstance(tar.name, str)
415                self.assertEqual(tar.name, os.path.abspath(fobj.name))
416
417    def test_no_name_attribute(self):
418        with open(self.tarname, "rb") as fobj:
419            data = fobj.read()
420        fobj = io.BytesIO(data)
421        self.assertRaises(AttributeError, getattr, fobj, "name")
422        tar = tarfile.open(fileobj=fobj, mode=self.mode)
423        self.assertIsNone(tar.name)
424
425    def test_empty_name_attribute(self):
426        with open(self.tarname, "rb") as fobj:
427            data = fobj.read()
428        fobj = io.BytesIO(data)
429        fobj.name = ""
430        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
431            self.assertIsNone(tar.name)
432
433    def test_int_name_attribute(self):
434        # Issue 21044: tarfile.open() should handle fileobj with an integer
435        # 'name' attribute.
436        fd = os.open(self.tarname, os.O_RDONLY)
437        with open(fd, 'rb') as fobj:
438            self.assertIsInstance(fobj.name, int)
439            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
440                self.assertIsNone(tar.name)
441
442    def test_bytes_name_attribute(self):
443        self.requires_name_attribute()
444        tarname = os.fsencode(self.tarname)
445        with open(tarname, 'rb') as fobj:
446            self.assertIsInstance(fobj.name, bytes)
447            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
448                self.assertIsInstance(tar.name, bytes)
449                self.assertEqual(tar.name, os.path.abspath(fobj.name))
450
451    def test_pathlike_name(self):
452        tarname = pathlib.Path(self.tarname)
453        with tarfile.open(tarname, mode=self.mode) as tar:
454            self.assertIsInstance(tar.name, str)
455            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
456        with self.taropen(tarname) as tar:
457            self.assertIsInstance(tar.name, str)
458            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
459        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
460            self.assertIsInstance(tar.name, str)
461            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
462        if self.suffix == '':
463            with tarfile.TarFile(tarname, mode='r') as tar:
464                self.assertIsInstance(tar.name, str)
465                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
466
467    def test_illegal_mode_arg(self):
468        with open(tmpname, 'wb'):
469            pass
470        with self.assertRaisesRegex(ValueError, 'mode must be '):
471            tar = self.taropen(tmpname, 'q')
472        with self.assertRaisesRegex(ValueError, 'mode must be '):
473            tar = self.taropen(tmpname, 'rw')
474        with self.assertRaisesRegex(ValueError, 'mode must be '):
475            tar = self.taropen(tmpname, '')
476
477    def test_fileobj_with_offset(self):
478        # Skip the first member and store values from the second member
479        # of the testtar.
480        tar = tarfile.open(self.tarname, mode=self.mode)
481        try:
482            tar.next()
483            t = tar.next()
484            name = t.name
485            offset = t.offset
486            with tar.extractfile(t) as f:
487                data = f.read()
488        finally:
489            tar.close()
490
491        # Open the testtar and seek to the offset of the second member.
492        with self.open(self.tarname) as fobj:
493            fobj.seek(offset)
494
495            # Test if the tarfile starts with the second member.
496            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
497            t = tar.next()
498            self.assertEqual(t.name, name)
499            # Read to the end of fileobj and test if seeking back to the
500            # beginning works.
501            tar.getmembers()
502            self.assertEqual(tar.extractfile(t).read(), data,
503                    "seek back did not work")
504            tar.close()
505
506    def test_fail_comp(self):
507        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
508        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
509        with open(tarname, "rb") as fobj:
510            self.assertRaises(tarfile.ReadError, tarfile.open,
511                              fileobj=fobj, mode=self.mode)
512
513    def test_v7_dirtype(self):
514        # Test old style dirtype member (bug #1336623):
515        # Old V7 tars create directory members using an AREGTYPE
516        # header with a "/" appended to the filename field.
517        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
518        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
519                "v7 dirtype failed")
520
521    def test_xstar_type(self):
522        # The xstar format stores extra atime and ctime fields inside the
523        # space reserved for the prefix field. The prefix field must be
524        # ignored in this case, otherwise it will mess up the name.
525        try:
526            self.tar.getmember("misc/regtype-xstar")
527        except KeyError:
528            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
529
530    def test_check_members(self):
531        for tarinfo in self.tar:
532            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
533                    "wrong mtime for %s" % tarinfo.name)
534            if not tarinfo.name.startswith("ustar/"):
535                continue
536            self.assertEqual(tarinfo.uname, "tarfile",
537                    "wrong uname for %s" % tarinfo.name)
538
539    def test_find_members(self):
540        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
541                "could not find all members")
542
543    @unittest.skipUnless(hasattr(os, "link"),
544                         "Missing hardlink implementation")
545    @support.skip_unless_symlink
546    def test_extract_hardlink(self):
547        # Test hardlink extraction (e.g. bug #857297).
548        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
549            tar.extract("ustar/regtype", TEMPDIR)
550            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
551
552            tar.extract("ustar/lnktype", TEMPDIR)
553            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
554            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
555                data = f.read()
556            self.assertEqual(md5sum(data), md5_regtype)
557
558            tar.extract("ustar/symtype", TEMPDIR)
559            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
560            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
561                data = f.read()
562            self.assertEqual(md5sum(data), md5_regtype)
563
564    def test_extractall(self):
565        # Test if extractall() correctly restores directory permissions
566        # and times (see issue1735).
567        tar = tarfile.open(tarname, encoding="iso8859-1")
568        DIR = os.path.join(TEMPDIR, "extractall")
569        os.mkdir(DIR)
570        try:
571            directories = [t for t in tar if t.isdir()]
572            tar.extractall(DIR, directories)
573            for tarinfo in directories:
574                path = os.path.join(DIR, tarinfo.name)
575                if sys.platform != "win32":
576                    # Win32 has no support for fine grained permissions.
577                    self.assertEqual(tarinfo.mode & 0o777,
578                                     os.stat(path).st_mode & 0o777)
579                def format_mtime(mtime):
580                    if isinstance(mtime, float):
581                        return "{} ({})".format(mtime, mtime.hex())
582                    else:
583                        return "{!r} (int)".format(mtime)
584                file_mtime = os.path.getmtime(path)
585                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
586                    format_mtime(tarinfo.mtime),
587                    format_mtime(file_mtime),
588                    path)
589                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
590        finally:
591            tar.close()
592            support.rmtree(DIR)
593
594    def test_extract_directory(self):
595        dirtype = "ustar/dirtype"
596        DIR = os.path.join(TEMPDIR, "extractdir")
597        os.mkdir(DIR)
598        try:
599            with tarfile.open(tarname, encoding="iso8859-1") as tar:
600                tarinfo = tar.getmember(dirtype)
601                tar.extract(tarinfo, path=DIR)
602                extracted = os.path.join(DIR, dirtype)
603                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
604                if sys.platform != "win32":
605                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
606        finally:
607            support.rmtree(DIR)
608
609    def test_extractall_pathlike_name(self):
610        DIR = pathlib.Path(TEMPDIR) / "extractall"
611        with support.temp_dir(DIR), \
612             tarfile.open(tarname, encoding="iso8859-1") as tar:
613            directories = [t for t in tar if t.isdir()]
614            tar.extractall(DIR, directories)
615            for tarinfo in directories:
616                path = DIR / tarinfo.name
617                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
618
619    def test_extract_pathlike_name(self):
620        dirtype = "ustar/dirtype"
621        DIR = pathlib.Path(TEMPDIR) / "extractall"
622        with support.temp_dir(DIR), \
623             tarfile.open(tarname, encoding="iso8859-1") as tar:
624            tarinfo = tar.getmember(dirtype)
625            tar.extract(tarinfo, path=DIR)
626            extracted = DIR / dirtype
627            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
628
629    def test_init_close_fobj(self):
630        # Issue #7341: Close the internal file object in the TarFile
631        # constructor in case of an error. For the test we rely on
632        # the fact that opening an empty file raises a ReadError.
633        empty = os.path.join(TEMPDIR, "empty")
634        with open(empty, "wb") as fobj:
635            fobj.write(b"")
636
637        try:
638            tar = object.__new__(tarfile.TarFile)
639            try:
640                tar.__init__(empty)
641            except tarfile.ReadError:
642                self.assertTrue(tar.fileobj.closed)
643            else:
644                self.fail("ReadError not raised")
645        finally:
646            support.unlink(empty)
647
648    def test_parallel_iteration(self):
649        # Issue #16601: Restarting iteration over tarfile continued
650        # from where it left off.
651        with tarfile.open(self.tarname) as tar:
652            for m1, m2 in zip(tar, tar):
653                self.assertEqual(m1.offset, m2.offset)
654                self.assertEqual(m1.get_info(), m2.get_info())
655
656class MiscReadTest(MiscReadTestBase, unittest.TestCase):
657    test_fail_comp = None
658
659class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
660    pass
661
662class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
663    def requires_name_attribute(self):
664        self.skipTest("BZ2File have no name attribute")
665
666class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
667    def requires_name_attribute(self):
668        self.skipTest("LZMAFile have no name attribute")
669
670
671class StreamReadTest(CommonReadTest, unittest.TestCase):
672
673    prefix="r|"
674
675    def test_read_through(self):
676        # Issue #11224: A poorly designed _FileInFile.read() method
677        # caused seeking errors with stream tar files.
678        for tarinfo in self.tar:
679            if not tarinfo.isreg():
680                continue
681            with self.tar.extractfile(tarinfo) as fobj:
682                while True:
683                    try:
684                        buf = fobj.read(512)
685                    except tarfile.StreamError:
686                        self.fail("simple read-through using "
687                                  "TarFile.extractfile() failed")
688                    if not buf:
689                        break
690
691    def test_fileobj_regular_file(self):
692        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
693        with self.tar.extractfile(tarinfo) as fobj:
694            data = fobj.read()
695        self.assertEqual(len(data), tarinfo.size,
696                "regular file extraction failed")
697        self.assertEqual(md5sum(data), md5_regtype,
698                "regular file extraction failed")
699
700    def test_provoke_stream_error(self):
701        tarinfos = self.tar.getmembers()
702        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
703            self.assertRaises(tarfile.StreamError, f.read)
704
705    def test_compare_members(self):
706        tar1 = tarfile.open(tarname, encoding="iso8859-1")
707        try:
708            tar2 = self.tar
709
710            while True:
711                t1 = tar1.next()
712                t2 = tar2.next()
713                if t1 is None:
714                    break
715                self.assertIsNotNone(t2, "stream.next() failed.")
716
717                if t2.islnk() or t2.issym():
718                    with self.assertRaises(tarfile.StreamError):
719                        tar2.extractfile(t2)
720                    continue
721
722                v1 = tar1.extractfile(t1)
723                v2 = tar2.extractfile(t2)
724                if v1 is None:
725                    continue
726                self.assertIsNotNone(v2, "stream.extractfile() failed")
727                self.assertEqual(v1.read(), v2.read(),
728                        "stream extraction failed")
729        finally:
730            tar1.close()
731
732class GzipStreamReadTest(GzipTest, StreamReadTest):
733    pass
734
735class Bz2StreamReadTest(Bz2Test, StreamReadTest):
736    pass
737
738class LzmaStreamReadTest(LzmaTest, StreamReadTest):
739    pass
740
741
742class DetectReadTest(TarTest, unittest.TestCase):
743    def _testfunc_file(self, name, mode):
744        try:
745            tar = tarfile.open(name, mode)
746        except tarfile.ReadError as e:
747            self.fail()
748        else:
749            tar.close()
750
751    def _testfunc_fileobj(self, name, mode):
752        try:
753            with open(name, "rb") as f:
754                tar = tarfile.open(name, mode, fileobj=f)
755        except tarfile.ReadError as e:
756            self.fail()
757        else:
758            tar.close()
759
760    def _test_modes(self, testfunc):
761        if self.suffix:
762            with self.assertRaises(tarfile.ReadError):
763                tarfile.open(tarname, mode="r:" + self.suffix)
764            with self.assertRaises(tarfile.ReadError):
765                tarfile.open(tarname, mode="r|" + self.suffix)
766            with self.assertRaises(tarfile.ReadError):
767                tarfile.open(self.tarname, mode="r:")
768            with self.assertRaises(tarfile.ReadError):
769                tarfile.open(self.tarname, mode="r|")
770        testfunc(self.tarname, "r")
771        testfunc(self.tarname, "r:" + self.suffix)
772        testfunc(self.tarname, "r:*")
773        testfunc(self.tarname, "r|" + self.suffix)
774        testfunc(self.tarname, "r|*")
775
776    def test_detect_file(self):
777        self._test_modes(self._testfunc_file)
778
779    def test_detect_fileobj(self):
780        self._test_modes(self._testfunc_fileobj)
781
782class GzipDetectReadTest(GzipTest, DetectReadTest):
783    pass
784
785class Bz2DetectReadTest(Bz2Test, DetectReadTest):
786    def test_detect_stream_bz2(self):
787        # Originally, tarfile's stream detection looked for the string
788        # "BZh91" at the start of the file. This is incorrect because
789        # the '9' represents the blocksize (900,000 bytes). If the file was
790        # compressed using another blocksize autodetection fails.
791        with open(tarname, "rb") as fobj:
792            data = fobj.read()
793
794        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
795        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
796            fobj.write(data)
797
798        self._testfunc_file(tmpname, "r|*")
799
800class LzmaDetectReadTest(LzmaTest, DetectReadTest):
801    pass
802
803
804class MemberReadTest(ReadTest, unittest.TestCase):
805
806    def _test_member(self, tarinfo, chksum=None, **kwargs):
807        if chksum is not None:
808            with self.tar.extractfile(tarinfo) as f:
809                self.assertEqual(md5sum(f.read()), chksum,
810                        "wrong md5sum for %s" % tarinfo.name)
811
812        kwargs["mtime"] = 0o7606136617
813        kwargs["uid"] = 1000
814        kwargs["gid"] = 100
815        if "old-v7" not in tarinfo.name:
816            # V7 tar can't handle alphabetic owners.
817            kwargs["uname"] = "tarfile"
818            kwargs["gname"] = "tarfile"
819        for k, v in kwargs.items():
820            self.assertEqual(getattr(tarinfo, k), v,
821                    "wrong value in %s field of %s" % (k, tarinfo.name))
822
823    def test_find_regtype(self):
824        tarinfo = self.tar.getmember("ustar/regtype")
825        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
826
827    def test_find_conttype(self):
828        tarinfo = self.tar.getmember("ustar/conttype")
829        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
830
831    def test_find_dirtype(self):
832        tarinfo = self.tar.getmember("ustar/dirtype")
833        self._test_member(tarinfo, size=0)
834
835    def test_find_dirtype_with_size(self):
836        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
837        self._test_member(tarinfo, size=255)
838
839    def test_find_lnktype(self):
840        tarinfo = self.tar.getmember("ustar/lnktype")
841        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
842
843    def test_find_symtype(self):
844        tarinfo = self.tar.getmember("ustar/symtype")
845        self._test_member(tarinfo, size=0, linkname="regtype")
846
847    def test_find_blktype(self):
848        tarinfo = self.tar.getmember("ustar/blktype")
849        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
850
851    def test_find_chrtype(self):
852        tarinfo = self.tar.getmember("ustar/chrtype")
853        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
854
855    def test_find_fifotype(self):
856        tarinfo = self.tar.getmember("ustar/fifotype")
857        self._test_member(tarinfo, size=0)
858
859    def test_find_sparse(self):
860        tarinfo = self.tar.getmember("ustar/sparse")
861        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
862
863    def test_find_gnusparse(self):
864        tarinfo = self.tar.getmember("gnu/sparse")
865        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
866
867    def test_find_gnusparse_00(self):
868        tarinfo = self.tar.getmember("gnu/sparse-0.0")
869        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
870
871    def test_find_gnusparse_01(self):
872        tarinfo = self.tar.getmember("gnu/sparse-0.1")
873        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
874
875    def test_find_gnusparse_10(self):
876        tarinfo = self.tar.getmember("gnu/sparse-1.0")
877        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
878
879    def test_find_umlauts(self):
880        tarinfo = self.tar.getmember("ustar/umlauts-"
881                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
882        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
883
884    def test_find_ustar_longname(self):
885        name = "ustar/" + "12345/" * 39 + "1234567/longname"
886        self.assertIn(name, self.tar.getnames())
887
888    def test_find_regtype_oldv7(self):
889        tarinfo = self.tar.getmember("misc/regtype-old-v7")
890        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
891
892    def test_find_pax_umlauts(self):
893        self.tar.close()
894        self.tar = tarfile.open(self.tarname, mode=self.mode,
895                                encoding="iso8859-1")
896        tarinfo = self.tar.getmember("pax/umlauts-"
897                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
898        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
899
900
901class LongnameTest:
902
903    def test_read_longname(self):
904        # Test reading of longname (bug #1471427).
905        longname = self.subdir + "/" + "123/" * 125 + "longname"
906        try:
907            tarinfo = self.tar.getmember(longname)
908        except KeyError:
909            self.fail("longname not found")
910        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
911                "read longname as dirtype")
912
913    def test_read_longlink(self):
914        longname = self.subdir + "/" + "123/" * 125 + "longname"
915        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
916        try:
917            tarinfo = self.tar.getmember(longlink)
918        except KeyError:
919            self.fail("longlink not found")
920        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
921
922    def test_truncated_longname(self):
923        longname = self.subdir + "/" + "123/" * 125 + "longname"
924        tarinfo = self.tar.getmember(longname)
925        offset = tarinfo.offset
926        self.tar.fileobj.seek(offset)
927        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
928        with self.assertRaises(tarfile.ReadError):
929            tarfile.open(name="foo.tar", fileobj=fobj)
930
931    def test_header_offset(self):
932        # Test if the start offset of the TarInfo object includes
933        # the preceding extended header.
934        longname = self.subdir + "/" + "123/" * 125 + "longname"
935        offset = self.tar.getmember(longname).offset
936        with open(tarname, "rb") as fobj:
937            fobj.seek(offset)
938            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
939                                              "iso8859-1", "strict")
940            self.assertEqual(tarinfo.type, self.longnametype)
941
942
943class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
944
945    subdir = "gnu"
946    longnametype = tarfile.GNUTYPE_LONGNAME
947
948    # Since 3.2 tarfile is supposed to accurately restore sparse members and
949    # produce files with holes. This is what we actually want to test here.
950    # Unfortunately, not all platforms/filesystems support sparse files, and
951    # even on platforms that do it is non-trivial to make reliable assertions
952    # about holes in files. Therefore, we first do one basic test which works
953    # an all platforms, and after that a test that will work only on
954    # platforms/filesystems that prove to support sparse files.
955    def _test_sparse_file(self, name):
956        self.tar.extract(name, TEMPDIR)
957        filename = os.path.join(TEMPDIR, name)
958        with open(filename, "rb") as fobj:
959            data = fobj.read()
960        self.assertEqual(md5sum(data), md5_sparse,
961                "wrong md5sum for %s" % name)
962
963        if self._fs_supports_holes():
964            s = os.stat(filename)
965            self.assertLess(s.st_blocks * 512, s.st_size)
966
967    def test_sparse_file_old(self):
968        self._test_sparse_file("gnu/sparse")
969
970    def test_sparse_file_00(self):
971        self._test_sparse_file("gnu/sparse-0.0")
972
973    def test_sparse_file_01(self):
974        self._test_sparse_file("gnu/sparse-0.1")
975
976    def test_sparse_file_10(self):
977        self._test_sparse_file("gnu/sparse-1.0")
978
979    @staticmethod
980    def _fs_supports_holes():
981        # Return True if the platform knows the st_blocks stat attribute and
982        # uses st_blocks units of 512 bytes, and if the filesystem is able to
983        # store holes of 4 KiB in files.
984        #
985        # The function returns False if page size is larger than 4 KiB.
986        # For example, ppc64 uses pages of 64 KiB.
987        if sys.platform.startswith("linux"):
988            # Linux evidentially has 512 byte st_blocks units.
989            name = os.path.join(TEMPDIR, "sparse-test")
990            with open(name, "wb") as fobj:
991                # Seek to "punch a hole" of 4 KiB
992                fobj.seek(4096)
993                fobj.write(b'x' * 4096)
994                fobj.truncate()
995            s = os.stat(name)
996            support.unlink(name)
997            return (s.st_blocks * 512 < s.st_size)
998        else:
999            return False
1000
1001
1002class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1003
1004    subdir = "pax"
1005    longnametype = tarfile.XHDTYPE
1006
1007    def test_pax_global_headers(self):
1008        tar = tarfile.open(tarname, encoding="iso8859-1")
1009        try:
1010            tarinfo = tar.getmember("pax/regtype1")
1011            self.assertEqual(tarinfo.uname, "foo")
1012            self.assertEqual(tarinfo.gname, "bar")
1013            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1014                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1015
1016            tarinfo = tar.getmember("pax/regtype2")
1017            self.assertEqual(tarinfo.uname, "")
1018            self.assertEqual(tarinfo.gname, "bar")
1019            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1020                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1021
1022            tarinfo = tar.getmember("pax/regtype3")
1023            self.assertEqual(tarinfo.uname, "tarfile")
1024            self.assertEqual(tarinfo.gname, "tarfile")
1025            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1026                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1027        finally:
1028            tar.close()
1029
1030    def test_pax_number_fields(self):
1031        # All following number fields are read from the pax header.
1032        tar = tarfile.open(tarname, encoding="iso8859-1")
1033        try:
1034            tarinfo = tar.getmember("pax/regtype4")
1035            self.assertEqual(tarinfo.size, 7011)
1036            self.assertEqual(tarinfo.uid, 123)
1037            self.assertEqual(tarinfo.gid, 123)
1038            self.assertEqual(tarinfo.mtime, 1041808783.0)
1039            self.assertEqual(type(tarinfo.mtime), float)
1040            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1041            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1042        finally:
1043            tar.close()
1044
1045
1046class WriteTestBase(TarTest):
1047    # Put all write tests in here that are supposed to be tested
1048    # in all possible mode combinations.
1049
1050    def test_fileobj_no_close(self):
1051        fobj = io.BytesIO()
1052        tar = tarfile.open(fileobj=fobj, mode=self.mode)
1053        tar.addfile(tarfile.TarInfo("foo"))
1054        tar.close()
1055        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1056        # Issue #20238: Incomplete gzip output with mode="w:gz"
1057        data = fobj.getvalue()
1058        del tar
1059        support.gc_collect()
1060        self.assertFalse(fobj.closed)
1061        self.assertEqual(data, fobj.getvalue())
1062
1063    def test_eof_marker(self):
1064        # Make sure an end of archive marker is written (two zero blocks).
1065        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1066        # So, we create an archive that has exactly 10240 bytes without the
1067        # marker, and has 20480 bytes once the marker is written.
1068        with tarfile.open(tmpname, self.mode) as tar:
1069            t = tarfile.TarInfo("foo")
1070            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1071            tar.addfile(t, io.BytesIO(b"a" * t.size))
1072
1073        with self.open(tmpname, "rb") as fobj:
1074            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1075
1076
1077class WriteTest(WriteTestBase, unittest.TestCase):
1078
1079    prefix = "w:"
1080
1081    def test_100_char_name(self):
1082        # The name field in a tar header stores strings of at most 100 chars.
1083        # If a string is shorter than 100 chars it has to be padded with '\0',
1084        # which implies that a string of exactly 100 chars is stored without
1085        # a trailing '\0'.
1086        name = "0123456789" * 10
1087        tar = tarfile.open(tmpname, self.mode)
1088        try:
1089            t = tarfile.TarInfo(name)
1090            tar.addfile(t)
1091        finally:
1092            tar.close()
1093
1094        tar = tarfile.open(tmpname)
1095        try:
1096            self.assertEqual(tar.getnames()[0], name,
1097                    "failed to store 100 char filename")
1098        finally:
1099            tar.close()
1100
1101    def test_tar_size(self):
1102        # Test for bug #1013882.
1103        tar = tarfile.open(tmpname, self.mode)
1104        try:
1105            path = os.path.join(TEMPDIR, "file")
1106            with open(path, "wb") as fobj:
1107                fobj.write(b"aaa")
1108            tar.add(path)
1109        finally:
1110            tar.close()
1111        self.assertGreater(os.path.getsize(tmpname), 0,
1112                "tarfile is empty")
1113
1114    # The test_*_size tests test for bug #1167128.
1115    def test_file_size(self):
1116        tar = tarfile.open(tmpname, self.mode)
1117        try:
1118            path = os.path.join(TEMPDIR, "file")
1119            with open(path, "wb"):
1120                pass
1121            tarinfo = tar.gettarinfo(path)
1122            self.assertEqual(tarinfo.size, 0)
1123
1124            with open(path, "wb") as fobj:
1125                fobj.write(b"aaa")
1126            tarinfo = tar.gettarinfo(path)
1127            self.assertEqual(tarinfo.size, 3)
1128        finally:
1129            tar.close()
1130
1131    def test_directory_size(self):
1132        path = os.path.join(TEMPDIR, "directory")
1133        os.mkdir(path)
1134        try:
1135            tar = tarfile.open(tmpname, self.mode)
1136            try:
1137                tarinfo = tar.gettarinfo(path)
1138                self.assertEqual(tarinfo.size, 0)
1139            finally:
1140                tar.close()
1141        finally:
1142            support.rmdir(path)
1143
1144    # mock the following:
1145    #  os.listdir: so we know that files are in the wrong order
1146    def test_ordered_recursion(self):
1147        path = os.path.join(TEMPDIR, "directory")
1148        os.mkdir(path)
1149        open(os.path.join(path, "1"), "a").close()
1150        open(os.path.join(path, "2"), "a").close()
1151        try:
1152            tar = tarfile.open(tmpname, self.mode)
1153            try:
1154                with unittest.mock.patch('os.listdir') as mock_listdir:
1155                    mock_listdir.return_value = ["2", "1"]
1156                    tar.add(path)
1157                paths = []
1158                for m in tar.getmembers():
1159                    paths.append(os.path.split(m.name)[-1])
1160                self.assertEqual(paths, ["directory", "1", "2"]);
1161            finally:
1162                tar.close()
1163        finally:
1164            support.unlink(os.path.join(path, "1"))
1165            support.unlink(os.path.join(path, "2"))
1166            support.rmdir(path)
1167
1168    def test_gettarinfo_pathlike_name(self):
1169        with tarfile.open(tmpname, self.mode) as tar:
1170            path = pathlib.Path(TEMPDIR) / "file"
1171            with open(path, "wb") as fobj:
1172                fobj.write(b"aaa")
1173            tarinfo = tar.gettarinfo(path)
1174            tarinfo2 = tar.gettarinfo(os.fspath(path))
1175            self.assertIsInstance(tarinfo.name, str)
1176            self.assertEqual(tarinfo.name, tarinfo2.name)
1177            self.assertEqual(tarinfo.size, 3)
1178
1179    @unittest.skipUnless(hasattr(os, "link"),
1180                         "Missing hardlink implementation")
1181    def test_link_size(self):
1182        link = os.path.join(TEMPDIR, "link")
1183        target = os.path.join(TEMPDIR, "link_target")
1184        with open(target, "wb") as fobj:
1185            fobj.write(b"aaa")
1186        try:
1187            os.link(target, link)
1188        except PermissionError as e:
1189            self.skipTest('os.link(): %s' % e)
1190        try:
1191            tar = tarfile.open(tmpname, self.mode)
1192            try:
1193                # Record the link target in the inodes list.
1194                tar.gettarinfo(target)
1195                tarinfo = tar.gettarinfo(link)
1196                self.assertEqual(tarinfo.size, 0)
1197            finally:
1198                tar.close()
1199        finally:
1200            support.unlink(target)
1201            support.unlink(link)
1202
1203    @support.skip_unless_symlink
1204    def test_symlink_size(self):
1205        path = os.path.join(TEMPDIR, "symlink")
1206        os.symlink("link_target", path)
1207        try:
1208            tar = tarfile.open(tmpname, self.mode)
1209            try:
1210                tarinfo = tar.gettarinfo(path)
1211                self.assertEqual(tarinfo.size, 0)
1212            finally:
1213                tar.close()
1214        finally:
1215            support.unlink(path)
1216
1217    def test_add_self(self):
1218        # Test for #1257255.
1219        dstname = os.path.abspath(tmpname)
1220        tar = tarfile.open(tmpname, self.mode)
1221        try:
1222            self.assertEqual(tar.name, dstname,
1223                    "archive name must be absolute")
1224            tar.add(dstname)
1225            self.assertEqual(tar.getnames(), [],
1226                    "added the archive to itself")
1227
1228            with support.change_cwd(TEMPDIR):
1229                tar.add(dstname)
1230            self.assertEqual(tar.getnames(), [],
1231                    "added the archive to itself")
1232        finally:
1233            tar.close()
1234
1235    def test_filter(self):
1236        tempdir = os.path.join(TEMPDIR, "filter")
1237        os.mkdir(tempdir)
1238        try:
1239            for name in ("foo", "bar", "baz"):
1240                name = os.path.join(tempdir, name)
1241                support.create_empty_file(name)
1242
1243            def filter(tarinfo):
1244                if os.path.basename(tarinfo.name) == "bar":
1245                    return
1246                tarinfo.uid = 123
1247                tarinfo.uname = "foo"
1248                return tarinfo
1249
1250            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1251            try:
1252                tar.add(tempdir, arcname="empty_dir", filter=filter)
1253            finally:
1254                tar.close()
1255
1256            # Verify that filter is a keyword-only argument
1257            with self.assertRaises(TypeError):
1258                tar.add(tempdir, "empty_dir", True, None, filter)
1259
1260            tar = tarfile.open(tmpname, "r")
1261            try:
1262                for tarinfo in tar:
1263                    self.assertEqual(tarinfo.uid, 123)
1264                    self.assertEqual(tarinfo.uname, "foo")
1265                self.assertEqual(len(tar.getmembers()), 3)
1266            finally:
1267                tar.close()
1268        finally:
1269            support.rmtree(tempdir)
1270
1271    # Guarantee that stored pathnames are not modified. Don't
1272    # remove ./ or ../ or double slashes. Still make absolute
1273    # pathnames relative.
1274    # For details see bug #6054.
1275    def _test_pathname(self, path, cmp_path=None, dir=False):
1276        # Create a tarfile with an empty member named path
1277        # and compare the stored name with the original.
1278        foo = os.path.join(TEMPDIR, "foo")
1279        if not dir:
1280            support.create_empty_file(foo)
1281        else:
1282            os.mkdir(foo)
1283
1284        tar = tarfile.open(tmpname, self.mode)
1285        try:
1286            tar.add(foo, arcname=path)
1287        finally:
1288            tar.close()
1289
1290        tar = tarfile.open(tmpname, "r")
1291        try:
1292            t = tar.next()
1293        finally:
1294            tar.close()
1295
1296        if not dir:
1297            support.unlink(foo)
1298        else:
1299            support.rmdir(foo)
1300
1301        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1302
1303
1304    @support.skip_unless_symlink
1305    def test_extractall_symlinks(self):
1306        # Test if extractall works properly when tarfile contains symlinks
1307        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1308        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1309        os.mkdir(tempdir)
1310        try:
1311            source_file = os.path.join(tempdir,'source')
1312            target_file = os.path.join(tempdir,'symlink')
1313            with open(source_file,'w') as f:
1314                f.write('something\n')
1315            os.symlink(source_file, target_file)
1316            tar = tarfile.open(temparchive,'w')
1317            tar.add(source_file)
1318            tar.add(target_file)
1319            tar.close()
1320            # Let's extract it to the location which contains the symlink
1321            tar = tarfile.open(temparchive,'r')
1322            # this should not raise OSError: [Errno 17] File exists
1323            try:
1324                tar.extractall(path=tempdir)
1325            except OSError:
1326                self.fail("extractall failed with symlinked files")
1327            finally:
1328                tar.close()
1329        finally:
1330            support.unlink(temparchive)
1331            support.rmtree(tempdir)
1332
1333    def test_pathnames(self):
1334        self._test_pathname("foo")
1335        self._test_pathname(os.path.join("foo", ".", "bar"))
1336        self._test_pathname(os.path.join("foo", "..", "bar"))
1337        self._test_pathname(os.path.join(".", "foo"))
1338        self._test_pathname(os.path.join(".", "foo", "."))
1339        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1340        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1341        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1342        self._test_pathname(os.path.join("..", "foo"))
1343        self._test_pathname(os.path.join("..", "foo", ".."))
1344        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1345        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1346
1347        self._test_pathname("foo" + os.sep + os.sep + "bar")
1348        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1349
1350    def test_abs_pathnames(self):
1351        if sys.platform == "win32":
1352            self._test_pathname("C:\\foo", "foo")
1353        else:
1354            self._test_pathname("/foo", "foo")
1355            self._test_pathname("///foo", "foo")
1356
1357    def test_cwd(self):
1358        # Test adding the current working directory.
1359        with support.change_cwd(TEMPDIR):
1360            tar = tarfile.open(tmpname, self.mode)
1361            try:
1362                tar.add(".")
1363            finally:
1364                tar.close()
1365
1366            tar = tarfile.open(tmpname, "r")
1367            try:
1368                for t in tar:
1369                    if t.name != ".":
1370                        self.assertTrue(t.name.startswith("./"), t.name)
1371            finally:
1372                tar.close()
1373
1374    def test_open_nonwritable_fileobj(self):
1375        for exctype in OSError, EOFError, RuntimeError:
1376            class BadFile(io.BytesIO):
1377                first = True
1378                def write(self, data):
1379                    if self.first:
1380                        self.first = False
1381                        raise exctype
1382
1383            f = BadFile()
1384            with self.assertRaises(exctype):
1385                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1386                                   format=tarfile.PAX_FORMAT,
1387                                   pax_headers={'non': 'empty'})
1388            self.assertFalse(f.closed)
1389
1390class GzipWriteTest(GzipTest, WriteTest):
1391    pass
1392
1393class Bz2WriteTest(Bz2Test, WriteTest):
1394    pass
1395
1396class LzmaWriteTest(LzmaTest, WriteTest):
1397    pass
1398
1399
1400class StreamWriteTest(WriteTestBase, unittest.TestCase):
1401
1402    prefix = "w|"
1403    decompressor = None
1404
1405    def test_stream_padding(self):
1406        # Test for bug #1543303.
1407        tar = tarfile.open(tmpname, self.mode)
1408        tar.close()
1409        if self.decompressor:
1410            dec = self.decompressor()
1411            with open(tmpname, "rb") as fobj:
1412                data = fobj.read()
1413            data = dec.decompress(data)
1414            self.assertFalse(dec.unused_data, "found trailing data")
1415        else:
1416            with self.open(tmpname) as fobj:
1417                data = fobj.read()
1418        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1419                        "incorrect zero padding")
1420
1421    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1422                         "Missing umask implementation")
1423    def test_file_mode(self):
1424        # Test for issue #8464: Create files with correct
1425        # permissions.
1426        if os.path.exists(tmpname):
1427            support.unlink(tmpname)
1428
1429        original_umask = os.umask(0o022)
1430        try:
1431            tar = tarfile.open(tmpname, self.mode)
1432            tar.close()
1433            mode = os.stat(tmpname).st_mode & 0o777
1434            self.assertEqual(mode, 0o644, "wrong file permissions")
1435        finally:
1436            os.umask(original_umask)
1437
1438class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1439    pass
1440
1441class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1442    decompressor = bz2.BZ2Decompressor if bz2 else None
1443
1444class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1445    decompressor = lzma.LZMADecompressor if lzma else None
1446
1447
1448class GNUWriteTest(unittest.TestCase):
1449    # This testcase checks for correct creation of GNU Longname
1450    # and Longlink extended headers (cp. bug #812325).
1451
1452    def _length(self, s):
1453        blocks = len(s) // 512 + 1
1454        return blocks * 512
1455
1456    def _calc_size(self, name, link=None):
1457        # Initial tar header
1458        count = 512
1459
1460        if len(name) > tarfile.LENGTH_NAME:
1461            # GNU longname extended header + longname
1462            count += 512
1463            count += self._length(name)
1464        if link is not None and len(link) > tarfile.LENGTH_LINK:
1465            # GNU longlink extended header + longlink
1466            count += 512
1467            count += self._length(link)
1468        return count
1469
1470    def _test(self, name, link=None):
1471        tarinfo = tarfile.TarInfo(name)
1472        if link:
1473            tarinfo.linkname = link
1474            tarinfo.type = tarfile.LNKTYPE
1475
1476        tar = tarfile.open(tmpname, "w")
1477        try:
1478            tar.format = tarfile.GNU_FORMAT
1479            tar.addfile(tarinfo)
1480
1481            v1 = self._calc_size(name, link)
1482            v2 = tar.offset
1483            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1484        finally:
1485            tar.close()
1486
1487        tar = tarfile.open(tmpname)
1488        try:
1489            member = tar.next()
1490            self.assertIsNotNone(member,
1491                    "unable to read longname member")
1492            self.assertEqual(tarinfo.name, member.name,
1493                    "unable to read longname member")
1494            self.assertEqual(tarinfo.linkname, member.linkname,
1495                    "unable to read longname member")
1496        finally:
1497            tar.close()
1498
1499    def test_longname_1023(self):
1500        self._test(("longnam/" * 127) + "longnam")
1501
1502    def test_longname_1024(self):
1503        self._test(("longnam/" * 127) + "longname")
1504
1505    def test_longname_1025(self):
1506        self._test(("longnam/" * 127) + "longname_")
1507
1508    def test_longlink_1023(self):
1509        self._test("name", ("longlnk/" * 127) + "longlnk")
1510
1511    def test_longlink_1024(self):
1512        self._test("name", ("longlnk/" * 127) + "longlink")
1513
1514    def test_longlink_1025(self):
1515        self._test("name", ("longlnk/" * 127) + "longlink_")
1516
1517    def test_longnamelink_1023(self):
1518        self._test(("longnam/" * 127) + "longnam",
1519                   ("longlnk/" * 127) + "longlnk")
1520
1521    def test_longnamelink_1024(self):
1522        self._test(("longnam/" * 127) + "longname",
1523                   ("longlnk/" * 127) + "longlink")
1524
1525    def test_longnamelink_1025(self):
1526        self._test(("longnam/" * 127) + "longname_",
1527                   ("longlnk/" * 127) + "longlink_")
1528
1529
1530class CreateTest(WriteTestBase, unittest.TestCase):
1531
1532    prefix = "x:"
1533
1534    file_path = os.path.join(TEMPDIR, "spameggs42")
1535
1536    def setUp(self):
1537        support.unlink(tmpname)
1538
1539    @classmethod
1540    def setUpClass(cls):
1541        with open(cls.file_path, "wb") as fobj:
1542            fobj.write(b"aaa")
1543
1544    @classmethod
1545    def tearDownClass(cls):
1546        support.unlink(cls.file_path)
1547
1548    def test_create(self):
1549        with tarfile.open(tmpname, self.mode) as tobj:
1550            tobj.add(self.file_path)
1551
1552        with self.taropen(tmpname) as tobj:
1553            names = tobj.getnames()
1554        self.assertEqual(len(names), 1)
1555        self.assertIn('spameggs42', names[0])
1556
1557    def test_create_existing(self):
1558        with tarfile.open(tmpname, self.mode) as tobj:
1559            tobj.add(self.file_path)
1560
1561        with self.assertRaises(FileExistsError):
1562            tobj = tarfile.open(tmpname, self.mode)
1563
1564        with self.taropen(tmpname) as tobj:
1565            names = tobj.getnames()
1566        self.assertEqual(len(names), 1)
1567        self.assertIn('spameggs42', names[0])
1568
1569    def test_create_taropen(self):
1570        with self.taropen(tmpname, "x") as tobj:
1571            tobj.add(self.file_path)
1572
1573        with self.taropen(tmpname) as tobj:
1574            names = tobj.getnames()
1575        self.assertEqual(len(names), 1)
1576        self.assertIn('spameggs42', names[0])
1577
1578    def test_create_existing_taropen(self):
1579        with self.taropen(tmpname, "x") as tobj:
1580            tobj.add(self.file_path)
1581
1582        with self.assertRaises(FileExistsError):
1583            with self.taropen(tmpname, "x"):
1584                pass
1585
1586        with self.taropen(tmpname) as tobj:
1587            names = tobj.getnames()
1588        self.assertEqual(len(names), 1)
1589        self.assertIn("spameggs42", names[0])
1590
1591    def test_create_pathlike_name(self):
1592        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1593            self.assertIsInstance(tobj.name, str)
1594            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1595            tobj.add(pathlib.Path(self.file_path))
1596            names = tobj.getnames()
1597        self.assertEqual(len(names), 1)
1598        self.assertIn('spameggs42', names[0])
1599
1600        with self.taropen(tmpname) as tobj:
1601            names = tobj.getnames()
1602        self.assertEqual(len(names), 1)
1603        self.assertIn('spameggs42', names[0])
1604
1605    def test_create_taropen_pathlike_name(self):
1606        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1607            self.assertIsInstance(tobj.name, str)
1608            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1609            tobj.add(pathlib.Path(self.file_path))
1610            names = tobj.getnames()
1611        self.assertEqual(len(names), 1)
1612        self.assertIn('spameggs42', names[0])
1613
1614        with self.taropen(tmpname) as tobj:
1615            names = tobj.getnames()
1616        self.assertEqual(len(names), 1)
1617        self.assertIn('spameggs42', names[0])
1618
1619
1620class GzipCreateTest(GzipTest, CreateTest):
1621    pass
1622
1623
1624class Bz2CreateTest(Bz2Test, CreateTest):
1625    pass
1626
1627
1628class LzmaCreateTest(LzmaTest, CreateTest):
1629    pass
1630
1631
1632class CreateWithXModeTest(CreateTest):
1633
1634    prefix = "x"
1635
1636    test_create_taropen = None
1637    test_create_existing_taropen = None
1638
1639
1640@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1641class HardlinkTest(unittest.TestCase):
1642    # Test the creation of LNKTYPE (hardlink) members in an archive.
1643
1644    def setUp(self):
1645        self.foo = os.path.join(TEMPDIR, "foo")
1646        self.bar = os.path.join(TEMPDIR, "bar")
1647
1648        with open(self.foo, "wb") as fobj:
1649            fobj.write(b"foo")
1650
1651        try:
1652            os.link(self.foo, self.bar)
1653        except PermissionError as e:
1654            self.skipTest('os.link(): %s' % e)
1655
1656        self.tar = tarfile.open(tmpname, "w")
1657        self.tar.add(self.foo)
1658
1659    def tearDown(self):
1660        self.tar.close()
1661        support.unlink(self.foo)
1662        support.unlink(self.bar)
1663
1664    def test_add_twice(self):
1665        # The same name will be added as a REGTYPE every
1666        # time regardless of st_nlink.
1667        tarinfo = self.tar.gettarinfo(self.foo)
1668        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1669                "add file as regular failed")
1670
1671    def test_add_hardlink(self):
1672        tarinfo = self.tar.gettarinfo(self.bar)
1673        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1674                "add file as hardlink failed")
1675
1676    def test_dereference_hardlink(self):
1677        self.tar.dereference = True
1678        tarinfo = self.tar.gettarinfo(self.bar)
1679        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1680                "dereferencing hardlink failed")
1681
1682
1683class PaxWriteTest(GNUWriteTest):
1684
1685    def _test(self, name, link=None):
1686        # See GNUWriteTest.
1687        tarinfo = tarfile.TarInfo(name)
1688        if link:
1689            tarinfo.linkname = link
1690            tarinfo.type = tarfile.LNKTYPE
1691
1692        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1693        try:
1694            tar.addfile(tarinfo)
1695        finally:
1696            tar.close()
1697
1698        tar = tarfile.open(tmpname)
1699        try:
1700            if link:
1701                l = tar.getmembers()[0].linkname
1702                self.assertEqual(link, l, "PAX longlink creation failed")
1703            else:
1704                n = tar.getmembers()[0].name
1705                self.assertEqual(name, n, "PAX longname creation failed")
1706        finally:
1707            tar.close()
1708
1709    def test_pax_global_header(self):
1710        pax_headers = {
1711                "foo": "bar",
1712                "uid": "0",
1713                "mtime": "1.23",
1714                "test": "\xe4\xf6\xfc",
1715                "\xe4\xf6\xfc": "test"}
1716
1717        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1718                pax_headers=pax_headers)
1719        try:
1720            tar.addfile(tarfile.TarInfo("test"))
1721        finally:
1722            tar.close()
1723
1724        # Test if the global header was written correctly.
1725        tar = tarfile.open(tmpname, encoding="iso8859-1")
1726        try:
1727            self.assertEqual(tar.pax_headers, pax_headers)
1728            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1729            # Test if all the fields are strings.
1730            for key, val in tar.pax_headers.items():
1731                self.assertIsNot(type(key), bytes)
1732                self.assertIsNot(type(val), bytes)
1733                if key in tarfile.PAX_NUMBER_FIELDS:
1734                    try:
1735                        tarfile.PAX_NUMBER_FIELDS[key](val)
1736                    except (TypeError, ValueError):
1737                        self.fail("unable to convert pax header field")
1738        finally:
1739            tar.close()
1740
1741    def test_pax_extended_header(self):
1742        # The fields from the pax header have priority over the
1743        # TarInfo.
1744        pax_headers = {"path": "foo", "uid": "123"}
1745
1746        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1747                           encoding="iso8859-1")
1748        try:
1749            t = tarfile.TarInfo()
1750            t.name = "\xe4\xf6\xfc" # non-ASCII
1751            t.uid = 8**8 # too large
1752            t.pax_headers = pax_headers
1753            tar.addfile(t)
1754        finally:
1755            tar.close()
1756
1757        tar = tarfile.open(tmpname, encoding="iso8859-1")
1758        try:
1759            t = tar.getmembers()[0]
1760            self.assertEqual(t.pax_headers, pax_headers)
1761            self.assertEqual(t.name, "foo")
1762            self.assertEqual(t.uid, 123)
1763        finally:
1764            tar.close()
1765
1766
1767class UnicodeTest:
1768
1769    def test_iso8859_1_filename(self):
1770        self._test_unicode_filename("iso8859-1")
1771
1772    def test_utf7_filename(self):
1773        self._test_unicode_filename("utf7")
1774
1775    def test_utf8_filename(self):
1776        self._test_unicode_filename("utf-8")
1777
1778    def _test_unicode_filename(self, encoding):
1779        tar = tarfile.open(tmpname, "w", format=self.format,
1780                           encoding=encoding, errors="strict")
1781        try:
1782            name = "\xe4\xf6\xfc"
1783            tar.addfile(tarfile.TarInfo(name))
1784        finally:
1785            tar.close()
1786
1787        tar = tarfile.open(tmpname, encoding=encoding)
1788        try:
1789            self.assertEqual(tar.getmembers()[0].name, name)
1790        finally:
1791            tar.close()
1792
1793    def test_unicode_filename_error(self):
1794        tar = tarfile.open(tmpname, "w", format=self.format,
1795                           encoding="ascii", errors="strict")
1796        try:
1797            tarinfo = tarfile.TarInfo()
1798
1799            tarinfo.name = "\xe4\xf6\xfc"
1800            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1801
1802            tarinfo.name = "foo"
1803            tarinfo.uname = "\xe4\xf6\xfc"
1804            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1805        finally:
1806            tar.close()
1807
1808    def test_unicode_argument(self):
1809        tar = tarfile.open(tarname, "r",
1810                           encoding="iso8859-1", errors="strict")
1811        try:
1812            for t in tar:
1813                self.assertIs(type(t.name), str)
1814                self.assertIs(type(t.linkname), str)
1815                self.assertIs(type(t.uname), str)
1816                self.assertIs(type(t.gname), str)
1817        finally:
1818            tar.close()
1819
1820    def test_uname_unicode(self):
1821        t = tarfile.TarInfo("foo")
1822        t.uname = "\xe4\xf6\xfc"
1823        t.gname = "\xe4\xf6\xfc"
1824
1825        tar = tarfile.open(tmpname, mode="w", format=self.format,
1826                           encoding="iso8859-1")
1827        try:
1828            tar.addfile(t)
1829        finally:
1830            tar.close()
1831
1832        tar = tarfile.open(tmpname, encoding="iso8859-1")
1833        try:
1834            t = tar.getmember("foo")
1835            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1836            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1837
1838            if self.format != tarfile.PAX_FORMAT:
1839                tar.close()
1840                tar = tarfile.open(tmpname, encoding="ascii")
1841                t = tar.getmember("foo")
1842                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1843                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1844        finally:
1845            tar.close()
1846
1847
1848class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1849
1850    format = tarfile.USTAR_FORMAT
1851
1852    # Test whether the utf-8 encoded version of a filename exceeds the 100
1853    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1854    # bytes).
1855    def test_unicode_name1(self):
1856        self._test_ustar_name("0123456789" * 10)
1857        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1858        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1859        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1860
1861    def test_unicode_name2(self):
1862        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1863        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1864
1865    # Test whether the utf-8 encoded version of a filename exceeds the 155
1866    # bytes prefix + '/' + 100 bytes name limit.
1867    def test_unicode_longname1(self):
1868        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1869        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1870        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1871        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1872
1873    def test_unicode_longname2(self):
1874        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1875        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1876
1877    def test_unicode_longname3(self):
1878        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1879        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1880        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1881
1882    def test_unicode_longname4(self):
1883        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1884        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1885
1886    def _test_ustar_name(self, name, exc=None):
1887        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1888            t = tarfile.TarInfo(name)
1889            if exc is None:
1890                tar.addfile(t)
1891            else:
1892                self.assertRaises(exc, tar.addfile, t)
1893
1894        if exc is None:
1895            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1896                for t in tar:
1897                    self.assertEqual(name, t.name)
1898                    break
1899
1900    # Test the same as above for the 100 bytes link field.
1901    def test_unicode_link1(self):
1902        self._test_ustar_link("0123456789" * 10)
1903        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1904        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1905        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1906
1907    def test_unicode_link2(self):
1908        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1909        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1910
1911    def _test_ustar_link(self, name, exc=None):
1912        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1913            t = tarfile.TarInfo("foo")
1914            t.linkname = name
1915            if exc is None:
1916                tar.addfile(t)
1917            else:
1918                self.assertRaises(exc, tar.addfile, t)
1919
1920        if exc is None:
1921            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1922                for t in tar:
1923                    self.assertEqual(name, t.linkname)
1924                    break
1925
1926
1927class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1928
1929    format = tarfile.GNU_FORMAT
1930
1931    def test_bad_pax_header(self):
1932        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1933        # without a hdrcharset=BINARY header.
1934        for encoding, name in (
1935                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1936                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1937            with tarfile.open(tarname, encoding=encoding,
1938                              errors="surrogateescape") as tar:
1939                try:
1940                    t = tar.getmember(name)
1941                except KeyError:
1942                    self.fail("unable to read bad GNU tar pax header")
1943
1944
1945class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1946
1947    format = tarfile.PAX_FORMAT
1948
1949    # PAX_FORMAT ignores encoding in write mode.
1950    test_unicode_filename_error = None
1951
1952    def test_binary_header(self):
1953        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1954        for encoding, name in (
1955                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1956                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1957            with tarfile.open(tarname, encoding=encoding,
1958                              errors="surrogateescape") as tar:
1959                try:
1960                    t = tar.getmember(name)
1961                except KeyError:
1962                    self.fail("unable to read POSIX.1-2008 binary header")
1963
1964
1965class AppendTestBase:
1966    # Test append mode (cp. patch #1652681).
1967
1968    def setUp(self):
1969        self.tarname = tmpname
1970        if os.path.exists(self.tarname):
1971            support.unlink(self.tarname)
1972
1973    def _create_testtar(self, mode="w:"):
1974        with tarfile.open(tarname, encoding="iso8859-1") as src:
1975            t = src.getmember("ustar/regtype")
1976            t.name = "foo"
1977            with src.extractfile(t) as f:
1978                with tarfile.open(self.tarname, mode) as tar:
1979                    tar.addfile(t, f)
1980
1981    def test_append_compressed(self):
1982        self._create_testtar("w:" + self.suffix)
1983        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1984
1985class AppendTest(AppendTestBase, unittest.TestCase):
1986    test_append_compressed = None
1987
1988    def _add_testfile(self, fileobj=None):
1989        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1990            tar.addfile(tarfile.TarInfo("bar"))
1991
1992    def _test(self, names=["bar"], fileobj=None):
1993        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1994            self.assertEqual(tar.getnames(), names)
1995
1996    def test_non_existing(self):
1997        self._add_testfile()
1998        self._test()
1999
2000    def test_empty(self):
2001        tarfile.open(self.tarname, "w:").close()
2002        self._add_testfile()
2003        self._test()
2004
2005    def test_empty_fileobj(self):
2006        fobj = io.BytesIO(b"\0" * 1024)
2007        self._add_testfile(fobj)
2008        fobj.seek(0)
2009        self._test(fileobj=fobj)
2010
2011    def test_fileobj(self):
2012        self._create_testtar()
2013        with open(self.tarname, "rb") as fobj:
2014            data = fobj.read()
2015        fobj = io.BytesIO(data)
2016        self._add_testfile(fobj)
2017        fobj.seek(0)
2018        self._test(names=["foo", "bar"], fileobj=fobj)
2019
2020    def test_existing(self):
2021        self._create_testtar()
2022        self._add_testfile()
2023        self._test(names=["foo", "bar"])
2024
2025    # Append mode is supposed to fail if the tarfile to append to
2026    # does not end with a zero block.
2027    def _test_error(self, data):
2028        with open(self.tarname, "wb") as fobj:
2029            fobj.write(data)
2030        self.assertRaises(tarfile.ReadError, self._add_testfile)
2031
2032    def test_null(self):
2033        self._test_error(b"")
2034
2035    def test_incomplete(self):
2036        self._test_error(b"\0" * 13)
2037
2038    def test_premature_eof(self):
2039        data = tarfile.TarInfo("foo").tobuf()
2040        self._test_error(data)
2041
2042    def test_trailing_garbage(self):
2043        data = tarfile.TarInfo("foo").tobuf()
2044        self._test_error(data + b"\0" * 13)
2045
2046    def test_invalid(self):
2047        self._test_error(b"a" * 512)
2048
2049class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2050    pass
2051
2052class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2053    pass
2054
2055class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2056    pass
2057
2058
2059class LimitsTest(unittest.TestCase):
2060
2061    def test_ustar_limits(self):
2062        # 100 char name
2063        tarinfo = tarfile.TarInfo("0123456789" * 10)
2064        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2065
2066        # 101 char name that cannot be stored
2067        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2068        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2069
2070        # 256 char name with a slash at pos 156
2071        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2072        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2073
2074        # 256 char name that cannot be stored
2075        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2076        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2077
2078        # 512 char name
2079        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2080        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2081
2082        # 512 char linkname
2083        tarinfo = tarfile.TarInfo("longlink")
2084        tarinfo.linkname = "123/" * 126 + "longname"
2085        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2086
2087        # uid > 8 digits
2088        tarinfo = tarfile.TarInfo("name")
2089        tarinfo.uid = 0o10000000
2090        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2091
2092    def test_gnu_limits(self):
2093        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2094        tarinfo.tobuf(tarfile.GNU_FORMAT)
2095
2096        tarinfo = tarfile.TarInfo("longlink")
2097        tarinfo.linkname = "123/" * 126 + "longname"
2098        tarinfo.tobuf(tarfile.GNU_FORMAT)
2099
2100        # uid >= 256 ** 7
2101        tarinfo = tarfile.TarInfo("name")
2102        tarinfo.uid = 0o4000000000000000000
2103        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2104
2105    def test_pax_limits(self):
2106        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2107        tarinfo.tobuf(tarfile.PAX_FORMAT)
2108
2109        tarinfo = tarfile.TarInfo("longlink")
2110        tarinfo.linkname = "123/" * 126 + "longname"
2111        tarinfo.tobuf(tarfile.PAX_FORMAT)
2112
2113        tarinfo = tarfile.TarInfo("name")
2114        tarinfo.uid = 0o4000000000000000000
2115        tarinfo.tobuf(tarfile.PAX_FORMAT)
2116
2117
2118class MiscTest(unittest.TestCase):
2119
2120    def test_char_fields(self):
2121        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2122                         b"foo\0\0\0\0\0")
2123        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2124                         b"foo")
2125        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2126                         "foo")
2127        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2128                         "foo")
2129
2130    def test_read_number_fields(self):
2131        # Issue 13158: Test if GNU tar specific base-256 number fields
2132        # are decoded correctly.
2133        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2134        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2135        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2136                         0o10000000)
2137        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2138                         0xffffffff)
2139        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2140                         -1)
2141        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2142                         -100)
2143        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2144                         -0x100000000000000)
2145
2146        # Issue 24514: Test if empty number fields are converted to zero.
2147        self.assertEqual(tarfile.nti(b"\0"), 0)
2148        self.assertEqual(tarfile.nti(b"       \0"), 0)
2149
2150    def test_write_number_fields(self):
2151        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2152        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2153        self.assertEqual(tarfile.itn(0o10000000),
2154                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2155        self.assertEqual(tarfile.itn(0xffffffff),
2156                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2157        self.assertEqual(tarfile.itn(-1),
2158                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2159        self.assertEqual(tarfile.itn(-100),
2160                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2161        self.assertEqual(tarfile.itn(-0x100000000000000),
2162                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2163
2164        # Issue 32713: Test if itn() supports float values outside the
2165        # non-GNU format range
2166        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2167                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2168        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2169                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2170        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2171
2172    def test_number_field_limits(self):
2173        with self.assertRaises(ValueError):
2174            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2175        with self.assertRaises(ValueError):
2176            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2177        with self.assertRaises(ValueError):
2178            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2179        with self.assertRaises(ValueError):
2180            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2181
2182    def test__all__(self):
2183        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2184                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2185                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2186                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2187                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2188                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2189                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2190                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2191                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2192                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2193                     'filemode',
2194                     'EmptyHeaderError', 'TruncatedHeaderError',
2195                     'EOFHeaderError', 'InvalidHeaderError',
2196                     'SubsequentHeaderError', 'ExFileObject',
2197                     'main'}
2198        support.check__all__(self, tarfile, blacklist=blacklist)
2199
2200
2201class CommandLineTest(unittest.TestCase):
2202
2203    def tarfilecmd(self, *args, **kwargs):
2204        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2205                                                      **kwargs)
2206        return out.replace(os.linesep.encode(), b'\n')
2207
2208    def tarfilecmd_failure(self, *args):
2209        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2210
2211    def make_simple_tarfile(self, tar_name):
2212        files = [support.findfile('tokenize_tests.txt'),
2213                 support.findfile('tokenize_tests-no-coding-cookie-'
2214                                  'and-utf8-bom-sig-only.txt')]
2215        self.addCleanup(support.unlink, tar_name)
2216        with tarfile.open(tar_name, 'w') as tf:
2217            for tardata in files:
2218                tf.add(tardata, arcname=os.path.basename(tardata))
2219
2220    def test_bad_use(self):
2221        rc, out, err = self.tarfilecmd_failure()
2222        self.assertEqual(out, b'')
2223        self.assertIn(b'usage', err.lower())
2224        self.assertIn(b'error', err.lower())
2225        self.assertIn(b'required', err.lower())
2226        rc, out, err = self.tarfilecmd_failure('-l', '')
2227        self.assertEqual(out, b'')
2228        self.assertNotEqual(err.strip(), b'')
2229
2230    def test_test_command(self):
2231        for tar_name in testtarnames:
2232            for opt in '-t', '--test':
2233                out = self.tarfilecmd(opt, tar_name)
2234                self.assertEqual(out, b'')
2235
2236    def test_test_command_verbose(self):
2237        for tar_name in testtarnames:
2238            for opt in '-v', '--verbose':
2239                out = self.tarfilecmd(opt, '-t', tar_name)
2240                self.assertIn(b'is a tar archive.\n', out)
2241
2242    def test_test_command_invalid_file(self):
2243        zipname = support.findfile('zipdir.zip')
2244        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2245        self.assertIn(b' is not a tar archive.', err)
2246        self.assertEqual(out, b'')
2247        self.assertEqual(rc, 1)
2248
2249        for tar_name in testtarnames:
2250            with self.subTest(tar_name=tar_name):
2251                with open(tar_name, 'rb') as f:
2252                    data = f.read()
2253                try:
2254                    with open(tmpname, 'wb') as f:
2255                        f.write(data[:511])
2256                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2257                    self.assertEqual(out, b'')
2258                    self.assertEqual(rc, 1)
2259                finally:
2260                    support.unlink(tmpname)
2261
2262    def test_list_command(self):
2263        for tar_name in testtarnames:
2264            with support.captured_stdout() as t:
2265                with tarfile.open(tar_name, 'r') as tf:
2266                    tf.list(verbose=False)
2267            expected = t.getvalue().encode('ascii', 'backslashreplace')
2268            for opt in '-l', '--list':
2269                out = self.tarfilecmd(opt, tar_name,
2270                                      PYTHONIOENCODING='ascii')
2271                self.assertEqual(out, expected)
2272
2273    def test_list_command_verbose(self):
2274        for tar_name in testtarnames:
2275            with support.captured_stdout() as t:
2276                with tarfile.open(tar_name, 'r') as tf:
2277                    tf.list(verbose=True)
2278            expected = t.getvalue().encode('ascii', 'backslashreplace')
2279            for opt in '-v', '--verbose':
2280                out = self.tarfilecmd(opt, '-l', tar_name,
2281                                      PYTHONIOENCODING='ascii')
2282                self.assertEqual(out, expected)
2283
2284    def test_list_command_invalid_file(self):
2285        zipname = support.findfile('zipdir.zip')
2286        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2287        self.assertIn(b' is not a tar archive.', err)
2288        self.assertEqual(out, b'')
2289        self.assertEqual(rc, 1)
2290
2291    def test_create_command(self):
2292        files = [support.findfile('tokenize_tests.txt'),
2293                 support.findfile('tokenize_tests-no-coding-cookie-'
2294                                  'and-utf8-bom-sig-only.txt')]
2295        for opt in '-c', '--create':
2296            try:
2297                out = self.tarfilecmd(opt, tmpname, *files)
2298                self.assertEqual(out, b'')
2299                with tarfile.open(tmpname) as tar:
2300                    tar.getmembers()
2301            finally:
2302                support.unlink(tmpname)
2303
2304    def test_create_command_verbose(self):
2305        files = [support.findfile('tokenize_tests.txt'),
2306                 support.findfile('tokenize_tests-no-coding-cookie-'
2307                                  'and-utf8-bom-sig-only.txt')]
2308        for opt in '-v', '--verbose':
2309            try:
2310                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2311                self.assertIn(b' file created.', out)
2312                with tarfile.open(tmpname) as tar:
2313                    tar.getmembers()
2314            finally:
2315                support.unlink(tmpname)
2316
2317    def test_create_command_dotless_filename(self):
2318        files = [support.findfile('tokenize_tests.txt')]
2319        try:
2320            out = self.tarfilecmd('-c', dotlessname, *files)
2321            self.assertEqual(out, b'')
2322            with tarfile.open(dotlessname) as tar:
2323                tar.getmembers()
2324        finally:
2325            support.unlink(dotlessname)
2326
2327    def test_create_command_dot_started_filename(self):
2328        tar_name = os.path.join(TEMPDIR, ".testtar")
2329        files = [support.findfile('tokenize_tests.txt')]
2330        try:
2331            out = self.tarfilecmd('-c', tar_name, *files)
2332            self.assertEqual(out, b'')
2333            with tarfile.open(tar_name) as tar:
2334                tar.getmembers()
2335        finally:
2336            support.unlink(tar_name)
2337
2338    def test_create_command_compressed(self):
2339        files = [support.findfile('tokenize_tests.txt'),
2340                 support.findfile('tokenize_tests-no-coding-cookie-'
2341                                  'and-utf8-bom-sig-only.txt')]
2342        for filetype in (GzipTest, Bz2Test, LzmaTest):
2343            if not filetype.open:
2344                continue
2345            try:
2346                tar_name = tmpname + '.' + filetype.suffix
2347                out = self.tarfilecmd('-c', tar_name, *files)
2348                with filetype.taropen(tar_name) as tar:
2349                    tar.getmembers()
2350            finally:
2351                support.unlink(tar_name)
2352
2353    def test_extract_command(self):
2354        self.make_simple_tarfile(tmpname)
2355        for opt in '-e', '--extract':
2356            try:
2357                with support.temp_cwd(tarextdir):
2358                    out = self.tarfilecmd(opt, tmpname)
2359                self.assertEqual(out, b'')
2360            finally:
2361                support.rmtree(tarextdir)
2362
2363    def test_extract_command_verbose(self):
2364        self.make_simple_tarfile(tmpname)
2365        for opt in '-v', '--verbose':
2366            try:
2367                with support.temp_cwd(tarextdir):
2368                    out = self.tarfilecmd(opt, '-e', tmpname)
2369                self.assertIn(b' file is extracted.', out)
2370            finally:
2371                support.rmtree(tarextdir)
2372
2373    def test_extract_command_different_directory(self):
2374        self.make_simple_tarfile(tmpname)
2375        try:
2376            with support.temp_cwd(tarextdir):
2377                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2378            self.assertEqual(out, b'')
2379        finally:
2380            support.rmtree(tarextdir)
2381
2382    def test_extract_command_invalid_file(self):
2383        zipname = support.findfile('zipdir.zip')
2384        with support.temp_cwd(tarextdir):
2385            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2386        self.assertIn(b' is not a tar archive.', err)
2387        self.assertEqual(out, b'')
2388        self.assertEqual(rc, 1)
2389
2390
2391class ContextManagerTest(unittest.TestCase):
2392
2393    def test_basic(self):
2394        with tarfile.open(tarname) as tar:
2395            self.assertFalse(tar.closed, "closed inside runtime context")
2396        self.assertTrue(tar.closed, "context manager failed")
2397
2398    def test_closed(self):
2399        # The __enter__() method is supposed to raise OSError
2400        # if the TarFile object is already closed.
2401        tar = tarfile.open(tarname)
2402        tar.close()
2403        with self.assertRaises(OSError):
2404            with tar:
2405                pass
2406
2407    def test_exception(self):
2408        # Test if the OSError exception is passed through properly.
2409        with self.assertRaises(Exception) as exc:
2410            with tarfile.open(tarname) as tar:
2411                raise OSError
2412        self.assertIsInstance(exc.exception, OSError,
2413                              "wrong exception raised in context manager")
2414        self.assertTrue(tar.closed, "context manager failed")
2415
2416    def test_no_eof(self):
2417        # __exit__() must not write end-of-archive blocks if an
2418        # exception was raised.
2419        try:
2420            with tarfile.open(tmpname, "w") as tar:
2421                raise Exception
2422        except:
2423            pass
2424        self.assertEqual(os.path.getsize(tmpname), 0,
2425                "context manager wrote an end-of-archive block")
2426        self.assertTrue(tar.closed, "context manager failed")
2427
2428    def test_eof(self):
2429        # __exit__() must write end-of-archive blocks, i.e. call
2430        # TarFile.close() if there was no error.
2431        with tarfile.open(tmpname, "w"):
2432            pass
2433        self.assertNotEqual(os.path.getsize(tmpname), 0,
2434                "context manager wrote no end-of-archive block")
2435
2436    def test_fileobj(self):
2437        # Test that __exit__() did not close the external file
2438        # object.
2439        with open(tmpname, "wb") as fobj:
2440            try:
2441                with tarfile.open(fileobj=fobj, mode="w") as tar:
2442                    raise Exception
2443            except:
2444                pass
2445            self.assertFalse(fobj.closed, "external file object was closed")
2446            self.assertTrue(tar.closed, "context manager failed")
2447
2448
2449@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2450class LinkEmulationTest(ReadTest, unittest.TestCase):
2451
2452    # Test for issue #8741 regression. On platforms that do not support
2453    # symbolic or hard links tarfile tries to extract these types of members
2454    # as the regular files they point to.
2455    def _test_link_extraction(self, name):
2456        self.tar.extract(name, TEMPDIR)
2457        with open(os.path.join(TEMPDIR, name), "rb") as f:
2458            data = f.read()
2459        self.assertEqual(md5sum(data), md5_regtype)
2460
2461    # See issues #1578269, #8879, and #17689 for some history on these skips
2462    @unittest.skipIf(hasattr(os.path, "islink"),
2463                     "Skip emulation - has os.path.islink but not os.link")
2464    def test_hardlink_extraction1(self):
2465        self._test_link_extraction("ustar/lnktype")
2466
2467    @unittest.skipIf(hasattr(os.path, "islink"),
2468                     "Skip emulation - has os.path.islink but not os.link")
2469    def test_hardlink_extraction2(self):
2470        self._test_link_extraction("./ustar/linktest2/lnktype")
2471
2472    @unittest.skipIf(hasattr(os, "symlink"),
2473                     "Skip emulation if symlink exists")
2474    def test_symlink_extraction1(self):
2475        self._test_link_extraction("ustar/symtype")
2476
2477    @unittest.skipIf(hasattr(os, "symlink"),
2478                     "Skip emulation if symlink exists")
2479    def test_symlink_extraction2(self):
2480        self._test_link_extraction("./ustar/linktest2/symtype")
2481
2482
2483class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2484    # Issue5068: The _BZ2Proxy.read() method loops forever
2485    # on an empty or partial bzipped file.
2486
2487    def _test_partial_input(self, mode):
2488        class MyBytesIO(io.BytesIO):
2489            hit_eof = False
2490            def read(self, n):
2491                if self.hit_eof:
2492                    raise AssertionError("infinite loop detected in "
2493                                         "tarfile.open()")
2494                self.hit_eof = self.tell() == len(self.getvalue())
2495                return super(MyBytesIO, self).read(n)
2496            def seek(self, *args):
2497                self.hit_eof = False
2498                return super(MyBytesIO, self).seek(*args)
2499
2500        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2501        for x in range(len(data) + 1):
2502            try:
2503                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2504            except tarfile.ReadError:
2505                pass # we have no interest in ReadErrors
2506
2507    def test_partial_input(self):
2508        self._test_partial_input("r")
2509
2510    def test_partial_input_bz2(self):
2511        self._test_partial_input("r:bz2")
2512
2513
2514def root_is_uid_gid_0():
2515    try:
2516        import pwd, grp
2517    except ImportError:
2518        return False
2519    if pwd.getpwuid(0)[0] != 'root':
2520        return False
2521    if grp.getgrgid(0)[0] != 'root':
2522        return False
2523    return True
2524
2525
2526@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2527@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2528class NumericOwnerTest(unittest.TestCase):
2529    # mock the following:
2530    #  os.chown: so we can test what's being called
2531    #  os.chmod: so the modes are not actually changed. if they are, we can't
2532    #             delete the files/directories
2533    #  os.geteuid: so we can lie and say we're root (uid = 0)
2534
2535    @staticmethod
2536    def _make_test_archive(filename_1, dirname_1, filename_2):
2537        # the file contents to write
2538        fobj = io.BytesIO(b"content")
2539
2540        # create a tar file with a file, a directory, and a file within that
2541        #  directory. Assign various .uid/.gid values to them
2542        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2543                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2544                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2545                 ]
2546        with tarfile.open(tmpname, 'w') as tarfl:
2547            for name, uid, gid, typ, contents in items:
2548                t = tarfile.TarInfo(name)
2549                t.uid = uid
2550                t.gid = gid
2551                t.uname = 'root'
2552                t.gname = 'root'
2553                t.type = typ
2554                tarfl.addfile(t, contents)
2555
2556        # return the full pathname to the tar file
2557        return tmpname
2558
2559    @staticmethod
2560    @contextmanager
2561    def _setup_test(mock_geteuid):
2562        mock_geteuid.return_value = 0  # lie and say we're root
2563        fname = 'numeric-owner-testfile'
2564        dirname = 'dir'
2565
2566        # the names we want stored in the tarfile
2567        filename_1 = fname
2568        dirname_1 = dirname
2569        filename_2 = os.path.join(dirname, fname)
2570
2571        # create the tarfile with the contents we're after
2572        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2573                                                           dirname_1,
2574                                                           filename_2)
2575
2576        # open the tarfile for reading. yield it and the names of the items
2577        #  we stored into the file
2578        with tarfile.open(tar_filename) as tarfl:
2579            yield tarfl, filename_1, dirname_1, filename_2
2580
2581    @unittest.mock.patch('os.chown')
2582    @unittest.mock.patch('os.chmod')
2583    @unittest.mock.patch('os.geteuid')
2584    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2585                                        mock_chown):
2586        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2587                                                filename_2):
2588            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2589            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2590
2591        # convert to filesystem paths
2592        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2593        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2594
2595        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2596                                     unittest.mock.call(f_filename_2, 88, 87),
2597                                     ],
2598                                    any_order=True)
2599
2600    @unittest.mock.patch('os.chown')
2601    @unittest.mock.patch('os.chmod')
2602    @unittest.mock.patch('os.geteuid')
2603    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2604                                           mock_chown):
2605        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2606                                                filename_2):
2607            tarfl.extractall(TEMPDIR, numeric_owner=True)
2608
2609        # convert to filesystem paths
2610        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2611        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2612        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2613
2614        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2615                                     unittest.mock.call(f_dirname_1, 77, 76),
2616                                     unittest.mock.call(f_filename_2, 88, 87),
2617                                     ],
2618                                    any_order=True)
2619
2620    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2621    #  because the uname and gname in the test file are 'root', and extract()
2622    #  will look them up using pwd and grp to find their uid and gid, which we
2623    #  test here to be 0.
2624    @unittest.skipUnless(root_is_uid_gid_0(),
2625                         'uid=0,gid=0 must be named "root"')
2626    @unittest.mock.patch('os.chown')
2627    @unittest.mock.patch('os.chmod')
2628    @unittest.mock.patch('os.geteuid')
2629    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2630                                           mock_chown):
2631        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2632            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2633
2634        # convert to filesystem paths
2635        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2636
2637        mock_chown.assert_called_with(f_filename_1, 0, 0)
2638
2639    @unittest.mock.patch('os.geteuid')
2640    def test_keyword_only(self, mock_geteuid):
2641        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2642            self.assertRaises(TypeError,
2643                              tarfl.extract, filename_1, TEMPDIR, False, True)
2644
2645
2646def setUpModule():
2647    support.unlink(TEMPDIR)
2648    os.makedirs(TEMPDIR)
2649
2650    global testtarnames
2651    testtarnames = [tarname]
2652    with open(tarname, "rb") as fobj:
2653        data = fobj.read()
2654
2655    # Create compressed tarfiles.
2656    for c in GzipTest, Bz2Test, LzmaTest:
2657        if c.open:
2658            support.unlink(c.tarname)
2659            testtarnames.append(c.tarname)
2660            with c.open(c.tarname, "wb") as tar:
2661                tar.write(data)
2662
2663def tearDownModule():
2664    if os.path.exists(TEMPDIR):
2665        support.rmtree(TEMPDIR)
2666
2667if __name__ == "__main__":
2668    unittest.main()
2669