1import sys 2import os 3import io 4from hashlib import md5 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def md5sum(data): 31 return md5(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 43md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 44 45 46class TarTest: 47 tarname = tarname 48 suffix = '' 49 open = io.FileIO 50 taropen = tarfile.TarFile.taropen 51 52 @property 53 def mode(self): 54 return self.prefix + self.suffix 55 56@support.requires_gzip 57class GzipTest: 58 tarname = gzipname 59 suffix = 'gz' 60 open = gzip.GzipFile if gzip else None 61 taropen = tarfile.TarFile.gzopen 62 63@support.requires_bz2 64class Bz2Test: 65 tarname = bz2name 66 suffix = 'bz2' 67 open = bz2.BZ2File if bz2 else None 68 taropen = tarfile.TarFile.bz2open 69 70@support.requires_lzma 71class LzmaTest: 72 tarname = xzname 73 suffix = 'xz' 74 open = lzma.LZMAFile if lzma else None 75 taropen = tarfile.TarFile.xzopen 76 77 78class ReadTest(TarTest): 79 80 prefix = "r:" 81 82 def setUp(self): 83 self.tar = tarfile.open(self.tarname, mode=self.mode, 84 encoding="iso8859-1") 85 86 def tearDown(self): 87 self.tar.close() 88 89 90class UstarReadTest(ReadTest, unittest.TestCase): 91 92 def test_fileobj_regular_file(self): 93 tarinfo = self.tar.getmember("ustar/regtype") 94 with self.tar.extractfile(tarinfo) as fobj: 95 data = fobj.read() 96 self.assertEqual(len(data), tarinfo.size, 97 "regular file extraction failed") 98 self.assertEqual(md5sum(data), md5_regtype, 99 "regular file extraction failed") 100 101 def test_fileobj_readlines(self): 102 self.tar.extract("ustar/regtype", TEMPDIR) 103 tarinfo = self.tar.getmember("ustar/regtype") 104 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 105 lines1 = fobj1.readlines() 106 107 with self.tar.extractfile(tarinfo) as fobj: 108 fobj2 = io.TextIOWrapper(fobj) 109 lines2 = fobj2.readlines() 110 self.assertEqual(lines1, lines2, 111 "fileobj.readlines() failed") 112 self.assertEqual(len(lines2), 114, 113 "fileobj.readlines() failed") 114 self.assertEqual(lines2[83], 115 "I will gladly admit that Python is not the fastest " 116 "running scripting language.\n", 117 "fileobj.readlines() failed") 118 119 def test_fileobj_iter(self): 120 self.tar.extract("ustar/regtype", TEMPDIR) 121 tarinfo = self.tar.getmember("ustar/regtype") 122 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 123 lines1 = fobj1.readlines() 124 with self.tar.extractfile(tarinfo) as fobj2: 125 lines2 = list(io.TextIOWrapper(fobj2)) 126 self.assertEqual(lines1, lines2, 127 "fileobj.__iter__() failed") 128 129 def test_fileobj_seek(self): 130 self.tar.extract("ustar/regtype", TEMPDIR) 131 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 132 data = fobj.read() 133 134 tarinfo = self.tar.getmember("ustar/regtype") 135 fobj = self.tar.extractfile(tarinfo) 136 137 text = fobj.read() 138 fobj.seek(0) 139 self.assertEqual(0, fobj.tell(), 140 "seek() to file's start failed") 141 fobj.seek(2048, 0) 142 self.assertEqual(2048, fobj.tell(), 143 "seek() to absolute position failed") 144 fobj.seek(-1024, 1) 145 self.assertEqual(1024, fobj.tell(), 146 "seek() to negative relative position failed") 147 fobj.seek(1024, 1) 148 self.assertEqual(2048, fobj.tell(), 149 "seek() to positive relative position failed") 150 s = fobj.read(10) 151 self.assertEqual(s, data[2048:2058], 152 "read() after seek failed") 153 fobj.seek(0, 2) 154 self.assertEqual(tarinfo.size, fobj.tell(), 155 "seek() to file's end failed") 156 self.assertEqual(fobj.read(), b"", 157 "read() at file's end did not return empty string") 158 fobj.seek(-tarinfo.size, 2) 159 self.assertEqual(0, fobj.tell(), 160 "relative seek() to file's end failed") 161 fobj.seek(512) 162 s1 = fobj.readlines() 163 fobj.seek(512) 164 s2 = fobj.readlines() 165 self.assertEqual(s1, s2, 166 "readlines() after seek failed") 167 fobj.seek(0) 168 self.assertEqual(len(fobj.readline()), fobj.tell(), 169 "tell() after readline() failed") 170 fobj.seek(512) 171 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 172 "tell() after seek() and readline() failed") 173 fobj.seek(0) 174 line = fobj.readline() 175 self.assertEqual(fobj.read(), data[len(line):], 176 "read() after readline() failed") 177 fobj.close() 178 179 def test_fileobj_text(self): 180 with self.tar.extractfile("ustar/regtype") as fobj: 181 fobj = io.TextIOWrapper(fobj) 182 data = fobj.read().encode("iso8859-1") 183 self.assertEqual(md5sum(data), md5_regtype) 184 try: 185 fobj.seek(100) 186 except AttributeError: 187 # Issue #13815: seek() complained about a missing 188 # flush() method. 189 self.fail("seeking failed in text mode") 190 191 # Test if symbolic and hard links are resolved by extractfile(). The 192 # test link members each point to a regular member whose data is 193 # supposed to be exported. 194 def _test_fileobj_link(self, lnktype, regtype): 195 with self.tar.extractfile(lnktype) as a, \ 196 self.tar.extractfile(regtype) as b: 197 self.assertEqual(a.name, b.name) 198 199 def test_fileobj_link1(self): 200 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 201 202 def test_fileobj_link2(self): 203 self._test_fileobj_link("./ustar/linktest2/lnktype", 204 "ustar/linktest1/regtype") 205 206 def test_fileobj_symlink1(self): 207 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 208 209 def test_fileobj_symlink2(self): 210 self._test_fileobj_link("./ustar/linktest2/symtype", 211 "ustar/linktest1/regtype") 212 213 def test_issue14160(self): 214 self._test_fileobj_link("symtype2", "ustar/regtype") 215 216class GzipUstarReadTest(GzipTest, UstarReadTest): 217 pass 218 219class Bz2UstarReadTest(Bz2Test, UstarReadTest): 220 pass 221 222class LzmaUstarReadTest(LzmaTest, UstarReadTest): 223 pass 224 225 226class ListTest(ReadTest, unittest.TestCase): 227 228 # Override setUp to use default encoding (UTF-8) 229 def setUp(self): 230 self.tar = tarfile.open(self.tarname, mode=self.mode) 231 232 def test_list(self): 233 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 234 with support.swap_attr(sys, 'stdout', tio): 235 self.tar.list(verbose=False) 236 out = tio.detach().getvalue() 237 self.assertIn(b'ustar/conttype', out) 238 self.assertIn(b'ustar/regtype', out) 239 self.assertIn(b'ustar/lnktype', out) 240 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 241 self.assertIn(b'./ustar/linktest2/symtype', out) 242 self.assertIn(b'./ustar/linktest2/lnktype', out) 243 # Make sure it puts trailing slash for directory 244 self.assertIn(b'ustar/dirtype/', out) 245 self.assertIn(b'ustar/dirtype-with-size/', out) 246 # Make sure it is able to print unencodable characters 247 def conv(b): 248 s = b.decode(self.tar.encoding, 'surrogateescape') 249 return s.encode('ascii', 'backslashreplace') 250 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 251 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 252 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 256 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 257 # Make sure it prints files separated by one newline without any 258 # 'ls -l'-like accessories if verbose flag is not being used 259 # ... 260 # ustar/conttype 261 # ustar/regtype 262 # ... 263 self.assertRegex(out, br'ustar/conttype ?\r?\n' 264 br'ustar/regtype ?\r?\n') 265 # Make sure it does not print the source of link without verbose flag 266 self.assertNotIn(b'link to', out) 267 self.assertNotIn(b'->', out) 268 269 def test_list_verbose(self): 270 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 271 with support.swap_attr(sys, 'stdout', tio): 272 self.tar.list(verbose=True) 273 out = tio.detach().getvalue() 274 # Make sure it prints files separated by one newline with 'ls -l'-like 275 # accessories if verbose flag is being used 276 # ... 277 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 278 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 279 # ... 280 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 281 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 282 br'ustar/\w+type ?\r?\n') * 2) 283 # Make sure it prints the source of link with verbose flag 284 self.assertIn(b'ustar/symtype -> regtype', out) 285 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 286 self.assertIn(b'./ustar/linktest2/lnktype link to ' 287 b'./ustar/linktest1/regtype', out) 288 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 289 (b'/123' * 125) + b'/longname', out) 290 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 291 (b'/123' * 125) + b'/longname', out) 292 293 def test_list_members(self): 294 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 295 def members(tar): 296 for tarinfo in tar.getmembers(): 297 if 'reg' in tarinfo.name: 298 yield tarinfo 299 with support.swap_attr(sys, 'stdout', tio): 300 self.tar.list(verbose=False, members=members(self.tar)) 301 out = tio.detach().getvalue() 302 self.assertIn(b'ustar/regtype', out) 303 self.assertNotIn(b'ustar/conttype', out) 304 305 306class GzipListTest(GzipTest, ListTest): 307 pass 308 309 310class Bz2ListTest(Bz2Test, ListTest): 311 pass 312 313 314class LzmaListTest(LzmaTest, ListTest): 315 pass 316 317 318class CommonReadTest(ReadTest): 319 320 def test_empty_tarfile(self): 321 # Test for issue6123: Allow opening empty archives. 322 # This test checks if tarfile.open() is able to open an empty tar 323 # archive successfully. Note that an empty tar archive is not the 324 # same as an empty file! 325 with tarfile.open(tmpname, self.mode.replace("r", "w")): 326 pass 327 try: 328 tar = tarfile.open(tmpname, self.mode) 329 tar.getnames() 330 except tarfile.ReadError: 331 self.fail("tarfile.open() failed on empty archive") 332 else: 333 self.assertListEqual(tar.getmembers(), []) 334 finally: 335 tar.close() 336 337 def test_non_existent_tarfile(self): 338 # Test for issue11513: prevent non-existent gzipped tarfiles raising 339 # multiple exceptions. 340 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 341 tarfile.open("xxx", self.mode) 342 343 def test_null_tarfile(self): 344 # Test for issue6123: Allow opening empty archives. 345 # This test guarantees that tarfile.open() does not treat an empty 346 # file as an empty tar archive. 347 with open(tmpname, "wb"): 348 pass 349 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 350 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 351 352 def test_ignore_zeros(self): 353 # Test TarFile's ignore_zeros option. 354 # generate 512 pseudorandom bytes 355 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 356 for char in (b'\0', b'a'): 357 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 358 # are ignored correctly. 359 with self.open(tmpname, "w") as fobj: 360 fobj.write(char * 1024) 361 tarinfo = tarfile.TarInfo("foo") 362 tarinfo.size = len(data) 363 fobj.write(tarinfo.tobuf()) 364 fobj.write(data) 365 366 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 367 try: 368 self.assertListEqual(tar.getnames(), ["foo"], 369 "ignore_zeros=True should have skipped the %r-blocks" % 370 char) 371 finally: 372 tar.close() 373 374 def test_premature_end_of_archive(self): 375 for size in (512, 600, 1024, 1200): 376 with tarfile.open(tmpname, "w:") as tar: 377 t = tarfile.TarInfo("foo") 378 t.size = 1024 379 tar.addfile(t, io.BytesIO(b"a" * 1024)) 380 381 with open(tmpname, "r+b") as fobj: 382 fobj.truncate(size) 383 384 with tarfile.open(tmpname) as tar: 385 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 386 for t in tar: 387 pass 388 389 with tarfile.open(tmpname) as tar: 390 t = tar.next() 391 392 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 393 tar.extract(t, TEMPDIR) 394 395 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 396 tar.extractfile(t).read() 397 398 def test_length_zero_header(self): 399 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 400 # with an exception 401 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 402 with tarfile.open(support.findfile('recursion.tar')) as tar: 403 pass 404 405class MiscReadTestBase(CommonReadTest): 406 def requires_name_attribute(self): 407 pass 408 409 def test_no_name_argument(self): 410 self.requires_name_attribute() 411 with open(self.tarname, "rb") as fobj: 412 self.assertIsInstance(fobj.name, str) 413 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 414 self.assertIsInstance(tar.name, str) 415 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 416 417 def test_no_name_attribute(self): 418 with open(self.tarname, "rb") as fobj: 419 data = fobj.read() 420 fobj = io.BytesIO(data) 421 self.assertRaises(AttributeError, getattr, fobj, "name") 422 tar = tarfile.open(fileobj=fobj, mode=self.mode) 423 self.assertIsNone(tar.name) 424 425 def test_empty_name_attribute(self): 426 with open(self.tarname, "rb") as fobj: 427 data = fobj.read() 428 fobj = io.BytesIO(data) 429 fobj.name = "" 430 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 431 self.assertIsNone(tar.name) 432 433 def test_int_name_attribute(self): 434 # Issue 21044: tarfile.open() should handle fileobj with an integer 435 # 'name' attribute. 436 fd = os.open(self.tarname, os.O_RDONLY) 437 with open(fd, 'rb') as fobj: 438 self.assertIsInstance(fobj.name, int) 439 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 440 self.assertIsNone(tar.name) 441 442 def test_bytes_name_attribute(self): 443 self.requires_name_attribute() 444 tarname = os.fsencode(self.tarname) 445 with open(tarname, 'rb') as fobj: 446 self.assertIsInstance(fobj.name, bytes) 447 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 448 self.assertIsInstance(tar.name, bytes) 449 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 450 451 def test_pathlike_name(self): 452 tarname = pathlib.Path(self.tarname) 453 with tarfile.open(tarname, mode=self.mode) as tar: 454 self.assertIsInstance(tar.name, str) 455 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 456 with self.taropen(tarname) as tar: 457 self.assertIsInstance(tar.name, str) 458 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 459 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 460 self.assertIsInstance(tar.name, str) 461 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 462 if self.suffix == '': 463 with tarfile.TarFile(tarname, mode='r') as tar: 464 self.assertIsInstance(tar.name, str) 465 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 466 467 def test_illegal_mode_arg(self): 468 with open(tmpname, 'wb'): 469 pass 470 with self.assertRaisesRegex(ValueError, 'mode must be '): 471 tar = self.taropen(tmpname, 'q') 472 with self.assertRaisesRegex(ValueError, 'mode must be '): 473 tar = self.taropen(tmpname, 'rw') 474 with self.assertRaisesRegex(ValueError, 'mode must be '): 475 tar = self.taropen(tmpname, '') 476 477 def test_fileobj_with_offset(self): 478 # Skip the first member and store values from the second member 479 # of the testtar. 480 tar = tarfile.open(self.tarname, mode=self.mode) 481 try: 482 tar.next() 483 t = tar.next() 484 name = t.name 485 offset = t.offset 486 with tar.extractfile(t) as f: 487 data = f.read() 488 finally: 489 tar.close() 490 491 # Open the testtar and seek to the offset of the second member. 492 with self.open(self.tarname) as fobj: 493 fobj.seek(offset) 494 495 # Test if the tarfile starts with the second member. 496 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 497 t = tar.next() 498 self.assertEqual(t.name, name) 499 # Read to the end of fileobj and test if seeking back to the 500 # beginning works. 501 tar.getmembers() 502 self.assertEqual(tar.extractfile(t).read(), data, 503 "seek back did not work") 504 tar.close() 505 506 def test_fail_comp(self): 507 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 508 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 509 with open(tarname, "rb") as fobj: 510 self.assertRaises(tarfile.ReadError, tarfile.open, 511 fileobj=fobj, mode=self.mode) 512 513 def test_v7_dirtype(self): 514 # Test old style dirtype member (bug #1336623): 515 # Old V7 tars create directory members using an AREGTYPE 516 # header with a "/" appended to the filename field. 517 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 518 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 519 "v7 dirtype failed") 520 521 def test_xstar_type(self): 522 # The xstar format stores extra atime and ctime fields inside the 523 # space reserved for the prefix field. The prefix field must be 524 # ignored in this case, otherwise it will mess up the name. 525 try: 526 self.tar.getmember("misc/regtype-xstar") 527 except KeyError: 528 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 529 530 def test_check_members(self): 531 for tarinfo in self.tar: 532 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 533 "wrong mtime for %s" % tarinfo.name) 534 if not tarinfo.name.startswith("ustar/"): 535 continue 536 self.assertEqual(tarinfo.uname, "tarfile", 537 "wrong uname for %s" % tarinfo.name) 538 539 def test_find_members(self): 540 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 541 "could not find all members") 542 543 @unittest.skipUnless(hasattr(os, "link"), 544 "Missing hardlink implementation") 545 @support.skip_unless_symlink 546 def test_extract_hardlink(self): 547 # Test hardlink extraction (e.g. bug #857297). 548 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 549 tar.extract("ustar/regtype", TEMPDIR) 550 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 551 552 tar.extract("ustar/lnktype", TEMPDIR) 553 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 554 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 555 data = f.read() 556 self.assertEqual(md5sum(data), md5_regtype) 557 558 tar.extract("ustar/symtype", TEMPDIR) 559 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 560 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 561 data = f.read() 562 self.assertEqual(md5sum(data), md5_regtype) 563 564 def test_extractall(self): 565 # Test if extractall() correctly restores directory permissions 566 # and times (see issue1735). 567 tar = tarfile.open(tarname, encoding="iso8859-1") 568 DIR = os.path.join(TEMPDIR, "extractall") 569 os.mkdir(DIR) 570 try: 571 directories = [t for t in tar if t.isdir()] 572 tar.extractall(DIR, directories) 573 for tarinfo in directories: 574 path = os.path.join(DIR, tarinfo.name) 575 if sys.platform != "win32": 576 # Win32 has no support for fine grained permissions. 577 self.assertEqual(tarinfo.mode & 0o777, 578 os.stat(path).st_mode & 0o777) 579 def format_mtime(mtime): 580 if isinstance(mtime, float): 581 return "{} ({})".format(mtime, mtime.hex()) 582 else: 583 return "{!r} (int)".format(mtime) 584 file_mtime = os.path.getmtime(path) 585 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 586 format_mtime(tarinfo.mtime), 587 format_mtime(file_mtime), 588 path) 589 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 590 finally: 591 tar.close() 592 support.rmtree(DIR) 593 594 def test_extract_directory(self): 595 dirtype = "ustar/dirtype" 596 DIR = os.path.join(TEMPDIR, "extractdir") 597 os.mkdir(DIR) 598 try: 599 with tarfile.open(tarname, encoding="iso8859-1") as tar: 600 tarinfo = tar.getmember(dirtype) 601 tar.extract(tarinfo, path=DIR) 602 extracted = os.path.join(DIR, dirtype) 603 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 604 if sys.platform != "win32": 605 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 606 finally: 607 support.rmtree(DIR) 608 609 def test_extractall_pathlike_name(self): 610 DIR = pathlib.Path(TEMPDIR) / "extractall" 611 with support.temp_dir(DIR), \ 612 tarfile.open(tarname, encoding="iso8859-1") as tar: 613 directories = [t for t in tar if t.isdir()] 614 tar.extractall(DIR, directories) 615 for tarinfo in directories: 616 path = DIR / tarinfo.name 617 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 618 619 def test_extract_pathlike_name(self): 620 dirtype = "ustar/dirtype" 621 DIR = pathlib.Path(TEMPDIR) / "extractall" 622 with support.temp_dir(DIR), \ 623 tarfile.open(tarname, encoding="iso8859-1") as tar: 624 tarinfo = tar.getmember(dirtype) 625 tar.extract(tarinfo, path=DIR) 626 extracted = DIR / dirtype 627 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 628 629 def test_init_close_fobj(self): 630 # Issue #7341: Close the internal file object in the TarFile 631 # constructor in case of an error. For the test we rely on 632 # the fact that opening an empty file raises a ReadError. 633 empty = os.path.join(TEMPDIR, "empty") 634 with open(empty, "wb") as fobj: 635 fobj.write(b"") 636 637 try: 638 tar = object.__new__(tarfile.TarFile) 639 try: 640 tar.__init__(empty) 641 except tarfile.ReadError: 642 self.assertTrue(tar.fileobj.closed) 643 else: 644 self.fail("ReadError not raised") 645 finally: 646 support.unlink(empty) 647 648 def test_parallel_iteration(self): 649 # Issue #16601: Restarting iteration over tarfile continued 650 # from where it left off. 651 with tarfile.open(self.tarname) as tar: 652 for m1, m2 in zip(tar, tar): 653 self.assertEqual(m1.offset, m2.offset) 654 self.assertEqual(m1.get_info(), m2.get_info()) 655 656class MiscReadTest(MiscReadTestBase, unittest.TestCase): 657 test_fail_comp = None 658 659class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 660 pass 661 662class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 663 def requires_name_attribute(self): 664 self.skipTest("BZ2File have no name attribute") 665 666class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 667 def requires_name_attribute(self): 668 self.skipTest("LZMAFile have no name attribute") 669 670 671class StreamReadTest(CommonReadTest, unittest.TestCase): 672 673 prefix="r|" 674 675 def test_read_through(self): 676 # Issue #11224: A poorly designed _FileInFile.read() method 677 # caused seeking errors with stream tar files. 678 for tarinfo in self.tar: 679 if not tarinfo.isreg(): 680 continue 681 with self.tar.extractfile(tarinfo) as fobj: 682 while True: 683 try: 684 buf = fobj.read(512) 685 except tarfile.StreamError: 686 self.fail("simple read-through using " 687 "TarFile.extractfile() failed") 688 if not buf: 689 break 690 691 def test_fileobj_regular_file(self): 692 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 693 with self.tar.extractfile(tarinfo) as fobj: 694 data = fobj.read() 695 self.assertEqual(len(data), tarinfo.size, 696 "regular file extraction failed") 697 self.assertEqual(md5sum(data), md5_regtype, 698 "regular file extraction failed") 699 700 def test_provoke_stream_error(self): 701 tarinfos = self.tar.getmembers() 702 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 703 self.assertRaises(tarfile.StreamError, f.read) 704 705 def test_compare_members(self): 706 tar1 = tarfile.open(tarname, encoding="iso8859-1") 707 try: 708 tar2 = self.tar 709 710 while True: 711 t1 = tar1.next() 712 t2 = tar2.next() 713 if t1 is None: 714 break 715 self.assertIsNotNone(t2, "stream.next() failed.") 716 717 if t2.islnk() or t2.issym(): 718 with self.assertRaises(tarfile.StreamError): 719 tar2.extractfile(t2) 720 continue 721 722 v1 = tar1.extractfile(t1) 723 v2 = tar2.extractfile(t2) 724 if v1 is None: 725 continue 726 self.assertIsNotNone(v2, "stream.extractfile() failed") 727 self.assertEqual(v1.read(), v2.read(), 728 "stream extraction failed") 729 finally: 730 tar1.close() 731 732class GzipStreamReadTest(GzipTest, StreamReadTest): 733 pass 734 735class Bz2StreamReadTest(Bz2Test, StreamReadTest): 736 pass 737 738class LzmaStreamReadTest(LzmaTest, StreamReadTest): 739 pass 740 741 742class DetectReadTest(TarTest, unittest.TestCase): 743 def _testfunc_file(self, name, mode): 744 try: 745 tar = tarfile.open(name, mode) 746 except tarfile.ReadError as e: 747 self.fail() 748 else: 749 tar.close() 750 751 def _testfunc_fileobj(self, name, mode): 752 try: 753 with open(name, "rb") as f: 754 tar = tarfile.open(name, mode, fileobj=f) 755 except tarfile.ReadError as e: 756 self.fail() 757 else: 758 tar.close() 759 760 def _test_modes(self, testfunc): 761 if self.suffix: 762 with self.assertRaises(tarfile.ReadError): 763 tarfile.open(tarname, mode="r:" + self.suffix) 764 with self.assertRaises(tarfile.ReadError): 765 tarfile.open(tarname, mode="r|" + self.suffix) 766 with self.assertRaises(tarfile.ReadError): 767 tarfile.open(self.tarname, mode="r:") 768 with self.assertRaises(tarfile.ReadError): 769 tarfile.open(self.tarname, mode="r|") 770 testfunc(self.tarname, "r") 771 testfunc(self.tarname, "r:" + self.suffix) 772 testfunc(self.tarname, "r:*") 773 testfunc(self.tarname, "r|" + self.suffix) 774 testfunc(self.tarname, "r|*") 775 776 def test_detect_file(self): 777 self._test_modes(self._testfunc_file) 778 779 def test_detect_fileobj(self): 780 self._test_modes(self._testfunc_fileobj) 781 782class GzipDetectReadTest(GzipTest, DetectReadTest): 783 pass 784 785class Bz2DetectReadTest(Bz2Test, DetectReadTest): 786 def test_detect_stream_bz2(self): 787 # Originally, tarfile's stream detection looked for the string 788 # "BZh91" at the start of the file. This is incorrect because 789 # the '9' represents the blocksize (900,000 bytes). If the file was 790 # compressed using another blocksize autodetection fails. 791 with open(tarname, "rb") as fobj: 792 data = fobj.read() 793 794 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 795 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 796 fobj.write(data) 797 798 self._testfunc_file(tmpname, "r|*") 799 800class LzmaDetectReadTest(LzmaTest, DetectReadTest): 801 pass 802 803 804class MemberReadTest(ReadTest, unittest.TestCase): 805 806 def _test_member(self, tarinfo, chksum=None, **kwargs): 807 if chksum is not None: 808 with self.tar.extractfile(tarinfo) as f: 809 self.assertEqual(md5sum(f.read()), chksum, 810 "wrong md5sum for %s" % tarinfo.name) 811 812 kwargs["mtime"] = 0o7606136617 813 kwargs["uid"] = 1000 814 kwargs["gid"] = 100 815 if "old-v7" not in tarinfo.name: 816 # V7 tar can't handle alphabetic owners. 817 kwargs["uname"] = "tarfile" 818 kwargs["gname"] = "tarfile" 819 for k, v in kwargs.items(): 820 self.assertEqual(getattr(tarinfo, k), v, 821 "wrong value in %s field of %s" % (k, tarinfo.name)) 822 823 def test_find_regtype(self): 824 tarinfo = self.tar.getmember("ustar/regtype") 825 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 826 827 def test_find_conttype(self): 828 tarinfo = self.tar.getmember("ustar/conttype") 829 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 830 831 def test_find_dirtype(self): 832 tarinfo = self.tar.getmember("ustar/dirtype") 833 self._test_member(tarinfo, size=0) 834 835 def test_find_dirtype_with_size(self): 836 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 837 self._test_member(tarinfo, size=255) 838 839 def test_find_lnktype(self): 840 tarinfo = self.tar.getmember("ustar/lnktype") 841 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 842 843 def test_find_symtype(self): 844 tarinfo = self.tar.getmember("ustar/symtype") 845 self._test_member(tarinfo, size=0, linkname="regtype") 846 847 def test_find_blktype(self): 848 tarinfo = self.tar.getmember("ustar/blktype") 849 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 850 851 def test_find_chrtype(self): 852 tarinfo = self.tar.getmember("ustar/chrtype") 853 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 854 855 def test_find_fifotype(self): 856 tarinfo = self.tar.getmember("ustar/fifotype") 857 self._test_member(tarinfo, size=0) 858 859 def test_find_sparse(self): 860 tarinfo = self.tar.getmember("ustar/sparse") 861 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 862 863 def test_find_gnusparse(self): 864 tarinfo = self.tar.getmember("gnu/sparse") 865 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 866 867 def test_find_gnusparse_00(self): 868 tarinfo = self.tar.getmember("gnu/sparse-0.0") 869 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 870 871 def test_find_gnusparse_01(self): 872 tarinfo = self.tar.getmember("gnu/sparse-0.1") 873 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 874 875 def test_find_gnusparse_10(self): 876 tarinfo = self.tar.getmember("gnu/sparse-1.0") 877 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 878 879 def test_find_umlauts(self): 880 tarinfo = self.tar.getmember("ustar/umlauts-" 881 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 882 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 883 884 def test_find_ustar_longname(self): 885 name = "ustar/" + "12345/" * 39 + "1234567/longname" 886 self.assertIn(name, self.tar.getnames()) 887 888 def test_find_regtype_oldv7(self): 889 tarinfo = self.tar.getmember("misc/regtype-old-v7") 890 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 891 892 def test_find_pax_umlauts(self): 893 self.tar.close() 894 self.tar = tarfile.open(self.tarname, mode=self.mode, 895 encoding="iso8859-1") 896 tarinfo = self.tar.getmember("pax/umlauts-" 897 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 898 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 899 900 901class LongnameTest: 902 903 def test_read_longname(self): 904 # Test reading of longname (bug #1471427). 905 longname = self.subdir + "/" + "123/" * 125 + "longname" 906 try: 907 tarinfo = self.tar.getmember(longname) 908 except KeyError: 909 self.fail("longname not found") 910 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 911 "read longname as dirtype") 912 913 def test_read_longlink(self): 914 longname = self.subdir + "/" + "123/" * 125 + "longname" 915 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 916 try: 917 tarinfo = self.tar.getmember(longlink) 918 except KeyError: 919 self.fail("longlink not found") 920 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 921 922 def test_truncated_longname(self): 923 longname = self.subdir + "/" + "123/" * 125 + "longname" 924 tarinfo = self.tar.getmember(longname) 925 offset = tarinfo.offset 926 self.tar.fileobj.seek(offset) 927 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 928 with self.assertRaises(tarfile.ReadError): 929 tarfile.open(name="foo.tar", fileobj=fobj) 930 931 def test_header_offset(self): 932 # Test if the start offset of the TarInfo object includes 933 # the preceding extended header. 934 longname = self.subdir + "/" + "123/" * 125 + "longname" 935 offset = self.tar.getmember(longname).offset 936 with open(tarname, "rb") as fobj: 937 fobj.seek(offset) 938 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 939 "iso8859-1", "strict") 940 self.assertEqual(tarinfo.type, self.longnametype) 941 942 943class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 944 945 subdir = "gnu" 946 longnametype = tarfile.GNUTYPE_LONGNAME 947 948 # Since 3.2 tarfile is supposed to accurately restore sparse members and 949 # produce files with holes. This is what we actually want to test here. 950 # Unfortunately, not all platforms/filesystems support sparse files, and 951 # even on platforms that do it is non-trivial to make reliable assertions 952 # about holes in files. Therefore, we first do one basic test which works 953 # an all platforms, and after that a test that will work only on 954 # platforms/filesystems that prove to support sparse files. 955 def _test_sparse_file(self, name): 956 self.tar.extract(name, TEMPDIR) 957 filename = os.path.join(TEMPDIR, name) 958 with open(filename, "rb") as fobj: 959 data = fobj.read() 960 self.assertEqual(md5sum(data), md5_sparse, 961 "wrong md5sum for %s" % name) 962 963 if self._fs_supports_holes(): 964 s = os.stat(filename) 965 self.assertLess(s.st_blocks * 512, s.st_size) 966 967 def test_sparse_file_old(self): 968 self._test_sparse_file("gnu/sparse") 969 970 def test_sparse_file_00(self): 971 self._test_sparse_file("gnu/sparse-0.0") 972 973 def test_sparse_file_01(self): 974 self._test_sparse_file("gnu/sparse-0.1") 975 976 def test_sparse_file_10(self): 977 self._test_sparse_file("gnu/sparse-1.0") 978 979 @staticmethod 980 def _fs_supports_holes(): 981 # Return True if the platform knows the st_blocks stat attribute and 982 # uses st_blocks units of 512 bytes, and if the filesystem is able to 983 # store holes of 4 KiB in files. 984 # 985 # The function returns False if page size is larger than 4 KiB. 986 # For example, ppc64 uses pages of 64 KiB. 987 if sys.platform.startswith("linux"): 988 # Linux evidentially has 512 byte st_blocks units. 989 name = os.path.join(TEMPDIR, "sparse-test") 990 with open(name, "wb") as fobj: 991 # Seek to "punch a hole" of 4 KiB 992 fobj.seek(4096) 993 fobj.write(b'x' * 4096) 994 fobj.truncate() 995 s = os.stat(name) 996 support.unlink(name) 997 return (s.st_blocks * 512 < s.st_size) 998 else: 999 return False 1000 1001 1002class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1003 1004 subdir = "pax" 1005 longnametype = tarfile.XHDTYPE 1006 1007 def test_pax_global_headers(self): 1008 tar = tarfile.open(tarname, encoding="iso8859-1") 1009 try: 1010 tarinfo = tar.getmember("pax/regtype1") 1011 self.assertEqual(tarinfo.uname, "foo") 1012 self.assertEqual(tarinfo.gname, "bar") 1013 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1014 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1015 1016 tarinfo = tar.getmember("pax/regtype2") 1017 self.assertEqual(tarinfo.uname, "") 1018 self.assertEqual(tarinfo.gname, "bar") 1019 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1020 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1021 1022 tarinfo = tar.getmember("pax/regtype3") 1023 self.assertEqual(tarinfo.uname, "tarfile") 1024 self.assertEqual(tarinfo.gname, "tarfile") 1025 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1026 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1027 finally: 1028 tar.close() 1029 1030 def test_pax_number_fields(self): 1031 # All following number fields are read from the pax header. 1032 tar = tarfile.open(tarname, encoding="iso8859-1") 1033 try: 1034 tarinfo = tar.getmember("pax/regtype4") 1035 self.assertEqual(tarinfo.size, 7011) 1036 self.assertEqual(tarinfo.uid, 123) 1037 self.assertEqual(tarinfo.gid, 123) 1038 self.assertEqual(tarinfo.mtime, 1041808783.0) 1039 self.assertEqual(type(tarinfo.mtime), float) 1040 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1041 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1042 finally: 1043 tar.close() 1044 1045 1046class WriteTestBase(TarTest): 1047 # Put all write tests in here that are supposed to be tested 1048 # in all possible mode combinations. 1049 1050 def test_fileobj_no_close(self): 1051 fobj = io.BytesIO() 1052 tar = tarfile.open(fileobj=fobj, mode=self.mode) 1053 tar.addfile(tarfile.TarInfo("foo")) 1054 tar.close() 1055 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1056 # Issue #20238: Incomplete gzip output with mode="w:gz" 1057 data = fobj.getvalue() 1058 del tar 1059 support.gc_collect() 1060 self.assertFalse(fobj.closed) 1061 self.assertEqual(data, fobj.getvalue()) 1062 1063 def test_eof_marker(self): 1064 # Make sure an end of archive marker is written (two zero blocks). 1065 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1066 # So, we create an archive that has exactly 10240 bytes without the 1067 # marker, and has 20480 bytes once the marker is written. 1068 with tarfile.open(tmpname, self.mode) as tar: 1069 t = tarfile.TarInfo("foo") 1070 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1071 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1072 1073 with self.open(tmpname, "rb") as fobj: 1074 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1075 1076 1077class WriteTest(WriteTestBase, unittest.TestCase): 1078 1079 prefix = "w:" 1080 1081 def test_100_char_name(self): 1082 # The name field in a tar header stores strings of at most 100 chars. 1083 # If a string is shorter than 100 chars it has to be padded with '\0', 1084 # which implies that a string of exactly 100 chars is stored without 1085 # a trailing '\0'. 1086 name = "0123456789" * 10 1087 tar = tarfile.open(tmpname, self.mode) 1088 try: 1089 t = tarfile.TarInfo(name) 1090 tar.addfile(t) 1091 finally: 1092 tar.close() 1093 1094 tar = tarfile.open(tmpname) 1095 try: 1096 self.assertEqual(tar.getnames()[0], name, 1097 "failed to store 100 char filename") 1098 finally: 1099 tar.close() 1100 1101 def test_tar_size(self): 1102 # Test for bug #1013882. 1103 tar = tarfile.open(tmpname, self.mode) 1104 try: 1105 path = os.path.join(TEMPDIR, "file") 1106 with open(path, "wb") as fobj: 1107 fobj.write(b"aaa") 1108 tar.add(path) 1109 finally: 1110 tar.close() 1111 self.assertGreater(os.path.getsize(tmpname), 0, 1112 "tarfile is empty") 1113 1114 # The test_*_size tests test for bug #1167128. 1115 def test_file_size(self): 1116 tar = tarfile.open(tmpname, self.mode) 1117 try: 1118 path = os.path.join(TEMPDIR, "file") 1119 with open(path, "wb"): 1120 pass 1121 tarinfo = tar.gettarinfo(path) 1122 self.assertEqual(tarinfo.size, 0) 1123 1124 with open(path, "wb") as fobj: 1125 fobj.write(b"aaa") 1126 tarinfo = tar.gettarinfo(path) 1127 self.assertEqual(tarinfo.size, 3) 1128 finally: 1129 tar.close() 1130 1131 def test_directory_size(self): 1132 path = os.path.join(TEMPDIR, "directory") 1133 os.mkdir(path) 1134 try: 1135 tar = tarfile.open(tmpname, self.mode) 1136 try: 1137 tarinfo = tar.gettarinfo(path) 1138 self.assertEqual(tarinfo.size, 0) 1139 finally: 1140 tar.close() 1141 finally: 1142 support.rmdir(path) 1143 1144 # mock the following: 1145 # os.listdir: so we know that files are in the wrong order 1146 def test_ordered_recursion(self): 1147 path = os.path.join(TEMPDIR, "directory") 1148 os.mkdir(path) 1149 open(os.path.join(path, "1"), "a").close() 1150 open(os.path.join(path, "2"), "a").close() 1151 try: 1152 tar = tarfile.open(tmpname, self.mode) 1153 try: 1154 with unittest.mock.patch('os.listdir') as mock_listdir: 1155 mock_listdir.return_value = ["2", "1"] 1156 tar.add(path) 1157 paths = [] 1158 for m in tar.getmembers(): 1159 paths.append(os.path.split(m.name)[-1]) 1160 self.assertEqual(paths, ["directory", "1", "2"]); 1161 finally: 1162 tar.close() 1163 finally: 1164 support.unlink(os.path.join(path, "1")) 1165 support.unlink(os.path.join(path, "2")) 1166 support.rmdir(path) 1167 1168 def test_gettarinfo_pathlike_name(self): 1169 with tarfile.open(tmpname, self.mode) as tar: 1170 path = pathlib.Path(TEMPDIR) / "file" 1171 with open(path, "wb") as fobj: 1172 fobj.write(b"aaa") 1173 tarinfo = tar.gettarinfo(path) 1174 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1175 self.assertIsInstance(tarinfo.name, str) 1176 self.assertEqual(tarinfo.name, tarinfo2.name) 1177 self.assertEqual(tarinfo.size, 3) 1178 1179 @unittest.skipUnless(hasattr(os, "link"), 1180 "Missing hardlink implementation") 1181 def test_link_size(self): 1182 link = os.path.join(TEMPDIR, "link") 1183 target = os.path.join(TEMPDIR, "link_target") 1184 with open(target, "wb") as fobj: 1185 fobj.write(b"aaa") 1186 try: 1187 os.link(target, link) 1188 except PermissionError as e: 1189 self.skipTest('os.link(): %s' % e) 1190 try: 1191 tar = tarfile.open(tmpname, self.mode) 1192 try: 1193 # Record the link target in the inodes list. 1194 tar.gettarinfo(target) 1195 tarinfo = tar.gettarinfo(link) 1196 self.assertEqual(tarinfo.size, 0) 1197 finally: 1198 tar.close() 1199 finally: 1200 support.unlink(target) 1201 support.unlink(link) 1202 1203 @support.skip_unless_symlink 1204 def test_symlink_size(self): 1205 path = os.path.join(TEMPDIR, "symlink") 1206 os.symlink("link_target", path) 1207 try: 1208 tar = tarfile.open(tmpname, self.mode) 1209 try: 1210 tarinfo = tar.gettarinfo(path) 1211 self.assertEqual(tarinfo.size, 0) 1212 finally: 1213 tar.close() 1214 finally: 1215 support.unlink(path) 1216 1217 def test_add_self(self): 1218 # Test for #1257255. 1219 dstname = os.path.abspath(tmpname) 1220 tar = tarfile.open(tmpname, self.mode) 1221 try: 1222 self.assertEqual(tar.name, dstname, 1223 "archive name must be absolute") 1224 tar.add(dstname) 1225 self.assertEqual(tar.getnames(), [], 1226 "added the archive to itself") 1227 1228 with support.change_cwd(TEMPDIR): 1229 tar.add(dstname) 1230 self.assertEqual(tar.getnames(), [], 1231 "added the archive to itself") 1232 finally: 1233 tar.close() 1234 1235 def test_filter(self): 1236 tempdir = os.path.join(TEMPDIR, "filter") 1237 os.mkdir(tempdir) 1238 try: 1239 for name in ("foo", "bar", "baz"): 1240 name = os.path.join(tempdir, name) 1241 support.create_empty_file(name) 1242 1243 def filter(tarinfo): 1244 if os.path.basename(tarinfo.name) == "bar": 1245 return 1246 tarinfo.uid = 123 1247 tarinfo.uname = "foo" 1248 return tarinfo 1249 1250 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1251 try: 1252 tar.add(tempdir, arcname="empty_dir", filter=filter) 1253 finally: 1254 tar.close() 1255 1256 # Verify that filter is a keyword-only argument 1257 with self.assertRaises(TypeError): 1258 tar.add(tempdir, "empty_dir", True, None, filter) 1259 1260 tar = tarfile.open(tmpname, "r") 1261 try: 1262 for tarinfo in tar: 1263 self.assertEqual(tarinfo.uid, 123) 1264 self.assertEqual(tarinfo.uname, "foo") 1265 self.assertEqual(len(tar.getmembers()), 3) 1266 finally: 1267 tar.close() 1268 finally: 1269 support.rmtree(tempdir) 1270 1271 # Guarantee that stored pathnames are not modified. Don't 1272 # remove ./ or ../ or double slashes. Still make absolute 1273 # pathnames relative. 1274 # For details see bug #6054. 1275 def _test_pathname(self, path, cmp_path=None, dir=False): 1276 # Create a tarfile with an empty member named path 1277 # and compare the stored name with the original. 1278 foo = os.path.join(TEMPDIR, "foo") 1279 if not dir: 1280 support.create_empty_file(foo) 1281 else: 1282 os.mkdir(foo) 1283 1284 tar = tarfile.open(tmpname, self.mode) 1285 try: 1286 tar.add(foo, arcname=path) 1287 finally: 1288 tar.close() 1289 1290 tar = tarfile.open(tmpname, "r") 1291 try: 1292 t = tar.next() 1293 finally: 1294 tar.close() 1295 1296 if not dir: 1297 support.unlink(foo) 1298 else: 1299 support.rmdir(foo) 1300 1301 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1302 1303 1304 @support.skip_unless_symlink 1305 def test_extractall_symlinks(self): 1306 # Test if extractall works properly when tarfile contains symlinks 1307 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1308 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1309 os.mkdir(tempdir) 1310 try: 1311 source_file = os.path.join(tempdir,'source') 1312 target_file = os.path.join(tempdir,'symlink') 1313 with open(source_file,'w') as f: 1314 f.write('something\n') 1315 os.symlink(source_file, target_file) 1316 tar = tarfile.open(temparchive,'w') 1317 tar.add(source_file) 1318 tar.add(target_file) 1319 tar.close() 1320 # Let's extract it to the location which contains the symlink 1321 tar = tarfile.open(temparchive,'r') 1322 # this should not raise OSError: [Errno 17] File exists 1323 try: 1324 tar.extractall(path=tempdir) 1325 except OSError: 1326 self.fail("extractall failed with symlinked files") 1327 finally: 1328 tar.close() 1329 finally: 1330 support.unlink(temparchive) 1331 support.rmtree(tempdir) 1332 1333 def test_pathnames(self): 1334 self._test_pathname("foo") 1335 self._test_pathname(os.path.join("foo", ".", "bar")) 1336 self._test_pathname(os.path.join("foo", "..", "bar")) 1337 self._test_pathname(os.path.join(".", "foo")) 1338 self._test_pathname(os.path.join(".", "foo", ".")) 1339 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1340 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1341 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1342 self._test_pathname(os.path.join("..", "foo")) 1343 self._test_pathname(os.path.join("..", "foo", "..")) 1344 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1345 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1346 1347 self._test_pathname("foo" + os.sep + os.sep + "bar") 1348 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1349 1350 def test_abs_pathnames(self): 1351 if sys.platform == "win32": 1352 self._test_pathname("C:\\foo", "foo") 1353 else: 1354 self._test_pathname("/foo", "foo") 1355 self._test_pathname("///foo", "foo") 1356 1357 def test_cwd(self): 1358 # Test adding the current working directory. 1359 with support.change_cwd(TEMPDIR): 1360 tar = tarfile.open(tmpname, self.mode) 1361 try: 1362 tar.add(".") 1363 finally: 1364 tar.close() 1365 1366 tar = tarfile.open(tmpname, "r") 1367 try: 1368 for t in tar: 1369 if t.name != ".": 1370 self.assertTrue(t.name.startswith("./"), t.name) 1371 finally: 1372 tar.close() 1373 1374 def test_open_nonwritable_fileobj(self): 1375 for exctype in OSError, EOFError, RuntimeError: 1376 class BadFile(io.BytesIO): 1377 first = True 1378 def write(self, data): 1379 if self.first: 1380 self.first = False 1381 raise exctype 1382 1383 f = BadFile() 1384 with self.assertRaises(exctype): 1385 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1386 format=tarfile.PAX_FORMAT, 1387 pax_headers={'non': 'empty'}) 1388 self.assertFalse(f.closed) 1389 1390class GzipWriteTest(GzipTest, WriteTest): 1391 pass 1392 1393class Bz2WriteTest(Bz2Test, WriteTest): 1394 pass 1395 1396class LzmaWriteTest(LzmaTest, WriteTest): 1397 pass 1398 1399 1400class StreamWriteTest(WriteTestBase, unittest.TestCase): 1401 1402 prefix = "w|" 1403 decompressor = None 1404 1405 def test_stream_padding(self): 1406 # Test for bug #1543303. 1407 tar = tarfile.open(tmpname, self.mode) 1408 tar.close() 1409 if self.decompressor: 1410 dec = self.decompressor() 1411 with open(tmpname, "rb") as fobj: 1412 data = fobj.read() 1413 data = dec.decompress(data) 1414 self.assertFalse(dec.unused_data, "found trailing data") 1415 else: 1416 with self.open(tmpname) as fobj: 1417 data = fobj.read() 1418 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1419 "incorrect zero padding") 1420 1421 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1422 "Missing umask implementation") 1423 def test_file_mode(self): 1424 # Test for issue #8464: Create files with correct 1425 # permissions. 1426 if os.path.exists(tmpname): 1427 support.unlink(tmpname) 1428 1429 original_umask = os.umask(0o022) 1430 try: 1431 tar = tarfile.open(tmpname, self.mode) 1432 tar.close() 1433 mode = os.stat(tmpname).st_mode & 0o777 1434 self.assertEqual(mode, 0o644, "wrong file permissions") 1435 finally: 1436 os.umask(original_umask) 1437 1438class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1439 pass 1440 1441class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1442 decompressor = bz2.BZ2Decompressor if bz2 else None 1443 1444class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1445 decompressor = lzma.LZMADecompressor if lzma else None 1446 1447 1448class GNUWriteTest(unittest.TestCase): 1449 # This testcase checks for correct creation of GNU Longname 1450 # and Longlink extended headers (cp. bug #812325). 1451 1452 def _length(self, s): 1453 blocks = len(s) // 512 + 1 1454 return blocks * 512 1455 1456 def _calc_size(self, name, link=None): 1457 # Initial tar header 1458 count = 512 1459 1460 if len(name) > tarfile.LENGTH_NAME: 1461 # GNU longname extended header + longname 1462 count += 512 1463 count += self._length(name) 1464 if link is not None and len(link) > tarfile.LENGTH_LINK: 1465 # GNU longlink extended header + longlink 1466 count += 512 1467 count += self._length(link) 1468 return count 1469 1470 def _test(self, name, link=None): 1471 tarinfo = tarfile.TarInfo(name) 1472 if link: 1473 tarinfo.linkname = link 1474 tarinfo.type = tarfile.LNKTYPE 1475 1476 tar = tarfile.open(tmpname, "w") 1477 try: 1478 tar.format = tarfile.GNU_FORMAT 1479 tar.addfile(tarinfo) 1480 1481 v1 = self._calc_size(name, link) 1482 v2 = tar.offset 1483 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1484 finally: 1485 tar.close() 1486 1487 tar = tarfile.open(tmpname) 1488 try: 1489 member = tar.next() 1490 self.assertIsNotNone(member, 1491 "unable to read longname member") 1492 self.assertEqual(tarinfo.name, member.name, 1493 "unable to read longname member") 1494 self.assertEqual(tarinfo.linkname, member.linkname, 1495 "unable to read longname member") 1496 finally: 1497 tar.close() 1498 1499 def test_longname_1023(self): 1500 self._test(("longnam/" * 127) + "longnam") 1501 1502 def test_longname_1024(self): 1503 self._test(("longnam/" * 127) + "longname") 1504 1505 def test_longname_1025(self): 1506 self._test(("longnam/" * 127) + "longname_") 1507 1508 def test_longlink_1023(self): 1509 self._test("name", ("longlnk/" * 127) + "longlnk") 1510 1511 def test_longlink_1024(self): 1512 self._test("name", ("longlnk/" * 127) + "longlink") 1513 1514 def test_longlink_1025(self): 1515 self._test("name", ("longlnk/" * 127) + "longlink_") 1516 1517 def test_longnamelink_1023(self): 1518 self._test(("longnam/" * 127) + "longnam", 1519 ("longlnk/" * 127) + "longlnk") 1520 1521 def test_longnamelink_1024(self): 1522 self._test(("longnam/" * 127) + "longname", 1523 ("longlnk/" * 127) + "longlink") 1524 1525 def test_longnamelink_1025(self): 1526 self._test(("longnam/" * 127) + "longname_", 1527 ("longlnk/" * 127) + "longlink_") 1528 1529 1530class CreateTest(WriteTestBase, unittest.TestCase): 1531 1532 prefix = "x:" 1533 1534 file_path = os.path.join(TEMPDIR, "spameggs42") 1535 1536 def setUp(self): 1537 support.unlink(tmpname) 1538 1539 @classmethod 1540 def setUpClass(cls): 1541 with open(cls.file_path, "wb") as fobj: 1542 fobj.write(b"aaa") 1543 1544 @classmethod 1545 def tearDownClass(cls): 1546 support.unlink(cls.file_path) 1547 1548 def test_create(self): 1549 with tarfile.open(tmpname, self.mode) as tobj: 1550 tobj.add(self.file_path) 1551 1552 with self.taropen(tmpname) as tobj: 1553 names = tobj.getnames() 1554 self.assertEqual(len(names), 1) 1555 self.assertIn('spameggs42', names[0]) 1556 1557 def test_create_existing(self): 1558 with tarfile.open(tmpname, self.mode) as tobj: 1559 tobj.add(self.file_path) 1560 1561 with self.assertRaises(FileExistsError): 1562 tobj = tarfile.open(tmpname, self.mode) 1563 1564 with self.taropen(tmpname) as tobj: 1565 names = tobj.getnames() 1566 self.assertEqual(len(names), 1) 1567 self.assertIn('spameggs42', names[0]) 1568 1569 def test_create_taropen(self): 1570 with self.taropen(tmpname, "x") as tobj: 1571 tobj.add(self.file_path) 1572 1573 with self.taropen(tmpname) as tobj: 1574 names = tobj.getnames() 1575 self.assertEqual(len(names), 1) 1576 self.assertIn('spameggs42', names[0]) 1577 1578 def test_create_existing_taropen(self): 1579 with self.taropen(tmpname, "x") as tobj: 1580 tobj.add(self.file_path) 1581 1582 with self.assertRaises(FileExistsError): 1583 with self.taropen(tmpname, "x"): 1584 pass 1585 1586 with self.taropen(tmpname) as tobj: 1587 names = tobj.getnames() 1588 self.assertEqual(len(names), 1) 1589 self.assertIn("spameggs42", names[0]) 1590 1591 def test_create_pathlike_name(self): 1592 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1593 self.assertIsInstance(tobj.name, str) 1594 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1595 tobj.add(pathlib.Path(self.file_path)) 1596 names = tobj.getnames() 1597 self.assertEqual(len(names), 1) 1598 self.assertIn('spameggs42', names[0]) 1599 1600 with self.taropen(tmpname) as tobj: 1601 names = tobj.getnames() 1602 self.assertEqual(len(names), 1) 1603 self.assertIn('spameggs42', names[0]) 1604 1605 def test_create_taropen_pathlike_name(self): 1606 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1607 self.assertIsInstance(tobj.name, str) 1608 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1609 tobj.add(pathlib.Path(self.file_path)) 1610 names = tobj.getnames() 1611 self.assertEqual(len(names), 1) 1612 self.assertIn('spameggs42', names[0]) 1613 1614 with self.taropen(tmpname) as tobj: 1615 names = tobj.getnames() 1616 self.assertEqual(len(names), 1) 1617 self.assertIn('spameggs42', names[0]) 1618 1619 1620class GzipCreateTest(GzipTest, CreateTest): 1621 pass 1622 1623 1624class Bz2CreateTest(Bz2Test, CreateTest): 1625 pass 1626 1627 1628class LzmaCreateTest(LzmaTest, CreateTest): 1629 pass 1630 1631 1632class CreateWithXModeTest(CreateTest): 1633 1634 prefix = "x" 1635 1636 test_create_taropen = None 1637 test_create_existing_taropen = None 1638 1639 1640@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1641class HardlinkTest(unittest.TestCase): 1642 # Test the creation of LNKTYPE (hardlink) members in an archive. 1643 1644 def setUp(self): 1645 self.foo = os.path.join(TEMPDIR, "foo") 1646 self.bar = os.path.join(TEMPDIR, "bar") 1647 1648 with open(self.foo, "wb") as fobj: 1649 fobj.write(b"foo") 1650 1651 try: 1652 os.link(self.foo, self.bar) 1653 except PermissionError as e: 1654 self.skipTest('os.link(): %s' % e) 1655 1656 self.tar = tarfile.open(tmpname, "w") 1657 self.tar.add(self.foo) 1658 1659 def tearDown(self): 1660 self.tar.close() 1661 support.unlink(self.foo) 1662 support.unlink(self.bar) 1663 1664 def test_add_twice(self): 1665 # The same name will be added as a REGTYPE every 1666 # time regardless of st_nlink. 1667 tarinfo = self.tar.gettarinfo(self.foo) 1668 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1669 "add file as regular failed") 1670 1671 def test_add_hardlink(self): 1672 tarinfo = self.tar.gettarinfo(self.bar) 1673 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1674 "add file as hardlink failed") 1675 1676 def test_dereference_hardlink(self): 1677 self.tar.dereference = True 1678 tarinfo = self.tar.gettarinfo(self.bar) 1679 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1680 "dereferencing hardlink failed") 1681 1682 1683class PaxWriteTest(GNUWriteTest): 1684 1685 def _test(self, name, link=None): 1686 # See GNUWriteTest. 1687 tarinfo = tarfile.TarInfo(name) 1688 if link: 1689 tarinfo.linkname = link 1690 tarinfo.type = tarfile.LNKTYPE 1691 1692 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1693 try: 1694 tar.addfile(tarinfo) 1695 finally: 1696 tar.close() 1697 1698 tar = tarfile.open(tmpname) 1699 try: 1700 if link: 1701 l = tar.getmembers()[0].linkname 1702 self.assertEqual(link, l, "PAX longlink creation failed") 1703 else: 1704 n = tar.getmembers()[0].name 1705 self.assertEqual(name, n, "PAX longname creation failed") 1706 finally: 1707 tar.close() 1708 1709 def test_pax_global_header(self): 1710 pax_headers = { 1711 "foo": "bar", 1712 "uid": "0", 1713 "mtime": "1.23", 1714 "test": "\xe4\xf6\xfc", 1715 "\xe4\xf6\xfc": "test"} 1716 1717 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1718 pax_headers=pax_headers) 1719 try: 1720 tar.addfile(tarfile.TarInfo("test")) 1721 finally: 1722 tar.close() 1723 1724 # Test if the global header was written correctly. 1725 tar = tarfile.open(tmpname, encoding="iso8859-1") 1726 try: 1727 self.assertEqual(tar.pax_headers, pax_headers) 1728 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1729 # Test if all the fields are strings. 1730 for key, val in tar.pax_headers.items(): 1731 self.assertIsNot(type(key), bytes) 1732 self.assertIsNot(type(val), bytes) 1733 if key in tarfile.PAX_NUMBER_FIELDS: 1734 try: 1735 tarfile.PAX_NUMBER_FIELDS[key](val) 1736 except (TypeError, ValueError): 1737 self.fail("unable to convert pax header field") 1738 finally: 1739 tar.close() 1740 1741 def test_pax_extended_header(self): 1742 # The fields from the pax header have priority over the 1743 # TarInfo. 1744 pax_headers = {"path": "foo", "uid": "123"} 1745 1746 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1747 encoding="iso8859-1") 1748 try: 1749 t = tarfile.TarInfo() 1750 t.name = "\xe4\xf6\xfc" # non-ASCII 1751 t.uid = 8**8 # too large 1752 t.pax_headers = pax_headers 1753 tar.addfile(t) 1754 finally: 1755 tar.close() 1756 1757 tar = tarfile.open(tmpname, encoding="iso8859-1") 1758 try: 1759 t = tar.getmembers()[0] 1760 self.assertEqual(t.pax_headers, pax_headers) 1761 self.assertEqual(t.name, "foo") 1762 self.assertEqual(t.uid, 123) 1763 finally: 1764 tar.close() 1765 1766 1767class UnicodeTest: 1768 1769 def test_iso8859_1_filename(self): 1770 self._test_unicode_filename("iso8859-1") 1771 1772 def test_utf7_filename(self): 1773 self._test_unicode_filename("utf7") 1774 1775 def test_utf8_filename(self): 1776 self._test_unicode_filename("utf-8") 1777 1778 def _test_unicode_filename(self, encoding): 1779 tar = tarfile.open(tmpname, "w", format=self.format, 1780 encoding=encoding, errors="strict") 1781 try: 1782 name = "\xe4\xf6\xfc" 1783 tar.addfile(tarfile.TarInfo(name)) 1784 finally: 1785 tar.close() 1786 1787 tar = tarfile.open(tmpname, encoding=encoding) 1788 try: 1789 self.assertEqual(tar.getmembers()[0].name, name) 1790 finally: 1791 tar.close() 1792 1793 def test_unicode_filename_error(self): 1794 tar = tarfile.open(tmpname, "w", format=self.format, 1795 encoding="ascii", errors="strict") 1796 try: 1797 tarinfo = tarfile.TarInfo() 1798 1799 tarinfo.name = "\xe4\xf6\xfc" 1800 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1801 1802 tarinfo.name = "foo" 1803 tarinfo.uname = "\xe4\xf6\xfc" 1804 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1805 finally: 1806 tar.close() 1807 1808 def test_unicode_argument(self): 1809 tar = tarfile.open(tarname, "r", 1810 encoding="iso8859-1", errors="strict") 1811 try: 1812 for t in tar: 1813 self.assertIs(type(t.name), str) 1814 self.assertIs(type(t.linkname), str) 1815 self.assertIs(type(t.uname), str) 1816 self.assertIs(type(t.gname), str) 1817 finally: 1818 tar.close() 1819 1820 def test_uname_unicode(self): 1821 t = tarfile.TarInfo("foo") 1822 t.uname = "\xe4\xf6\xfc" 1823 t.gname = "\xe4\xf6\xfc" 1824 1825 tar = tarfile.open(tmpname, mode="w", format=self.format, 1826 encoding="iso8859-1") 1827 try: 1828 tar.addfile(t) 1829 finally: 1830 tar.close() 1831 1832 tar = tarfile.open(tmpname, encoding="iso8859-1") 1833 try: 1834 t = tar.getmember("foo") 1835 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1836 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1837 1838 if self.format != tarfile.PAX_FORMAT: 1839 tar.close() 1840 tar = tarfile.open(tmpname, encoding="ascii") 1841 t = tar.getmember("foo") 1842 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1843 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1844 finally: 1845 tar.close() 1846 1847 1848class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1849 1850 format = tarfile.USTAR_FORMAT 1851 1852 # Test whether the utf-8 encoded version of a filename exceeds the 100 1853 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1854 # bytes). 1855 def test_unicode_name1(self): 1856 self._test_ustar_name("0123456789" * 10) 1857 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1858 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1859 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1860 1861 def test_unicode_name2(self): 1862 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1863 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1864 1865 # Test whether the utf-8 encoded version of a filename exceeds the 155 1866 # bytes prefix + '/' + 100 bytes name limit. 1867 def test_unicode_longname1(self): 1868 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1869 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1870 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1871 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1872 1873 def test_unicode_longname2(self): 1874 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1875 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1876 1877 def test_unicode_longname3(self): 1878 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1879 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1880 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1881 1882 def test_unicode_longname4(self): 1883 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1884 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1885 1886 def _test_ustar_name(self, name, exc=None): 1887 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1888 t = tarfile.TarInfo(name) 1889 if exc is None: 1890 tar.addfile(t) 1891 else: 1892 self.assertRaises(exc, tar.addfile, t) 1893 1894 if exc is None: 1895 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1896 for t in tar: 1897 self.assertEqual(name, t.name) 1898 break 1899 1900 # Test the same as above for the 100 bytes link field. 1901 def test_unicode_link1(self): 1902 self._test_ustar_link("0123456789" * 10) 1903 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1904 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1905 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1906 1907 def test_unicode_link2(self): 1908 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1909 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1910 1911 def _test_ustar_link(self, name, exc=None): 1912 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1913 t = tarfile.TarInfo("foo") 1914 t.linkname = name 1915 if exc is None: 1916 tar.addfile(t) 1917 else: 1918 self.assertRaises(exc, tar.addfile, t) 1919 1920 if exc is None: 1921 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1922 for t in tar: 1923 self.assertEqual(name, t.linkname) 1924 break 1925 1926 1927class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1928 1929 format = tarfile.GNU_FORMAT 1930 1931 def test_bad_pax_header(self): 1932 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1933 # without a hdrcharset=BINARY header. 1934 for encoding, name in ( 1935 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1936 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1937 with tarfile.open(tarname, encoding=encoding, 1938 errors="surrogateescape") as tar: 1939 try: 1940 t = tar.getmember(name) 1941 except KeyError: 1942 self.fail("unable to read bad GNU tar pax header") 1943 1944 1945class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1946 1947 format = tarfile.PAX_FORMAT 1948 1949 # PAX_FORMAT ignores encoding in write mode. 1950 test_unicode_filename_error = None 1951 1952 def test_binary_header(self): 1953 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1954 for encoding, name in ( 1955 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1956 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1957 with tarfile.open(tarname, encoding=encoding, 1958 errors="surrogateescape") as tar: 1959 try: 1960 t = tar.getmember(name) 1961 except KeyError: 1962 self.fail("unable to read POSIX.1-2008 binary header") 1963 1964 1965class AppendTestBase: 1966 # Test append mode (cp. patch #1652681). 1967 1968 def setUp(self): 1969 self.tarname = tmpname 1970 if os.path.exists(self.tarname): 1971 support.unlink(self.tarname) 1972 1973 def _create_testtar(self, mode="w:"): 1974 with tarfile.open(tarname, encoding="iso8859-1") as src: 1975 t = src.getmember("ustar/regtype") 1976 t.name = "foo" 1977 with src.extractfile(t) as f: 1978 with tarfile.open(self.tarname, mode) as tar: 1979 tar.addfile(t, f) 1980 1981 def test_append_compressed(self): 1982 self._create_testtar("w:" + self.suffix) 1983 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1984 1985class AppendTest(AppendTestBase, unittest.TestCase): 1986 test_append_compressed = None 1987 1988 def _add_testfile(self, fileobj=None): 1989 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1990 tar.addfile(tarfile.TarInfo("bar")) 1991 1992 def _test(self, names=["bar"], fileobj=None): 1993 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1994 self.assertEqual(tar.getnames(), names) 1995 1996 def test_non_existing(self): 1997 self._add_testfile() 1998 self._test() 1999 2000 def test_empty(self): 2001 tarfile.open(self.tarname, "w:").close() 2002 self._add_testfile() 2003 self._test() 2004 2005 def test_empty_fileobj(self): 2006 fobj = io.BytesIO(b"\0" * 1024) 2007 self._add_testfile(fobj) 2008 fobj.seek(0) 2009 self._test(fileobj=fobj) 2010 2011 def test_fileobj(self): 2012 self._create_testtar() 2013 with open(self.tarname, "rb") as fobj: 2014 data = fobj.read() 2015 fobj = io.BytesIO(data) 2016 self._add_testfile(fobj) 2017 fobj.seek(0) 2018 self._test(names=["foo", "bar"], fileobj=fobj) 2019 2020 def test_existing(self): 2021 self._create_testtar() 2022 self._add_testfile() 2023 self._test(names=["foo", "bar"]) 2024 2025 # Append mode is supposed to fail if the tarfile to append to 2026 # does not end with a zero block. 2027 def _test_error(self, data): 2028 with open(self.tarname, "wb") as fobj: 2029 fobj.write(data) 2030 self.assertRaises(tarfile.ReadError, self._add_testfile) 2031 2032 def test_null(self): 2033 self._test_error(b"") 2034 2035 def test_incomplete(self): 2036 self._test_error(b"\0" * 13) 2037 2038 def test_premature_eof(self): 2039 data = tarfile.TarInfo("foo").tobuf() 2040 self._test_error(data) 2041 2042 def test_trailing_garbage(self): 2043 data = tarfile.TarInfo("foo").tobuf() 2044 self._test_error(data + b"\0" * 13) 2045 2046 def test_invalid(self): 2047 self._test_error(b"a" * 512) 2048 2049class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2050 pass 2051 2052class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2053 pass 2054 2055class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2056 pass 2057 2058 2059class LimitsTest(unittest.TestCase): 2060 2061 def test_ustar_limits(self): 2062 # 100 char name 2063 tarinfo = tarfile.TarInfo("0123456789" * 10) 2064 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2065 2066 # 101 char name that cannot be stored 2067 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2068 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2069 2070 # 256 char name with a slash at pos 156 2071 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2072 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2073 2074 # 256 char name that cannot be stored 2075 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2076 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2077 2078 # 512 char name 2079 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2080 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2081 2082 # 512 char linkname 2083 tarinfo = tarfile.TarInfo("longlink") 2084 tarinfo.linkname = "123/" * 126 + "longname" 2085 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2086 2087 # uid > 8 digits 2088 tarinfo = tarfile.TarInfo("name") 2089 tarinfo.uid = 0o10000000 2090 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2091 2092 def test_gnu_limits(self): 2093 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2094 tarinfo.tobuf(tarfile.GNU_FORMAT) 2095 2096 tarinfo = tarfile.TarInfo("longlink") 2097 tarinfo.linkname = "123/" * 126 + "longname" 2098 tarinfo.tobuf(tarfile.GNU_FORMAT) 2099 2100 # uid >= 256 ** 7 2101 tarinfo = tarfile.TarInfo("name") 2102 tarinfo.uid = 0o4000000000000000000 2103 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2104 2105 def test_pax_limits(self): 2106 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2107 tarinfo.tobuf(tarfile.PAX_FORMAT) 2108 2109 tarinfo = tarfile.TarInfo("longlink") 2110 tarinfo.linkname = "123/" * 126 + "longname" 2111 tarinfo.tobuf(tarfile.PAX_FORMAT) 2112 2113 tarinfo = tarfile.TarInfo("name") 2114 tarinfo.uid = 0o4000000000000000000 2115 tarinfo.tobuf(tarfile.PAX_FORMAT) 2116 2117 2118class MiscTest(unittest.TestCase): 2119 2120 def test_char_fields(self): 2121 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2122 b"foo\0\0\0\0\0") 2123 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2124 b"foo") 2125 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2126 "foo") 2127 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2128 "foo") 2129 2130 def test_read_number_fields(self): 2131 # Issue 13158: Test if GNU tar specific base-256 number fields 2132 # are decoded correctly. 2133 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2134 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2135 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2136 0o10000000) 2137 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2138 0xffffffff) 2139 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2140 -1) 2141 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2142 -100) 2143 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2144 -0x100000000000000) 2145 2146 # Issue 24514: Test if empty number fields are converted to zero. 2147 self.assertEqual(tarfile.nti(b"\0"), 0) 2148 self.assertEqual(tarfile.nti(b" \0"), 0) 2149 2150 def test_write_number_fields(self): 2151 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2152 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2153 self.assertEqual(tarfile.itn(0o10000000), 2154 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2155 self.assertEqual(tarfile.itn(0xffffffff), 2156 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2157 self.assertEqual(tarfile.itn(-1), 2158 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2159 self.assertEqual(tarfile.itn(-100), 2160 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2161 self.assertEqual(tarfile.itn(-0x100000000000000), 2162 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2163 2164 # Issue 32713: Test if itn() supports float values outside the 2165 # non-GNU format range 2166 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2167 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2168 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2169 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2170 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2171 2172 def test_number_field_limits(self): 2173 with self.assertRaises(ValueError): 2174 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2175 with self.assertRaises(ValueError): 2176 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2177 with self.assertRaises(ValueError): 2178 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2179 with self.assertRaises(ValueError): 2180 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2181 2182 def test__all__(self): 2183 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2184 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2185 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2186 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2187 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2188 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2189 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2190 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2191 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2192 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2193 'filemode', 2194 'EmptyHeaderError', 'TruncatedHeaderError', 2195 'EOFHeaderError', 'InvalidHeaderError', 2196 'SubsequentHeaderError', 'ExFileObject', 2197 'main'} 2198 support.check__all__(self, tarfile, blacklist=blacklist) 2199 2200 2201class CommandLineTest(unittest.TestCase): 2202 2203 def tarfilecmd(self, *args, **kwargs): 2204 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2205 **kwargs) 2206 return out.replace(os.linesep.encode(), b'\n') 2207 2208 def tarfilecmd_failure(self, *args): 2209 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2210 2211 def make_simple_tarfile(self, tar_name): 2212 files = [support.findfile('tokenize_tests.txt'), 2213 support.findfile('tokenize_tests-no-coding-cookie-' 2214 'and-utf8-bom-sig-only.txt')] 2215 self.addCleanup(support.unlink, tar_name) 2216 with tarfile.open(tar_name, 'w') as tf: 2217 for tardata in files: 2218 tf.add(tardata, arcname=os.path.basename(tardata)) 2219 2220 def test_bad_use(self): 2221 rc, out, err = self.tarfilecmd_failure() 2222 self.assertEqual(out, b'') 2223 self.assertIn(b'usage', err.lower()) 2224 self.assertIn(b'error', err.lower()) 2225 self.assertIn(b'required', err.lower()) 2226 rc, out, err = self.tarfilecmd_failure('-l', '') 2227 self.assertEqual(out, b'') 2228 self.assertNotEqual(err.strip(), b'') 2229 2230 def test_test_command(self): 2231 for tar_name in testtarnames: 2232 for opt in '-t', '--test': 2233 out = self.tarfilecmd(opt, tar_name) 2234 self.assertEqual(out, b'') 2235 2236 def test_test_command_verbose(self): 2237 for tar_name in testtarnames: 2238 for opt in '-v', '--verbose': 2239 out = self.tarfilecmd(opt, '-t', tar_name) 2240 self.assertIn(b'is a tar archive.\n', out) 2241 2242 def test_test_command_invalid_file(self): 2243 zipname = support.findfile('zipdir.zip') 2244 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2245 self.assertIn(b' is not a tar archive.', err) 2246 self.assertEqual(out, b'') 2247 self.assertEqual(rc, 1) 2248 2249 for tar_name in testtarnames: 2250 with self.subTest(tar_name=tar_name): 2251 with open(tar_name, 'rb') as f: 2252 data = f.read() 2253 try: 2254 with open(tmpname, 'wb') as f: 2255 f.write(data[:511]) 2256 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2257 self.assertEqual(out, b'') 2258 self.assertEqual(rc, 1) 2259 finally: 2260 support.unlink(tmpname) 2261 2262 def test_list_command(self): 2263 for tar_name in testtarnames: 2264 with support.captured_stdout() as t: 2265 with tarfile.open(tar_name, 'r') as tf: 2266 tf.list(verbose=False) 2267 expected = t.getvalue().encode('ascii', 'backslashreplace') 2268 for opt in '-l', '--list': 2269 out = self.tarfilecmd(opt, tar_name, 2270 PYTHONIOENCODING='ascii') 2271 self.assertEqual(out, expected) 2272 2273 def test_list_command_verbose(self): 2274 for tar_name in testtarnames: 2275 with support.captured_stdout() as t: 2276 with tarfile.open(tar_name, 'r') as tf: 2277 tf.list(verbose=True) 2278 expected = t.getvalue().encode('ascii', 'backslashreplace') 2279 for opt in '-v', '--verbose': 2280 out = self.tarfilecmd(opt, '-l', tar_name, 2281 PYTHONIOENCODING='ascii') 2282 self.assertEqual(out, expected) 2283 2284 def test_list_command_invalid_file(self): 2285 zipname = support.findfile('zipdir.zip') 2286 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2287 self.assertIn(b' is not a tar archive.', err) 2288 self.assertEqual(out, b'') 2289 self.assertEqual(rc, 1) 2290 2291 def test_create_command(self): 2292 files = [support.findfile('tokenize_tests.txt'), 2293 support.findfile('tokenize_tests-no-coding-cookie-' 2294 'and-utf8-bom-sig-only.txt')] 2295 for opt in '-c', '--create': 2296 try: 2297 out = self.tarfilecmd(opt, tmpname, *files) 2298 self.assertEqual(out, b'') 2299 with tarfile.open(tmpname) as tar: 2300 tar.getmembers() 2301 finally: 2302 support.unlink(tmpname) 2303 2304 def test_create_command_verbose(self): 2305 files = [support.findfile('tokenize_tests.txt'), 2306 support.findfile('tokenize_tests-no-coding-cookie-' 2307 'and-utf8-bom-sig-only.txt')] 2308 for opt in '-v', '--verbose': 2309 try: 2310 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2311 self.assertIn(b' file created.', out) 2312 with tarfile.open(tmpname) as tar: 2313 tar.getmembers() 2314 finally: 2315 support.unlink(tmpname) 2316 2317 def test_create_command_dotless_filename(self): 2318 files = [support.findfile('tokenize_tests.txt')] 2319 try: 2320 out = self.tarfilecmd('-c', dotlessname, *files) 2321 self.assertEqual(out, b'') 2322 with tarfile.open(dotlessname) as tar: 2323 tar.getmembers() 2324 finally: 2325 support.unlink(dotlessname) 2326 2327 def test_create_command_dot_started_filename(self): 2328 tar_name = os.path.join(TEMPDIR, ".testtar") 2329 files = [support.findfile('tokenize_tests.txt')] 2330 try: 2331 out = self.tarfilecmd('-c', tar_name, *files) 2332 self.assertEqual(out, b'') 2333 with tarfile.open(tar_name) as tar: 2334 tar.getmembers() 2335 finally: 2336 support.unlink(tar_name) 2337 2338 def test_create_command_compressed(self): 2339 files = [support.findfile('tokenize_tests.txt'), 2340 support.findfile('tokenize_tests-no-coding-cookie-' 2341 'and-utf8-bom-sig-only.txt')] 2342 for filetype in (GzipTest, Bz2Test, LzmaTest): 2343 if not filetype.open: 2344 continue 2345 try: 2346 tar_name = tmpname + '.' + filetype.suffix 2347 out = self.tarfilecmd('-c', tar_name, *files) 2348 with filetype.taropen(tar_name) as tar: 2349 tar.getmembers() 2350 finally: 2351 support.unlink(tar_name) 2352 2353 def test_extract_command(self): 2354 self.make_simple_tarfile(tmpname) 2355 for opt in '-e', '--extract': 2356 try: 2357 with support.temp_cwd(tarextdir): 2358 out = self.tarfilecmd(opt, tmpname) 2359 self.assertEqual(out, b'') 2360 finally: 2361 support.rmtree(tarextdir) 2362 2363 def test_extract_command_verbose(self): 2364 self.make_simple_tarfile(tmpname) 2365 for opt in '-v', '--verbose': 2366 try: 2367 with support.temp_cwd(tarextdir): 2368 out = self.tarfilecmd(opt, '-e', tmpname) 2369 self.assertIn(b' file is extracted.', out) 2370 finally: 2371 support.rmtree(tarextdir) 2372 2373 def test_extract_command_different_directory(self): 2374 self.make_simple_tarfile(tmpname) 2375 try: 2376 with support.temp_cwd(tarextdir): 2377 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2378 self.assertEqual(out, b'') 2379 finally: 2380 support.rmtree(tarextdir) 2381 2382 def test_extract_command_invalid_file(self): 2383 zipname = support.findfile('zipdir.zip') 2384 with support.temp_cwd(tarextdir): 2385 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2386 self.assertIn(b' is not a tar archive.', err) 2387 self.assertEqual(out, b'') 2388 self.assertEqual(rc, 1) 2389 2390 2391class ContextManagerTest(unittest.TestCase): 2392 2393 def test_basic(self): 2394 with tarfile.open(tarname) as tar: 2395 self.assertFalse(tar.closed, "closed inside runtime context") 2396 self.assertTrue(tar.closed, "context manager failed") 2397 2398 def test_closed(self): 2399 # The __enter__() method is supposed to raise OSError 2400 # if the TarFile object is already closed. 2401 tar = tarfile.open(tarname) 2402 tar.close() 2403 with self.assertRaises(OSError): 2404 with tar: 2405 pass 2406 2407 def test_exception(self): 2408 # Test if the OSError exception is passed through properly. 2409 with self.assertRaises(Exception) as exc: 2410 with tarfile.open(tarname) as tar: 2411 raise OSError 2412 self.assertIsInstance(exc.exception, OSError, 2413 "wrong exception raised in context manager") 2414 self.assertTrue(tar.closed, "context manager failed") 2415 2416 def test_no_eof(self): 2417 # __exit__() must not write end-of-archive blocks if an 2418 # exception was raised. 2419 try: 2420 with tarfile.open(tmpname, "w") as tar: 2421 raise Exception 2422 except: 2423 pass 2424 self.assertEqual(os.path.getsize(tmpname), 0, 2425 "context manager wrote an end-of-archive block") 2426 self.assertTrue(tar.closed, "context manager failed") 2427 2428 def test_eof(self): 2429 # __exit__() must write end-of-archive blocks, i.e. call 2430 # TarFile.close() if there was no error. 2431 with tarfile.open(tmpname, "w"): 2432 pass 2433 self.assertNotEqual(os.path.getsize(tmpname), 0, 2434 "context manager wrote no end-of-archive block") 2435 2436 def test_fileobj(self): 2437 # Test that __exit__() did not close the external file 2438 # object. 2439 with open(tmpname, "wb") as fobj: 2440 try: 2441 with tarfile.open(fileobj=fobj, mode="w") as tar: 2442 raise Exception 2443 except: 2444 pass 2445 self.assertFalse(fobj.closed, "external file object was closed") 2446 self.assertTrue(tar.closed, "context manager failed") 2447 2448 2449@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2450class LinkEmulationTest(ReadTest, unittest.TestCase): 2451 2452 # Test for issue #8741 regression. On platforms that do not support 2453 # symbolic or hard links tarfile tries to extract these types of members 2454 # as the regular files they point to. 2455 def _test_link_extraction(self, name): 2456 self.tar.extract(name, TEMPDIR) 2457 with open(os.path.join(TEMPDIR, name), "rb") as f: 2458 data = f.read() 2459 self.assertEqual(md5sum(data), md5_regtype) 2460 2461 # See issues #1578269, #8879, and #17689 for some history on these skips 2462 @unittest.skipIf(hasattr(os.path, "islink"), 2463 "Skip emulation - has os.path.islink but not os.link") 2464 def test_hardlink_extraction1(self): 2465 self._test_link_extraction("ustar/lnktype") 2466 2467 @unittest.skipIf(hasattr(os.path, "islink"), 2468 "Skip emulation - has os.path.islink but not os.link") 2469 def test_hardlink_extraction2(self): 2470 self._test_link_extraction("./ustar/linktest2/lnktype") 2471 2472 @unittest.skipIf(hasattr(os, "symlink"), 2473 "Skip emulation if symlink exists") 2474 def test_symlink_extraction1(self): 2475 self._test_link_extraction("ustar/symtype") 2476 2477 @unittest.skipIf(hasattr(os, "symlink"), 2478 "Skip emulation if symlink exists") 2479 def test_symlink_extraction2(self): 2480 self._test_link_extraction("./ustar/linktest2/symtype") 2481 2482 2483class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2484 # Issue5068: The _BZ2Proxy.read() method loops forever 2485 # on an empty or partial bzipped file. 2486 2487 def _test_partial_input(self, mode): 2488 class MyBytesIO(io.BytesIO): 2489 hit_eof = False 2490 def read(self, n): 2491 if self.hit_eof: 2492 raise AssertionError("infinite loop detected in " 2493 "tarfile.open()") 2494 self.hit_eof = self.tell() == len(self.getvalue()) 2495 return super(MyBytesIO, self).read(n) 2496 def seek(self, *args): 2497 self.hit_eof = False 2498 return super(MyBytesIO, self).seek(*args) 2499 2500 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2501 for x in range(len(data) + 1): 2502 try: 2503 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2504 except tarfile.ReadError: 2505 pass # we have no interest in ReadErrors 2506 2507 def test_partial_input(self): 2508 self._test_partial_input("r") 2509 2510 def test_partial_input_bz2(self): 2511 self._test_partial_input("r:bz2") 2512 2513 2514def root_is_uid_gid_0(): 2515 try: 2516 import pwd, grp 2517 except ImportError: 2518 return False 2519 if pwd.getpwuid(0)[0] != 'root': 2520 return False 2521 if grp.getgrgid(0)[0] != 'root': 2522 return False 2523 return True 2524 2525 2526@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2527@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2528class NumericOwnerTest(unittest.TestCase): 2529 # mock the following: 2530 # os.chown: so we can test what's being called 2531 # os.chmod: so the modes are not actually changed. if they are, we can't 2532 # delete the files/directories 2533 # os.geteuid: so we can lie and say we're root (uid = 0) 2534 2535 @staticmethod 2536 def _make_test_archive(filename_1, dirname_1, filename_2): 2537 # the file contents to write 2538 fobj = io.BytesIO(b"content") 2539 2540 # create a tar file with a file, a directory, and a file within that 2541 # directory. Assign various .uid/.gid values to them 2542 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2543 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2544 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2545 ] 2546 with tarfile.open(tmpname, 'w') as tarfl: 2547 for name, uid, gid, typ, contents in items: 2548 t = tarfile.TarInfo(name) 2549 t.uid = uid 2550 t.gid = gid 2551 t.uname = 'root' 2552 t.gname = 'root' 2553 t.type = typ 2554 tarfl.addfile(t, contents) 2555 2556 # return the full pathname to the tar file 2557 return tmpname 2558 2559 @staticmethod 2560 @contextmanager 2561 def _setup_test(mock_geteuid): 2562 mock_geteuid.return_value = 0 # lie and say we're root 2563 fname = 'numeric-owner-testfile' 2564 dirname = 'dir' 2565 2566 # the names we want stored in the tarfile 2567 filename_1 = fname 2568 dirname_1 = dirname 2569 filename_2 = os.path.join(dirname, fname) 2570 2571 # create the tarfile with the contents we're after 2572 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2573 dirname_1, 2574 filename_2) 2575 2576 # open the tarfile for reading. yield it and the names of the items 2577 # we stored into the file 2578 with tarfile.open(tar_filename) as tarfl: 2579 yield tarfl, filename_1, dirname_1, filename_2 2580 2581 @unittest.mock.patch('os.chown') 2582 @unittest.mock.patch('os.chmod') 2583 @unittest.mock.patch('os.geteuid') 2584 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2585 mock_chown): 2586 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2587 filename_2): 2588 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2589 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2590 2591 # convert to filesystem paths 2592 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2593 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2594 2595 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2596 unittest.mock.call(f_filename_2, 88, 87), 2597 ], 2598 any_order=True) 2599 2600 @unittest.mock.patch('os.chown') 2601 @unittest.mock.patch('os.chmod') 2602 @unittest.mock.patch('os.geteuid') 2603 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2604 mock_chown): 2605 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2606 filename_2): 2607 tarfl.extractall(TEMPDIR, numeric_owner=True) 2608 2609 # convert to filesystem paths 2610 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2611 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2612 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2613 2614 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2615 unittest.mock.call(f_dirname_1, 77, 76), 2616 unittest.mock.call(f_filename_2, 88, 87), 2617 ], 2618 any_order=True) 2619 2620 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2621 # because the uname and gname in the test file are 'root', and extract() 2622 # will look them up using pwd and grp to find their uid and gid, which we 2623 # test here to be 0. 2624 @unittest.skipUnless(root_is_uid_gid_0(), 2625 'uid=0,gid=0 must be named "root"') 2626 @unittest.mock.patch('os.chown') 2627 @unittest.mock.patch('os.chmod') 2628 @unittest.mock.patch('os.geteuid') 2629 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2630 mock_chown): 2631 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2632 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2633 2634 # convert to filesystem paths 2635 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2636 2637 mock_chown.assert_called_with(f_filename_1, 0, 0) 2638 2639 @unittest.mock.patch('os.geteuid') 2640 def test_keyword_only(self, mock_geteuid): 2641 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2642 self.assertRaises(TypeError, 2643 tarfl.extract, filename_1, TEMPDIR, False, True) 2644 2645 2646def setUpModule(): 2647 support.unlink(TEMPDIR) 2648 os.makedirs(TEMPDIR) 2649 2650 global testtarnames 2651 testtarnames = [tarname] 2652 with open(tarname, "rb") as fobj: 2653 data = fobj.read() 2654 2655 # Create compressed tarfiles. 2656 for c in GzipTest, Bz2Test, LzmaTest: 2657 if c.open: 2658 support.unlink(c.tarname) 2659 testtarnames.append(c.tarname) 2660 with c.open(c.tarname, "wb") as tar: 2661 tar.write(data) 2662 2663def tearDownModule(): 2664 if os.path.exists(TEMPDIR): 2665 support.rmtree(TEMPDIR) 2666 2667if __name__ == "__main__": 2668 unittest.main() 2669