1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import zlib 23except ImportError: 24 zlib = None 25try: 26 import bz2 27except ImportError: 28 bz2 = None 29try: 30 import lzma 31except ImportError: 32 lzma = None 33 34def sha256sum(data): 35 return sha256(data).hexdigest() 36 37TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 38tarextdir = TEMPDIR + '-extract-test' 39tarname = support.findfile("testtar.tar") 40gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 41bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 42xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 43tmpname = os.path.join(TEMPDIR, "tmp.tar") 44dotlessname = os.path.join(TEMPDIR, "testtar") 45 46sha256_regtype = ( 47 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 48) 49sha256_sparse = ( 50 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 51) 52 53 54class TarTest: 55 tarname = tarname 56 suffix = '' 57 open = io.FileIO 58 taropen = tarfile.TarFile.taropen 59 60 @property 61 def mode(self): 62 return self.prefix + self.suffix 63 64@support.requires_gzip() 65class GzipTest: 66 tarname = gzipname 67 suffix = 'gz' 68 open = gzip.GzipFile if gzip else None 69 taropen = tarfile.TarFile.gzopen 70 71@support.requires_bz2() 72class Bz2Test: 73 tarname = bz2name 74 suffix = 'bz2' 75 open = bz2.BZ2File if bz2 else None 76 taropen = tarfile.TarFile.bz2open 77 78@support.requires_lzma() 79class LzmaTest: 80 tarname = xzname 81 suffix = 'xz' 82 open = lzma.LZMAFile if lzma else None 83 taropen = tarfile.TarFile.xzopen 84 85 86class ReadTest(TarTest): 87 88 prefix = "r:" 89 90 def setUp(self): 91 self.tar = tarfile.open(self.tarname, mode=self.mode, 92 encoding="iso8859-1") 93 94 def tearDown(self): 95 self.tar.close() 96 97 98class UstarReadTest(ReadTest, unittest.TestCase): 99 100 def test_fileobj_regular_file(self): 101 tarinfo = self.tar.getmember("ustar/regtype") 102 with self.tar.extractfile(tarinfo) as fobj: 103 data = fobj.read() 104 self.assertEqual(len(data), tarinfo.size, 105 "regular file extraction failed") 106 self.assertEqual(sha256sum(data), sha256_regtype, 107 "regular file extraction failed") 108 109 def test_fileobj_readlines(self): 110 self.tar.extract("ustar/regtype", TEMPDIR) 111 tarinfo = self.tar.getmember("ustar/regtype") 112 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 113 lines1 = fobj1.readlines() 114 115 with self.tar.extractfile(tarinfo) as fobj: 116 fobj2 = io.TextIOWrapper(fobj) 117 lines2 = fobj2.readlines() 118 self.assertEqual(lines1, lines2, 119 "fileobj.readlines() failed") 120 self.assertEqual(len(lines2), 114, 121 "fileobj.readlines() failed") 122 self.assertEqual(lines2[83], 123 "I will gladly admit that Python is not the fastest " 124 "running scripting language.\n", 125 "fileobj.readlines() failed") 126 127 def test_fileobj_iter(self): 128 self.tar.extract("ustar/regtype", TEMPDIR) 129 tarinfo = self.tar.getmember("ustar/regtype") 130 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 131 lines1 = fobj1.readlines() 132 with self.tar.extractfile(tarinfo) as fobj2: 133 lines2 = list(io.TextIOWrapper(fobj2)) 134 self.assertEqual(lines1, lines2, 135 "fileobj.__iter__() failed") 136 137 def test_fileobj_seek(self): 138 self.tar.extract("ustar/regtype", TEMPDIR) 139 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 140 data = fobj.read() 141 142 tarinfo = self.tar.getmember("ustar/regtype") 143 with self.tar.extractfile(tarinfo) as fobj: 144 text = fobj.read() 145 fobj.seek(0) 146 self.assertEqual(0, fobj.tell(), 147 "seek() to file's start failed") 148 fobj.seek(2048, 0) 149 self.assertEqual(2048, fobj.tell(), 150 "seek() to absolute position failed") 151 fobj.seek(-1024, 1) 152 self.assertEqual(1024, fobj.tell(), 153 "seek() to negative relative position failed") 154 fobj.seek(1024, 1) 155 self.assertEqual(2048, fobj.tell(), 156 "seek() to positive relative position failed") 157 s = fobj.read(10) 158 self.assertEqual(s, data[2048:2058], 159 "read() after seek failed") 160 fobj.seek(0, 2) 161 self.assertEqual(tarinfo.size, fobj.tell(), 162 "seek() to file's end failed") 163 self.assertEqual(fobj.read(), b"", 164 "read() at file's end did not return empty string") 165 fobj.seek(-tarinfo.size, 2) 166 self.assertEqual(0, fobj.tell(), 167 "relative seek() to file's end failed") 168 fobj.seek(512) 169 s1 = fobj.readlines() 170 fobj.seek(512) 171 s2 = fobj.readlines() 172 self.assertEqual(s1, s2, 173 "readlines() after seek failed") 174 fobj.seek(0) 175 self.assertEqual(len(fobj.readline()), fobj.tell(), 176 "tell() after readline() failed") 177 fobj.seek(512) 178 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 179 "tell() after seek() and readline() failed") 180 fobj.seek(0) 181 line = fobj.readline() 182 self.assertEqual(fobj.read(), data[len(line):], 183 "read() after readline() failed") 184 185 def test_fileobj_text(self): 186 with self.tar.extractfile("ustar/regtype") as fobj: 187 fobj = io.TextIOWrapper(fobj) 188 data = fobj.read().encode("iso8859-1") 189 self.assertEqual(sha256sum(data), sha256_regtype) 190 try: 191 fobj.seek(100) 192 except AttributeError: 193 # Issue #13815: seek() complained about a missing 194 # flush() method. 195 self.fail("seeking failed in text mode") 196 197 # Test if symbolic and hard links are resolved by extractfile(). The 198 # test link members each point to a regular member whose data is 199 # supposed to be exported. 200 def _test_fileobj_link(self, lnktype, regtype): 201 with self.tar.extractfile(lnktype) as a, \ 202 self.tar.extractfile(regtype) as b: 203 self.assertEqual(a.name, b.name) 204 205 def test_fileobj_link1(self): 206 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 207 208 def test_fileobj_link2(self): 209 self._test_fileobj_link("./ustar/linktest2/lnktype", 210 "ustar/linktest1/regtype") 211 212 def test_fileobj_symlink1(self): 213 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 214 215 def test_fileobj_symlink2(self): 216 self._test_fileobj_link("./ustar/linktest2/symtype", 217 "ustar/linktest1/regtype") 218 219 def test_issue14160(self): 220 self._test_fileobj_link("symtype2", "ustar/regtype") 221 222class GzipUstarReadTest(GzipTest, UstarReadTest): 223 pass 224 225class Bz2UstarReadTest(Bz2Test, UstarReadTest): 226 pass 227 228class LzmaUstarReadTest(LzmaTest, UstarReadTest): 229 pass 230 231 232class ListTest(ReadTest, unittest.TestCase): 233 234 # Override setUp to use default encoding (UTF-8) 235 def setUp(self): 236 self.tar = tarfile.open(self.tarname, mode=self.mode) 237 238 def test_list(self): 239 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 240 with support.swap_attr(sys, 'stdout', tio): 241 self.tar.list(verbose=False) 242 out = tio.detach().getvalue() 243 self.assertIn(b'ustar/conttype', out) 244 self.assertIn(b'ustar/regtype', out) 245 self.assertIn(b'ustar/lnktype', out) 246 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 247 self.assertIn(b'./ustar/linktest2/symtype', out) 248 self.assertIn(b'./ustar/linktest2/lnktype', out) 249 # Make sure it puts trailing slash for directory 250 self.assertIn(b'ustar/dirtype/', out) 251 self.assertIn(b'ustar/dirtype-with-size/', out) 252 # Make sure it is able to print unencodable characters 253 def conv(b): 254 s = b.decode(self.tar.encoding, 'surrogateescape') 255 return s.encode('ascii', 'backslashreplace') 256 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 257 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 258 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 259 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 260 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 261 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 262 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 263 # Make sure it prints files separated by one newline without any 264 # 'ls -l'-like accessories if verbose flag is not being used 265 # ... 266 # ustar/conttype 267 # ustar/regtype 268 # ... 269 self.assertRegex(out, br'ustar/conttype ?\r?\n' 270 br'ustar/regtype ?\r?\n') 271 # Make sure it does not print the source of link without verbose flag 272 self.assertNotIn(b'link to', out) 273 self.assertNotIn(b'->', out) 274 275 def test_list_verbose(self): 276 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 277 with support.swap_attr(sys, 'stdout', tio): 278 self.tar.list(verbose=True) 279 out = tio.detach().getvalue() 280 # Make sure it prints files separated by one newline with 'ls -l'-like 281 # accessories if verbose flag is being used 282 # ... 283 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 284 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 285 # ... 286 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 287 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 288 br'ustar/\w+type ?\r?\n') * 2) 289 # Make sure it prints the source of link with verbose flag 290 self.assertIn(b'ustar/symtype -> regtype', out) 291 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 292 self.assertIn(b'./ustar/linktest2/lnktype link to ' 293 b'./ustar/linktest1/regtype', out) 294 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 295 (b'/123' * 125) + b'/longname', out) 296 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 297 (b'/123' * 125) + b'/longname', out) 298 299 def test_list_members(self): 300 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 301 def members(tar): 302 for tarinfo in tar.getmembers(): 303 if 'reg' in tarinfo.name: 304 yield tarinfo 305 with support.swap_attr(sys, 'stdout', tio): 306 self.tar.list(verbose=False, members=members(self.tar)) 307 out = tio.detach().getvalue() 308 self.assertIn(b'ustar/regtype', out) 309 self.assertNotIn(b'ustar/conttype', out) 310 311 312class GzipListTest(GzipTest, ListTest): 313 pass 314 315 316class Bz2ListTest(Bz2Test, ListTest): 317 pass 318 319 320class LzmaListTest(LzmaTest, ListTest): 321 pass 322 323 324class CommonReadTest(ReadTest): 325 326 def test_is_tarfile_erroneous(self): 327 with open(tmpname, "wb"): 328 pass 329 330 # is_tarfile works on filenames 331 self.assertFalse(tarfile.is_tarfile(tmpname)) 332 333 # is_tarfile works on path-like objects 334 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 335 336 # is_tarfile works on file objects 337 with open(tmpname, "rb") as fobj: 338 self.assertFalse(tarfile.is_tarfile(fobj)) 339 340 # is_tarfile works on file-like objects 341 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 342 343 def test_is_tarfile_valid(self): 344 # is_tarfile works on filenames 345 self.assertTrue(tarfile.is_tarfile(self.tarname)) 346 347 # is_tarfile works on path-like objects 348 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 349 350 # is_tarfile works on file objects 351 with open(self.tarname, "rb") as fobj: 352 self.assertTrue(tarfile.is_tarfile(fobj)) 353 354 # is_tarfile works on file-like objects 355 with open(self.tarname, "rb") as fobj: 356 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 357 358 def test_empty_tarfile(self): 359 # Test for issue6123: Allow opening empty archives. 360 # This test checks if tarfile.open() is able to open an empty tar 361 # archive successfully. Note that an empty tar archive is not the 362 # same as an empty file! 363 with tarfile.open(tmpname, self.mode.replace("r", "w")): 364 pass 365 try: 366 tar = tarfile.open(tmpname, self.mode) 367 tar.getnames() 368 except tarfile.ReadError: 369 self.fail("tarfile.open() failed on empty archive") 370 else: 371 self.assertListEqual(tar.getmembers(), []) 372 finally: 373 tar.close() 374 375 def test_non_existent_tarfile(self): 376 # Test for issue11513: prevent non-existent gzipped tarfiles raising 377 # multiple exceptions. 378 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 379 tarfile.open("xxx", self.mode) 380 381 def test_null_tarfile(self): 382 # Test for issue6123: Allow opening empty archives. 383 # This test guarantees that tarfile.open() does not treat an empty 384 # file as an empty tar archive. 385 with open(tmpname, "wb"): 386 pass 387 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 388 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 389 390 def test_ignore_zeros(self): 391 # Test TarFile's ignore_zeros option. 392 # generate 512 pseudorandom bytes 393 data = Random(0).randbytes(512) 394 for char in (b'\0', b'a'): 395 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 396 # are ignored correctly. 397 with self.open(tmpname, "w") as fobj: 398 fobj.write(char * 1024) 399 tarinfo = tarfile.TarInfo("foo") 400 tarinfo.size = len(data) 401 fobj.write(tarinfo.tobuf()) 402 fobj.write(data) 403 404 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 405 try: 406 self.assertListEqual(tar.getnames(), ["foo"], 407 "ignore_zeros=True should have skipped the %r-blocks" % 408 char) 409 finally: 410 tar.close() 411 412 def test_premature_end_of_archive(self): 413 for size in (512, 600, 1024, 1200): 414 with tarfile.open(tmpname, "w:") as tar: 415 t = tarfile.TarInfo("foo") 416 t.size = 1024 417 tar.addfile(t, io.BytesIO(b"a" * 1024)) 418 419 with open(tmpname, "r+b") as fobj: 420 fobj.truncate(size) 421 422 with tarfile.open(tmpname) as tar: 423 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 424 for t in tar: 425 pass 426 427 with tarfile.open(tmpname) as tar: 428 t = tar.next() 429 430 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 431 tar.extract(t, TEMPDIR) 432 433 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 434 tar.extractfile(t).read() 435 436 def test_length_zero_header(self): 437 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 438 # with an exception 439 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 440 with tarfile.open(support.findfile('recursion.tar')) as tar: 441 pass 442 443class MiscReadTestBase(CommonReadTest): 444 def requires_name_attribute(self): 445 pass 446 447 def test_no_name_argument(self): 448 self.requires_name_attribute() 449 with open(self.tarname, "rb") as fobj: 450 self.assertIsInstance(fobj.name, str) 451 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 452 self.assertIsInstance(tar.name, str) 453 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 454 455 def test_no_name_attribute(self): 456 with open(self.tarname, "rb") as fobj: 457 data = fobj.read() 458 fobj = io.BytesIO(data) 459 self.assertRaises(AttributeError, getattr, fobj, "name") 460 tar = tarfile.open(fileobj=fobj, mode=self.mode) 461 self.assertIsNone(tar.name) 462 463 def test_empty_name_attribute(self): 464 with open(self.tarname, "rb") as fobj: 465 data = fobj.read() 466 fobj = io.BytesIO(data) 467 fobj.name = "" 468 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 469 self.assertIsNone(tar.name) 470 471 def test_int_name_attribute(self): 472 # Issue 21044: tarfile.open() should handle fileobj with an integer 473 # 'name' attribute. 474 fd = os.open(self.tarname, os.O_RDONLY) 475 with open(fd, 'rb') as fobj: 476 self.assertIsInstance(fobj.name, int) 477 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 478 self.assertIsNone(tar.name) 479 480 def test_bytes_name_attribute(self): 481 self.requires_name_attribute() 482 tarname = os.fsencode(self.tarname) 483 with open(tarname, 'rb') as fobj: 484 self.assertIsInstance(fobj.name, bytes) 485 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 486 self.assertIsInstance(tar.name, bytes) 487 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 488 489 def test_pathlike_name(self): 490 tarname = pathlib.Path(self.tarname) 491 with tarfile.open(tarname, mode=self.mode) as tar: 492 self.assertIsInstance(tar.name, str) 493 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 494 with self.taropen(tarname) as tar: 495 self.assertIsInstance(tar.name, str) 496 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 497 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 498 self.assertIsInstance(tar.name, str) 499 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 500 if self.suffix == '': 501 with tarfile.TarFile(tarname, mode='r') as tar: 502 self.assertIsInstance(tar.name, str) 503 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 504 505 def test_illegal_mode_arg(self): 506 with open(tmpname, 'wb'): 507 pass 508 with self.assertRaisesRegex(ValueError, 'mode must be '): 509 tar = self.taropen(tmpname, 'q') 510 with self.assertRaisesRegex(ValueError, 'mode must be '): 511 tar = self.taropen(tmpname, 'rw') 512 with self.assertRaisesRegex(ValueError, 'mode must be '): 513 tar = self.taropen(tmpname, '') 514 515 def test_fileobj_with_offset(self): 516 # Skip the first member and store values from the second member 517 # of the testtar. 518 tar = tarfile.open(self.tarname, mode=self.mode) 519 try: 520 tar.next() 521 t = tar.next() 522 name = t.name 523 offset = t.offset 524 with tar.extractfile(t) as f: 525 data = f.read() 526 finally: 527 tar.close() 528 529 # Open the testtar and seek to the offset of the second member. 530 with self.open(self.tarname) as fobj: 531 fobj.seek(offset) 532 533 # Test if the tarfile starts with the second member. 534 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 535 t = tar.next() 536 self.assertEqual(t.name, name) 537 # Read to the end of fileobj and test if seeking back to the 538 # beginning works. 539 tar.getmembers() 540 self.assertEqual(tar.extractfile(t).read(), data, 541 "seek back did not work") 542 543 def test_fail_comp(self): 544 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 545 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 546 with open(tarname, "rb") as fobj: 547 self.assertRaises(tarfile.ReadError, tarfile.open, 548 fileobj=fobj, mode=self.mode) 549 550 def test_v7_dirtype(self): 551 # Test old style dirtype member (bug #1336623): 552 # Old V7 tars create directory members using an AREGTYPE 553 # header with a "/" appended to the filename field. 554 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 555 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 556 "v7 dirtype failed") 557 558 def test_xstar_type(self): 559 # The xstar format stores extra atime and ctime fields inside the 560 # space reserved for the prefix field. The prefix field must be 561 # ignored in this case, otherwise it will mess up the name. 562 try: 563 self.tar.getmember("misc/regtype-xstar") 564 except KeyError: 565 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 566 567 def test_check_members(self): 568 for tarinfo in self.tar: 569 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 570 "wrong mtime for %s" % tarinfo.name) 571 if not tarinfo.name.startswith("ustar/"): 572 continue 573 self.assertEqual(tarinfo.uname, "tarfile", 574 "wrong uname for %s" % tarinfo.name) 575 576 def test_find_members(self): 577 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 578 "could not find all members") 579 580 @unittest.skipUnless(hasattr(os, "link"), 581 "Missing hardlink implementation") 582 @support.skip_unless_symlink 583 def test_extract_hardlink(self): 584 # Test hardlink extraction (e.g. bug #857297). 585 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 586 tar.extract("ustar/regtype", TEMPDIR) 587 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 588 589 tar.extract("ustar/lnktype", TEMPDIR) 590 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 591 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 592 data = f.read() 593 self.assertEqual(sha256sum(data), sha256_regtype) 594 595 tar.extract("ustar/symtype", TEMPDIR) 596 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 597 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 598 data = f.read() 599 self.assertEqual(sha256sum(data), sha256_regtype) 600 601 def test_extractall(self): 602 # Test if extractall() correctly restores directory permissions 603 # and times (see issue1735). 604 tar = tarfile.open(tarname, encoding="iso8859-1") 605 DIR = os.path.join(TEMPDIR, "extractall") 606 os.mkdir(DIR) 607 try: 608 directories = [t for t in tar if t.isdir()] 609 tar.extractall(DIR, directories) 610 for tarinfo in directories: 611 path = os.path.join(DIR, tarinfo.name) 612 if sys.platform != "win32": 613 # Win32 has no support for fine grained permissions. 614 self.assertEqual(tarinfo.mode & 0o777, 615 os.stat(path).st_mode & 0o777) 616 def format_mtime(mtime): 617 if isinstance(mtime, float): 618 return "{} ({})".format(mtime, mtime.hex()) 619 else: 620 return "{!r} (int)".format(mtime) 621 file_mtime = os.path.getmtime(path) 622 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 623 format_mtime(tarinfo.mtime), 624 format_mtime(file_mtime), 625 path) 626 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 627 finally: 628 tar.close() 629 support.rmtree(DIR) 630 631 def test_extract_directory(self): 632 dirtype = "ustar/dirtype" 633 DIR = os.path.join(TEMPDIR, "extractdir") 634 os.mkdir(DIR) 635 try: 636 with tarfile.open(tarname, encoding="iso8859-1") as tar: 637 tarinfo = tar.getmember(dirtype) 638 tar.extract(tarinfo, path=DIR) 639 extracted = os.path.join(DIR, dirtype) 640 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 641 if sys.platform != "win32": 642 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 643 finally: 644 support.rmtree(DIR) 645 646 def test_extractall_pathlike_name(self): 647 DIR = pathlib.Path(TEMPDIR) / "extractall" 648 with support.temp_dir(DIR), \ 649 tarfile.open(tarname, encoding="iso8859-1") as tar: 650 directories = [t for t in tar if t.isdir()] 651 tar.extractall(DIR, directories) 652 for tarinfo in directories: 653 path = DIR / tarinfo.name 654 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 655 656 def test_extract_pathlike_name(self): 657 dirtype = "ustar/dirtype" 658 DIR = pathlib.Path(TEMPDIR) / "extractall" 659 with support.temp_dir(DIR), \ 660 tarfile.open(tarname, encoding="iso8859-1") as tar: 661 tarinfo = tar.getmember(dirtype) 662 tar.extract(tarinfo, path=DIR) 663 extracted = DIR / dirtype 664 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 665 666 def test_init_close_fobj(self): 667 # Issue #7341: Close the internal file object in the TarFile 668 # constructor in case of an error. For the test we rely on 669 # the fact that opening an empty file raises a ReadError. 670 empty = os.path.join(TEMPDIR, "empty") 671 with open(empty, "wb") as fobj: 672 fobj.write(b"") 673 674 try: 675 tar = object.__new__(tarfile.TarFile) 676 try: 677 tar.__init__(empty) 678 except tarfile.ReadError: 679 self.assertTrue(tar.fileobj.closed) 680 else: 681 self.fail("ReadError not raised") 682 finally: 683 support.unlink(empty) 684 685 def test_parallel_iteration(self): 686 # Issue #16601: Restarting iteration over tarfile continued 687 # from where it left off. 688 with tarfile.open(self.tarname) as tar: 689 for m1, m2 in zip(tar, tar): 690 self.assertEqual(m1.offset, m2.offset) 691 self.assertEqual(m1.get_info(), m2.get_info()) 692 693 @unittest.skipIf(zlib is None, "requires zlib") 694 def test_zlib_error_does_not_leak(self): 695 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 696 # parsing certain types of invalid data 697 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 698 mock.side_effect = zlib.error 699 with self.assertRaises(tarfile.ReadError): 700 tarfile.open(self.tarname) 701 702 703class MiscReadTest(MiscReadTestBase, unittest.TestCase): 704 test_fail_comp = None 705 706class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 707 pass 708 709class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 710 def requires_name_attribute(self): 711 self.skipTest("BZ2File have no name attribute") 712 713class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 714 def requires_name_attribute(self): 715 self.skipTest("LZMAFile have no name attribute") 716 717 718class StreamReadTest(CommonReadTest, unittest.TestCase): 719 720 prefix="r|" 721 722 def test_read_through(self): 723 # Issue #11224: A poorly designed _FileInFile.read() method 724 # caused seeking errors with stream tar files. 725 for tarinfo in self.tar: 726 if not tarinfo.isreg(): 727 continue 728 with self.tar.extractfile(tarinfo) as fobj: 729 while True: 730 try: 731 buf = fobj.read(512) 732 except tarfile.StreamError: 733 self.fail("simple read-through using " 734 "TarFile.extractfile() failed") 735 if not buf: 736 break 737 738 def test_fileobj_regular_file(self): 739 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 740 with self.tar.extractfile(tarinfo) as fobj: 741 data = fobj.read() 742 self.assertEqual(len(data), tarinfo.size, 743 "regular file extraction failed") 744 self.assertEqual(sha256sum(data), sha256_regtype, 745 "regular file extraction failed") 746 747 def test_provoke_stream_error(self): 748 tarinfos = self.tar.getmembers() 749 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 750 self.assertRaises(tarfile.StreamError, f.read) 751 752 def test_compare_members(self): 753 tar1 = tarfile.open(tarname, encoding="iso8859-1") 754 try: 755 tar2 = self.tar 756 757 while True: 758 t1 = tar1.next() 759 t2 = tar2.next() 760 if t1 is None: 761 break 762 self.assertIsNotNone(t2, "stream.next() failed.") 763 764 if t2.islnk() or t2.issym(): 765 with self.assertRaises(tarfile.StreamError): 766 tar2.extractfile(t2) 767 continue 768 769 v1 = tar1.extractfile(t1) 770 v2 = tar2.extractfile(t2) 771 if v1 is None: 772 continue 773 self.assertIsNotNone(v2, "stream.extractfile() failed") 774 self.assertEqual(v1.read(), v2.read(), 775 "stream extraction failed") 776 finally: 777 tar1.close() 778 779class GzipStreamReadTest(GzipTest, StreamReadTest): 780 pass 781 782class Bz2StreamReadTest(Bz2Test, StreamReadTest): 783 pass 784 785class LzmaStreamReadTest(LzmaTest, StreamReadTest): 786 pass 787 788 789class DetectReadTest(TarTest, unittest.TestCase): 790 def _testfunc_file(self, name, mode): 791 try: 792 tar = tarfile.open(name, mode) 793 except tarfile.ReadError as e: 794 self.fail() 795 else: 796 tar.close() 797 798 def _testfunc_fileobj(self, name, mode): 799 try: 800 with open(name, "rb") as f: 801 tar = tarfile.open(name, mode, fileobj=f) 802 except tarfile.ReadError as e: 803 self.fail() 804 else: 805 tar.close() 806 807 def _test_modes(self, testfunc): 808 if self.suffix: 809 with self.assertRaises(tarfile.ReadError): 810 tarfile.open(tarname, mode="r:" + self.suffix) 811 with self.assertRaises(tarfile.ReadError): 812 tarfile.open(tarname, mode="r|" + self.suffix) 813 with self.assertRaises(tarfile.ReadError): 814 tarfile.open(self.tarname, mode="r:") 815 with self.assertRaises(tarfile.ReadError): 816 tarfile.open(self.tarname, mode="r|") 817 testfunc(self.tarname, "r") 818 testfunc(self.tarname, "r:" + self.suffix) 819 testfunc(self.tarname, "r:*") 820 testfunc(self.tarname, "r|" + self.suffix) 821 testfunc(self.tarname, "r|*") 822 823 def test_detect_file(self): 824 self._test_modes(self._testfunc_file) 825 826 def test_detect_fileobj(self): 827 self._test_modes(self._testfunc_fileobj) 828 829class GzipDetectReadTest(GzipTest, DetectReadTest): 830 pass 831 832class Bz2DetectReadTest(Bz2Test, DetectReadTest): 833 def test_detect_stream_bz2(self): 834 # Originally, tarfile's stream detection looked for the string 835 # "BZh91" at the start of the file. This is incorrect because 836 # the '9' represents the blocksize (900,000 bytes). If the file was 837 # compressed using another blocksize autodetection fails. 838 with open(tarname, "rb") as fobj: 839 data = fobj.read() 840 841 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 842 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 843 fobj.write(data) 844 845 self._testfunc_file(tmpname, "r|*") 846 847class LzmaDetectReadTest(LzmaTest, DetectReadTest): 848 pass 849 850 851class MemberReadTest(ReadTest, unittest.TestCase): 852 853 def _test_member(self, tarinfo, chksum=None, **kwargs): 854 if chksum is not None: 855 with self.tar.extractfile(tarinfo) as f: 856 self.assertEqual(sha256sum(f.read()), chksum, 857 "wrong sha256sum for %s" % tarinfo.name) 858 859 kwargs["mtime"] = 0o7606136617 860 kwargs["uid"] = 1000 861 kwargs["gid"] = 100 862 if "old-v7" not in tarinfo.name: 863 # V7 tar can't handle alphabetic owners. 864 kwargs["uname"] = "tarfile" 865 kwargs["gname"] = "tarfile" 866 for k, v in kwargs.items(): 867 self.assertEqual(getattr(tarinfo, k), v, 868 "wrong value in %s field of %s" % (k, tarinfo.name)) 869 870 def test_find_regtype(self): 871 tarinfo = self.tar.getmember("ustar/regtype") 872 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 873 874 def test_find_conttype(self): 875 tarinfo = self.tar.getmember("ustar/conttype") 876 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 877 878 def test_find_dirtype(self): 879 tarinfo = self.tar.getmember("ustar/dirtype") 880 self._test_member(tarinfo, size=0) 881 882 def test_find_dirtype_with_size(self): 883 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 884 self._test_member(tarinfo, size=255) 885 886 def test_find_lnktype(self): 887 tarinfo = self.tar.getmember("ustar/lnktype") 888 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 889 890 def test_find_symtype(self): 891 tarinfo = self.tar.getmember("ustar/symtype") 892 self._test_member(tarinfo, size=0, linkname="regtype") 893 894 def test_find_blktype(self): 895 tarinfo = self.tar.getmember("ustar/blktype") 896 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 897 898 def test_find_chrtype(self): 899 tarinfo = self.tar.getmember("ustar/chrtype") 900 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 901 902 def test_find_fifotype(self): 903 tarinfo = self.tar.getmember("ustar/fifotype") 904 self._test_member(tarinfo, size=0) 905 906 def test_find_sparse(self): 907 tarinfo = self.tar.getmember("ustar/sparse") 908 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 909 910 def test_find_gnusparse(self): 911 tarinfo = self.tar.getmember("gnu/sparse") 912 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 913 914 def test_find_gnusparse_00(self): 915 tarinfo = self.tar.getmember("gnu/sparse-0.0") 916 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 917 918 def test_find_gnusparse_01(self): 919 tarinfo = self.tar.getmember("gnu/sparse-0.1") 920 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 921 922 def test_find_gnusparse_10(self): 923 tarinfo = self.tar.getmember("gnu/sparse-1.0") 924 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 925 926 def test_find_umlauts(self): 927 tarinfo = self.tar.getmember("ustar/umlauts-" 928 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 929 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 930 931 def test_find_ustar_longname(self): 932 name = "ustar/" + "12345/" * 39 + "1234567/longname" 933 self.assertIn(name, self.tar.getnames()) 934 935 def test_find_regtype_oldv7(self): 936 tarinfo = self.tar.getmember("misc/regtype-old-v7") 937 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 938 939 def test_find_pax_umlauts(self): 940 self.tar.close() 941 self.tar = tarfile.open(self.tarname, mode=self.mode, 942 encoding="iso8859-1") 943 tarinfo = self.tar.getmember("pax/umlauts-" 944 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 945 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 946 947 948class LongnameTest: 949 950 def test_read_longname(self): 951 # Test reading of longname (bug #1471427). 952 longname = self.subdir + "/" + "123/" * 125 + "longname" 953 try: 954 tarinfo = self.tar.getmember(longname) 955 except KeyError: 956 self.fail("longname not found") 957 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 958 "read longname as dirtype") 959 960 def test_read_longlink(self): 961 longname = self.subdir + "/" + "123/" * 125 + "longname" 962 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 963 try: 964 tarinfo = self.tar.getmember(longlink) 965 except KeyError: 966 self.fail("longlink not found") 967 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 968 969 def test_truncated_longname(self): 970 longname = self.subdir + "/" + "123/" * 125 + "longname" 971 tarinfo = self.tar.getmember(longname) 972 offset = tarinfo.offset 973 self.tar.fileobj.seek(offset) 974 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 975 with self.assertRaises(tarfile.ReadError): 976 tarfile.open(name="foo.tar", fileobj=fobj) 977 978 def test_header_offset(self): 979 # Test if the start offset of the TarInfo object includes 980 # the preceding extended header. 981 longname = self.subdir + "/" + "123/" * 125 + "longname" 982 offset = self.tar.getmember(longname).offset 983 with open(tarname, "rb") as fobj: 984 fobj.seek(offset) 985 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 986 "iso8859-1", "strict") 987 self.assertEqual(tarinfo.type, self.longnametype) 988 989 990class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 991 992 subdir = "gnu" 993 longnametype = tarfile.GNUTYPE_LONGNAME 994 995 # Since 3.2 tarfile is supposed to accurately restore sparse members and 996 # produce files with holes. This is what we actually want to test here. 997 # Unfortunately, not all platforms/filesystems support sparse files, and 998 # even on platforms that do it is non-trivial to make reliable assertions 999 # about holes in files. Therefore, we first do one basic test which works 1000 # an all platforms, and after that a test that will work only on 1001 # platforms/filesystems that prove to support sparse files. 1002 def _test_sparse_file(self, name): 1003 self.tar.extract(name, TEMPDIR) 1004 filename = os.path.join(TEMPDIR, name) 1005 with open(filename, "rb") as fobj: 1006 data = fobj.read() 1007 self.assertEqual(sha256sum(data), sha256_sparse, 1008 "wrong sha256sum for %s" % name) 1009 1010 if self._fs_supports_holes(): 1011 s = os.stat(filename) 1012 self.assertLess(s.st_blocks * 512, s.st_size) 1013 1014 def test_sparse_file_old(self): 1015 self._test_sparse_file("gnu/sparse") 1016 1017 def test_sparse_file_00(self): 1018 self._test_sparse_file("gnu/sparse-0.0") 1019 1020 def test_sparse_file_01(self): 1021 self._test_sparse_file("gnu/sparse-0.1") 1022 1023 def test_sparse_file_10(self): 1024 self._test_sparse_file("gnu/sparse-1.0") 1025 1026 @staticmethod 1027 def _fs_supports_holes(): 1028 # Return True if the platform knows the st_blocks stat attribute and 1029 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1030 # store holes of 4 KiB in files. 1031 # 1032 # The function returns False if page size is larger than 4 KiB. 1033 # For example, ppc64 uses pages of 64 KiB. 1034 if sys.platform.startswith("linux"): 1035 # Linux evidentially has 512 byte st_blocks units. 1036 name = os.path.join(TEMPDIR, "sparse-test") 1037 with open(name, "wb") as fobj: 1038 # Seek to "punch a hole" of 4 KiB 1039 fobj.seek(4096) 1040 fobj.write(b'x' * 4096) 1041 fobj.truncate() 1042 s = os.stat(name) 1043 support.unlink(name) 1044 return (s.st_blocks * 512 < s.st_size) 1045 else: 1046 return False 1047 1048 1049class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1050 1051 subdir = "pax" 1052 longnametype = tarfile.XHDTYPE 1053 1054 def test_pax_global_headers(self): 1055 tar = tarfile.open(tarname, encoding="iso8859-1") 1056 try: 1057 tarinfo = tar.getmember("pax/regtype1") 1058 self.assertEqual(tarinfo.uname, "foo") 1059 self.assertEqual(tarinfo.gname, "bar") 1060 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1061 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1062 1063 tarinfo = tar.getmember("pax/regtype2") 1064 self.assertEqual(tarinfo.uname, "") 1065 self.assertEqual(tarinfo.gname, "bar") 1066 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1067 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1068 1069 tarinfo = tar.getmember("pax/regtype3") 1070 self.assertEqual(tarinfo.uname, "tarfile") 1071 self.assertEqual(tarinfo.gname, "tarfile") 1072 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1073 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1074 finally: 1075 tar.close() 1076 1077 def test_pax_number_fields(self): 1078 # All following number fields are read from the pax header. 1079 tar = tarfile.open(tarname, encoding="iso8859-1") 1080 try: 1081 tarinfo = tar.getmember("pax/regtype4") 1082 self.assertEqual(tarinfo.size, 7011) 1083 self.assertEqual(tarinfo.uid, 123) 1084 self.assertEqual(tarinfo.gid, 123) 1085 self.assertEqual(tarinfo.mtime, 1041808783.0) 1086 self.assertEqual(type(tarinfo.mtime), float) 1087 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1088 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1089 finally: 1090 tar.close() 1091 1092 1093class WriteTestBase(TarTest): 1094 # Put all write tests in here that are supposed to be tested 1095 # in all possible mode combinations. 1096 1097 def test_fileobj_no_close(self): 1098 fobj = io.BytesIO() 1099 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1100 tar.addfile(tarfile.TarInfo("foo")) 1101 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1102 # Issue #20238: Incomplete gzip output with mode="w:gz" 1103 data = fobj.getvalue() 1104 del tar 1105 support.gc_collect() 1106 self.assertFalse(fobj.closed) 1107 self.assertEqual(data, fobj.getvalue()) 1108 1109 def test_eof_marker(self): 1110 # Make sure an end of archive marker is written (two zero blocks). 1111 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1112 # So, we create an archive that has exactly 10240 bytes without the 1113 # marker, and has 20480 bytes once the marker is written. 1114 with tarfile.open(tmpname, self.mode) as tar: 1115 t = tarfile.TarInfo("foo") 1116 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1117 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1118 1119 with self.open(tmpname, "rb") as fobj: 1120 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1121 1122 1123class WriteTest(WriteTestBase, unittest.TestCase): 1124 1125 prefix = "w:" 1126 1127 def test_100_char_name(self): 1128 # The name field in a tar header stores strings of at most 100 chars. 1129 # If a string is shorter than 100 chars it has to be padded with '\0', 1130 # which implies that a string of exactly 100 chars is stored without 1131 # a trailing '\0'. 1132 name = "0123456789" * 10 1133 tar = tarfile.open(tmpname, self.mode) 1134 try: 1135 t = tarfile.TarInfo(name) 1136 tar.addfile(t) 1137 finally: 1138 tar.close() 1139 1140 tar = tarfile.open(tmpname) 1141 try: 1142 self.assertEqual(tar.getnames()[0], name, 1143 "failed to store 100 char filename") 1144 finally: 1145 tar.close() 1146 1147 def test_tar_size(self): 1148 # Test for bug #1013882. 1149 tar = tarfile.open(tmpname, self.mode) 1150 try: 1151 path = os.path.join(TEMPDIR, "file") 1152 with open(path, "wb") as fobj: 1153 fobj.write(b"aaa") 1154 tar.add(path) 1155 finally: 1156 tar.close() 1157 self.assertGreater(os.path.getsize(tmpname), 0, 1158 "tarfile is empty") 1159 1160 # The test_*_size tests test for bug #1167128. 1161 def test_file_size(self): 1162 tar = tarfile.open(tmpname, self.mode) 1163 try: 1164 path = os.path.join(TEMPDIR, "file") 1165 with open(path, "wb"): 1166 pass 1167 tarinfo = tar.gettarinfo(path) 1168 self.assertEqual(tarinfo.size, 0) 1169 1170 with open(path, "wb") as fobj: 1171 fobj.write(b"aaa") 1172 tarinfo = tar.gettarinfo(path) 1173 self.assertEqual(tarinfo.size, 3) 1174 finally: 1175 tar.close() 1176 1177 def test_directory_size(self): 1178 path = os.path.join(TEMPDIR, "directory") 1179 os.mkdir(path) 1180 try: 1181 tar = tarfile.open(tmpname, self.mode) 1182 try: 1183 tarinfo = tar.gettarinfo(path) 1184 self.assertEqual(tarinfo.size, 0) 1185 finally: 1186 tar.close() 1187 finally: 1188 support.rmdir(path) 1189 1190 # mock the following: 1191 # os.listdir: so we know that files are in the wrong order 1192 def test_ordered_recursion(self): 1193 path = os.path.join(TEMPDIR, "directory") 1194 os.mkdir(path) 1195 open(os.path.join(path, "1"), "a").close() 1196 open(os.path.join(path, "2"), "a").close() 1197 try: 1198 tar = tarfile.open(tmpname, self.mode) 1199 try: 1200 with unittest.mock.patch('os.listdir') as mock_listdir: 1201 mock_listdir.return_value = ["2", "1"] 1202 tar.add(path) 1203 paths = [] 1204 for m in tar.getmembers(): 1205 paths.append(os.path.split(m.name)[-1]) 1206 self.assertEqual(paths, ["directory", "1", "2"]); 1207 finally: 1208 tar.close() 1209 finally: 1210 support.unlink(os.path.join(path, "1")) 1211 support.unlink(os.path.join(path, "2")) 1212 support.rmdir(path) 1213 1214 def test_gettarinfo_pathlike_name(self): 1215 with tarfile.open(tmpname, self.mode) as tar: 1216 path = pathlib.Path(TEMPDIR) / "file" 1217 with open(path, "wb") as fobj: 1218 fobj.write(b"aaa") 1219 tarinfo = tar.gettarinfo(path) 1220 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1221 self.assertIsInstance(tarinfo.name, str) 1222 self.assertEqual(tarinfo.name, tarinfo2.name) 1223 self.assertEqual(tarinfo.size, 3) 1224 1225 @unittest.skipUnless(hasattr(os, "link"), 1226 "Missing hardlink implementation") 1227 def test_link_size(self): 1228 link = os.path.join(TEMPDIR, "link") 1229 target = os.path.join(TEMPDIR, "link_target") 1230 with open(target, "wb") as fobj: 1231 fobj.write(b"aaa") 1232 try: 1233 os.link(target, link) 1234 except PermissionError as e: 1235 self.skipTest('os.link(): %s' % e) 1236 try: 1237 tar = tarfile.open(tmpname, self.mode) 1238 try: 1239 # Record the link target in the inodes list. 1240 tar.gettarinfo(target) 1241 tarinfo = tar.gettarinfo(link) 1242 self.assertEqual(tarinfo.size, 0) 1243 finally: 1244 tar.close() 1245 finally: 1246 support.unlink(target) 1247 support.unlink(link) 1248 1249 @support.skip_unless_symlink 1250 def test_symlink_size(self): 1251 path = os.path.join(TEMPDIR, "symlink") 1252 os.symlink("link_target", path) 1253 try: 1254 tar = tarfile.open(tmpname, self.mode) 1255 try: 1256 tarinfo = tar.gettarinfo(path) 1257 self.assertEqual(tarinfo.size, 0) 1258 finally: 1259 tar.close() 1260 finally: 1261 support.unlink(path) 1262 1263 def test_add_self(self): 1264 # Test for #1257255. 1265 dstname = os.path.abspath(tmpname) 1266 tar = tarfile.open(tmpname, self.mode) 1267 try: 1268 self.assertEqual(tar.name, dstname, 1269 "archive name must be absolute") 1270 tar.add(dstname) 1271 self.assertEqual(tar.getnames(), [], 1272 "added the archive to itself") 1273 1274 with support.change_cwd(TEMPDIR): 1275 tar.add(dstname) 1276 self.assertEqual(tar.getnames(), [], 1277 "added the archive to itself") 1278 finally: 1279 tar.close() 1280 1281 def test_filter(self): 1282 tempdir = os.path.join(TEMPDIR, "filter") 1283 os.mkdir(tempdir) 1284 try: 1285 for name in ("foo", "bar", "baz"): 1286 name = os.path.join(tempdir, name) 1287 support.create_empty_file(name) 1288 1289 def filter(tarinfo): 1290 if os.path.basename(tarinfo.name) == "bar": 1291 return 1292 tarinfo.uid = 123 1293 tarinfo.uname = "foo" 1294 return tarinfo 1295 1296 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1297 try: 1298 tar.add(tempdir, arcname="empty_dir", filter=filter) 1299 finally: 1300 tar.close() 1301 1302 # Verify that filter is a keyword-only argument 1303 with self.assertRaises(TypeError): 1304 tar.add(tempdir, "empty_dir", True, None, filter) 1305 1306 tar = tarfile.open(tmpname, "r") 1307 try: 1308 for tarinfo in tar: 1309 self.assertEqual(tarinfo.uid, 123) 1310 self.assertEqual(tarinfo.uname, "foo") 1311 self.assertEqual(len(tar.getmembers()), 3) 1312 finally: 1313 tar.close() 1314 finally: 1315 support.rmtree(tempdir) 1316 1317 # Guarantee that stored pathnames are not modified. Don't 1318 # remove ./ or ../ or double slashes. Still make absolute 1319 # pathnames relative. 1320 # For details see bug #6054. 1321 def _test_pathname(self, path, cmp_path=None, dir=False): 1322 # Create a tarfile with an empty member named path 1323 # and compare the stored name with the original. 1324 foo = os.path.join(TEMPDIR, "foo") 1325 if not dir: 1326 support.create_empty_file(foo) 1327 else: 1328 os.mkdir(foo) 1329 1330 tar = tarfile.open(tmpname, self.mode) 1331 try: 1332 tar.add(foo, arcname=path) 1333 finally: 1334 tar.close() 1335 1336 tar = tarfile.open(tmpname, "r") 1337 try: 1338 t = tar.next() 1339 finally: 1340 tar.close() 1341 1342 if not dir: 1343 support.unlink(foo) 1344 else: 1345 support.rmdir(foo) 1346 1347 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1348 1349 1350 @support.skip_unless_symlink 1351 def test_extractall_symlinks(self): 1352 # Test if extractall works properly when tarfile contains symlinks 1353 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1354 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1355 os.mkdir(tempdir) 1356 try: 1357 source_file = os.path.join(tempdir,'source') 1358 target_file = os.path.join(tempdir,'symlink') 1359 with open(source_file,'w') as f: 1360 f.write('something\n') 1361 os.symlink(source_file, target_file) 1362 with tarfile.open(temparchive, 'w') as tar: 1363 tar.add(source_file, arcname="source") 1364 tar.add(target_file, arcname="symlink") 1365 # Let's extract it to the location which contains the symlink 1366 with tarfile.open(temparchive, errorlevel=2) as tar: 1367 # this should not raise OSError: [Errno 17] File exists 1368 try: 1369 tar.extractall(path=tempdir) 1370 except OSError: 1371 self.fail("extractall failed with symlinked files") 1372 finally: 1373 support.unlink(temparchive) 1374 support.rmtree(tempdir) 1375 1376 def test_pathnames(self): 1377 self._test_pathname("foo") 1378 self._test_pathname(os.path.join("foo", ".", "bar")) 1379 self._test_pathname(os.path.join("foo", "..", "bar")) 1380 self._test_pathname(os.path.join(".", "foo")) 1381 self._test_pathname(os.path.join(".", "foo", ".")) 1382 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1383 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1384 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1385 self._test_pathname(os.path.join("..", "foo")) 1386 self._test_pathname(os.path.join("..", "foo", "..")) 1387 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1388 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1389 1390 self._test_pathname("foo" + os.sep + os.sep + "bar") 1391 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1392 1393 def test_abs_pathnames(self): 1394 if sys.platform == "win32": 1395 self._test_pathname("C:\\foo", "foo") 1396 else: 1397 self._test_pathname("/foo", "foo") 1398 self._test_pathname("///foo", "foo") 1399 1400 def test_cwd(self): 1401 # Test adding the current working directory. 1402 with support.change_cwd(TEMPDIR): 1403 tar = tarfile.open(tmpname, self.mode) 1404 try: 1405 tar.add(".") 1406 finally: 1407 tar.close() 1408 1409 tar = tarfile.open(tmpname, "r") 1410 try: 1411 for t in tar: 1412 if t.name != ".": 1413 self.assertTrue(t.name.startswith("./"), t.name) 1414 finally: 1415 tar.close() 1416 1417 def test_open_nonwritable_fileobj(self): 1418 for exctype in OSError, EOFError, RuntimeError: 1419 class BadFile(io.BytesIO): 1420 first = True 1421 def write(self, data): 1422 if self.first: 1423 self.first = False 1424 raise exctype 1425 1426 f = BadFile() 1427 with self.assertRaises(exctype): 1428 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1429 format=tarfile.PAX_FORMAT, 1430 pax_headers={'non': 'empty'}) 1431 self.assertFalse(f.closed) 1432 1433 1434class GzipWriteTest(GzipTest, WriteTest): 1435 pass 1436 1437 1438class Bz2WriteTest(Bz2Test, WriteTest): 1439 pass 1440 1441 1442class LzmaWriteTest(LzmaTest, WriteTest): 1443 pass 1444 1445 1446class StreamWriteTest(WriteTestBase, unittest.TestCase): 1447 1448 prefix = "w|" 1449 decompressor = None 1450 1451 def test_stream_padding(self): 1452 # Test for bug #1543303. 1453 tar = tarfile.open(tmpname, self.mode) 1454 tar.close() 1455 if self.decompressor: 1456 dec = self.decompressor() 1457 with open(tmpname, "rb") as fobj: 1458 data = fobj.read() 1459 data = dec.decompress(data) 1460 self.assertFalse(dec.unused_data, "found trailing data") 1461 else: 1462 with self.open(tmpname) as fobj: 1463 data = fobj.read() 1464 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1465 "incorrect zero padding") 1466 1467 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1468 "Missing umask implementation") 1469 def test_file_mode(self): 1470 # Test for issue #8464: Create files with correct 1471 # permissions. 1472 if os.path.exists(tmpname): 1473 support.unlink(tmpname) 1474 1475 original_umask = os.umask(0o022) 1476 try: 1477 tar = tarfile.open(tmpname, self.mode) 1478 tar.close() 1479 mode = os.stat(tmpname).st_mode & 0o777 1480 self.assertEqual(mode, 0o644, "wrong file permissions") 1481 finally: 1482 os.umask(original_umask) 1483 1484 1485class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1486 def test_source_directory_not_leaked(self): 1487 """ 1488 Ensure the source directory is not included in the tar header 1489 per bpo-41316. 1490 """ 1491 tarfile.open(tmpname, self.mode).close() 1492 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1493 assert os.path.dirname(tmpname) not in payload 1494 1495 1496class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1497 decompressor = bz2.BZ2Decompressor if bz2 else None 1498 1499class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1500 decompressor = lzma.LZMADecompressor if lzma else None 1501 1502 1503class GNUWriteTest(unittest.TestCase): 1504 # This testcase checks for correct creation of GNU Longname 1505 # and Longlink extended headers (cp. bug #812325). 1506 1507 def _length(self, s): 1508 blocks = len(s) // 512 + 1 1509 return blocks * 512 1510 1511 def _calc_size(self, name, link=None): 1512 # Initial tar header 1513 count = 512 1514 1515 if len(name) > tarfile.LENGTH_NAME: 1516 # GNU longname extended header + longname 1517 count += 512 1518 count += self._length(name) 1519 if link is not None and len(link) > tarfile.LENGTH_LINK: 1520 # GNU longlink extended header + longlink 1521 count += 512 1522 count += self._length(link) 1523 return count 1524 1525 def _test(self, name, link=None): 1526 tarinfo = tarfile.TarInfo(name) 1527 if link: 1528 tarinfo.linkname = link 1529 tarinfo.type = tarfile.LNKTYPE 1530 1531 tar = tarfile.open(tmpname, "w") 1532 try: 1533 tar.format = tarfile.GNU_FORMAT 1534 tar.addfile(tarinfo) 1535 1536 v1 = self._calc_size(name, link) 1537 v2 = tar.offset 1538 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1539 finally: 1540 tar.close() 1541 1542 tar = tarfile.open(tmpname) 1543 try: 1544 member = tar.next() 1545 self.assertIsNotNone(member, 1546 "unable to read longname member") 1547 self.assertEqual(tarinfo.name, member.name, 1548 "unable to read longname member") 1549 self.assertEqual(tarinfo.linkname, member.linkname, 1550 "unable to read longname member") 1551 finally: 1552 tar.close() 1553 1554 def test_longname_1023(self): 1555 self._test(("longnam/" * 127) + "longnam") 1556 1557 def test_longname_1024(self): 1558 self._test(("longnam/" * 127) + "longname") 1559 1560 def test_longname_1025(self): 1561 self._test(("longnam/" * 127) + "longname_") 1562 1563 def test_longlink_1023(self): 1564 self._test("name", ("longlnk/" * 127) + "longlnk") 1565 1566 def test_longlink_1024(self): 1567 self._test("name", ("longlnk/" * 127) + "longlink") 1568 1569 def test_longlink_1025(self): 1570 self._test("name", ("longlnk/" * 127) + "longlink_") 1571 1572 def test_longnamelink_1023(self): 1573 self._test(("longnam/" * 127) + "longnam", 1574 ("longlnk/" * 127) + "longlnk") 1575 1576 def test_longnamelink_1024(self): 1577 self._test(("longnam/" * 127) + "longname", 1578 ("longlnk/" * 127) + "longlink") 1579 1580 def test_longnamelink_1025(self): 1581 self._test(("longnam/" * 127) + "longname_", 1582 ("longlnk/" * 127) + "longlink_") 1583 1584 1585class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1586 1587 prefix = "w:" 1588 1589 def test_headers_written_only_for_device_files(self): 1590 # Regression test for bpo-18819. 1591 tempdir = os.path.join(TEMPDIR, "device_header_test") 1592 os.mkdir(tempdir) 1593 try: 1594 tar = tarfile.open(tmpname, self.mode) 1595 try: 1596 input_blk = tarfile.TarInfo(name="my_block_device") 1597 input_reg = tarfile.TarInfo(name="my_regular_file") 1598 input_blk.type = tarfile.BLKTYPE 1599 input_reg.type = tarfile.REGTYPE 1600 tar.addfile(input_blk) 1601 tar.addfile(input_reg) 1602 finally: 1603 tar.close() 1604 1605 # devmajor and devminor should be *interpreted* as 0 in both... 1606 tar = tarfile.open(tmpname, "r") 1607 try: 1608 output_blk = tar.getmember("my_block_device") 1609 output_reg = tar.getmember("my_regular_file") 1610 finally: 1611 tar.close() 1612 self.assertEqual(output_blk.devmajor, 0) 1613 self.assertEqual(output_blk.devminor, 0) 1614 self.assertEqual(output_reg.devmajor, 0) 1615 self.assertEqual(output_reg.devminor, 0) 1616 1617 # ...but the fields should not actually be set on regular files: 1618 with open(tmpname, "rb") as infile: 1619 buf = infile.read() 1620 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1621 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1622 # See `struct posixheader` in GNU docs for byte offsets: 1623 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1624 device_headers = slice(329, 329 + 16) 1625 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1626 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1627 finally: 1628 support.rmtree(tempdir) 1629 1630 1631class CreateTest(WriteTestBase, unittest.TestCase): 1632 1633 prefix = "x:" 1634 1635 file_path = os.path.join(TEMPDIR, "spameggs42") 1636 1637 def setUp(self): 1638 support.unlink(tmpname) 1639 1640 @classmethod 1641 def setUpClass(cls): 1642 with open(cls.file_path, "wb") as fobj: 1643 fobj.write(b"aaa") 1644 1645 @classmethod 1646 def tearDownClass(cls): 1647 support.unlink(cls.file_path) 1648 1649 def test_create(self): 1650 with tarfile.open(tmpname, self.mode) as tobj: 1651 tobj.add(self.file_path) 1652 1653 with self.taropen(tmpname) as tobj: 1654 names = tobj.getnames() 1655 self.assertEqual(len(names), 1) 1656 self.assertIn('spameggs42', names[0]) 1657 1658 def test_create_existing(self): 1659 with tarfile.open(tmpname, self.mode) as tobj: 1660 tobj.add(self.file_path) 1661 1662 with self.assertRaises(FileExistsError): 1663 tobj = tarfile.open(tmpname, self.mode) 1664 1665 with self.taropen(tmpname) as tobj: 1666 names = tobj.getnames() 1667 self.assertEqual(len(names), 1) 1668 self.assertIn('spameggs42', names[0]) 1669 1670 def test_create_taropen(self): 1671 with self.taropen(tmpname, "x") as tobj: 1672 tobj.add(self.file_path) 1673 1674 with self.taropen(tmpname) as tobj: 1675 names = tobj.getnames() 1676 self.assertEqual(len(names), 1) 1677 self.assertIn('spameggs42', names[0]) 1678 1679 def test_create_existing_taropen(self): 1680 with self.taropen(tmpname, "x") as tobj: 1681 tobj.add(self.file_path) 1682 1683 with self.assertRaises(FileExistsError): 1684 with self.taropen(tmpname, "x"): 1685 pass 1686 1687 with self.taropen(tmpname) as tobj: 1688 names = tobj.getnames() 1689 self.assertEqual(len(names), 1) 1690 self.assertIn("spameggs42", names[0]) 1691 1692 def test_create_pathlike_name(self): 1693 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1694 self.assertIsInstance(tobj.name, str) 1695 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1696 tobj.add(pathlib.Path(self.file_path)) 1697 names = tobj.getnames() 1698 self.assertEqual(len(names), 1) 1699 self.assertIn('spameggs42', names[0]) 1700 1701 with self.taropen(tmpname) as tobj: 1702 names = tobj.getnames() 1703 self.assertEqual(len(names), 1) 1704 self.assertIn('spameggs42', names[0]) 1705 1706 def test_create_taropen_pathlike_name(self): 1707 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1708 self.assertIsInstance(tobj.name, str) 1709 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1710 tobj.add(pathlib.Path(self.file_path)) 1711 names = tobj.getnames() 1712 self.assertEqual(len(names), 1) 1713 self.assertIn('spameggs42', names[0]) 1714 1715 with self.taropen(tmpname) as tobj: 1716 names = tobj.getnames() 1717 self.assertEqual(len(names), 1) 1718 self.assertIn('spameggs42', names[0]) 1719 1720 1721class GzipCreateTest(GzipTest, CreateTest): 1722 1723 def test_create_with_compresslevel(self): 1724 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1725 tobj.add(self.file_path) 1726 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 1727 pass 1728 1729 1730class Bz2CreateTest(Bz2Test, CreateTest): 1731 1732 def test_create_with_compresslevel(self): 1733 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1734 tobj.add(self.file_path) 1735 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 1736 pass 1737 1738 1739class LzmaCreateTest(LzmaTest, CreateTest): 1740 1741 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 1742 # It does not allow for preset to be specified when reading. 1743 def test_create_with_preset(self): 1744 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 1745 tobj.add(self.file_path) 1746 1747 1748class CreateWithXModeTest(CreateTest): 1749 1750 prefix = "x" 1751 1752 test_create_taropen = None 1753 test_create_existing_taropen = None 1754 1755 1756@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1757class HardlinkTest(unittest.TestCase): 1758 # Test the creation of LNKTYPE (hardlink) members in an archive. 1759 1760 def setUp(self): 1761 self.foo = os.path.join(TEMPDIR, "foo") 1762 self.bar = os.path.join(TEMPDIR, "bar") 1763 1764 with open(self.foo, "wb") as fobj: 1765 fobj.write(b"foo") 1766 1767 try: 1768 os.link(self.foo, self.bar) 1769 except PermissionError as e: 1770 self.skipTest('os.link(): %s' % e) 1771 1772 self.tar = tarfile.open(tmpname, "w") 1773 self.tar.add(self.foo) 1774 1775 def tearDown(self): 1776 self.tar.close() 1777 support.unlink(self.foo) 1778 support.unlink(self.bar) 1779 1780 def test_add_twice(self): 1781 # The same name will be added as a REGTYPE every 1782 # time regardless of st_nlink. 1783 tarinfo = self.tar.gettarinfo(self.foo) 1784 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1785 "add file as regular failed") 1786 1787 def test_add_hardlink(self): 1788 tarinfo = self.tar.gettarinfo(self.bar) 1789 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1790 "add file as hardlink failed") 1791 1792 def test_dereference_hardlink(self): 1793 self.tar.dereference = True 1794 tarinfo = self.tar.gettarinfo(self.bar) 1795 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1796 "dereferencing hardlink failed") 1797 1798 1799class PaxWriteTest(GNUWriteTest): 1800 1801 def _test(self, name, link=None): 1802 # See GNUWriteTest. 1803 tarinfo = tarfile.TarInfo(name) 1804 if link: 1805 tarinfo.linkname = link 1806 tarinfo.type = tarfile.LNKTYPE 1807 1808 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1809 try: 1810 tar.addfile(tarinfo) 1811 finally: 1812 tar.close() 1813 1814 tar = tarfile.open(tmpname) 1815 try: 1816 if link: 1817 l = tar.getmembers()[0].linkname 1818 self.assertEqual(link, l, "PAX longlink creation failed") 1819 else: 1820 n = tar.getmembers()[0].name 1821 self.assertEqual(name, n, "PAX longname creation failed") 1822 finally: 1823 tar.close() 1824 1825 def test_pax_global_header(self): 1826 pax_headers = { 1827 "foo": "bar", 1828 "uid": "0", 1829 "mtime": "1.23", 1830 "test": "\xe4\xf6\xfc", 1831 "\xe4\xf6\xfc": "test"} 1832 1833 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1834 pax_headers=pax_headers) 1835 try: 1836 tar.addfile(tarfile.TarInfo("test")) 1837 finally: 1838 tar.close() 1839 1840 # Test if the global header was written correctly. 1841 tar = tarfile.open(tmpname, encoding="iso8859-1") 1842 try: 1843 self.assertEqual(tar.pax_headers, pax_headers) 1844 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1845 # Test if all the fields are strings. 1846 for key, val in tar.pax_headers.items(): 1847 self.assertIsNot(type(key), bytes) 1848 self.assertIsNot(type(val), bytes) 1849 if key in tarfile.PAX_NUMBER_FIELDS: 1850 try: 1851 tarfile.PAX_NUMBER_FIELDS[key](val) 1852 except (TypeError, ValueError): 1853 self.fail("unable to convert pax header field") 1854 finally: 1855 tar.close() 1856 1857 def test_pax_extended_header(self): 1858 # The fields from the pax header have priority over the 1859 # TarInfo. 1860 pax_headers = {"path": "foo", "uid": "123"} 1861 1862 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1863 encoding="iso8859-1") 1864 try: 1865 t = tarfile.TarInfo() 1866 t.name = "\xe4\xf6\xfc" # non-ASCII 1867 t.uid = 8**8 # too large 1868 t.pax_headers = pax_headers 1869 tar.addfile(t) 1870 finally: 1871 tar.close() 1872 1873 tar = tarfile.open(tmpname, encoding="iso8859-1") 1874 try: 1875 t = tar.getmembers()[0] 1876 self.assertEqual(t.pax_headers, pax_headers) 1877 self.assertEqual(t.name, "foo") 1878 self.assertEqual(t.uid, 123) 1879 finally: 1880 tar.close() 1881 1882 1883class UnicodeTest: 1884 1885 def test_iso8859_1_filename(self): 1886 self._test_unicode_filename("iso8859-1") 1887 1888 def test_utf7_filename(self): 1889 self._test_unicode_filename("utf7") 1890 1891 def test_utf8_filename(self): 1892 self._test_unicode_filename("utf-8") 1893 1894 def _test_unicode_filename(self, encoding): 1895 tar = tarfile.open(tmpname, "w", format=self.format, 1896 encoding=encoding, errors="strict") 1897 try: 1898 name = "\xe4\xf6\xfc" 1899 tar.addfile(tarfile.TarInfo(name)) 1900 finally: 1901 tar.close() 1902 1903 tar = tarfile.open(tmpname, encoding=encoding) 1904 try: 1905 self.assertEqual(tar.getmembers()[0].name, name) 1906 finally: 1907 tar.close() 1908 1909 def test_unicode_filename_error(self): 1910 tar = tarfile.open(tmpname, "w", format=self.format, 1911 encoding="ascii", errors="strict") 1912 try: 1913 tarinfo = tarfile.TarInfo() 1914 1915 tarinfo.name = "\xe4\xf6\xfc" 1916 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1917 1918 tarinfo.name = "foo" 1919 tarinfo.uname = "\xe4\xf6\xfc" 1920 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1921 finally: 1922 tar.close() 1923 1924 def test_unicode_argument(self): 1925 tar = tarfile.open(tarname, "r", 1926 encoding="iso8859-1", errors="strict") 1927 try: 1928 for t in tar: 1929 self.assertIs(type(t.name), str) 1930 self.assertIs(type(t.linkname), str) 1931 self.assertIs(type(t.uname), str) 1932 self.assertIs(type(t.gname), str) 1933 finally: 1934 tar.close() 1935 1936 def test_uname_unicode(self): 1937 t = tarfile.TarInfo("foo") 1938 t.uname = "\xe4\xf6\xfc" 1939 t.gname = "\xe4\xf6\xfc" 1940 1941 tar = tarfile.open(tmpname, mode="w", format=self.format, 1942 encoding="iso8859-1") 1943 try: 1944 tar.addfile(t) 1945 finally: 1946 tar.close() 1947 1948 tar = tarfile.open(tmpname, encoding="iso8859-1") 1949 try: 1950 t = tar.getmember("foo") 1951 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1952 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1953 1954 if self.format != tarfile.PAX_FORMAT: 1955 tar.close() 1956 tar = tarfile.open(tmpname, encoding="ascii") 1957 t = tar.getmember("foo") 1958 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1959 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1960 finally: 1961 tar.close() 1962 1963 1964class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1965 1966 format = tarfile.USTAR_FORMAT 1967 1968 # Test whether the utf-8 encoded version of a filename exceeds the 100 1969 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1970 # bytes). 1971 def test_unicode_name1(self): 1972 self._test_ustar_name("0123456789" * 10) 1973 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1974 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1975 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1976 1977 def test_unicode_name2(self): 1978 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1979 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1980 1981 # Test whether the utf-8 encoded version of a filename exceeds the 155 1982 # bytes prefix + '/' + 100 bytes name limit. 1983 def test_unicode_longname1(self): 1984 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1985 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1986 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1987 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1988 1989 def test_unicode_longname2(self): 1990 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1991 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1992 1993 def test_unicode_longname3(self): 1994 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1995 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1996 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1997 1998 def test_unicode_longname4(self): 1999 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2000 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2001 2002 def _test_ustar_name(self, name, exc=None): 2003 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2004 t = tarfile.TarInfo(name) 2005 if exc is None: 2006 tar.addfile(t) 2007 else: 2008 self.assertRaises(exc, tar.addfile, t) 2009 2010 if exc is None: 2011 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2012 for t in tar: 2013 self.assertEqual(name, t.name) 2014 break 2015 2016 # Test the same as above for the 100 bytes link field. 2017 def test_unicode_link1(self): 2018 self._test_ustar_link("0123456789" * 10) 2019 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2020 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2021 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2022 2023 def test_unicode_link2(self): 2024 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2025 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2026 2027 def _test_ustar_link(self, name, exc=None): 2028 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2029 t = tarfile.TarInfo("foo") 2030 t.linkname = name 2031 if exc is None: 2032 tar.addfile(t) 2033 else: 2034 self.assertRaises(exc, tar.addfile, t) 2035 2036 if exc is None: 2037 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2038 for t in tar: 2039 self.assertEqual(name, t.linkname) 2040 break 2041 2042 2043class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2044 2045 format = tarfile.GNU_FORMAT 2046 2047 def test_bad_pax_header(self): 2048 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2049 # without a hdrcharset=BINARY header. 2050 for encoding, name in ( 2051 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2052 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2053 with tarfile.open(tarname, encoding=encoding, 2054 errors="surrogateescape") as tar: 2055 try: 2056 t = tar.getmember(name) 2057 except KeyError: 2058 self.fail("unable to read bad GNU tar pax header") 2059 2060 2061class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2062 2063 format = tarfile.PAX_FORMAT 2064 2065 # PAX_FORMAT ignores encoding in write mode. 2066 test_unicode_filename_error = None 2067 2068 def test_binary_header(self): 2069 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2070 for encoding, name in ( 2071 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2072 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2073 with tarfile.open(tarname, encoding=encoding, 2074 errors="surrogateescape") as tar: 2075 try: 2076 t = tar.getmember(name) 2077 except KeyError: 2078 self.fail("unable to read POSIX.1-2008 binary header") 2079 2080 2081class AppendTestBase: 2082 # Test append mode (cp. patch #1652681). 2083 2084 def setUp(self): 2085 self.tarname = tmpname 2086 if os.path.exists(self.tarname): 2087 support.unlink(self.tarname) 2088 2089 def _create_testtar(self, mode="w:"): 2090 with tarfile.open(tarname, encoding="iso8859-1") as src: 2091 t = src.getmember("ustar/regtype") 2092 t.name = "foo" 2093 with src.extractfile(t) as f: 2094 with tarfile.open(self.tarname, mode) as tar: 2095 tar.addfile(t, f) 2096 2097 def test_append_compressed(self): 2098 self._create_testtar("w:" + self.suffix) 2099 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2100 2101class AppendTest(AppendTestBase, unittest.TestCase): 2102 test_append_compressed = None 2103 2104 def _add_testfile(self, fileobj=None): 2105 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2106 tar.addfile(tarfile.TarInfo("bar")) 2107 2108 def _test(self, names=["bar"], fileobj=None): 2109 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2110 self.assertEqual(tar.getnames(), names) 2111 2112 def test_non_existing(self): 2113 self._add_testfile() 2114 self._test() 2115 2116 def test_empty(self): 2117 tarfile.open(self.tarname, "w:").close() 2118 self._add_testfile() 2119 self._test() 2120 2121 def test_empty_fileobj(self): 2122 fobj = io.BytesIO(b"\0" * 1024) 2123 self._add_testfile(fobj) 2124 fobj.seek(0) 2125 self._test(fileobj=fobj) 2126 2127 def test_fileobj(self): 2128 self._create_testtar() 2129 with open(self.tarname, "rb") as fobj: 2130 data = fobj.read() 2131 fobj = io.BytesIO(data) 2132 self._add_testfile(fobj) 2133 fobj.seek(0) 2134 self._test(names=["foo", "bar"], fileobj=fobj) 2135 2136 def test_existing(self): 2137 self._create_testtar() 2138 self._add_testfile() 2139 self._test(names=["foo", "bar"]) 2140 2141 # Append mode is supposed to fail if the tarfile to append to 2142 # does not end with a zero block. 2143 def _test_error(self, data): 2144 with open(self.tarname, "wb") as fobj: 2145 fobj.write(data) 2146 self.assertRaises(tarfile.ReadError, self._add_testfile) 2147 2148 def test_null(self): 2149 self._test_error(b"") 2150 2151 def test_incomplete(self): 2152 self._test_error(b"\0" * 13) 2153 2154 def test_premature_eof(self): 2155 data = tarfile.TarInfo("foo").tobuf() 2156 self._test_error(data) 2157 2158 def test_trailing_garbage(self): 2159 data = tarfile.TarInfo("foo").tobuf() 2160 self._test_error(data + b"\0" * 13) 2161 2162 def test_invalid(self): 2163 self._test_error(b"a" * 512) 2164 2165class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2166 pass 2167 2168class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2169 pass 2170 2171class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2172 pass 2173 2174 2175class LimitsTest(unittest.TestCase): 2176 2177 def test_ustar_limits(self): 2178 # 100 char name 2179 tarinfo = tarfile.TarInfo("0123456789" * 10) 2180 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2181 2182 # 101 char name that cannot be stored 2183 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2184 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2185 2186 # 256 char name with a slash at pos 156 2187 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2188 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2189 2190 # 256 char name that cannot be stored 2191 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2192 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2193 2194 # 512 char name 2195 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2196 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2197 2198 # 512 char linkname 2199 tarinfo = tarfile.TarInfo("longlink") 2200 tarinfo.linkname = "123/" * 126 + "longname" 2201 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2202 2203 # uid > 8 digits 2204 tarinfo = tarfile.TarInfo("name") 2205 tarinfo.uid = 0o10000000 2206 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2207 2208 def test_gnu_limits(self): 2209 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2210 tarinfo.tobuf(tarfile.GNU_FORMAT) 2211 2212 tarinfo = tarfile.TarInfo("longlink") 2213 tarinfo.linkname = "123/" * 126 + "longname" 2214 tarinfo.tobuf(tarfile.GNU_FORMAT) 2215 2216 # uid >= 256 ** 7 2217 tarinfo = tarfile.TarInfo("name") 2218 tarinfo.uid = 0o4000000000000000000 2219 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2220 2221 def test_pax_limits(self): 2222 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2223 tarinfo.tobuf(tarfile.PAX_FORMAT) 2224 2225 tarinfo = tarfile.TarInfo("longlink") 2226 tarinfo.linkname = "123/" * 126 + "longname" 2227 tarinfo.tobuf(tarfile.PAX_FORMAT) 2228 2229 tarinfo = tarfile.TarInfo("name") 2230 tarinfo.uid = 0o4000000000000000000 2231 tarinfo.tobuf(tarfile.PAX_FORMAT) 2232 2233 2234class MiscTest(unittest.TestCase): 2235 2236 def test_char_fields(self): 2237 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2238 b"foo\0\0\0\0\0") 2239 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2240 b"foo") 2241 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2242 "foo") 2243 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2244 "foo") 2245 2246 def test_read_number_fields(self): 2247 # Issue 13158: Test if GNU tar specific base-256 number fields 2248 # are decoded correctly. 2249 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2250 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2251 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2252 0o10000000) 2253 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2254 0xffffffff) 2255 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2256 -1) 2257 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2258 -100) 2259 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2260 -0x100000000000000) 2261 2262 # Issue 24514: Test if empty number fields are converted to zero. 2263 self.assertEqual(tarfile.nti(b"\0"), 0) 2264 self.assertEqual(tarfile.nti(b" \0"), 0) 2265 2266 def test_write_number_fields(self): 2267 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2268 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2269 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2270 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2271 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2272 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2273 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2274 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2275 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2276 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2277 self.assertEqual(tarfile.itn(-0x100000000000000, 2278 format=tarfile.GNU_FORMAT), 2279 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2280 2281 # Issue 32713: Test if itn() supports float values outside the 2282 # non-GNU format range 2283 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2284 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2285 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2286 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2287 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2288 2289 def test_number_field_limits(self): 2290 with self.assertRaises(ValueError): 2291 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2292 with self.assertRaises(ValueError): 2293 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2294 with self.assertRaises(ValueError): 2295 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2296 with self.assertRaises(ValueError): 2297 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2298 2299 def test__all__(self): 2300 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2301 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2302 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2303 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2304 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2305 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2306 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2307 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2308 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2309 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2310 'filemode', 2311 'EmptyHeaderError', 'TruncatedHeaderError', 2312 'EOFHeaderError', 'InvalidHeaderError', 2313 'SubsequentHeaderError', 'ExFileObject', 2314 'main'} 2315 support.check__all__(self, tarfile, blacklist=blacklist) 2316 2317 2318class CommandLineTest(unittest.TestCase): 2319 2320 def tarfilecmd(self, *args, **kwargs): 2321 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2322 **kwargs) 2323 return out.replace(os.linesep.encode(), b'\n') 2324 2325 def tarfilecmd_failure(self, *args): 2326 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2327 2328 def make_simple_tarfile(self, tar_name): 2329 files = [support.findfile('tokenize_tests.txt'), 2330 support.findfile('tokenize_tests-no-coding-cookie-' 2331 'and-utf8-bom-sig-only.txt')] 2332 self.addCleanup(support.unlink, tar_name) 2333 with tarfile.open(tar_name, 'w') as tf: 2334 for tardata in files: 2335 tf.add(tardata, arcname=os.path.basename(tardata)) 2336 2337 def test_bad_use(self): 2338 rc, out, err = self.tarfilecmd_failure() 2339 self.assertEqual(out, b'') 2340 self.assertIn(b'usage', err.lower()) 2341 self.assertIn(b'error', err.lower()) 2342 self.assertIn(b'required', err.lower()) 2343 rc, out, err = self.tarfilecmd_failure('-l', '') 2344 self.assertEqual(out, b'') 2345 self.assertNotEqual(err.strip(), b'') 2346 2347 def test_test_command(self): 2348 for tar_name in testtarnames: 2349 for opt in '-t', '--test': 2350 out = self.tarfilecmd(opt, tar_name) 2351 self.assertEqual(out, b'') 2352 2353 def test_test_command_verbose(self): 2354 for tar_name in testtarnames: 2355 for opt in '-v', '--verbose': 2356 out = self.tarfilecmd(opt, '-t', tar_name, 2357 PYTHONIOENCODING='utf-8') 2358 self.assertIn(b'is a tar archive.\n', out) 2359 2360 def test_test_command_invalid_file(self): 2361 zipname = support.findfile('zipdir.zip') 2362 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2363 self.assertIn(b' is not a tar archive.', err) 2364 self.assertEqual(out, b'') 2365 self.assertEqual(rc, 1) 2366 2367 for tar_name in testtarnames: 2368 with self.subTest(tar_name=tar_name): 2369 with open(tar_name, 'rb') as f: 2370 data = f.read() 2371 try: 2372 with open(tmpname, 'wb') as f: 2373 f.write(data[:511]) 2374 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2375 self.assertEqual(out, b'') 2376 self.assertEqual(rc, 1) 2377 finally: 2378 support.unlink(tmpname) 2379 2380 def test_list_command(self): 2381 for tar_name in testtarnames: 2382 with support.captured_stdout() as t: 2383 with tarfile.open(tar_name, 'r') as tf: 2384 tf.list(verbose=False) 2385 expected = t.getvalue().encode('ascii', 'backslashreplace') 2386 for opt in '-l', '--list': 2387 out = self.tarfilecmd(opt, tar_name, 2388 PYTHONIOENCODING='ascii') 2389 self.assertEqual(out, expected) 2390 2391 def test_list_command_verbose(self): 2392 for tar_name in testtarnames: 2393 with support.captured_stdout() as t: 2394 with tarfile.open(tar_name, 'r') as tf: 2395 tf.list(verbose=True) 2396 expected = t.getvalue().encode('ascii', 'backslashreplace') 2397 for opt in '-v', '--verbose': 2398 out = self.tarfilecmd(opt, '-l', tar_name, 2399 PYTHONIOENCODING='ascii') 2400 self.assertEqual(out, expected) 2401 2402 def test_list_command_invalid_file(self): 2403 zipname = support.findfile('zipdir.zip') 2404 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2405 self.assertIn(b' is not a tar archive.', err) 2406 self.assertEqual(out, b'') 2407 self.assertEqual(rc, 1) 2408 2409 def test_create_command(self): 2410 files = [support.findfile('tokenize_tests.txt'), 2411 support.findfile('tokenize_tests-no-coding-cookie-' 2412 'and-utf8-bom-sig-only.txt')] 2413 for opt in '-c', '--create': 2414 try: 2415 out = self.tarfilecmd(opt, tmpname, *files) 2416 self.assertEqual(out, b'') 2417 with tarfile.open(tmpname) as tar: 2418 tar.getmembers() 2419 finally: 2420 support.unlink(tmpname) 2421 2422 def test_create_command_verbose(self): 2423 files = [support.findfile('tokenize_tests.txt'), 2424 support.findfile('tokenize_tests-no-coding-cookie-' 2425 'and-utf8-bom-sig-only.txt')] 2426 for opt in '-v', '--verbose': 2427 try: 2428 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2429 PYTHONIOENCODING='utf-8') 2430 self.assertIn(b' file created.', out) 2431 with tarfile.open(tmpname) as tar: 2432 tar.getmembers() 2433 finally: 2434 support.unlink(tmpname) 2435 2436 def test_create_command_dotless_filename(self): 2437 files = [support.findfile('tokenize_tests.txt')] 2438 try: 2439 out = self.tarfilecmd('-c', dotlessname, *files) 2440 self.assertEqual(out, b'') 2441 with tarfile.open(dotlessname) as tar: 2442 tar.getmembers() 2443 finally: 2444 support.unlink(dotlessname) 2445 2446 def test_create_command_dot_started_filename(self): 2447 tar_name = os.path.join(TEMPDIR, ".testtar") 2448 files = [support.findfile('tokenize_tests.txt')] 2449 try: 2450 out = self.tarfilecmd('-c', tar_name, *files) 2451 self.assertEqual(out, b'') 2452 with tarfile.open(tar_name) as tar: 2453 tar.getmembers() 2454 finally: 2455 support.unlink(tar_name) 2456 2457 def test_create_command_compressed(self): 2458 files = [support.findfile('tokenize_tests.txt'), 2459 support.findfile('tokenize_tests-no-coding-cookie-' 2460 'and-utf8-bom-sig-only.txt')] 2461 for filetype in (GzipTest, Bz2Test, LzmaTest): 2462 if not filetype.open: 2463 continue 2464 try: 2465 tar_name = tmpname + '.' + filetype.suffix 2466 out = self.tarfilecmd('-c', tar_name, *files) 2467 with filetype.taropen(tar_name) as tar: 2468 tar.getmembers() 2469 finally: 2470 support.unlink(tar_name) 2471 2472 def test_extract_command(self): 2473 self.make_simple_tarfile(tmpname) 2474 for opt in '-e', '--extract': 2475 try: 2476 with support.temp_cwd(tarextdir): 2477 out = self.tarfilecmd(opt, tmpname) 2478 self.assertEqual(out, b'') 2479 finally: 2480 support.rmtree(tarextdir) 2481 2482 def test_extract_command_verbose(self): 2483 self.make_simple_tarfile(tmpname) 2484 for opt in '-v', '--verbose': 2485 try: 2486 with support.temp_cwd(tarextdir): 2487 out = self.tarfilecmd(opt, '-e', tmpname, 2488 PYTHONIOENCODING='utf-8') 2489 self.assertIn(b' file is extracted.', out) 2490 finally: 2491 support.rmtree(tarextdir) 2492 2493 def test_extract_command_different_directory(self): 2494 self.make_simple_tarfile(tmpname) 2495 try: 2496 with support.temp_cwd(tarextdir): 2497 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2498 self.assertEqual(out, b'') 2499 finally: 2500 support.rmtree(tarextdir) 2501 2502 def test_extract_command_invalid_file(self): 2503 zipname = support.findfile('zipdir.zip') 2504 with support.temp_cwd(tarextdir): 2505 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2506 self.assertIn(b' is not a tar archive.', err) 2507 self.assertEqual(out, b'') 2508 self.assertEqual(rc, 1) 2509 2510 2511class ContextManagerTest(unittest.TestCase): 2512 2513 def test_basic(self): 2514 with tarfile.open(tarname) as tar: 2515 self.assertFalse(tar.closed, "closed inside runtime context") 2516 self.assertTrue(tar.closed, "context manager failed") 2517 2518 def test_closed(self): 2519 # The __enter__() method is supposed to raise OSError 2520 # if the TarFile object is already closed. 2521 tar = tarfile.open(tarname) 2522 tar.close() 2523 with self.assertRaises(OSError): 2524 with tar: 2525 pass 2526 2527 def test_exception(self): 2528 # Test if the OSError exception is passed through properly. 2529 with self.assertRaises(Exception) as exc: 2530 with tarfile.open(tarname) as tar: 2531 raise OSError 2532 self.assertIsInstance(exc.exception, OSError, 2533 "wrong exception raised in context manager") 2534 self.assertTrue(tar.closed, "context manager failed") 2535 2536 def test_no_eof(self): 2537 # __exit__() must not write end-of-archive blocks if an 2538 # exception was raised. 2539 try: 2540 with tarfile.open(tmpname, "w") as tar: 2541 raise Exception 2542 except: 2543 pass 2544 self.assertEqual(os.path.getsize(tmpname), 0, 2545 "context manager wrote an end-of-archive block") 2546 self.assertTrue(tar.closed, "context manager failed") 2547 2548 def test_eof(self): 2549 # __exit__() must write end-of-archive blocks, i.e. call 2550 # TarFile.close() if there was no error. 2551 with tarfile.open(tmpname, "w"): 2552 pass 2553 self.assertNotEqual(os.path.getsize(tmpname), 0, 2554 "context manager wrote no end-of-archive block") 2555 2556 def test_fileobj(self): 2557 # Test that __exit__() did not close the external file 2558 # object. 2559 with open(tmpname, "wb") as fobj: 2560 try: 2561 with tarfile.open(fileobj=fobj, mode="w") as tar: 2562 raise Exception 2563 except: 2564 pass 2565 self.assertFalse(fobj.closed, "external file object was closed") 2566 self.assertTrue(tar.closed, "context manager failed") 2567 2568 2569@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2570class LinkEmulationTest(ReadTest, unittest.TestCase): 2571 2572 # Test for issue #8741 regression. On platforms that do not support 2573 # symbolic or hard links tarfile tries to extract these types of members 2574 # as the regular files they point to. 2575 def _test_link_extraction(self, name): 2576 self.tar.extract(name, TEMPDIR) 2577 with open(os.path.join(TEMPDIR, name), "rb") as f: 2578 data = f.read() 2579 self.assertEqual(sha256sum(data), sha256_regtype) 2580 2581 # See issues #1578269, #8879, and #17689 for some history on these skips 2582 @unittest.skipIf(hasattr(os.path, "islink"), 2583 "Skip emulation - has os.path.islink but not os.link") 2584 def test_hardlink_extraction1(self): 2585 self._test_link_extraction("ustar/lnktype") 2586 2587 @unittest.skipIf(hasattr(os.path, "islink"), 2588 "Skip emulation - has os.path.islink but not os.link") 2589 def test_hardlink_extraction2(self): 2590 self._test_link_extraction("./ustar/linktest2/lnktype") 2591 2592 @unittest.skipIf(hasattr(os, "symlink"), 2593 "Skip emulation if symlink exists") 2594 def test_symlink_extraction1(self): 2595 self._test_link_extraction("ustar/symtype") 2596 2597 @unittest.skipIf(hasattr(os, "symlink"), 2598 "Skip emulation if symlink exists") 2599 def test_symlink_extraction2(self): 2600 self._test_link_extraction("./ustar/linktest2/symtype") 2601 2602 2603class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2604 # Issue5068: The _BZ2Proxy.read() method loops forever 2605 # on an empty or partial bzipped file. 2606 2607 def _test_partial_input(self, mode): 2608 class MyBytesIO(io.BytesIO): 2609 hit_eof = False 2610 def read(self, n): 2611 if self.hit_eof: 2612 raise AssertionError("infinite loop detected in " 2613 "tarfile.open()") 2614 self.hit_eof = self.tell() == len(self.getvalue()) 2615 return super(MyBytesIO, self).read(n) 2616 def seek(self, *args): 2617 self.hit_eof = False 2618 return super(MyBytesIO, self).seek(*args) 2619 2620 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2621 for x in range(len(data) + 1): 2622 try: 2623 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2624 except tarfile.ReadError: 2625 pass # we have no interest in ReadErrors 2626 2627 def test_partial_input(self): 2628 self._test_partial_input("r") 2629 2630 def test_partial_input_bz2(self): 2631 self._test_partial_input("r:bz2") 2632 2633 2634def root_is_uid_gid_0(): 2635 try: 2636 import pwd, grp 2637 except ImportError: 2638 return False 2639 if pwd.getpwuid(0)[0] != 'root': 2640 return False 2641 if grp.getgrgid(0)[0] != 'root': 2642 return False 2643 return True 2644 2645 2646@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2647@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2648class NumericOwnerTest(unittest.TestCase): 2649 # mock the following: 2650 # os.chown: so we can test what's being called 2651 # os.chmod: so the modes are not actually changed. if they are, we can't 2652 # delete the files/directories 2653 # os.geteuid: so we can lie and say we're root (uid = 0) 2654 2655 @staticmethod 2656 def _make_test_archive(filename_1, dirname_1, filename_2): 2657 # the file contents to write 2658 fobj = io.BytesIO(b"content") 2659 2660 # create a tar file with a file, a directory, and a file within that 2661 # directory. Assign various .uid/.gid values to them 2662 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2663 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2664 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2665 ] 2666 with tarfile.open(tmpname, 'w') as tarfl: 2667 for name, uid, gid, typ, contents in items: 2668 t = tarfile.TarInfo(name) 2669 t.uid = uid 2670 t.gid = gid 2671 t.uname = 'root' 2672 t.gname = 'root' 2673 t.type = typ 2674 tarfl.addfile(t, contents) 2675 2676 # return the full pathname to the tar file 2677 return tmpname 2678 2679 @staticmethod 2680 @contextmanager 2681 def _setup_test(mock_geteuid): 2682 mock_geteuid.return_value = 0 # lie and say we're root 2683 fname = 'numeric-owner-testfile' 2684 dirname = 'dir' 2685 2686 # the names we want stored in the tarfile 2687 filename_1 = fname 2688 dirname_1 = dirname 2689 filename_2 = os.path.join(dirname, fname) 2690 2691 # create the tarfile with the contents we're after 2692 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2693 dirname_1, 2694 filename_2) 2695 2696 # open the tarfile for reading. yield it and the names of the items 2697 # we stored into the file 2698 with tarfile.open(tar_filename) as tarfl: 2699 yield tarfl, filename_1, dirname_1, filename_2 2700 2701 @unittest.mock.patch('os.chown') 2702 @unittest.mock.patch('os.chmod') 2703 @unittest.mock.patch('os.geteuid') 2704 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2705 mock_chown): 2706 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2707 filename_2): 2708 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2709 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2710 2711 # convert to filesystem paths 2712 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2713 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2714 2715 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2716 unittest.mock.call(f_filename_2, 88, 87), 2717 ], 2718 any_order=True) 2719 2720 @unittest.mock.patch('os.chown') 2721 @unittest.mock.patch('os.chmod') 2722 @unittest.mock.patch('os.geteuid') 2723 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2724 mock_chown): 2725 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2726 filename_2): 2727 tarfl.extractall(TEMPDIR, numeric_owner=True) 2728 2729 # convert to filesystem paths 2730 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2731 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2732 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2733 2734 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2735 unittest.mock.call(f_dirname_1, 77, 76), 2736 unittest.mock.call(f_filename_2, 88, 87), 2737 ], 2738 any_order=True) 2739 2740 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2741 # because the uname and gname in the test file are 'root', and extract() 2742 # will look them up using pwd and grp to find their uid and gid, which we 2743 # test here to be 0. 2744 @unittest.skipUnless(root_is_uid_gid_0(), 2745 'uid=0,gid=0 must be named "root"') 2746 @unittest.mock.patch('os.chown') 2747 @unittest.mock.patch('os.chmod') 2748 @unittest.mock.patch('os.geteuid') 2749 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2750 mock_chown): 2751 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2752 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2753 2754 # convert to filesystem paths 2755 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2756 2757 mock_chown.assert_called_with(f_filename_1, 0, 0) 2758 2759 @unittest.mock.patch('os.geteuid') 2760 def test_keyword_only(self, mock_geteuid): 2761 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2762 self.assertRaises(TypeError, 2763 tarfl.extract, filename_1, TEMPDIR, False, True) 2764 2765 2766def setUpModule(): 2767 support.unlink(TEMPDIR) 2768 os.makedirs(TEMPDIR) 2769 2770 global testtarnames 2771 testtarnames = [tarname] 2772 with open(tarname, "rb") as fobj: 2773 data = fobj.read() 2774 2775 # Create compressed tarfiles. 2776 for c in GzipTest, Bz2Test, LzmaTest: 2777 if c.open: 2778 support.unlink(c.tarname) 2779 testtarnames.append(c.tarname) 2780 with c.open(c.tarname, "wb") as tar: 2781 tar.write(data) 2782 2783def tearDownModule(): 2784 if os.path.exists(TEMPDIR): 2785 support.rmtree(TEMPDIR) 2786 2787if __name__ == "__main__": 2788 unittest.main() 2789