1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper, requires_hashdigest 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def sha256sum(data): 31 return sha256(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42sha256_regtype = ( 43 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 44) 45sha256_sparse = ( 46 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 47) 48 49 50class TarTest: 51 tarname = tarname 52 suffix = '' 53 open = io.FileIO 54 taropen = tarfile.TarFile.taropen 55 56 @property 57 def mode(self): 58 return self.prefix + self.suffix 59 60@support.requires_gzip 61class GzipTest: 62 tarname = gzipname 63 suffix = 'gz' 64 open = gzip.GzipFile if gzip else None 65 taropen = tarfile.TarFile.gzopen 66 67@support.requires_bz2 68class Bz2Test: 69 tarname = bz2name 70 suffix = 'bz2' 71 open = bz2.BZ2File if bz2 else None 72 taropen = tarfile.TarFile.bz2open 73 74@support.requires_lzma 75class LzmaTest: 76 tarname = xzname 77 suffix = 'xz' 78 open = lzma.LZMAFile if lzma else None 79 taropen = tarfile.TarFile.xzopen 80 81 82class ReadTest(TarTest): 83 84 prefix = "r:" 85 86 def setUp(self): 87 self.tar = tarfile.open(self.tarname, mode=self.mode, 88 encoding="iso8859-1") 89 90 def tearDown(self): 91 self.tar.close() 92 93 94class UstarReadTest(ReadTest, unittest.TestCase): 95 96 def test_fileobj_regular_file(self): 97 tarinfo = self.tar.getmember("ustar/regtype") 98 with self.tar.extractfile(tarinfo) as fobj: 99 data = fobj.read() 100 self.assertEqual(len(data), tarinfo.size, 101 "regular file extraction failed") 102 self.assertEqual(sha256sum(data), sha256_regtype, 103 "regular file extraction failed") 104 105 def test_fileobj_readlines(self): 106 self.tar.extract("ustar/regtype", TEMPDIR) 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 109 lines1 = fobj1.readlines() 110 111 with self.tar.extractfile(tarinfo) as fobj: 112 fobj2 = io.TextIOWrapper(fobj) 113 lines2 = fobj2.readlines() 114 self.assertEqual(lines1, lines2, 115 "fileobj.readlines() failed") 116 self.assertEqual(len(lines2), 114, 117 "fileobj.readlines() failed") 118 self.assertEqual(lines2[83], 119 "I will gladly admit that Python is not the fastest " 120 "running scripting language.\n", 121 "fileobj.readlines() failed") 122 123 def test_fileobj_iter(self): 124 self.tar.extract("ustar/regtype", TEMPDIR) 125 tarinfo = self.tar.getmember("ustar/regtype") 126 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 127 lines1 = fobj1.readlines() 128 with self.tar.extractfile(tarinfo) as fobj2: 129 lines2 = list(io.TextIOWrapper(fobj2)) 130 self.assertEqual(lines1, lines2, 131 "fileobj.__iter__() failed") 132 133 def test_fileobj_seek(self): 134 self.tar.extract("ustar/regtype", TEMPDIR) 135 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 136 data = fobj.read() 137 138 tarinfo = self.tar.getmember("ustar/regtype") 139 with self.tar.extractfile(tarinfo) as fobj: 140 text = fobj.read() 141 fobj.seek(0) 142 self.assertEqual(0, fobj.tell(), 143 "seek() to file's start failed") 144 fobj.seek(2048, 0) 145 self.assertEqual(2048, fobj.tell(), 146 "seek() to absolute position failed") 147 fobj.seek(-1024, 1) 148 self.assertEqual(1024, fobj.tell(), 149 "seek() to negative relative position failed") 150 fobj.seek(1024, 1) 151 self.assertEqual(2048, fobj.tell(), 152 "seek() to positive relative position failed") 153 s = fobj.read(10) 154 self.assertEqual(s, data[2048:2058], 155 "read() after seek failed") 156 fobj.seek(0, 2) 157 self.assertEqual(tarinfo.size, fobj.tell(), 158 "seek() to file's end failed") 159 self.assertEqual(fobj.read(), b"", 160 "read() at file's end did not return empty string") 161 fobj.seek(-tarinfo.size, 2) 162 self.assertEqual(0, fobj.tell(), 163 "relative seek() to file's end failed") 164 fobj.seek(512) 165 s1 = fobj.readlines() 166 fobj.seek(512) 167 s2 = fobj.readlines() 168 self.assertEqual(s1, s2, 169 "readlines() after seek failed") 170 fobj.seek(0) 171 self.assertEqual(len(fobj.readline()), fobj.tell(), 172 "tell() after readline() failed") 173 fobj.seek(512) 174 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 175 "tell() after seek() and readline() failed") 176 fobj.seek(0) 177 line = fobj.readline() 178 self.assertEqual(fobj.read(), data[len(line):], 179 "read() after readline() failed") 180 181 def test_fileobj_text(self): 182 with self.tar.extractfile("ustar/regtype") as fobj: 183 fobj = io.TextIOWrapper(fobj) 184 data = fobj.read().encode("iso8859-1") 185 self.assertEqual(sha256sum(data), sha256_regtype) 186 try: 187 fobj.seek(100) 188 except AttributeError: 189 # Issue #13815: seek() complained about a missing 190 # flush() method. 191 self.fail("seeking failed in text mode") 192 193 # Test if symbolic and hard links are resolved by extractfile(). The 194 # test link members each point to a regular member whose data is 195 # supposed to be exported. 196 def _test_fileobj_link(self, lnktype, regtype): 197 with self.tar.extractfile(lnktype) as a, \ 198 self.tar.extractfile(regtype) as b: 199 self.assertEqual(a.name, b.name) 200 201 def test_fileobj_link1(self): 202 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 203 204 def test_fileobj_link2(self): 205 self._test_fileobj_link("./ustar/linktest2/lnktype", 206 "ustar/linktest1/regtype") 207 208 def test_fileobj_symlink1(self): 209 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 210 211 def test_fileobj_symlink2(self): 212 self._test_fileobj_link("./ustar/linktest2/symtype", 213 "ustar/linktest1/regtype") 214 215 def test_issue14160(self): 216 self._test_fileobj_link("symtype2", "ustar/regtype") 217 218class GzipUstarReadTest(GzipTest, UstarReadTest): 219 pass 220 221class Bz2UstarReadTest(Bz2Test, UstarReadTest): 222 pass 223 224class LzmaUstarReadTest(LzmaTest, UstarReadTest): 225 pass 226 227 228class ListTest(ReadTest, unittest.TestCase): 229 230 # Override setUp to use default encoding (UTF-8) 231 def setUp(self): 232 self.tar = tarfile.open(self.tarname, mode=self.mode) 233 234 def test_list(self): 235 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 236 with support.swap_attr(sys, 'stdout', tio): 237 self.tar.list(verbose=False) 238 out = tio.detach().getvalue() 239 self.assertIn(b'ustar/conttype', out) 240 self.assertIn(b'ustar/regtype', out) 241 self.assertIn(b'ustar/lnktype', out) 242 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 243 self.assertIn(b'./ustar/linktest2/symtype', out) 244 self.assertIn(b'./ustar/linktest2/lnktype', out) 245 # Make sure it puts trailing slash for directory 246 self.assertIn(b'ustar/dirtype/', out) 247 self.assertIn(b'ustar/dirtype-with-size/', out) 248 # Make sure it is able to print unencodable characters 249 def conv(b): 250 s = b.decode(self.tar.encoding, 'surrogateescape') 251 return s.encode('ascii', 'backslashreplace') 252 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 256 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 257 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 258 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 259 # Make sure it prints files separated by one newline without any 260 # 'ls -l'-like accessories if verbose flag is not being used 261 # ... 262 # ustar/conttype 263 # ustar/regtype 264 # ... 265 self.assertRegex(out, br'ustar/conttype ?\r?\n' 266 br'ustar/regtype ?\r?\n') 267 # Make sure it does not print the source of link without verbose flag 268 self.assertNotIn(b'link to', out) 269 self.assertNotIn(b'->', out) 270 271 def test_list_verbose(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=True) 275 out = tio.detach().getvalue() 276 # Make sure it prints files separated by one newline with 'ls -l'-like 277 # accessories if verbose flag is being used 278 # ... 279 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 280 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 281 # ... 282 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 283 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 284 br'ustar/\w+type ?\r?\n') * 2) 285 # Make sure it prints the source of link with verbose flag 286 self.assertIn(b'ustar/symtype -> regtype', out) 287 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 288 self.assertIn(b'./ustar/linktest2/lnktype link to ' 289 b'./ustar/linktest1/regtype', out) 290 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 291 (b'/123' * 125) + b'/longname', out) 292 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 293 (b'/123' * 125) + b'/longname', out) 294 295 def test_list_members(self): 296 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 297 def members(tar): 298 for tarinfo in tar.getmembers(): 299 if 'reg' in tarinfo.name: 300 yield tarinfo 301 with support.swap_attr(sys, 'stdout', tio): 302 self.tar.list(verbose=False, members=members(self.tar)) 303 out = tio.detach().getvalue() 304 self.assertIn(b'ustar/regtype', out) 305 self.assertNotIn(b'ustar/conttype', out) 306 307 308class GzipListTest(GzipTest, ListTest): 309 pass 310 311 312class Bz2ListTest(Bz2Test, ListTest): 313 pass 314 315 316class LzmaListTest(LzmaTest, ListTest): 317 pass 318 319 320class CommonReadTest(ReadTest): 321 322 def test_empty_tarfile(self): 323 # Test for issue6123: Allow opening empty archives. 324 # This test checks if tarfile.open() is able to open an empty tar 325 # archive successfully. Note that an empty tar archive is not the 326 # same as an empty file! 327 with tarfile.open(tmpname, self.mode.replace("r", "w")): 328 pass 329 try: 330 tar = tarfile.open(tmpname, self.mode) 331 tar.getnames() 332 except tarfile.ReadError: 333 self.fail("tarfile.open() failed on empty archive") 334 else: 335 self.assertListEqual(tar.getmembers(), []) 336 finally: 337 tar.close() 338 339 def test_non_existent_tarfile(self): 340 # Test for issue11513: prevent non-existent gzipped tarfiles raising 341 # multiple exceptions. 342 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 343 tarfile.open("xxx", self.mode) 344 345 def test_null_tarfile(self): 346 # Test for issue6123: Allow opening empty archives. 347 # This test guarantees that tarfile.open() does not treat an empty 348 # file as an empty tar archive. 349 with open(tmpname, "wb"): 350 pass 351 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 352 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 353 354 def test_ignore_zeros(self): 355 # Test TarFile's ignore_zeros option. 356 # generate 512 pseudorandom bytes 357 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 358 for char in (b'\0', b'a'): 359 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 360 # are ignored correctly. 361 with self.open(tmpname, "w") as fobj: 362 fobj.write(char * 1024) 363 tarinfo = tarfile.TarInfo("foo") 364 tarinfo.size = len(data) 365 fobj.write(tarinfo.tobuf()) 366 fobj.write(data) 367 368 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 369 try: 370 self.assertListEqual(tar.getnames(), ["foo"], 371 "ignore_zeros=True should have skipped the %r-blocks" % 372 char) 373 finally: 374 tar.close() 375 376 def test_premature_end_of_archive(self): 377 for size in (512, 600, 1024, 1200): 378 with tarfile.open(tmpname, "w:") as tar: 379 t = tarfile.TarInfo("foo") 380 t.size = 1024 381 tar.addfile(t, io.BytesIO(b"a" * 1024)) 382 383 with open(tmpname, "r+b") as fobj: 384 fobj.truncate(size) 385 386 with tarfile.open(tmpname) as tar: 387 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 388 for t in tar: 389 pass 390 391 with tarfile.open(tmpname) as tar: 392 t = tar.next() 393 394 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 395 tar.extract(t, TEMPDIR) 396 397 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 398 tar.extractfile(t).read() 399 400 def test_length_zero_header(self): 401 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 402 # with an exception 403 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 404 with tarfile.open(support.findfile('recursion.tar')) as tar: 405 pass 406 407class MiscReadTestBase(CommonReadTest): 408 def requires_name_attribute(self): 409 pass 410 411 def test_no_name_argument(self): 412 self.requires_name_attribute() 413 with open(self.tarname, "rb") as fobj: 414 self.assertIsInstance(fobj.name, str) 415 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 416 self.assertIsInstance(tar.name, str) 417 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 418 419 def test_no_name_attribute(self): 420 with open(self.tarname, "rb") as fobj: 421 data = fobj.read() 422 fobj = io.BytesIO(data) 423 self.assertRaises(AttributeError, getattr, fobj, "name") 424 tar = tarfile.open(fileobj=fobj, mode=self.mode) 425 self.assertIsNone(tar.name) 426 427 def test_empty_name_attribute(self): 428 with open(self.tarname, "rb") as fobj: 429 data = fobj.read() 430 fobj = io.BytesIO(data) 431 fobj.name = "" 432 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 433 self.assertIsNone(tar.name) 434 435 def test_int_name_attribute(self): 436 # Issue 21044: tarfile.open() should handle fileobj with an integer 437 # 'name' attribute. 438 fd = os.open(self.tarname, os.O_RDONLY) 439 with open(fd, 'rb') as fobj: 440 self.assertIsInstance(fobj.name, int) 441 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 442 self.assertIsNone(tar.name) 443 444 def test_bytes_name_attribute(self): 445 self.requires_name_attribute() 446 tarname = os.fsencode(self.tarname) 447 with open(tarname, 'rb') as fobj: 448 self.assertIsInstance(fobj.name, bytes) 449 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 450 self.assertIsInstance(tar.name, bytes) 451 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 452 453 def test_pathlike_name(self): 454 tarname = pathlib.Path(self.tarname) 455 with tarfile.open(tarname, mode=self.mode) as tar: 456 self.assertIsInstance(tar.name, str) 457 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 458 with self.taropen(tarname) as tar: 459 self.assertIsInstance(tar.name, str) 460 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 461 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 462 self.assertIsInstance(tar.name, str) 463 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 464 if self.suffix == '': 465 with tarfile.TarFile(tarname, mode='r') as tar: 466 self.assertIsInstance(tar.name, str) 467 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 468 469 def test_illegal_mode_arg(self): 470 with open(tmpname, 'wb'): 471 pass 472 with self.assertRaisesRegex(ValueError, 'mode must be '): 473 tar = self.taropen(tmpname, 'q') 474 with self.assertRaisesRegex(ValueError, 'mode must be '): 475 tar = self.taropen(tmpname, 'rw') 476 with self.assertRaisesRegex(ValueError, 'mode must be '): 477 tar = self.taropen(tmpname, '') 478 479 def test_fileobj_with_offset(self): 480 # Skip the first member and store values from the second member 481 # of the testtar. 482 tar = tarfile.open(self.tarname, mode=self.mode) 483 try: 484 tar.next() 485 t = tar.next() 486 name = t.name 487 offset = t.offset 488 with tar.extractfile(t) as f: 489 data = f.read() 490 finally: 491 tar.close() 492 493 # Open the testtar and seek to the offset of the second member. 494 with self.open(self.tarname) as fobj: 495 fobj.seek(offset) 496 497 # Test if the tarfile starts with the second member. 498 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 499 t = tar.next() 500 self.assertEqual(t.name, name) 501 # Read to the end of fileobj and test if seeking back to the 502 # beginning works. 503 tar.getmembers() 504 self.assertEqual(tar.extractfile(t).read(), data, 505 "seek back did not work") 506 507 def test_fail_comp(self): 508 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 509 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 510 with open(tarname, "rb") as fobj: 511 self.assertRaises(tarfile.ReadError, tarfile.open, 512 fileobj=fobj, mode=self.mode) 513 514 def test_v7_dirtype(self): 515 # Test old style dirtype member (bug #1336623): 516 # Old V7 tars create directory members using an AREGTYPE 517 # header with a "/" appended to the filename field. 518 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 519 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 520 "v7 dirtype failed") 521 522 def test_xstar_type(self): 523 # The xstar format stores extra atime and ctime fields inside the 524 # space reserved for the prefix field. The prefix field must be 525 # ignored in this case, otherwise it will mess up the name. 526 try: 527 self.tar.getmember("misc/regtype-xstar") 528 except KeyError: 529 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 530 531 def test_check_members(self): 532 for tarinfo in self.tar: 533 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 534 "wrong mtime for %s" % tarinfo.name) 535 if not tarinfo.name.startswith("ustar/"): 536 continue 537 self.assertEqual(tarinfo.uname, "tarfile", 538 "wrong uname for %s" % tarinfo.name) 539 540 def test_find_members(self): 541 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 542 "could not find all members") 543 544 @unittest.skipUnless(hasattr(os, "link"), 545 "Missing hardlink implementation") 546 @support.skip_unless_symlink 547 def test_extract_hardlink(self): 548 # Test hardlink extraction (e.g. bug #857297). 549 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 550 tar.extract("ustar/regtype", TEMPDIR) 551 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 552 553 tar.extract("ustar/lnktype", TEMPDIR) 554 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 555 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 556 data = f.read() 557 self.assertEqual(sha256sum(data), sha256_regtype) 558 559 tar.extract("ustar/symtype", TEMPDIR) 560 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 561 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 562 data = f.read() 563 self.assertEqual(sha256sum(data), sha256_regtype) 564 565 def test_extractall(self): 566 # Test if extractall() correctly restores directory permissions 567 # and times (see issue1735). 568 tar = tarfile.open(tarname, encoding="iso8859-1") 569 DIR = os.path.join(TEMPDIR, "extractall") 570 os.mkdir(DIR) 571 try: 572 directories = [t for t in tar if t.isdir()] 573 tar.extractall(DIR, directories) 574 for tarinfo in directories: 575 path = os.path.join(DIR, tarinfo.name) 576 if sys.platform != "win32": 577 # Win32 has no support for fine grained permissions. 578 self.assertEqual(tarinfo.mode & 0o777, 579 os.stat(path).st_mode & 0o777) 580 def format_mtime(mtime): 581 if isinstance(mtime, float): 582 return "{} ({})".format(mtime, mtime.hex()) 583 else: 584 return "{!r} (int)".format(mtime) 585 file_mtime = os.path.getmtime(path) 586 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 587 format_mtime(tarinfo.mtime), 588 format_mtime(file_mtime), 589 path) 590 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 591 finally: 592 tar.close() 593 support.rmtree(DIR) 594 595 def test_extract_directory(self): 596 dirtype = "ustar/dirtype" 597 DIR = os.path.join(TEMPDIR, "extractdir") 598 os.mkdir(DIR) 599 try: 600 with tarfile.open(tarname, encoding="iso8859-1") as tar: 601 tarinfo = tar.getmember(dirtype) 602 tar.extract(tarinfo, path=DIR) 603 extracted = os.path.join(DIR, dirtype) 604 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 605 if sys.platform != "win32": 606 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 607 finally: 608 support.rmtree(DIR) 609 610 def test_extractall_pathlike_name(self): 611 DIR = pathlib.Path(TEMPDIR) / "extractall" 612 with support.temp_dir(DIR), \ 613 tarfile.open(tarname, encoding="iso8859-1") as tar: 614 directories = [t for t in tar if t.isdir()] 615 tar.extractall(DIR, directories) 616 for tarinfo in directories: 617 path = DIR / tarinfo.name 618 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 619 620 def test_extract_pathlike_name(self): 621 dirtype = "ustar/dirtype" 622 DIR = pathlib.Path(TEMPDIR) / "extractall" 623 with support.temp_dir(DIR), \ 624 tarfile.open(tarname, encoding="iso8859-1") as tar: 625 tarinfo = tar.getmember(dirtype) 626 tar.extract(tarinfo, path=DIR) 627 extracted = DIR / dirtype 628 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 629 630 def test_init_close_fobj(self): 631 # Issue #7341: Close the internal file object in the TarFile 632 # constructor in case of an error. For the test we rely on 633 # the fact that opening an empty file raises a ReadError. 634 empty = os.path.join(TEMPDIR, "empty") 635 with open(empty, "wb") as fobj: 636 fobj.write(b"") 637 638 try: 639 tar = object.__new__(tarfile.TarFile) 640 try: 641 tar.__init__(empty) 642 except tarfile.ReadError: 643 self.assertTrue(tar.fileobj.closed) 644 else: 645 self.fail("ReadError not raised") 646 finally: 647 support.unlink(empty) 648 649 def test_parallel_iteration(self): 650 # Issue #16601: Restarting iteration over tarfile continued 651 # from where it left off. 652 with tarfile.open(self.tarname) as tar: 653 for m1, m2 in zip(tar, tar): 654 self.assertEqual(m1.offset, m2.offset) 655 self.assertEqual(m1.get_info(), m2.get_info()) 656 657class MiscReadTest(MiscReadTestBase, unittest.TestCase): 658 test_fail_comp = None 659 660class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 661 pass 662 663class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 664 def requires_name_attribute(self): 665 self.skipTest("BZ2File have no name attribute") 666 667class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 668 def requires_name_attribute(self): 669 self.skipTest("LZMAFile have no name attribute") 670 671 672class StreamReadTest(CommonReadTest, unittest.TestCase): 673 674 prefix="r|" 675 676 def test_read_through(self): 677 # Issue #11224: A poorly designed _FileInFile.read() method 678 # caused seeking errors with stream tar files. 679 for tarinfo in self.tar: 680 if not tarinfo.isreg(): 681 continue 682 with self.tar.extractfile(tarinfo) as fobj: 683 while True: 684 try: 685 buf = fobj.read(512) 686 except tarfile.StreamError: 687 self.fail("simple read-through using " 688 "TarFile.extractfile() failed") 689 if not buf: 690 break 691 692 def test_fileobj_regular_file(self): 693 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 694 with self.tar.extractfile(tarinfo) as fobj: 695 data = fobj.read() 696 self.assertEqual(len(data), tarinfo.size, 697 "regular file extraction failed") 698 self.assertEqual(sha256sum(data), sha256_regtype, 699 "regular file extraction failed") 700 701 def test_provoke_stream_error(self): 702 tarinfos = self.tar.getmembers() 703 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 704 self.assertRaises(tarfile.StreamError, f.read) 705 706 def test_compare_members(self): 707 tar1 = tarfile.open(tarname, encoding="iso8859-1") 708 try: 709 tar2 = self.tar 710 711 while True: 712 t1 = tar1.next() 713 t2 = tar2.next() 714 if t1 is None: 715 break 716 self.assertIsNotNone(t2, "stream.next() failed.") 717 718 if t2.islnk() or t2.issym(): 719 with self.assertRaises(tarfile.StreamError): 720 tar2.extractfile(t2) 721 continue 722 723 v1 = tar1.extractfile(t1) 724 v2 = tar2.extractfile(t2) 725 if v1 is None: 726 continue 727 self.assertIsNotNone(v2, "stream.extractfile() failed") 728 self.assertEqual(v1.read(), v2.read(), 729 "stream extraction failed") 730 finally: 731 tar1.close() 732 733class GzipStreamReadTest(GzipTest, StreamReadTest): 734 pass 735 736class Bz2StreamReadTest(Bz2Test, StreamReadTest): 737 pass 738 739class LzmaStreamReadTest(LzmaTest, StreamReadTest): 740 pass 741 742 743class DetectReadTest(TarTest, unittest.TestCase): 744 def _testfunc_file(self, name, mode): 745 try: 746 tar = tarfile.open(name, mode) 747 except tarfile.ReadError as e: 748 self.fail() 749 else: 750 tar.close() 751 752 def _testfunc_fileobj(self, name, mode): 753 try: 754 with open(name, "rb") as f: 755 tar = tarfile.open(name, mode, fileobj=f) 756 except tarfile.ReadError as e: 757 self.fail() 758 else: 759 tar.close() 760 761 def _test_modes(self, testfunc): 762 if self.suffix: 763 with self.assertRaises(tarfile.ReadError): 764 tarfile.open(tarname, mode="r:" + self.suffix) 765 with self.assertRaises(tarfile.ReadError): 766 tarfile.open(tarname, mode="r|" + self.suffix) 767 with self.assertRaises(tarfile.ReadError): 768 tarfile.open(self.tarname, mode="r:") 769 with self.assertRaises(tarfile.ReadError): 770 tarfile.open(self.tarname, mode="r|") 771 testfunc(self.tarname, "r") 772 testfunc(self.tarname, "r:" + self.suffix) 773 testfunc(self.tarname, "r:*") 774 testfunc(self.tarname, "r|" + self.suffix) 775 testfunc(self.tarname, "r|*") 776 777 def test_detect_file(self): 778 self._test_modes(self._testfunc_file) 779 780 def test_detect_fileobj(self): 781 self._test_modes(self._testfunc_fileobj) 782 783class GzipDetectReadTest(GzipTest, DetectReadTest): 784 pass 785 786class Bz2DetectReadTest(Bz2Test, DetectReadTest): 787 def test_detect_stream_bz2(self): 788 # Originally, tarfile's stream detection looked for the string 789 # "BZh91" at the start of the file. This is incorrect because 790 # the '9' represents the blocksize (900,000 bytes). If the file was 791 # compressed using another blocksize autodetection fails. 792 with open(tarname, "rb") as fobj: 793 data = fobj.read() 794 795 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 796 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 797 fobj.write(data) 798 799 self._testfunc_file(tmpname, "r|*") 800 801class LzmaDetectReadTest(LzmaTest, DetectReadTest): 802 pass 803 804 805class MemberReadTest(ReadTest, unittest.TestCase): 806 807 def _test_member(self, tarinfo, chksum=None, **kwargs): 808 if chksum is not None: 809 with self.tar.extractfile(tarinfo) as f: 810 self.assertEqual(sha256sum(f.read()), chksum, 811 "wrong sha256sum for %s" % tarinfo.name) 812 813 kwargs["mtime"] = 0o7606136617 814 kwargs["uid"] = 1000 815 kwargs["gid"] = 100 816 if "old-v7" not in tarinfo.name: 817 # V7 tar can't handle alphabetic owners. 818 kwargs["uname"] = "tarfile" 819 kwargs["gname"] = "tarfile" 820 for k, v in kwargs.items(): 821 self.assertEqual(getattr(tarinfo, k), v, 822 "wrong value in %s field of %s" % (k, tarinfo.name)) 823 824 def test_find_regtype(self): 825 tarinfo = self.tar.getmember("ustar/regtype") 826 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 827 828 def test_find_conttype(self): 829 tarinfo = self.tar.getmember("ustar/conttype") 830 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 831 832 def test_find_dirtype(self): 833 tarinfo = self.tar.getmember("ustar/dirtype") 834 self._test_member(tarinfo, size=0) 835 836 def test_find_dirtype_with_size(self): 837 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 838 self._test_member(tarinfo, size=255) 839 840 def test_find_lnktype(self): 841 tarinfo = self.tar.getmember("ustar/lnktype") 842 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 843 844 def test_find_symtype(self): 845 tarinfo = self.tar.getmember("ustar/symtype") 846 self._test_member(tarinfo, size=0, linkname="regtype") 847 848 def test_find_blktype(self): 849 tarinfo = self.tar.getmember("ustar/blktype") 850 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 851 852 def test_find_chrtype(self): 853 tarinfo = self.tar.getmember("ustar/chrtype") 854 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 855 856 def test_find_fifotype(self): 857 tarinfo = self.tar.getmember("ustar/fifotype") 858 self._test_member(tarinfo, size=0) 859 860 def test_find_sparse(self): 861 tarinfo = self.tar.getmember("ustar/sparse") 862 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 863 864 def test_find_gnusparse(self): 865 tarinfo = self.tar.getmember("gnu/sparse") 866 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 867 868 def test_find_gnusparse_00(self): 869 tarinfo = self.tar.getmember("gnu/sparse-0.0") 870 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 871 872 def test_find_gnusparse_01(self): 873 tarinfo = self.tar.getmember("gnu/sparse-0.1") 874 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 875 876 def test_find_gnusparse_10(self): 877 tarinfo = self.tar.getmember("gnu/sparse-1.0") 878 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 879 880 def test_find_umlauts(self): 881 tarinfo = self.tar.getmember("ustar/umlauts-" 882 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 883 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 884 885 def test_find_ustar_longname(self): 886 name = "ustar/" + "12345/" * 39 + "1234567/longname" 887 self.assertIn(name, self.tar.getnames()) 888 889 def test_find_regtype_oldv7(self): 890 tarinfo = self.tar.getmember("misc/regtype-old-v7") 891 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 892 893 def test_find_pax_umlauts(self): 894 self.tar.close() 895 self.tar = tarfile.open(self.tarname, mode=self.mode, 896 encoding="iso8859-1") 897 tarinfo = self.tar.getmember("pax/umlauts-" 898 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 899 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 900 901 902class LongnameTest: 903 904 def test_read_longname(self): 905 # Test reading of longname (bug #1471427). 906 longname = self.subdir + "/" + "123/" * 125 + "longname" 907 try: 908 tarinfo = self.tar.getmember(longname) 909 except KeyError: 910 self.fail("longname not found") 911 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 912 "read longname as dirtype") 913 914 def test_read_longlink(self): 915 longname = self.subdir + "/" + "123/" * 125 + "longname" 916 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 917 try: 918 tarinfo = self.tar.getmember(longlink) 919 except KeyError: 920 self.fail("longlink not found") 921 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 922 923 def test_truncated_longname(self): 924 longname = self.subdir + "/" + "123/" * 125 + "longname" 925 tarinfo = self.tar.getmember(longname) 926 offset = tarinfo.offset 927 self.tar.fileobj.seek(offset) 928 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 929 with self.assertRaises(tarfile.ReadError): 930 tarfile.open(name="foo.tar", fileobj=fobj) 931 932 def test_header_offset(self): 933 # Test if the start offset of the TarInfo object includes 934 # the preceding extended header. 935 longname = self.subdir + "/" + "123/" * 125 + "longname" 936 offset = self.tar.getmember(longname).offset 937 with open(tarname, "rb") as fobj: 938 fobj.seek(offset) 939 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 940 "iso8859-1", "strict") 941 self.assertEqual(tarinfo.type, self.longnametype) 942 943 944class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 945 946 subdir = "gnu" 947 longnametype = tarfile.GNUTYPE_LONGNAME 948 949 # Since 3.2 tarfile is supposed to accurately restore sparse members and 950 # produce files with holes. This is what we actually want to test here. 951 # Unfortunately, not all platforms/filesystems support sparse files, and 952 # even on platforms that do it is non-trivial to make reliable assertions 953 # about holes in files. Therefore, we first do one basic test which works 954 # an all platforms, and after that a test that will work only on 955 # platforms/filesystems that prove to support sparse files. 956 def _test_sparse_file(self, name): 957 self.tar.extract(name, TEMPDIR) 958 filename = os.path.join(TEMPDIR, name) 959 with open(filename, "rb") as fobj: 960 data = fobj.read() 961 self.assertEqual(sha256sum(data), sha256_sparse, 962 "wrong sha256sum for %s" % name) 963 964 if self._fs_supports_holes(): 965 s = os.stat(filename) 966 self.assertLess(s.st_blocks * 512, s.st_size) 967 968 def test_sparse_file_old(self): 969 self._test_sparse_file("gnu/sparse") 970 971 def test_sparse_file_00(self): 972 self._test_sparse_file("gnu/sparse-0.0") 973 974 def test_sparse_file_01(self): 975 self._test_sparse_file("gnu/sparse-0.1") 976 977 def test_sparse_file_10(self): 978 self._test_sparse_file("gnu/sparse-1.0") 979 980 @staticmethod 981 def _fs_supports_holes(): 982 # Return True if the platform knows the st_blocks stat attribute and 983 # uses st_blocks units of 512 bytes, and if the filesystem is able to 984 # store holes of 4 KiB in files. 985 # 986 # The function returns False if page size is larger than 4 KiB. 987 # For example, ppc64 uses pages of 64 KiB. 988 if sys.platform.startswith("linux"): 989 # Linux evidentially has 512 byte st_blocks units. 990 name = os.path.join(TEMPDIR, "sparse-test") 991 with open(name, "wb") as fobj: 992 # Seek to "punch a hole" of 4 KiB 993 fobj.seek(4096) 994 fobj.write(b'x' * 4096) 995 fobj.truncate() 996 s = os.stat(name) 997 support.unlink(name) 998 return (s.st_blocks * 512 < s.st_size) 999 else: 1000 return False 1001 1002 1003class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1004 1005 subdir = "pax" 1006 longnametype = tarfile.XHDTYPE 1007 1008 def test_pax_global_headers(self): 1009 tar = tarfile.open(tarname, encoding="iso8859-1") 1010 try: 1011 tarinfo = tar.getmember("pax/regtype1") 1012 self.assertEqual(tarinfo.uname, "foo") 1013 self.assertEqual(tarinfo.gname, "bar") 1014 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1015 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1016 1017 tarinfo = tar.getmember("pax/regtype2") 1018 self.assertEqual(tarinfo.uname, "") 1019 self.assertEqual(tarinfo.gname, "bar") 1020 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1021 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1022 1023 tarinfo = tar.getmember("pax/regtype3") 1024 self.assertEqual(tarinfo.uname, "tarfile") 1025 self.assertEqual(tarinfo.gname, "tarfile") 1026 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1027 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1028 finally: 1029 tar.close() 1030 1031 def test_pax_number_fields(self): 1032 # All following number fields are read from the pax header. 1033 tar = tarfile.open(tarname, encoding="iso8859-1") 1034 try: 1035 tarinfo = tar.getmember("pax/regtype4") 1036 self.assertEqual(tarinfo.size, 7011) 1037 self.assertEqual(tarinfo.uid, 123) 1038 self.assertEqual(tarinfo.gid, 123) 1039 self.assertEqual(tarinfo.mtime, 1041808783.0) 1040 self.assertEqual(type(tarinfo.mtime), float) 1041 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1042 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1043 finally: 1044 tar.close() 1045 1046 1047class WriteTestBase(TarTest): 1048 # Put all write tests in here that are supposed to be tested 1049 # in all possible mode combinations. 1050 1051 def test_fileobj_no_close(self): 1052 fobj = io.BytesIO() 1053 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1054 tar.addfile(tarfile.TarInfo("foo")) 1055 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1056 # Issue #20238: Incomplete gzip output with mode="w:gz" 1057 data = fobj.getvalue() 1058 del tar 1059 support.gc_collect() 1060 self.assertFalse(fobj.closed) 1061 self.assertEqual(data, fobj.getvalue()) 1062 1063 def test_eof_marker(self): 1064 # Make sure an end of archive marker is written (two zero blocks). 1065 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1066 # So, we create an archive that has exactly 10240 bytes without the 1067 # marker, and has 20480 bytes once the marker is written. 1068 with tarfile.open(tmpname, self.mode) as tar: 1069 t = tarfile.TarInfo("foo") 1070 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1071 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1072 1073 with self.open(tmpname, "rb") as fobj: 1074 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1075 1076 1077class WriteTest(WriteTestBase, unittest.TestCase): 1078 1079 prefix = "w:" 1080 1081 def test_100_char_name(self): 1082 # The name field in a tar header stores strings of at most 100 chars. 1083 # If a string is shorter than 100 chars it has to be padded with '\0', 1084 # which implies that a string of exactly 100 chars is stored without 1085 # a trailing '\0'. 1086 name = "0123456789" * 10 1087 tar = tarfile.open(tmpname, self.mode) 1088 try: 1089 t = tarfile.TarInfo(name) 1090 tar.addfile(t) 1091 finally: 1092 tar.close() 1093 1094 tar = tarfile.open(tmpname) 1095 try: 1096 self.assertEqual(tar.getnames()[0], name, 1097 "failed to store 100 char filename") 1098 finally: 1099 tar.close() 1100 1101 def test_tar_size(self): 1102 # Test for bug #1013882. 1103 tar = tarfile.open(tmpname, self.mode) 1104 try: 1105 path = os.path.join(TEMPDIR, "file") 1106 with open(path, "wb") as fobj: 1107 fobj.write(b"aaa") 1108 tar.add(path) 1109 finally: 1110 tar.close() 1111 self.assertGreater(os.path.getsize(tmpname), 0, 1112 "tarfile is empty") 1113 1114 # The test_*_size tests test for bug #1167128. 1115 def test_file_size(self): 1116 tar = tarfile.open(tmpname, self.mode) 1117 try: 1118 path = os.path.join(TEMPDIR, "file") 1119 with open(path, "wb"): 1120 pass 1121 tarinfo = tar.gettarinfo(path) 1122 self.assertEqual(tarinfo.size, 0) 1123 1124 with open(path, "wb") as fobj: 1125 fobj.write(b"aaa") 1126 tarinfo = tar.gettarinfo(path) 1127 self.assertEqual(tarinfo.size, 3) 1128 finally: 1129 tar.close() 1130 1131 def test_directory_size(self): 1132 path = os.path.join(TEMPDIR, "directory") 1133 os.mkdir(path) 1134 try: 1135 tar = tarfile.open(tmpname, self.mode) 1136 try: 1137 tarinfo = tar.gettarinfo(path) 1138 self.assertEqual(tarinfo.size, 0) 1139 finally: 1140 tar.close() 1141 finally: 1142 support.rmdir(path) 1143 1144 # mock the following: 1145 # os.listdir: so we know that files are in the wrong order 1146 def test_ordered_recursion(self): 1147 path = os.path.join(TEMPDIR, "directory") 1148 os.mkdir(path) 1149 open(os.path.join(path, "1"), "a").close() 1150 open(os.path.join(path, "2"), "a").close() 1151 try: 1152 tar = tarfile.open(tmpname, self.mode) 1153 try: 1154 with unittest.mock.patch('os.listdir') as mock_listdir: 1155 mock_listdir.return_value = ["2", "1"] 1156 tar.add(path) 1157 paths = [] 1158 for m in tar.getmembers(): 1159 paths.append(os.path.split(m.name)[-1]) 1160 self.assertEqual(paths, ["directory", "1", "2"]); 1161 finally: 1162 tar.close() 1163 finally: 1164 support.unlink(os.path.join(path, "1")) 1165 support.unlink(os.path.join(path, "2")) 1166 support.rmdir(path) 1167 1168 def test_gettarinfo_pathlike_name(self): 1169 with tarfile.open(tmpname, self.mode) as tar: 1170 path = pathlib.Path(TEMPDIR) / "file" 1171 with open(path, "wb") as fobj: 1172 fobj.write(b"aaa") 1173 tarinfo = tar.gettarinfo(path) 1174 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1175 self.assertIsInstance(tarinfo.name, str) 1176 self.assertEqual(tarinfo.name, tarinfo2.name) 1177 self.assertEqual(tarinfo.size, 3) 1178 1179 @unittest.skipUnless(hasattr(os, "link"), 1180 "Missing hardlink implementation") 1181 def test_link_size(self): 1182 link = os.path.join(TEMPDIR, "link") 1183 target = os.path.join(TEMPDIR, "link_target") 1184 with open(target, "wb") as fobj: 1185 fobj.write(b"aaa") 1186 try: 1187 os.link(target, link) 1188 except PermissionError as e: 1189 self.skipTest('os.link(): %s' % e) 1190 try: 1191 tar = tarfile.open(tmpname, self.mode) 1192 try: 1193 # Record the link target in the inodes list. 1194 tar.gettarinfo(target) 1195 tarinfo = tar.gettarinfo(link) 1196 self.assertEqual(tarinfo.size, 0) 1197 finally: 1198 tar.close() 1199 finally: 1200 support.unlink(target) 1201 support.unlink(link) 1202 1203 @support.skip_unless_symlink 1204 def test_symlink_size(self): 1205 path = os.path.join(TEMPDIR, "symlink") 1206 os.symlink("link_target", path) 1207 try: 1208 tar = tarfile.open(tmpname, self.mode) 1209 try: 1210 tarinfo = tar.gettarinfo(path) 1211 self.assertEqual(tarinfo.size, 0) 1212 finally: 1213 tar.close() 1214 finally: 1215 support.unlink(path) 1216 1217 def test_add_self(self): 1218 # Test for #1257255. 1219 dstname = os.path.abspath(tmpname) 1220 tar = tarfile.open(tmpname, self.mode) 1221 try: 1222 self.assertEqual(tar.name, dstname, 1223 "archive name must be absolute") 1224 tar.add(dstname) 1225 self.assertEqual(tar.getnames(), [], 1226 "added the archive to itself") 1227 1228 with support.change_cwd(TEMPDIR): 1229 tar.add(dstname) 1230 self.assertEqual(tar.getnames(), [], 1231 "added the archive to itself") 1232 finally: 1233 tar.close() 1234 1235 def test_filter(self): 1236 tempdir = os.path.join(TEMPDIR, "filter") 1237 os.mkdir(tempdir) 1238 try: 1239 for name in ("foo", "bar", "baz"): 1240 name = os.path.join(tempdir, name) 1241 support.create_empty_file(name) 1242 1243 def filter(tarinfo): 1244 if os.path.basename(tarinfo.name) == "bar": 1245 return 1246 tarinfo.uid = 123 1247 tarinfo.uname = "foo" 1248 return tarinfo 1249 1250 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1251 try: 1252 tar.add(tempdir, arcname="empty_dir", filter=filter) 1253 finally: 1254 tar.close() 1255 1256 # Verify that filter is a keyword-only argument 1257 with self.assertRaises(TypeError): 1258 tar.add(tempdir, "empty_dir", True, None, filter) 1259 1260 tar = tarfile.open(tmpname, "r") 1261 try: 1262 for tarinfo in tar: 1263 self.assertEqual(tarinfo.uid, 123) 1264 self.assertEqual(tarinfo.uname, "foo") 1265 self.assertEqual(len(tar.getmembers()), 3) 1266 finally: 1267 tar.close() 1268 finally: 1269 support.rmtree(tempdir) 1270 1271 # Guarantee that stored pathnames are not modified. Don't 1272 # remove ./ or ../ or double slashes. Still make absolute 1273 # pathnames relative. 1274 # For details see bug #6054. 1275 def _test_pathname(self, path, cmp_path=None, dir=False): 1276 # Create a tarfile with an empty member named path 1277 # and compare the stored name with the original. 1278 foo = os.path.join(TEMPDIR, "foo") 1279 if not dir: 1280 support.create_empty_file(foo) 1281 else: 1282 os.mkdir(foo) 1283 1284 tar = tarfile.open(tmpname, self.mode) 1285 try: 1286 tar.add(foo, arcname=path) 1287 finally: 1288 tar.close() 1289 1290 tar = tarfile.open(tmpname, "r") 1291 try: 1292 t = tar.next() 1293 finally: 1294 tar.close() 1295 1296 if not dir: 1297 support.unlink(foo) 1298 else: 1299 support.rmdir(foo) 1300 1301 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1302 1303 1304 @support.skip_unless_symlink 1305 def test_extractall_symlinks(self): 1306 # Test if extractall works properly when tarfile contains symlinks 1307 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1308 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1309 os.mkdir(tempdir) 1310 try: 1311 source_file = os.path.join(tempdir,'source') 1312 target_file = os.path.join(tempdir,'symlink') 1313 with open(source_file,'w') as f: 1314 f.write('something\n') 1315 os.symlink(source_file, target_file) 1316 with tarfile.open(temparchive, 'w') as tar: 1317 tar.add(source_file, arcname="source") 1318 tar.add(target_file, arcname="symlink") 1319 # Let's extract it to the location which contains the symlink 1320 with tarfile.open(temparchive, errorlevel=2) as tar: 1321 # this should not raise OSError: [Errno 17] File exists 1322 try: 1323 tar.extractall(path=tempdir) 1324 except OSError: 1325 self.fail("extractall failed with symlinked files") 1326 finally: 1327 support.unlink(temparchive) 1328 support.rmtree(tempdir) 1329 1330 def test_pathnames(self): 1331 self._test_pathname("foo") 1332 self._test_pathname(os.path.join("foo", ".", "bar")) 1333 self._test_pathname(os.path.join("foo", "..", "bar")) 1334 self._test_pathname(os.path.join(".", "foo")) 1335 self._test_pathname(os.path.join(".", "foo", ".")) 1336 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1337 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1338 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1339 self._test_pathname(os.path.join("..", "foo")) 1340 self._test_pathname(os.path.join("..", "foo", "..")) 1341 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1342 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1343 1344 self._test_pathname("foo" + os.sep + os.sep + "bar") 1345 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1346 1347 def test_abs_pathnames(self): 1348 if sys.platform == "win32": 1349 self._test_pathname("C:\\foo", "foo") 1350 else: 1351 self._test_pathname("/foo", "foo") 1352 self._test_pathname("///foo", "foo") 1353 1354 def test_cwd(self): 1355 # Test adding the current working directory. 1356 with support.change_cwd(TEMPDIR): 1357 tar = tarfile.open(tmpname, self.mode) 1358 try: 1359 tar.add(".") 1360 finally: 1361 tar.close() 1362 1363 tar = tarfile.open(tmpname, "r") 1364 try: 1365 for t in tar: 1366 if t.name != ".": 1367 self.assertTrue(t.name.startswith("./"), t.name) 1368 finally: 1369 tar.close() 1370 1371 def test_open_nonwritable_fileobj(self): 1372 for exctype in OSError, EOFError, RuntimeError: 1373 class BadFile(io.BytesIO): 1374 first = True 1375 def write(self, data): 1376 if self.first: 1377 self.first = False 1378 raise exctype 1379 1380 f = BadFile() 1381 with self.assertRaises(exctype): 1382 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1383 format=tarfile.PAX_FORMAT, 1384 pax_headers={'non': 'empty'}) 1385 self.assertFalse(f.closed) 1386 1387 1388class GzipWriteTest(GzipTest, WriteTest): 1389 pass 1390 1391 1392class Bz2WriteTest(Bz2Test, WriteTest): 1393 pass 1394 1395 1396class LzmaWriteTest(LzmaTest, WriteTest): 1397 pass 1398 1399 1400class StreamWriteTest(WriteTestBase, unittest.TestCase): 1401 1402 prefix = "w|" 1403 decompressor = None 1404 1405 def test_stream_padding(self): 1406 # Test for bug #1543303. 1407 tar = tarfile.open(tmpname, self.mode) 1408 tar.close() 1409 if self.decompressor: 1410 dec = self.decompressor() 1411 with open(tmpname, "rb") as fobj: 1412 data = fobj.read() 1413 data = dec.decompress(data) 1414 self.assertFalse(dec.unused_data, "found trailing data") 1415 else: 1416 with self.open(tmpname) as fobj: 1417 data = fobj.read() 1418 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1419 "incorrect zero padding") 1420 1421 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1422 "Missing umask implementation") 1423 def test_file_mode(self): 1424 # Test for issue #8464: Create files with correct 1425 # permissions. 1426 if os.path.exists(tmpname): 1427 support.unlink(tmpname) 1428 1429 original_umask = os.umask(0o022) 1430 try: 1431 tar = tarfile.open(tmpname, self.mode) 1432 tar.close() 1433 mode = os.stat(tmpname).st_mode & 0o777 1434 self.assertEqual(mode, 0o644, "wrong file permissions") 1435 finally: 1436 os.umask(original_umask) 1437 1438 1439class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1440 def test_source_directory_not_leaked(self): 1441 """ 1442 Ensure the source directory is not included in the tar header 1443 per bpo-41316. 1444 """ 1445 tarfile.open(tmpname, self.mode).close() 1446 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1447 assert os.path.dirname(tmpname) not in payload 1448 1449 1450class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1451 decompressor = bz2.BZ2Decompressor if bz2 else None 1452 1453class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1454 decompressor = lzma.LZMADecompressor if lzma else None 1455 1456 1457class GNUWriteTest(unittest.TestCase): 1458 # This testcase checks for correct creation of GNU Longname 1459 # and Longlink extended headers (cp. bug #812325). 1460 1461 def _length(self, s): 1462 blocks = len(s) // 512 + 1 1463 return blocks * 512 1464 1465 def _calc_size(self, name, link=None): 1466 # Initial tar header 1467 count = 512 1468 1469 if len(name) > tarfile.LENGTH_NAME: 1470 # GNU longname extended header + longname 1471 count += 512 1472 count += self._length(name) 1473 if link is not None and len(link) > tarfile.LENGTH_LINK: 1474 # GNU longlink extended header + longlink 1475 count += 512 1476 count += self._length(link) 1477 return count 1478 1479 def _test(self, name, link=None): 1480 tarinfo = tarfile.TarInfo(name) 1481 if link: 1482 tarinfo.linkname = link 1483 tarinfo.type = tarfile.LNKTYPE 1484 1485 tar = tarfile.open(tmpname, "w") 1486 try: 1487 tar.format = tarfile.GNU_FORMAT 1488 tar.addfile(tarinfo) 1489 1490 v1 = self._calc_size(name, link) 1491 v2 = tar.offset 1492 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1493 finally: 1494 tar.close() 1495 1496 tar = tarfile.open(tmpname) 1497 try: 1498 member = tar.next() 1499 self.assertIsNotNone(member, 1500 "unable to read longname member") 1501 self.assertEqual(tarinfo.name, member.name, 1502 "unable to read longname member") 1503 self.assertEqual(tarinfo.linkname, member.linkname, 1504 "unable to read longname member") 1505 finally: 1506 tar.close() 1507 1508 def test_longname_1023(self): 1509 self._test(("longnam/" * 127) + "longnam") 1510 1511 def test_longname_1024(self): 1512 self._test(("longnam/" * 127) + "longname") 1513 1514 def test_longname_1025(self): 1515 self._test(("longnam/" * 127) + "longname_") 1516 1517 def test_longlink_1023(self): 1518 self._test("name", ("longlnk/" * 127) + "longlnk") 1519 1520 def test_longlink_1024(self): 1521 self._test("name", ("longlnk/" * 127) + "longlink") 1522 1523 def test_longlink_1025(self): 1524 self._test("name", ("longlnk/" * 127) + "longlink_") 1525 1526 def test_longnamelink_1023(self): 1527 self._test(("longnam/" * 127) + "longnam", 1528 ("longlnk/" * 127) + "longlnk") 1529 1530 def test_longnamelink_1024(self): 1531 self._test(("longnam/" * 127) + "longname", 1532 ("longlnk/" * 127) + "longlink") 1533 1534 def test_longnamelink_1025(self): 1535 self._test(("longnam/" * 127) + "longname_", 1536 ("longlnk/" * 127) + "longlink_") 1537 1538 1539class CreateTest(WriteTestBase, unittest.TestCase): 1540 1541 prefix = "x:" 1542 1543 file_path = os.path.join(TEMPDIR, "spameggs42") 1544 1545 def setUp(self): 1546 support.unlink(tmpname) 1547 1548 @classmethod 1549 def setUpClass(cls): 1550 with open(cls.file_path, "wb") as fobj: 1551 fobj.write(b"aaa") 1552 1553 @classmethod 1554 def tearDownClass(cls): 1555 support.unlink(cls.file_path) 1556 1557 def test_create(self): 1558 with tarfile.open(tmpname, self.mode) as tobj: 1559 tobj.add(self.file_path) 1560 1561 with self.taropen(tmpname) as tobj: 1562 names = tobj.getnames() 1563 self.assertEqual(len(names), 1) 1564 self.assertIn('spameggs42', names[0]) 1565 1566 def test_create_existing(self): 1567 with tarfile.open(tmpname, self.mode) as tobj: 1568 tobj.add(self.file_path) 1569 1570 with self.assertRaises(FileExistsError): 1571 tobj = tarfile.open(tmpname, self.mode) 1572 1573 with self.taropen(tmpname) as tobj: 1574 names = tobj.getnames() 1575 self.assertEqual(len(names), 1) 1576 self.assertIn('spameggs42', names[0]) 1577 1578 def test_create_taropen(self): 1579 with self.taropen(tmpname, "x") as tobj: 1580 tobj.add(self.file_path) 1581 1582 with self.taropen(tmpname) as tobj: 1583 names = tobj.getnames() 1584 self.assertEqual(len(names), 1) 1585 self.assertIn('spameggs42', names[0]) 1586 1587 def test_create_existing_taropen(self): 1588 with self.taropen(tmpname, "x") as tobj: 1589 tobj.add(self.file_path) 1590 1591 with self.assertRaises(FileExistsError): 1592 with self.taropen(tmpname, "x"): 1593 pass 1594 1595 with self.taropen(tmpname) as tobj: 1596 names = tobj.getnames() 1597 self.assertEqual(len(names), 1) 1598 self.assertIn("spameggs42", names[0]) 1599 1600 def test_create_pathlike_name(self): 1601 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1602 self.assertIsInstance(tobj.name, str) 1603 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1604 tobj.add(pathlib.Path(self.file_path)) 1605 names = tobj.getnames() 1606 self.assertEqual(len(names), 1) 1607 self.assertIn('spameggs42', names[0]) 1608 1609 with self.taropen(tmpname) as tobj: 1610 names = tobj.getnames() 1611 self.assertEqual(len(names), 1) 1612 self.assertIn('spameggs42', names[0]) 1613 1614 def test_create_taropen_pathlike_name(self): 1615 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1616 self.assertIsInstance(tobj.name, str) 1617 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1618 tobj.add(pathlib.Path(self.file_path)) 1619 names = tobj.getnames() 1620 self.assertEqual(len(names), 1) 1621 self.assertIn('spameggs42', names[0]) 1622 1623 with self.taropen(tmpname) as tobj: 1624 names = tobj.getnames() 1625 self.assertEqual(len(names), 1) 1626 self.assertIn('spameggs42', names[0]) 1627 1628 1629class GzipCreateTest(GzipTest, CreateTest): 1630 pass 1631 1632 1633class Bz2CreateTest(Bz2Test, CreateTest): 1634 pass 1635 1636 1637class LzmaCreateTest(LzmaTest, CreateTest): 1638 pass 1639 1640 1641class CreateWithXModeTest(CreateTest): 1642 1643 prefix = "x" 1644 1645 test_create_taropen = None 1646 test_create_existing_taropen = None 1647 1648 1649@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1650class HardlinkTest(unittest.TestCase): 1651 # Test the creation of LNKTYPE (hardlink) members in an archive. 1652 1653 def setUp(self): 1654 self.foo = os.path.join(TEMPDIR, "foo") 1655 self.bar = os.path.join(TEMPDIR, "bar") 1656 1657 with open(self.foo, "wb") as fobj: 1658 fobj.write(b"foo") 1659 1660 try: 1661 os.link(self.foo, self.bar) 1662 except PermissionError as e: 1663 self.skipTest('os.link(): %s' % e) 1664 1665 self.tar = tarfile.open(tmpname, "w") 1666 self.tar.add(self.foo) 1667 1668 def tearDown(self): 1669 self.tar.close() 1670 support.unlink(self.foo) 1671 support.unlink(self.bar) 1672 1673 def test_add_twice(self): 1674 # The same name will be added as a REGTYPE every 1675 # time regardless of st_nlink. 1676 tarinfo = self.tar.gettarinfo(self.foo) 1677 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1678 "add file as regular failed") 1679 1680 def test_add_hardlink(self): 1681 tarinfo = self.tar.gettarinfo(self.bar) 1682 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1683 "add file as hardlink failed") 1684 1685 def test_dereference_hardlink(self): 1686 self.tar.dereference = True 1687 tarinfo = self.tar.gettarinfo(self.bar) 1688 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1689 "dereferencing hardlink failed") 1690 1691 1692class PaxWriteTest(GNUWriteTest): 1693 1694 def _test(self, name, link=None): 1695 # See GNUWriteTest. 1696 tarinfo = tarfile.TarInfo(name) 1697 if link: 1698 tarinfo.linkname = link 1699 tarinfo.type = tarfile.LNKTYPE 1700 1701 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1702 try: 1703 tar.addfile(tarinfo) 1704 finally: 1705 tar.close() 1706 1707 tar = tarfile.open(tmpname) 1708 try: 1709 if link: 1710 l = tar.getmembers()[0].linkname 1711 self.assertEqual(link, l, "PAX longlink creation failed") 1712 else: 1713 n = tar.getmembers()[0].name 1714 self.assertEqual(name, n, "PAX longname creation failed") 1715 finally: 1716 tar.close() 1717 1718 def test_pax_global_header(self): 1719 pax_headers = { 1720 "foo": "bar", 1721 "uid": "0", 1722 "mtime": "1.23", 1723 "test": "\xe4\xf6\xfc", 1724 "\xe4\xf6\xfc": "test"} 1725 1726 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1727 pax_headers=pax_headers) 1728 try: 1729 tar.addfile(tarfile.TarInfo("test")) 1730 finally: 1731 tar.close() 1732 1733 # Test if the global header was written correctly. 1734 tar = tarfile.open(tmpname, encoding="iso8859-1") 1735 try: 1736 self.assertEqual(tar.pax_headers, pax_headers) 1737 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1738 # Test if all the fields are strings. 1739 for key, val in tar.pax_headers.items(): 1740 self.assertIsNot(type(key), bytes) 1741 self.assertIsNot(type(val), bytes) 1742 if key in tarfile.PAX_NUMBER_FIELDS: 1743 try: 1744 tarfile.PAX_NUMBER_FIELDS[key](val) 1745 except (TypeError, ValueError): 1746 self.fail("unable to convert pax header field") 1747 finally: 1748 tar.close() 1749 1750 def test_pax_extended_header(self): 1751 # The fields from the pax header have priority over the 1752 # TarInfo. 1753 pax_headers = {"path": "foo", "uid": "123"} 1754 1755 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1756 encoding="iso8859-1") 1757 try: 1758 t = tarfile.TarInfo() 1759 t.name = "\xe4\xf6\xfc" # non-ASCII 1760 t.uid = 8**8 # too large 1761 t.pax_headers = pax_headers 1762 tar.addfile(t) 1763 finally: 1764 tar.close() 1765 1766 tar = tarfile.open(tmpname, encoding="iso8859-1") 1767 try: 1768 t = tar.getmembers()[0] 1769 self.assertEqual(t.pax_headers, pax_headers) 1770 self.assertEqual(t.name, "foo") 1771 self.assertEqual(t.uid, 123) 1772 finally: 1773 tar.close() 1774 1775 1776class UnicodeTest: 1777 1778 def test_iso8859_1_filename(self): 1779 self._test_unicode_filename("iso8859-1") 1780 1781 def test_utf7_filename(self): 1782 self._test_unicode_filename("utf7") 1783 1784 def test_utf8_filename(self): 1785 self._test_unicode_filename("utf-8") 1786 1787 def _test_unicode_filename(self, encoding): 1788 tar = tarfile.open(tmpname, "w", format=self.format, 1789 encoding=encoding, errors="strict") 1790 try: 1791 name = "\xe4\xf6\xfc" 1792 tar.addfile(tarfile.TarInfo(name)) 1793 finally: 1794 tar.close() 1795 1796 tar = tarfile.open(tmpname, encoding=encoding) 1797 try: 1798 self.assertEqual(tar.getmembers()[0].name, name) 1799 finally: 1800 tar.close() 1801 1802 def test_unicode_filename_error(self): 1803 tar = tarfile.open(tmpname, "w", format=self.format, 1804 encoding="ascii", errors="strict") 1805 try: 1806 tarinfo = tarfile.TarInfo() 1807 1808 tarinfo.name = "\xe4\xf6\xfc" 1809 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1810 1811 tarinfo.name = "foo" 1812 tarinfo.uname = "\xe4\xf6\xfc" 1813 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1814 finally: 1815 tar.close() 1816 1817 def test_unicode_argument(self): 1818 tar = tarfile.open(tarname, "r", 1819 encoding="iso8859-1", errors="strict") 1820 try: 1821 for t in tar: 1822 self.assertIs(type(t.name), str) 1823 self.assertIs(type(t.linkname), str) 1824 self.assertIs(type(t.uname), str) 1825 self.assertIs(type(t.gname), str) 1826 finally: 1827 tar.close() 1828 1829 def test_uname_unicode(self): 1830 t = tarfile.TarInfo("foo") 1831 t.uname = "\xe4\xf6\xfc" 1832 t.gname = "\xe4\xf6\xfc" 1833 1834 tar = tarfile.open(tmpname, mode="w", format=self.format, 1835 encoding="iso8859-1") 1836 try: 1837 tar.addfile(t) 1838 finally: 1839 tar.close() 1840 1841 tar = tarfile.open(tmpname, encoding="iso8859-1") 1842 try: 1843 t = tar.getmember("foo") 1844 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1845 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1846 1847 if self.format != tarfile.PAX_FORMAT: 1848 tar.close() 1849 tar = tarfile.open(tmpname, encoding="ascii") 1850 t = tar.getmember("foo") 1851 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1852 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1853 finally: 1854 tar.close() 1855 1856 1857class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1858 1859 format = tarfile.USTAR_FORMAT 1860 1861 # Test whether the utf-8 encoded version of a filename exceeds the 100 1862 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1863 # bytes). 1864 def test_unicode_name1(self): 1865 self._test_ustar_name("0123456789" * 10) 1866 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1867 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1868 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1869 1870 def test_unicode_name2(self): 1871 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1872 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1873 1874 # Test whether the utf-8 encoded version of a filename exceeds the 155 1875 # bytes prefix + '/' + 100 bytes name limit. 1876 def test_unicode_longname1(self): 1877 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1878 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1879 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1880 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1881 1882 def test_unicode_longname2(self): 1883 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1884 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1885 1886 def test_unicode_longname3(self): 1887 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1888 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1889 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1890 1891 def test_unicode_longname4(self): 1892 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1893 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1894 1895 def _test_ustar_name(self, name, exc=None): 1896 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1897 t = tarfile.TarInfo(name) 1898 if exc is None: 1899 tar.addfile(t) 1900 else: 1901 self.assertRaises(exc, tar.addfile, t) 1902 1903 if exc is None: 1904 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1905 for t in tar: 1906 self.assertEqual(name, t.name) 1907 break 1908 1909 # Test the same as above for the 100 bytes link field. 1910 def test_unicode_link1(self): 1911 self._test_ustar_link("0123456789" * 10) 1912 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1913 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1914 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1915 1916 def test_unicode_link2(self): 1917 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1918 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1919 1920 def _test_ustar_link(self, name, exc=None): 1921 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1922 t = tarfile.TarInfo("foo") 1923 t.linkname = name 1924 if exc is None: 1925 tar.addfile(t) 1926 else: 1927 self.assertRaises(exc, tar.addfile, t) 1928 1929 if exc is None: 1930 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1931 for t in tar: 1932 self.assertEqual(name, t.linkname) 1933 break 1934 1935 1936class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1937 1938 format = tarfile.GNU_FORMAT 1939 1940 def test_bad_pax_header(self): 1941 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1942 # without a hdrcharset=BINARY header. 1943 for encoding, name in ( 1944 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1945 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1946 with tarfile.open(tarname, encoding=encoding, 1947 errors="surrogateescape") as tar: 1948 try: 1949 t = tar.getmember(name) 1950 except KeyError: 1951 self.fail("unable to read bad GNU tar pax header") 1952 1953 1954class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1955 1956 format = tarfile.PAX_FORMAT 1957 1958 # PAX_FORMAT ignores encoding in write mode. 1959 test_unicode_filename_error = None 1960 1961 def test_binary_header(self): 1962 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1963 for encoding, name in ( 1964 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1965 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1966 with tarfile.open(tarname, encoding=encoding, 1967 errors="surrogateescape") as tar: 1968 try: 1969 t = tar.getmember(name) 1970 except KeyError: 1971 self.fail("unable to read POSIX.1-2008 binary header") 1972 1973 1974class AppendTestBase: 1975 # Test append mode (cp. patch #1652681). 1976 1977 def setUp(self): 1978 self.tarname = tmpname 1979 if os.path.exists(self.tarname): 1980 support.unlink(self.tarname) 1981 1982 def _create_testtar(self, mode="w:"): 1983 with tarfile.open(tarname, encoding="iso8859-1") as src: 1984 t = src.getmember("ustar/regtype") 1985 t.name = "foo" 1986 with src.extractfile(t) as f: 1987 with tarfile.open(self.tarname, mode) as tar: 1988 tar.addfile(t, f) 1989 1990 def test_append_compressed(self): 1991 self._create_testtar("w:" + self.suffix) 1992 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1993 1994class AppendTest(AppendTestBase, unittest.TestCase): 1995 test_append_compressed = None 1996 1997 def _add_testfile(self, fileobj=None): 1998 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1999 tar.addfile(tarfile.TarInfo("bar")) 2000 2001 def _test(self, names=["bar"], fileobj=None): 2002 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2003 self.assertEqual(tar.getnames(), names) 2004 2005 def test_non_existing(self): 2006 self._add_testfile() 2007 self._test() 2008 2009 def test_empty(self): 2010 tarfile.open(self.tarname, "w:").close() 2011 self._add_testfile() 2012 self._test() 2013 2014 def test_empty_fileobj(self): 2015 fobj = io.BytesIO(b"\0" * 1024) 2016 self._add_testfile(fobj) 2017 fobj.seek(0) 2018 self._test(fileobj=fobj) 2019 2020 def test_fileobj(self): 2021 self._create_testtar() 2022 with open(self.tarname, "rb") as fobj: 2023 data = fobj.read() 2024 fobj = io.BytesIO(data) 2025 self._add_testfile(fobj) 2026 fobj.seek(0) 2027 self._test(names=["foo", "bar"], fileobj=fobj) 2028 2029 def test_existing(self): 2030 self._create_testtar() 2031 self._add_testfile() 2032 self._test(names=["foo", "bar"]) 2033 2034 # Append mode is supposed to fail if the tarfile to append to 2035 # does not end with a zero block. 2036 def _test_error(self, data): 2037 with open(self.tarname, "wb") as fobj: 2038 fobj.write(data) 2039 self.assertRaises(tarfile.ReadError, self._add_testfile) 2040 2041 def test_null(self): 2042 self._test_error(b"") 2043 2044 def test_incomplete(self): 2045 self._test_error(b"\0" * 13) 2046 2047 def test_premature_eof(self): 2048 data = tarfile.TarInfo("foo").tobuf() 2049 self._test_error(data) 2050 2051 def test_trailing_garbage(self): 2052 data = tarfile.TarInfo("foo").tobuf() 2053 self._test_error(data + b"\0" * 13) 2054 2055 def test_invalid(self): 2056 self._test_error(b"a" * 512) 2057 2058class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2059 pass 2060 2061class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2062 pass 2063 2064class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2065 pass 2066 2067 2068class LimitsTest(unittest.TestCase): 2069 2070 def test_ustar_limits(self): 2071 # 100 char name 2072 tarinfo = tarfile.TarInfo("0123456789" * 10) 2073 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2074 2075 # 101 char name that cannot be stored 2076 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2077 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2078 2079 # 256 char name with a slash at pos 156 2080 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2081 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2082 2083 # 256 char name that cannot be stored 2084 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2085 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2086 2087 # 512 char name 2088 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2089 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2090 2091 # 512 char linkname 2092 tarinfo = tarfile.TarInfo("longlink") 2093 tarinfo.linkname = "123/" * 126 + "longname" 2094 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2095 2096 # uid > 8 digits 2097 tarinfo = tarfile.TarInfo("name") 2098 tarinfo.uid = 0o10000000 2099 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2100 2101 def test_gnu_limits(self): 2102 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2103 tarinfo.tobuf(tarfile.GNU_FORMAT) 2104 2105 tarinfo = tarfile.TarInfo("longlink") 2106 tarinfo.linkname = "123/" * 126 + "longname" 2107 tarinfo.tobuf(tarfile.GNU_FORMAT) 2108 2109 # uid >= 256 ** 7 2110 tarinfo = tarfile.TarInfo("name") 2111 tarinfo.uid = 0o4000000000000000000 2112 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2113 2114 def test_pax_limits(self): 2115 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2116 tarinfo.tobuf(tarfile.PAX_FORMAT) 2117 2118 tarinfo = tarfile.TarInfo("longlink") 2119 tarinfo.linkname = "123/" * 126 + "longname" 2120 tarinfo.tobuf(tarfile.PAX_FORMAT) 2121 2122 tarinfo = tarfile.TarInfo("name") 2123 tarinfo.uid = 0o4000000000000000000 2124 tarinfo.tobuf(tarfile.PAX_FORMAT) 2125 2126 2127class MiscTest(unittest.TestCase): 2128 2129 def test_char_fields(self): 2130 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2131 b"foo\0\0\0\0\0") 2132 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2133 b"foo") 2134 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2135 "foo") 2136 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2137 "foo") 2138 2139 def test_read_number_fields(self): 2140 # Issue 13158: Test if GNU tar specific base-256 number fields 2141 # are decoded correctly. 2142 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2143 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2144 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2145 0o10000000) 2146 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2147 0xffffffff) 2148 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2149 -1) 2150 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2151 -100) 2152 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2153 -0x100000000000000) 2154 2155 # Issue 24514: Test if empty number fields are converted to zero. 2156 self.assertEqual(tarfile.nti(b"\0"), 0) 2157 self.assertEqual(tarfile.nti(b" \0"), 0) 2158 2159 def test_write_number_fields(self): 2160 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2161 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2162 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2163 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2164 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2165 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2166 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2167 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2168 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2169 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2170 self.assertEqual(tarfile.itn(-0x100000000000000, 2171 format=tarfile.GNU_FORMAT), 2172 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2173 2174 # Issue 32713: Test if itn() supports float values outside the 2175 # non-GNU format range 2176 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2177 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2178 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2179 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2180 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2181 2182 def test_number_field_limits(self): 2183 with self.assertRaises(ValueError): 2184 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2185 with self.assertRaises(ValueError): 2186 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2187 with self.assertRaises(ValueError): 2188 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2189 with self.assertRaises(ValueError): 2190 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2191 2192 def test__all__(self): 2193 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2194 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2195 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2196 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2197 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2198 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2199 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2200 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2201 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2202 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2203 'filemode', 2204 'EmptyHeaderError', 'TruncatedHeaderError', 2205 'EOFHeaderError', 'InvalidHeaderError', 2206 'SubsequentHeaderError', 'ExFileObject', 2207 'main'} 2208 support.check__all__(self, tarfile, blacklist=blacklist) 2209 2210 2211class CommandLineTest(unittest.TestCase): 2212 2213 def tarfilecmd(self, *args, **kwargs): 2214 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2215 **kwargs) 2216 return out.replace(os.linesep.encode(), b'\n') 2217 2218 def tarfilecmd_failure(self, *args): 2219 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2220 2221 def make_simple_tarfile(self, tar_name): 2222 files = [support.findfile('tokenize_tests.txt'), 2223 support.findfile('tokenize_tests-no-coding-cookie-' 2224 'and-utf8-bom-sig-only.txt')] 2225 self.addCleanup(support.unlink, tar_name) 2226 with tarfile.open(tar_name, 'w') as tf: 2227 for tardata in files: 2228 tf.add(tardata, arcname=os.path.basename(tardata)) 2229 2230 def test_bad_use(self): 2231 rc, out, err = self.tarfilecmd_failure() 2232 self.assertEqual(out, b'') 2233 self.assertIn(b'usage', err.lower()) 2234 self.assertIn(b'error', err.lower()) 2235 self.assertIn(b'required', err.lower()) 2236 rc, out, err = self.tarfilecmd_failure('-l', '') 2237 self.assertEqual(out, b'') 2238 self.assertNotEqual(err.strip(), b'') 2239 2240 def test_test_command(self): 2241 for tar_name in testtarnames: 2242 for opt in '-t', '--test': 2243 out = self.tarfilecmd(opt, tar_name) 2244 self.assertEqual(out, b'') 2245 2246 def test_test_command_verbose(self): 2247 for tar_name in testtarnames: 2248 for opt in '-v', '--verbose': 2249 out = self.tarfilecmd(opt, '-t', tar_name) 2250 self.assertIn(b'is a tar archive.\n', out) 2251 2252 def test_test_command_invalid_file(self): 2253 zipname = support.findfile('zipdir.zip') 2254 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2255 self.assertIn(b' is not a tar archive.', err) 2256 self.assertEqual(out, b'') 2257 self.assertEqual(rc, 1) 2258 2259 for tar_name in testtarnames: 2260 with self.subTest(tar_name=tar_name): 2261 with open(tar_name, 'rb') as f: 2262 data = f.read() 2263 try: 2264 with open(tmpname, 'wb') as f: 2265 f.write(data[:511]) 2266 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2267 self.assertEqual(out, b'') 2268 self.assertEqual(rc, 1) 2269 finally: 2270 support.unlink(tmpname) 2271 2272 def test_list_command(self): 2273 for tar_name in testtarnames: 2274 with support.captured_stdout() as t: 2275 with tarfile.open(tar_name, 'r') as tf: 2276 tf.list(verbose=False) 2277 expected = t.getvalue().encode('ascii', 'backslashreplace') 2278 for opt in '-l', '--list': 2279 out = self.tarfilecmd(opt, tar_name, 2280 PYTHONIOENCODING='ascii') 2281 self.assertEqual(out, expected) 2282 2283 def test_list_command_verbose(self): 2284 for tar_name in testtarnames: 2285 with support.captured_stdout() as t: 2286 with tarfile.open(tar_name, 'r') as tf: 2287 tf.list(verbose=True) 2288 expected = t.getvalue().encode('ascii', 'backslashreplace') 2289 for opt in '-v', '--verbose': 2290 out = self.tarfilecmd(opt, '-l', tar_name, 2291 PYTHONIOENCODING='ascii') 2292 self.assertEqual(out, expected) 2293 2294 def test_list_command_invalid_file(self): 2295 zipname = support.findfile('zipdir.zip') 2296 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2297 self.assertIn(b' is not a tar archive.', err) 2298 self.assertEqual(out, b'') 2299 self.assertEqual(rc, 1) 2300 2301 def test_create_command(self): 2302 files = [support.findfile('tokenize_tests.txt'), 2303 support.findfile('tokenize_tests-no-coding-cookie-' 2304 'and-utf8-bom-sig-only.txt')] 2305 for opt in '-c', '--create': 2306 try: 2307 out = self.tarfilecmd(opt, tmpname, *files) 2308 self.assertEqual(out, b'') 2309 with tarfile.open(tmpname) as tar: 2310 tar.getmembers() 2311 finally: 2312 support.unlink(tmpname) 2313 2314 def test_create_command_verbose(self): 2315 files = [support.findfile('tokenize_tests.txt'), 2316 support.findfile('tokenize_tests-no-coding-cookie-' 2317 'and-utf8-bom-sig-only.txt')] 2318 for opt in '-v', '--verbose': 2319 try: 2320 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2321 self.assertIn(b' file created.', out) 2322 with tarfile.open(tmpname) as tar: 2323 tar.getmembers() 2324 finally: 2325 support.unlink(tmpname) 2326 2327 def test_create_command_dotless_filename(self): 2328 files = [support.findfile('tokenize_tests.txt')] 2329 try: 2330 out = self.tarfilecmd('-c', dotlessname, *files) 2331 self.assertEqual(out, b'') 2332 with tarfile.open(dotlessname) as tar: 2333 tar.getmembers() 2334 finally: 2335 support.unlink(dotlessname) 2336 2337 def test_create_command_dot_started_filename(self): 2338 tar_name = os.path.join(TEMPDIR, ".testtar") 2339 files = [support.findfile('tokenize_tests.txt')] 2340 try: 2341 out = self.tarfilecmd('-c', tar_name, *files) 2342 self.assertEqual(out, b'') 2343 with tarfile.open(tar_name) as tar: 2344 tar.getmembers() 2345 finally: 2346 support.unlink(tar_name) 2347 2348 def test_create_command_compressed(self): 2349 files = [support.findfile('tokenize_tests.txt'), 2350 support.findfile('tokenize_tests-no-coding-cookie-' 2351 'and-utf8-bom-sig-only.txt')] 2352 for filetype in (GzipTest, Bz2Test, LzmaTest): 2353 if not filetype.open: 2354 continue 2355 try: 2356 tar_name = tmpname + '.' + filetype.suffix 2357 out = self.tarfilecmd('-c', tar_name, *files) 2358 with filetype.taropen(tar_name) as tar: 2359 tar.getmembers() 2360 finally: 2361 support.unlink(tar_name) 2362 2363 def test_extract_command(self): 2364 self.make_simple_tarfile(tmpname) 2365 for opt in '-e', '--extract': 2366 try: 2367 with support.temp_cwd(tarextdir): 2368 out = self.tarfilecmd(opt, tmpname) 2369 self.assertEqual(out, b'') 2370 finally: 2371 support.rmtree(tarextdir) 2372 2373 def test_extract_command_verbose(self): 2374 self.make_simple_tarfile(tmpname) 2375 for opt in '-v', '--verbose': 2376 try: 2377 with support.temp_cwd(tarextdir): 2378 out = self.tarfilecmd(opt, '-e', tmpname) 2379 self.assertIn(b' file is extracted.', out) 2380 finally: 2381 support.rmtree(tarextdir) 2382 2383 def test_extract_command_different_directory(self): 2384 self.make_simple_tarfile(tmpname) 2385 try: 2386 with support.temp_cwd(tarextdir): 2387 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2388 self.assertEqual(out, b'') 2389 finally: 2390 support.rmtree(tarextdir) 2391 2392 def test_extract_command_invalid_file(self): 2393 zipname = support.findfile('zipdir.zip') 2394 with support.temp_cwd(tarextdir): 2395 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2396 self.assertIn(b' is not a tar archive.', err) 2397 self.assertEqual(out, b'') 2398 self.assertEqual(rc, 1) 2399 2400 2401class ContextManagerTest(unittest.TestCase): 2402 2403 def test_basic(self): 2404 with tarfile.open(tarname) as tar: 2405 self.assertFalse(tar.closed, "closed inside runtime context") 2406 self.assertTrue(tar.closed, "context manager failed") 2407 2408 def test_closed(self): 2409 # The __enter__() method is supposed to raise OSError 2410 # if the TarFile object is already closed. 2411 tar = tarfile.open(tarname) 2412 tar.close() 2413 with self.assertRaises(OSError): 2414 with tar: 2415 pass 2416 2417 def test_exception(self): 2418 # Test if the OSError exception is passed through properly. 2419 with self.assertRaises(Exception) as exc: 2420 with tarfile.open(tarname) as tar: 2421 raise OSError 2422 self.assertIsInstance(exc.exception, OSError, 2423 "wrong exception raised in context manager") 2424 self.assertTrue(tar.closed, "context manager failed") 2425 2426 def test_no_eof(self): 2427 # __exit__() must not write end-of-archive blocks if an 2428 # exception was raised. 2429 try: 2430 with tarfile.open(tmpname, "w") as tar: 2431 raise Exception 2432 except: 2433 pass 2434 self.assertEqual(os.path.getsize(tmpname), 0, 2435 "context manager wrote an end-of-archive block") 2436 self.assertTrue(tar.closed, "context manager failed") 2437 2438 def test_eof(self): 2439 # __exit__() must write end-of-archive blocks, i.e. call 2440 # TarFile.close() if there was no error. 2441 with tarfile.open(tmpname, "w"): 2442 pass 2443 self.assertNotEqual(os.path.getsize(tmpname), 0, 2444 "context manager wrote no end-of-archive block") 2445 2446 def test_fileobj(self): 2447 # Test that __exit__() did not close the external file 2448 # object. 2449 with open(tmpname, "wb") as fobj: 2450 try: 2451 with tarfile.open(fileobj=fobj, mode="w") as tar: 2452 raise Exception 2453 except: 2454 pass 2455 self.assertFalse(fobj.closed, "external file object was closed") 2456 self.assertTrue(tar.closed, "context manager failed") 2457 2458 2459@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2460class LinkEmulationTest(ReadTest, unittest.TestCase): 2461 2462 # Test for issue #8741 regression. On platforms that do not support 2463 # symbolic or hard links tarfile tries to extract these types of members 2464 # as the regular files they point to. 2465 def _test_link_extraction(self, name): 2466 self.tar.extract(name, TEMPDIR) 2467 with open(os.path.join(TEMPDIR, name), "rb") as f: 2468 data = f.read() 2469 self.assertEqual(sha256sum(data), sha256_regtype) 2470 2471 # See issues #1578269, #8879, and #17689 for some history on these skips 2472 @unittest.skipIf(hasattr(os.path, "islink"), 2473 "Skip emulation - has os.path.islink but not os.link") 2474 def test_hardlink_extraction1(self): 2475 self._test_link_extraction("ustar/lnktype") 2476 2477 @unittest.skipIf(hasattr(os.path, "islink"), 2478 "Skip emulation - has os.path.islink but not os.link") 2479 def test_hardlink_extraction2(self): 2480 self._test_link_extraction("./ustar/linktest2/lnktype") 2481 2482 @unittest.skipIf(hasattr(os, "symlink"), 2483 "Skip emulation if symlink exists") 2484 def test_symlink_extraction1(self): 2485 self._test_link_extraction("ustar/symtype") 2486 2487 @unittest.skipIf(hasattr(os, "symlink"), 2488 "Skip emulation if symlink exists") 2489 def test_symlink_extraction2(self): 2490 self._test_link_extraction("./ustar/linktest2/symtype") 2491 2492 2493class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2494 # Issue5068: The _BZ2Proxy.read() method loops forever 2495 # on an empty or partial bzipped file. 2496 2497 def _test_partial_input(self, mode): 2498 class MyBytesIO(io.BytesIO): 2499 hit_eof = False 2500 def read(self, n): 2501 if self.hit_eof: 2502 raise AssertionError("infinite loop detected in " 2503 "tarfile.open()") 2504 self.hit_eof = self.tell() == len(self.getvalue()) 2505 return super(MyBytesIO, self).read(n) 2506 def seek(self, *args): 2507 self.hit_eof = False 2508 return super(MyBytesIO, self).seek(*args) 2509 2510 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2511 for x in range(len(data) + 1): 2512 try: 2513 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2514 except tarfile.ReadError: 2515 pass # we have no interest in ReadErrors 2516 2517 def test_partial_input(self): 2518 self._test_partial_input("r") 2519 2520 def test_partial_input_bz2(self): 2521 self._test_partial_input("r:bz2") 2522 2523 2524def root_is_uid_gid_0(): 2525 try: 2526 import pwd, grp 2527 except ImportError: 2528 return False 2529 if pwd.getpwuid(0)[0] != 'root': 2530 return False 2531 if grp.getgrgid(0)[0] != 'root': 2532 return False 2533 return True 2534 2535 2536@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2537@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2538class NumericOwnerTest(unittest.TestCase): 2539 # mock the following: 2540 # os.chown: so we can test what's being called 2541 # os.chmod: so the modes are not actually changed. if they are, we can't 2542 # delete the files/directories 2543 # os.geteuid: so we can lie and say we're root (uid = 0) 2544 2545 @staticmethod 2546 def _make_test_archive(filename_1, dirname_1, filename_2): 2547 # the file contents to write 2548 fobj = io.BytesIO(b"content") 2549 2550 # create a tar file with a file, a directory, and a file within that 2551 # directory. Assign various .uid/.gid values to them 2552 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2553 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2554 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2555 ] 2556 with tarfile.open(tmpname, 'w') as tarfl: 2557 for name, uid, gid, typ, contents in items: 2558 t = tarfile.TarInfo(name) 2559 t.uid = uid 2560 t.gid = gid 2561 t.uname = 'root' 2562 t.gname = 'root' 2563 t.type = typ 2564 tarfl.addfile(t, contents) 2565 2566 # return the full pathname to the tar file 2567 return tmpname 2568 2569 @staticmethod 2570 @contextmanager 2571 def _setup_test(mock_geteuid): 2572 mock_geteuid.return_value = 0 # lie and say we're root 2573 fname = 'numeric-owner-testfile' 2574 dirname = 'dir' 2575 2576 # the names we want stored in the tarfile 2577 filename_1 = fname 2578 dirname_1 = dirname 2579 filename_2 = os.path.join(dirname, fname) 2580 2581 # create the tarfile with the contents we're after 2582 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2583 dirname_1, 2584 filename_2) 2585 2586 # open the tarfile for reading. yield it and the names of the items 2587 # we stored into the file 2588 with tarfile.open(tar_filename) as tarfl: 2589 yield tarfl, filename_1, dirname_1, filename_2 2590 2591 @unittest.mock.patch('os.chown') 2592 @unittest.mock.patch('os.chmod') 2593 @unittest.mock.patch('os.geteuid') 2594 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2595 mock_chown): 2596 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2597 filename_2): 2598 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2599 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2600 2601 # convert to filesystem paths 2602 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2603 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2604 2605 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2606 unittest.mock.call(f_filename_2, 88, 87), 2607 ], 2608 any_order=True) 2609 2610 @unittest.mock.patch('os.chown') 2611 @unittest.mock.patch('os.chmod') 2612 @unittest.mock.patch('os.geteuid') 2613 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2614 mock_chown): 2615 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2616 filename_2): 2617 tarfl.extractall(TEMPDIR, numeric_owner=True) 2618 2619 # convert to filesystem paths 2620 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2621 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2622 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2623 2624 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2625 unittest.mock.call(f_dirname_1, 77, 76), 2626 unittest.mock.call(f_filename_2, 88, 87), 2627 ], 2628 any_order=True) 2629 2630 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2631 # because the uname and gname in the test file are 'root', and extract() 2632 # will look them up using pwd and grp to find their uid and gid, which we 2633 # test here to be 0. 2634 @unittest.skipUnless(root_is_uid_gid_0(), 2635 'uid=0,gid=0 must be named "root"') 2636 @unittest.mock.patch('os.chown') 2637 @unittest.mock.patch('os.chmod') 2638 @unittest.mock.patch('os.geteuid') 2639 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2640 mock_chown): 2641 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2642 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2643 2644 # convert to filesystem paths 2645 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2646 2647 mock_chown.assert_called_with(f_filename_1, 0, 0) 2648 2649 @unittest.mock.patch('os.geteuid') 2650 def test_keyword_only(self, mock_geteuid): 2651 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2652 self.assertRaises(TypeError, 2653 tarfl.extract, filename_1, TEMPDIR, False, True) 2654 2655 2656def setUpModule(): 2657 support.unlink(TEMPDIR) 2658 os.makedirs(TEMPDIR) 2659 2660 global testtarnames 2661 testtarnames = [tarname] 2662 with open(tarname, "rb") as fobj: 2663 data = fobj.read() 2664 2665 # Create compressed tarfiles. 2666 for c in GzipTest, Bz2Test, LzmaTest: 2667 if c.open: 2668 support.unlink(c.tarname) 2669 testtarnames.append(c.tarname) 2670 with c.open(c.tarname, "wb") as tar: 2671 tar.write(data) 2672 2673def tearDownModule(): 2674 if os.path.exists(TEMPDIR): 2675 support.rmtree(TEMPDIR) 2676 2677if __name__ == "__main__": 2678 unittest.main() 2679