1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import os_helper 15from test.support import script_helper 16 17# Check for our compression modules. 18try: 19 import gzip 20except ImportError: 21 gzip = None 22try: 23 import zlib 24except ImportError: 25 zlib = None 26try: 27 import bz2 28except ImportError: 29 bz2 = None 30try: 31 import lzma 32except ImportError: 33 lzma = None 34 35def sha256sum(data): 36 return sha256(data).hexdigest() 37 38TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" 39tarextdir = TEMPDIR + '-extract-test' 40tarname = support.findfile("testtar.tar") 41gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 42bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 43xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 44tmpname = os.path.join(TEMPDIR, "tmp.tar") 45dotlessname = os.path.join(TEMPDIR, "testtar") 46 47sha256_regtype = ( 48 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 49) 50sha256_sparse = ( 51 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 52) 53 54 55class TarTest: 56 tarname = tarname 57 suffix = '' 58 open = io.FileIO 59 taropen = tarfile.TarFile.taropen 60 61 @property 62 def mode(self): 63 return self.prefix + self.suffix 64 65@support.requires_gzip() 66class GzipTest: 67 tarname = gzipname 68 suffix = 'gz' 69 open = gzip.GzipFile if gzip else None 70 taropen = tarfile.TarFile.gzopen 71 72@support.requires_bz2() 73class Bz2Test: 74 tarname = bz2name 75 suffix = 'bz2' 76 open = bz2.BZ2File if bz2 else None 77 taropen = tarfile.TarFile.bz2open 78 79@support.requires_lzma() 80class LzmaTest: 81 tarname = xzname 82 suffix = 'xz' 83 open = lzma.LZMAFile if lzma else None 84 taropen = tarfile.TarFile.xzopen 85 86 87class ReadTest(TarTest): 88 89 prefix = "r:" 90 91 def setUp(self): 92 self.tar = tarfile.open(self.tarname, mode=self.mode, 93 encoding="iso8859-1") 94 95 def tearDown(self): 96 self.tar.close() 97 98 99class UstarReadTest(ReadTest, unittest.TestCase): 100 101 def test_fileobj_regular_file(self): 102 tarinfo = self.tar.getmember("ustar/regtype") 103 with self.tar.extractfile(tarinfo) as fobj: 104 data = fobj.read() 105 self.assertEqual(len(data), tarinfo.size, 106 "regular file extraction failed") 107 self.assertEqual(sha256sum(data), sha256_regtype, 108 "regular file extraction failed") 109 110 def test_fileobj_readlines(self): 111 self.tar.extract("ustar/regtype", TEMPDIR) 112 tarinfo = self.tar.getmember("ustar/regtype") 113 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 114 lines1 = fobj1.readlines() 115 116 with self.tar.extractfile(tarinfo) as fobj: 117 fobj2 = io.TextIOWrapper(fobj) 118 lines2 = fobj2.readlines() 119 self.assertEqual(lines1, lines2, 120 "fileobj.readlines() failed") 121 self.assertEqual(len(lines2), 114, 122 "fileobj.readlines() failed") 123 self.assertEqual(lines2[83], 124 "I will gladly admit that Python is not the fastest " 125 "running scripting language.\n", 126 "fileobj.readlines() failed") 127 128 def test_fileobj_iter(self): 129 self.tar.extract("ustar/regtype", TEMPDIR) 130 tarinfo = self.tar.getmember("ustar/regtype") 131 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 132 lines1 = fobj1.readlines() 133 with self.tar.extractfile(tarinfo) as fobj2: 134 lines2 = list(io.TextIOWrapper(fobj2)) 135 self.assertEqual(lines1, lines2, 136 "fileobj.__iter__() failed") 137 138 def test_fileobj_seek(self): 139 self.tar.extract("ustar/regtype", TEMPDIR) 140 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 141 data = fobj.read() 142 143 tarinfo = self.tar.getmember("ustar/regtype") 144 with self.tar.extractfile(tarinfo) as fobj: 145 text = fobj.read() 146 fobj.seek(0) 147 self.assertEqual(0, fobj.tell(), 148 "seek() to file's start failed") 149 fobj.seek(2048, 0) 150 self.assertEqual(2048, fobj.tell(), 151 "seek() to absolute position failed") 152 fobj.seek(-1024, 1) 153 self.assertEqual(1024, fobj.tell(), 154 "seek() to negative relative position failed") 155 fobj.seek(1024, 1) 156 self.assertEqual(2048, fobj.tell(), 157 "seek() to positive relative position failed") 158 s = fobj.read(10) 159 self.assertEqual(s, data[2048:2058], 160 "read() after seek failed") 161 fobj.seek(0, 2) 162 self.assertEqual(tarinfo.size, fobj.tell(), 163 "seek() to file's end failed") 164 self.assertEqual(fobj.read(), b"", 165 "read() at file's end did not return empty string") 166 fobj.seek(-tarinfo.size, 2) 167 self.assertEqual(0, fobj.tell(), 168 "relative seek() to file's end failed") 169 fobj.seek(512) 170 s1 = fobj.readlines() 171 fobj.seek(512) 172 s2 = fobj.readlines() 173 self.assertEqual(s1, s2, 174 "readlines() after seek failed") 175 fobj.seek(0) 176 self.assertEqual(len(fobj.readline()), fobj.tell(), 177 "tell() after readline() failed") 178 fobj.seek(512) 179 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 180 "tell() after seek() and readline() failed") 181 fobj.seek(0) 182 line = fobj.readline() 183 self.assertEqual(fobj.read(), data[len(line):], 184 "read() after readline() failed") 185 186 def test_fileobj_text(self): 187 with self.tar.extractfile("ustar/regtype") as fobj: 188 fobj = io.TextIOWrapper(fobj) 189 data = fobj.read().encode("iso8859-1") 190 self.assertEqual(sha256sum(data), sha256_regtype) 191 try: 192 fobj.seek(100) 193 except AttributeError: 194 # Issue #13815: seek() complained about a missing 195 # flush() method. 196 self.fail("seeking failed in text mode") 197 198 # Test if symbolic and hard links are resolved by extractfile(). The 199 # test link members each point to a regular member whose data is 200 # supposed to be exported. 201 def _test_fileobj_link(self, lnktype, regtype): 202 with self.tar.extractfile(lnktype) as a, \ 203 self.tar.extractfile(regtype) as b: 204 self.assertEqual(a.name, b.name) 205 206 def test_fileobj_link1(self): 207 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 208 209 def test_fileobj_link2(self): 210 self._test_fileobj_link("./ustar/linktest2/lnktype", 211 "ustar/linktest1/regtype") 212 213 def test_fileobj_symlink1(self): 214 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 215 216 def test_fileobj_symlink2(self): 217 self._test_fileobj_link("./ustar/linktest2/symtype", 218 "ustar/linktest1/regtype") 219 220 def test_issue14160(self): 221 self._test_fileobj_link("symtype2", "ustar/regtype") 222 223class GzipUstarReadTest(GzipTest, UstarReadTest): 224 pass 225 226class Bz2UstarReadTest(Bz2Test, UstarReadTest): 227 pass 228 229class LzmaUstarReadTest(LzmaTest, UstarReadTest): 230 pass 231 232 233class ListTest(ReadTest, unittest.TestCase): 234 235 # Override setUp to use default encoding (UTF-8) 236 def setUp(self): 237 self.tar = tarfile.open(self.tarname, mode=self.mode) 238 239 def test_list(self): 240 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 241 with support.swap_attr(sys, 'stdout', tio): 242 self.tar.list(verbose=False) 243 out = tio.detach().getvalue() 244 self.assertIn(b'ustar/conttype', out) 245 self.assertIn(b'ustar/regtype', out) 246 self.assertIn(b'ustar/lnktype', out) 247 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 248 self.assertIn(b'./ustar/linktest2/symtype', out) 249 self.assertIn(b'./ustar/linktest2/lnktype', out) 250 # Make sure it puts trailing slash for directory 251 self.assertIn(b'ustar/dirtype/', out) 252 self.assertIn(b'ustar/dirtype-with-size/', out) 253 # Make sure it is able to print unencodable characters 254 def conv(b): 255 s = b.decode(self.tar.encoding, 'surrogateescape') 256 return s.encode('ascii', 'backslashreplace') 257 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 258 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 259 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 260 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 261 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 262 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 263 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 264 # Make sure it prints files separated by one newline without any 265 # 'ls -l'-like accessories if verbose flag is not being used 266 # ... 267 # ustar/conttype 268 # ustar/regtype 269 # ... 270 self.assertRegex(out, br'ustar/conttype ?\r?\n' 271 br'ustar/regtype ?\r?\n') 272 # Make sure it does not print the source of link without verbose flag 273 self.assertNotIn(b'link to', out) 274 self.assertNotIn(b'->', out) 275 276 def test_list_verbose(self): 277 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 278 with support.swap_attr(sys, 'stdout', tio): 279 self.tar.list(verbose=True) 280 out = tio.detach().getvalue() 281 # Make sure it prints files separated by one newline with 'ls -l'-like 282 # accessories if verbose flag is being used 283 # ... 284 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 285 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 286 # ... 287 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 288 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 289 br'ustar/\w+type ?\r?\n') * 2) 290 # Make sure it prints the source of link with verbose flag 291 self.assertIn(b'ustar/symtype -> regtype', out) 292 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 293 self.assertIn(b'./ustar/linktest2/lnktype link to ' 294 b'./ustar/linktest1/regtype', out) 295 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 296 (b'/123' * 125) + b'/longname', out) 297 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 298 (b'/123' * 125) + b'/longname', out) 299 300 def test_list_members(self): 301 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 302 def members(tar): 303 for tarinfo in tar.getmembers(): 304 if 'reg' in tarinfo.name: 305 yield tarinfo 306 with support.swap_attr(sys, 'stdout', tio): 307 self.tar.list(verbose=False, members=members(self.tar)) 308 out = tio.detach().getvalue() 309 self.assertIn(b'ustar/regtype', out) 310 self.assertNotIn(b'ustar/conttype', out) 311 312 313class GzipListTest(GzipTest, ListTest): 314 pass 315 316 317class Bz2ListTest(Bz2Test, ListTest): 318 pass 319 320 321class LzmaListTest(LzmaTest, ListTest): 322 pass 323 324 325class CommonReadTest(ReadTest): 326 327 def test_is_tarfile_erroneous(self): 328 with open(tmpname, "wb"): 329 pass 330 331 # is_tarfile works on filenames 332 self.assertFalse(tarfile.is_tarfile(tmpname)) 333 334 # is_tarfile works on path-like objects 335 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 336 337 # is_tarfile works on file objects 338 with open(tmpname, "rb") as fobj: 339 self.assertFalse(tarfile.is_tarfile(fobj)) 340 341 # is_tarfile works on file-like objects 342 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 343 344 def test_is_tarfile_valid(self): 345 # is_tarfile works on filenames 346 self.assertTrue(tarfile.is_tarfile(self.tarname)) 347 348 # is_tarfile works on path-like objects 349 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 350 351 # is_tarfile works on file objects 352 with open(self.tarname, "rb") as fobj: 353 self.assertTrue(tarfile.is_tarfile(fobj)) 354 355 # is_tarfile works on file-like objects 356 with open(self.tarname, "rb") as fobj: 357 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 358 359 def test_empty_tarfile(self): 360 # Test for issue6123: Allow opening empty archives. 361 # This test checks if tarfile.open() is able to open an empty tar 362 # archive successfully. Note that an empty tar archive is not the 363 # same as an empty file! 364 with tarfile.open(tmpname, self.mode.replace("r", "w")): 365 pass 366 try: 367 tar = tarfile.open(tmpname, self.mode) 368 tar.getnames() 369 except tarfile.ReadError: 370 self.fail("tarfile.open() failed on empty archive") 371 else: 372 self.assertListEqual(tar.getmembers(), []) 373 finally: 374 tar.close() 375 376 def test_non_existent_tarfile(self): 377 # Test for issue11513: prevent non-existent gzipped tarfiles raising 378 # multiple exceptions. 379 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 380 tarfile.open("xxx", self.mode) 381 382 def test_null_tarfile(self): 383 # Test for issue6123: Allow opening empty archives. 384 # This test guarantees that tarfile.open() does not treat an empty 385 # file as an empty tar archive. 386 with open(tmpname, "wb"): 387 pass 388 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 389 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 390 391 def test_ignore_zeros(self): 392 # Test TarFile's ignore_zeros option. 393 # generate 512 pseudorandom bytes 394 data = Random(0).randbytes(512) 395 for char in (b'\0', b'a'): 396 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 397 # are ignored correctly. 398 with self.open(tmpname, "w") as fobj: 399 fobj.write(char * 1024) 400 tarinfo = tarfile.TarInfo("foo") 401 tarinfo.size = len(data) 402 fobj.write(tarinfo.tobuf()) 403 fobj.write(data) 404 405 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 406 try: 407 self.assertListEqual(tar.getnames(), ["foo"], 408 "ignore_zeros=True should have skipped the %r-blocks" % 409 char) 410 finally: 411 tar.close() 412 413 def test_premature_end_of_archive(self): 414 for size in (512, 600, 1024, 1200): 415 with tarfile.open(tmpname, "w:") as tar: 416 t = tarfile.TarInfo("foo") 417 t.size = 1024 418 tar.addfile(t, io.BytesIO(b"a" * 1024)) 419 420 with open(tmpname, "r+b") as fobj: 421 fobj.truncate(size) 422 423 with tarfile.open(tmpname) as tar: 424 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 425 for t in tar: 426 pass 427 428 with tarfile.open(tmpname) as tar: 429 t = tar.next() 430 431 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 432 tar.extract(t, TEMPDIR) 433 434 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 435 tar.extractfile(t).read() 436 437 def test_length_zero_header(self): 438 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 439 # with an exception 440 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 441 with tarfile.open(support.findfile('recursion.tar')) as tar: 442 pass 443 444class MiscReadTestBase(CommonReadTest): 445 def requires_name_attribute(self): 446 pass 447 448 def test_no_name_argument(self): 449 self.requires_name_attribute() 450 with open(self.tarname, "rb") as fobj: 451 self.assertIsInstance(fobj.name, str) 452 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 453 self.assertIsInstance(tar.name, str) 454 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 455 456 def test_no_name_attribute(self): 457 with open(self.tarname, "rb") as fobj: 458 data = fobj.read() 459 fobj = io.BytesIO(data) 460 self.assertRaises(AttributeError, getattr, fobj, "name") 461 tar = tarfile.open(fileobj=fobj, mode=self.mode) 462 self.assertIsNone(tar.name) 463 464 def test_empty_name_attribute(self): 465 with open(self.tarname, "rb") as fobj: 466 data = fobj.read() 467 fobj = io.BytesIO(data) 468 fobj.name = "" 469 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 470 self.assertIsNone(tar.name) 471 472 def test_int_name_attribute(self): 473 # Issue 21044: tarfile.open() should handle fileobj with an integer 474 # 'name' attribute. 475 fd = os.open(self.tarname, os.O_RDONLY) 476 with open(fd, 'rb') as fobj: 477 self.assertIsInstance(fobj.name, int) 478 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 479 self.assertIsNone(tar.name) 480 481 def test_bytes_name_attribute(self): 482 self.requires_name_attribute() 483 tarname = os.fsencode(self.tarname) 484 with open(tarname, 'rb') as fobj: 485 self.assertIsInstance(fobj.name, bytes) 486 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 487 self.assertIsInstance(tar.name, bytes) 488 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 489 490 def test_pathlike_name(self): 491 tarname = pathlib.Path(self.tarname) 492 with tarfile.open(tarname, mode=self.mode) as tar: 493 self.assertIsInstance(tar.name, str) 494 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 495 with self.taropen(tarname) as tar: 496 self.assertIsInstance(tar.name, str) 497 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 498 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 499 self.assertIsInstance(tar.name, str) 500 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 501 if self.suffix == '': 502 with tarfile.TarFile(tarname, mode='r') as tar: 503 self.assertIsInstance(tar.name, str) 504 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 505 506 def test_illegal_mode_arg(self): 507 with open(tmpname, 'wb'): 508 pass 509 with self.assertRaisesRegex(ValueError, 'mode must be '): 510 tar = self.taropen(tmpname, 'q') 511 with self.assertRaisesRegex(ValueError, 'mode must be '): 512 tar = self.taropen(tmpname, 'rw') 513 with self.assertRaisesRegex(ValueError, 'mode must be '): 514 tar = self.taropen(tmpname, '') 515 516 def test_fileobj_with_offset(self): 517 # Skip the first member and store values from the second member 518 # of the testtar. 519 tar = tarfile.open(self.tarname, mode=self.mode) 520 try: 521 tar.next() 522 t = tar.next() 523 name = t.name 524 offset = t.offset 525 with tar.extractfile(t) as f: 526 data = f.read() 527 finally: 528 tar.close() 529 530 # Open the testtar and seek to the offset of the second member. 531 with self.open(self.tarname) as fobj: 532 fobj.seek(offset) 533 534 # Test if the tarfile starts with the second member. 535 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 536 t = tar.next() 537 self.assertEqual(t.name, name) 538 # Read to the end of fileobj and test if seeking back to the 539 # beginning works. 540 tar.getmembers() 541 self.assertEqual(tar.extractfile(t).read(), data, 542 "seek back did not work") 543 544 def test_fail_comp(self): 545 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 546 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 547 with open(tarname, "rb") as fobj: 548 self.assertRaises(tarfile.ReadError, tarfile.open, 549 fileobj=fobj, mode=self.mode) 550 551 def test_v7_dirtype(self): 552 # Test old style dirtype member (bug #1336623): 553 # Old V7 tars create directory members using an AREGTYPE 554 # header with a "/" appended to the filename field. 555 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 556 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 557 "v7 dirtype failed") 558 559 def test_xstar_type(self): 560 # The xstar format stores extra atime and ctime fields inside the 561 # space reserved for the prefix field. The prefix field must be 562 # ignored in this case, otherwise it will mess up the name. 563 try: 564 self.tar.getmember("misc/regtype-xstar") 565 except KeyError: 566 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 567 568 def test_check_members(self): 569 for tarinfo in self.tar: 570 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 571 "wrong mtime for %s" % tarinfo.name) 572 if not tarinfo.name.startswith("ustar/"): 573 continue 574 self.assertEqual(tarinfo.uname, "tarfile", 575 "wrong uname for %s" % tarinfo.name) 576 577 def test_find_members(self): 578 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 579 "could not find all members") 580 581 @unittest.skipUnless(hasattr(os, "link"), 582 "Missing hardlink implementation") 583 @os_helper.skip_unless_symlink 584 def test_extract_hardlink(self): 585 # Test hardlink extraction (e.g. bug #857297). 586 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 587 tar.extract("ustar/regtype", TEMPDIR) 588 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 589 590 tar.extract("ustar/lnktype", TEMPDIR) 591 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 592 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 593 data = f.read() 594 self.assertEqual(sha256sum(data), sha256_regtype) 595 596 tar.extract("ustar/symtype", TEMPDIR) 597 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 598 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 599 data = f.read() 600 self.assertEqual(sha256sum(data), sha256_regtype) 601 602 def test_extractall(self): 603 # Test if extractall() correctly restores directory permissions 604 # and times (see issue1735). 605 tar = tarfile.open(tarname, encoding="iso8859-1") 606 DIR = os.path.join(TEMPDIR, "extractall") 607 os.mkdir(DIR) 608 try: 609 directories = [t for t in tar if t.isdir()] 610 tar.extractall(DIR, directories) 611 for tarinfo in directories: 612 path = os.path.join(DIR, tarinfo.name) 613 if sys.platform != "win32": 614 # Win32 has no support for fine grained permissions. 615 self.assertEqual(tarinfo.mode & 0o777, 616 os.stat(path).st_mode & 0o777) 617 def format_mtime(mtime): 618 if isinstance(mtime, float): 619 return "{} ({})".format(mtime, mtime.hex()) 620 else: 621 return "{!r} (int)".format(mtime) 622 file_mtime = os.path.getmtime(path) 623 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 624 format_mtime(tarinfo.mtime), 625 format_mtime(file_mtime), 626 path) 627 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 628 finally: 629 tar.close() 630 os_helper.rmtree(DIR) 631 632 def test_extract_directory(self): 633 dirtype = "ustar/dirtype" 634 DIR = os.path.join(TEMPDIR, "extractdir") 635 os.mkdir(DIR) 636 try: 637 with tarfile.open(tarname, encoding="iso8859-1") as tar: 638 tarinfo = tar.getmember(dirtype) 639 tar.extract(tarinfo, path=DIR) 640 extracted = os.path.join(DIR, dirtype) 641 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 642 if sys.platform != "win32": 643 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 644 finally: 645 os_helper.rmtree(DIR) 646 647 def test_extractall_pathlike_name(self): 648 DIR = pathlib.Path(TEMPDIR) / "extractall" 649 with os_helper.temp_dir(DIR), \ 650 tarfile.open(tarname, encoding="iso8859-1") as tar: 651 directories = [t for t in tar if t.isdir()] 652 tar.extractall(DIR, directories) 653 for tarinfo in directories: 654 path = DIR / tarinfo.name 655 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 656 657 def test_extract_pathlike_name(self): 658 dirtype = "ustar/dirtype" 659 DIR = pathlib.Path(TEMPDIR) / "extractall" 660 with os_helper.temp_dir(DIR), \ 661 tarfile.open(tarname, encoding="iso8859-1") as tar: 662 tarinfo = tar.getmember(dirtype) 663 tar.extract(tarinfo, path=DIR) 664 extracted = DIR / dirtype 665 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 666 667 def test_init_close_fobj(self): 668 # Issue #7341: Close the internal file object in the TarFile 669 # constructor in case of an error. For the test we rely on 670 # the fact that opening an empty file raises a ReadError. 671 empty = os.path.join(TEMPDIR, "empty") 672 with open(empty, "wb") as fobj: 673 fobj.write(b"") 674 675 try: 676 tar = object.__new__(tarfile.TarFile) 677 try: 678 tar.__init__(empty) 679 except tarfile.ReadError: 680 self.assertTrue(tar.fileobj.closed) 681 else: 682 self.fail("ReadError not raised") 683 finally: 684 os_helper.unlink(empty) 685 686 def test_parallel_iteration(self): 687 # Issue #16601: Restarting iteration over tarfile continued 688 # from where it left off. 689 with tarfile.open(self.tarname) as tar: 690 for m1, m2 in zip(tar, tar): 691 self.assertEqual(m1.offset, m2.offset) 692 self.assertEqual(m1.get_info(), m2.get_info()) 693 694 @unittest.skipIf(zlib is None, "requires zlib") 695 def test_zlib_error_does_not_leak(self): 696 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 697 # parsing certain types of invalid data 698 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 699 mock.side_effect = zlib.error 700 with self.assertRaises(tarfile.ReadError): 701 tarfile.open(self.tarname) 702 703 704class MiscReadTest(MiscReadTestBase, unittest.TestCase): 705 test_fail_comp = None 706 707class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 708 pass 709 710class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 711 def requires_name_attribute(self): 712 self.skipTest("BZ2File have no name attribute") 713 714class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 715 def requires_name_attribute(self): 716 self.skipTest("LZMAFile have no name attribute") 717 718 719class StreamReadTest(CommonReadTest, unittest.TestCase): 720 721 prefix="r|" 722 723 def test_read_through(self): 724 # Issue #11224: A poorly designed _FileInFile.read() method 725 # caused seeking errors with stream tar files. 726 for tarinfo in self.tar: 727 if not tarinfo.isreg(): 728 continue 729 with self.tar.extractfile(tarinfo) as fobj: 730 while True: 731 try: 732 buf = fobj.read(512) 733 except tarfile.StreamError: 734 self.fail("simple read-through using " 735 "TarFile.extractfile() failed") 736 if not buf: 737 break 738 739 def test_fileobj_regular_file(self): 740 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 741 with self.tar.extractfile(tarinfo) as fobj: 742 data = fobj.read() 743 self.assertEqual(len(data), tarinfo.size, 744 "regular file extraction failed") 745 self.assertEqual(sha256sum(data), sha256_regtype, 746 "regular file extraction failed") 747 748 def test_provoke_stream_error(self): 749 tarinfos = self.tar.getmembers() 750 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 751 self.assertRaises(tarfile.StreamError, f.read) 752 753 def test_compare_members(self): 754 tar1 = tarfile.open(tarname, encoding="iso8859-1") 755 try: 756 tar2 = self.tar 757 758 while True: 759 t1 = tar1.next() 760 t2 = tar2.next() 761 if t1 is None: 762 break 763 self.assertIsNotNone(t2, "stream.next() failed.") 764 765 if t2.islnk() or t2.issym(): 766 with self.assertRaises(tarfile.StreamError): 767 tar2.extractfile(t2) 768 continue 769 770 v1 = tar1.extractfile(t1) 771 v2 = tar2.extractfile(t2) 772 if v1 is None: 773 continue 774 self.assertIsNotNone(v2, "stream.extractfile() failed") 775 self.assertEqual(v1.read(), v2.read(), 776 "stream extraction failed") 777 finally: 778 tar1.close() 779 780class GzipStreamReadTest(GzipTest, StreamReadTest): 781 pass 782 783class Bz2StreamReadTest(Bz2Test, StreamReadTest): 784 pass 785 786class LzmaStreamReadTest(LzmaTest, StreamReadTest): 787 pass 788 789 790class DetectReadTest(TarTest, unittest.TestCase): 791 def _testfunc_file(self, name, mode): 792 try: 793 tar = tarfile.open(name, mode) 794 except tarfile.ReadError as e: 795 self.fail() 796 else: 797 tar.close() 798 799 def _testfunc_fileobj(self, name, mode): 800 try: 801 with open(name, "rb") as f: 802 tar = tarfile.open(name, mode, fileobj=f) 803 except tarfile.ReadError as e: 804 self.fail() 805 else: 806 tar.close() 807 808 def _test_modes(self, testfunc): 809 if self.suffix: 810 with self.assertRaises(tarfile.ReadError): 811 tarfile.open(tarname, mode="r:" + self.suffix) 812 with self.assertRaises(tarfile.ReadError): 813 tarfile.open(tarname, mode="r|" + self.suffix) 814 with self.assertRaises(tarfile.ReadError): 815 tarfile.open(self.tarname, mode="r:") 816 with self.assertRaises(tarfile.ReadError): 817 tarfile.open(self.tarname, mode="r|") 818 testfunc(self.tarname, "r") 819 testfunc(self.tarname, "r:" + self.suffix) 820 testfunc(self.tarname, "r:*") 821 testfunc(self.tarname, "r|" + self.suffix) 822 testfunc(self.tarname, "r|*") 823 824 def test_detect_file(self): 825 self._test_modes(self._testfunc_file) 826 827 def test_detect_fileobj(self): 828 self._test_modes(self._testfunc_fileobj) 829 830class GzipDetectReadTest(GzipTest, DetectReadTest): 831 pass 832 833class Bz2DetectReadTest(Bz2Test, DetectReadTest): 834 def test_detect_stream_bz2(self): 835 # Originally, tarfile's stream detection looked for the string 836 # "BZh91" at the start of the file. This is incorrect because 837 # the '9' represents the blocksize (900,000 bytes). If the file was 838 # compressed using another blocksize autodetection fails. 839 with open(tarname, "rb") as fobj: 840 data = fobj.read() 841 842 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 843 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 844 fobj.write(data) 845 846 self._testfunc_file(tmpname, "r|*") 847 848class LzmaDetectReadTest(LzmaTest, DetectReadTest): 849 pass 850 851 852class MemberReadTest(ReadTest, unittest.TestCase): 853 854 def _test_member(self, tarinfo, chksum=None, **kwargs): 855 if chksum is not None: 856 with self.tar.extractfile(tarinfo) as f: 857 self.assertEqual(sha256sum(f.read()), chksum, 858 "wrong sha256sum for %s" % tarinfo.name) 859 860 kwargs["mtime"] = 0o7606136617 861 kwargs["uid"] = 1000 862 kwargs["gid"] = 100 863 if "old-v7" not in tarinfo.name: 864 # V7 tar can't handle alphabetic owners. 865 kwargs["uname"] = "tarfile" 866 kwargs["gname"] = "tarfile" 867 for k, v in kwargs.items(): 868 self.assertEqual(getattr(tarinfo, k), v, 869 "wrong value in %s field of %s" % (k, tarinfo.name)) 870 871 def test_find_regtype(self): 872 tarinfo = self.tar.getmember("ustar/regtype") 873 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 874 875 def test_find_conttype(self): 876 tarinfo = self.tar.getmember("ustar/conttype") 877 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 878 879 def test_find_dirtype(self): 880 tarinfo = self.tar.getmember("ustar/dirtype") 881 self._test_member(tarinfo, size=0) 882 883 def test_find_dirtype_with_size(self): 884 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 885 self._test_member(tarinfo, size=255) 886 887 def test_find_lnktype(self): 888 tarinfo = self.tar.getmember("ustar/lnktype") 889 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 890 891 def test_find_symtype(self): 892 tarinfo = self.tar.getmember("ustar/symtype") 893 self._test_member(tarinfo, size=0, linkname="regtype") 894 895 def test_find_blktype(self): 896 tarinfo = self.tar.getmember("ustar/blktype") 897 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 898 899 def test_find_chrtype(self): 900 tarinfo = self.tar.getmember("ustar/chrtype") 901 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 902 903 def test_find_fifotype(self): 904 tarinfo = self.tar.getmember("ustar/fifotype") 905 self._test_member(tarinfo, size=0) 906 907 def test_find_sparse(self): 908 tarinfo = self.tar.getmember("ustar/sparse") 909 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 910 911 def test_find_gnusparse(self): 912 tarinfo = self.tar.getmember("gnu/sparse") 913 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 914 915 def test_find_gnusparse_00(self): 916 tarinfo = self.tar.getmember("gnu/sparse-0.0") 917 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 918 919 def test_find_gnusparse_01(self): 920 tarinfo = self.tar.getmember("gnu/sparse-0.1") 921 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 922 923 def test_find_gnusparse_10(self): 924 tarinfo = self.tar.getmember("gnu/sparse-1.0") 925 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 926 927 def test_find_umlauts(self): 928 tarinfo = self.tar.getmember("ustar/umlauts-" 929 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 930 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 931 932 def test_find_ustar_longname(self): 933 name = "ustar/" + "12345/" * 39 + "1234567/longname" 934 self.assertIn(name, self.tar.getnames()) 935 936 def test_find_regtype_oldv7(self): 937 tarinfo = self.tar.getmember("misc/regtype-old-v7") 938 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 939 940 def test_find_pax_umlauts(self): 941 self.tar.close() 942 self.tar = tarfile.open(self.tarname, mode=self.mode, 943 encoding="iso8859-1") 944 tarinfo = self.tar.getmember("pax/umlauts-" 945 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 946 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 947 948 949class LongnameTest: 950 951 def test_read_longname(self): 952 # Test reading of longname (bug #1471427). 953 longname = self.subdir + "/" + "123/" * 125 + "longname" 954 try: 955 tarinfo = self.tar.getmember(longname) 956 except KeyError: 957 self.fail("longname not found") 958 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 959 "read longname as dirtype") 960 961 def test_read_longlink(self): 962 longname = self.subdir + "/" + "123/" * 125 + "longname" 963 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 964 try: 965 tarinfo = self.tar.getmember(longlink) 966 except KeyError: 967 self.fail("longlink not found") 968 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 969 970 def test_truncated_longname(self): 971 longname = self.subdir + "/" + "123/" * 125 + "longname" 972 tarinfo = self.tar.getmember(longname) 973 offset = tarinfo.offset 974 self.tar.fileobj.seek(offset) 975 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 976 with self.assertRaises(tarfile.ReadError): 977 tarfile.open(name="foo.tar", fileobj=fobj) 978 979 def test_header_offset(self): 980 # Test if the start offset of the TarInfo object includes 981 # the preceding extended header. 982 longname = self.subdir + "/" + "123/" * 125 + "longname" 983 offset = self.tar.getmember(longname).offset 984 with open(tarname, "rb") as fobj: 985 fobj.seek(offset) 986 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 987 "iso8859-1", "strict") 988 self.assertEqual(tarinfo.type, self.longnametype) 989 990 991class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 992 993 subdir = "gnu" 994 longnametype = tarfile.GNUTYPE_LONGNAME 995 996 # Since 3.2 tarfile is supposed to accurately restore sparse members and 997 # produce files with holes. This is what we actually want to test here. 998 # Unfortunately, not all platforms/filesystems support sparse files, and 999 # even on platforms that do it is non-trivial to make reliable assertions 1000 # about holes in files. Therefore, we first do one basic test which works 1001 # an all platforms, and after that a test that will work only on 1002 # platforms/filesystems that prove to support sparse files. 1003 def _test_sparse_file(self, name): 1004 self.tar.extract(name, TEMPDIR) 1005 filename = os.path.join(TEMPDIR, name) 1006 with open(filename, "rb") as fobj: 1007 data = fobj.read() 1008 self.assertEqual(sha256sum(data), sha256_sparse, 1009 "wrong sha256sum for %s" % name) 1010 1011 if self._fs_supports_holes(): 1012 s = os.stat(filename) 1013 self.assertLess(s.st_blocks * 512, s.st_size) 1014 1015 def test_sparse_file_old(self): 1016 self._test_sparse_file("gnu/sparse") 1017 1018 def test_sparse_file_00(self): 1019 self._test_sparse_file("gnu/sparse-0.0") 1020 1021 def test_sparse_file_01(self): 1022 self._test_sparse_file("gnu/sparse-0.1") 1023 1024 def test_sparse_file_10(self): 1025 self._test_sparse_file("gnu/sparse-1.0") 1026 1027 @staticmethod 1028 def _fs_supports_holes(): 1029 # Return True if the platform knows the st_blocks stat attribute and 1030 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1031 # store holes of 4 KiB in files. 1032 # 1033 # The function returns False if page size is larger than 4 KiB. 1034 # For example, ppc64 uses pages of 64 KiB. 1035 if sys.platform.startswith("linux"): 1036 # Linux evidentially has 512 byte st_blocks units. 1037 name = os.path.join(TEMPDIR, "sparse-test") 1038 with open(name, "wb") as fobj: 1039 # Seek to "punch a hole" of 4 KiB 1040 fobj.seek(4096) 1041 fobj.write(b'x' * 4096) 1042 fobj.truncate() 1043 s = os.stat(name) 1044 os_helper.unlink(name) 1045 return (s.st_blocks * 512 < s.st_size) 1046 else: 1047 return False 1048 1049 1050class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1051 1052 subdir = "pax" 1053 longnametype = tarfile.XHDTYPE 1054 1055 def test_pax_global_headers(self): 1056 tar = tarfile.open(tarname, encoding="iso8859-1") 1057 try: 1058 tarinfo = tar.getmember("pax/regtype1") 1059 self.assertEqual(tarinfo.uname, "foo") 1060 self.assertEqual(tarinfo.gname, "bar") 1061 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1062 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1063 1064 tarinfo = tar.getmember("pax/regtype2") 1065 self.assertEqual(tarinfo.uname, "") 1066 self.assertEqual(tarinfo.gname, "bar") 1067 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1068 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1069 1070 tarinfo = tar.getmember("pax/regtype3") 1071 self.assertEqual(tarinfo.uname, "tarfile") 1072 self.assertEqual(tarinfo.gname, "tarfile") 1073 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1074 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1075 finally: 1076 tar.close() 1077 1078 def test_pax_number_fields(self): 1079 # All following number fields are read from the pax header. 1080 tar = tarfile.open(tarname, encoding="iso8859-1") 1081 try: 1082 tarinfo = tar.getmember("pax/regtype4") 1083 self.assertEqual(tarinfo.size, 7011) 1084 self.assertEqual(tarinfo.uid, 123) 1085 self.assertEqual(tarinfo.gid, 123) 1086 self.assertEqual(tarinfo.mtime, 1041808783.0) 1087 self.assertEqual(type(tarinfo.mtime), float) 1088 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1089 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1090 finally: 1091 tar.close() 1092 1093 1094class WriteTestBase(TarTest): 1095 # Put all write tests in here that are supposed to be tested 1096 # in all possible mode combinations. 1097 1098 def test_fileobj_no_close(self): 1099 fobj = io.BytesIO() 1100 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1101 tar.addfile(tarfile.TarInfo("foo")) 1102 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1103 # Issue #20238: Incomplete gzip output with mode="w:gz" 1104 data = fobj.getvalue() 1105 del tar 1106 support.gc_collect() 1107 self.assertFalse(fobj.closed) 1108 self.assertEqual(data, fobj.getvalue()) 1109 1110 def test_eof_marker(self): 1111 # Make sure an end of archive marker is written (two zero blocks). 1112 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1113 # So, we create an archive that has exactly 10240 bytes without the 1114 # marker, and has 20480 bytes once the marker is written. 1115 with tarfile.open(tmpname, self.mode) as tar: 1116 t = tarfile.TarInfo("foo") 1117 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1118 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1119 1120 with self.open(tmpname, "rb") as fobj: 1121 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1122 1123 1124class WriteTest(WriteTestBase, unittest.TestCase): 1125 1126 prefix = "w:" 1127 1128 def test_100_char_name(self): 1129 # The name field in a tar header stores strings of at most 100 chars. 1130 # If a string is shorter than 100 chars it has to be padded with '\0', 1131 # which implies that a string of exactly 100 chars is stored without 1132 # a trailing '\0'. 1133 name = "0123456789" * 10 1134 tar = tarfile.open(tmpname, self.mode) 1135 try: 1136 t = tarfile.TarInfo(name) 1137 tar.addfile(t) 1138 finally: 1139 tar.close() 1140 1141 tar = tarfile.open(tmpname) 1142 try: 1143 self.assertEqual(tar.getnames()[0], name, 1144 "failed to store 100 char filename") 1145 finally: 1146 tar.close() 1147 1148 def test_tar_size(self): 1149 # Test for bug #1013882. 1150 tar = tarfile.open(tmpname, self.mode) 1151 try: 1152 path = os.path.join(TEMPDIR, "file") 1153 with open(path, "wb") as fobj: 1154 fobj.write(b"aaa") 1155 tar.add(path) 1156 finally: 1157 tar.close() 1158 self.assertGreater(os.path.getsize(tmpname), 0, 1159 "tarfile is empty") 1160 1161 # The test_*_size tests test for bug #1167128. 1162 def test_file_size(self): 1163 tar = tarfile.open(tmpname, self.mode) 1164 try: 1165 path = os.path.join(TEMPDIR, "file") 1166 with open(path, "wb"): 1167 pass 1168 tarinfo = tar.gettarinfo(path) 1169 self.assertEqual(tarinfo.size, 0) 1170 1171 with open(path, "wb") as fobj: 1172 fobj.write(b"aaa") 1173 tarinfo = tar.gettarinfo(path) 1174 self.assertEqual(tarinfo.size, 3) 1175 finally: 1176 tar.close() 1177 1178 def test_directory_size(self): 1179 path = os.path.join(TEMPDIR, "directory") 1180 os.mkdir(path) 1181 try: 1182 tar = tarfile.open(tmpname, self.mode) 1183 try: 1184 tarinfo = tar.gettarinfo(path) 1185 self.assertEqual(tarinfo.size, 0) 1186 finally: 1187 tar.close() 1188 finally: 1189 os_helper.rmdir(path) 1190 1191 # mock the following: 1192 # os.listdir: so we know that files are in the wrong order 1193 def test_ordered_recursion(self): 1194 path = os.path.join(TEMPDIR, "directory") 1195 os.mkdir(path) 1196 open(os.path.join(path, "1"), "a").close() 1197 open(os.path.join(path, "2"), "a").close() 1198 try: 1199 tar = tarfile.open(tmpname, self.mode) 1200 try: 1201 with unittest.mock.patch('os.listdir') as mock_listdir: 1202 mock_listdir.return_value = ["2", "1"] 1203 tar.add(path) 1204 paths = [] 1205 for m in tar.getmembers(): 1206 paths.append(os.path.split(m.name)[-1]) 1207 self.assertEqual(paths, ["directory", "1", "2"]); 1208 finally: 1209 tar.close() 1210 finally: 1211 os_helper.unlink(os.path.join(path, "1")) 1212 os_helper.unlink(os.path.join(path, "2")) 1213 os_helper.rmdir(path) 1214 1215 def test_gettarinfo_pathlike_name(self): 1216 with tarfile.open(tmpname, self.mode) as tar: 1217 path = pathlib.Path(TEMPDIR) / "file" 1218 with open(path, "wb") as fobj: 1219 fobj.write(b"aaa") 1220 tarinfo = tar.gettarinfo(path) 1221 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1222 self.assertIsInstance(tarinfo.name, str) 1223 self.assertEqual(tarinfo.name, tarinfo2.name) 1224 self.assertEqual(tarinfo.size, 3) 1225 1226 @unittest.skipUnless(hasattr(os, "link"), 1227 "Missing hardlink implementation") 1228 def test_link_size(self): 1229 link = os.path.join(TEMPDIR, "link") 1230 target = os.path.join(TEMPDIR, "link_target") 1231 with open(target, "wb") as fobj: 1232 fobj.write(b"aaa") 1233 try: 1234 os.link(target, link) 1235 except PermissionError as e: 1236 self.skipTest('os.link(): %s' % e) 1237 try: 1238 tar = tarfile.open(tmpname, self.mode) 1239 try: 1240 # Record the link target in the inodes list. 1241 tar.gettarinfo(target) 1242 tarinfo = tar.gettarinfo(link) 1243 self.assertEqual(tarinfo.size, 0) 1244 finally: 1245 tar.close() 1246 finally: 1247 os_helper.unlink(target) 1248 os_helper.unlink(link) 1249 1250 @os_helper.skip_unless_symlink 1251 def test_symlink_size(self): 1252 path = os.path.join(TEMPDIR, "symlink") 1253 os.symlink("link_target", path) 1254 try: 1255 tar = tarfile.open(tmpname, self.mode) 1256 try: 1257 tarinfo = tar.gettarinfo(path) 1258 self.assertEqual(tarinfo.size, 0) 1259 finally: 1260 tar.close() 1261 finally: 1262 os_helper.unlink(path) 1263 1264 def test_add_self(self): 1265 # Test for #1257255. 1266 dstname = os.path.abspath(tmpname) 1267 tar = tarfile.open(tmpname, self.mode) 1268 try: 1269 self.assertEqual(tar.name, dstname, 1270 "archive name must be absolute") 1271 tar.add(dstname) 1272 self.assertEqual(tar.getnames(), [], 1273 "added the archive to itself") 1274 1275 with os_helper.change_cwd(TEMPDIR): 1276 tar.add(dstname) 1277 self.assertEqual(tar.getnames(), [], 1278 "added the archive to itself") 1279 finally: 1280 tar.close() 1281 1282 def test_filter(self): 1283 tempdir = os.path.join(TEMPDIR, "filter") 1284 os.mkdir(tempdir) 1285 try: 1286 for name in ("foo", "bar", "baz"): 1287 name = os.path.join(tempdir, name) 1288 os_helper.create_empty_file(name) 1289 1290 def filter(tarinfo): 1291 if os.path.basename(tarinfo.name) == "bar": 1292 return 1293 tarinfo.uid = 123 1294 tarinfo.uname = "foo" 1295 return tarinfo 1296 1297 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1298 try: 1299 tar.add(tempdir, arcname="empty_dir", filter=filter) 1300 finally: 1301 tar.close() 1302 1303 # Verify that filter is a keyword-only argument 1304 with self.assertRaises(TypeError): 1305 tar.add(tempdir, "empty_dir", True, None, filter) 1306 1307 tar = tarfile.open(tmpname, "r") 1308 try: 1309 for tarinfo in tar: 1310 self.assertEqual(tarinfo.uid, 123) 1311 self.assertEqual(tarinfo.uname, "foo") 1312 self.assertEqual(len(tar.getmembers()), 3) 1313 finally: 1314 tar.close() 1315 finally: 1316 os_helper.rmtree(tempdir) 1317 1318 # Guarantee that stored pathnames are not modified. Don't 1319 # remove ./ or ../ or double slashes. Still make absolute 1320 # pathnames relative. 1321 # For details see bug #6054. 1322 def _test_pathname(self, path, cmp_path=None, dir=False): 1323 # Create a tarfile with an empty member named path 1324 # and compare the stored name with the original. 1325 foo = os.path.join(TEMPDIR, "foo") 1326 if not dir: 1327 os_helper.create_empty_file(foo) 1328 else: 1329 os.mkdir(foo) 1330 1331 tar = tarfile.open(tmpname, self.mode) 1332 try: 1333 tar.add(foo, arcname=path) 1334 finally: 1335 tar.close() 1336 1337 tar = tarfile.open(tmpname, "r") 1338 try: 1339 t = tar.next() 1340 finally: 1341 tar.close() 1342 1343 if not dir: 1344 os_helper.unlink(foo) 1345 else: 1346 os_helper.rmdir(foo) 1347 1348 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1349 1350 1351 @os_helper.skip_unless_symlink 1352 def test_extractall_symlinks(self): 1353 # Test if extractall works properly when tarfile contains symlinks 1354 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1355 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1356 os.mkdir(tempdir) 1357 try: 1358 source_file = os.path.join(tempdir,'source') 1359 target_file = os.path.join(tempdir,'symlink') 1360 with open(source_file,'w') as f: 1361 f.write('something\n') 1362 os.symlink(source_file, target_file) 1363 with tarfile.open(temparchive, 'w') as tar: 1364 tar.add(source_file, arcname="source") 1365 tar.add(target_file, arcname="symlink") 1366 # Let's extract it to the location which contains the symlink 1367 with tarfile.open(temparchive, errorlevel=2) as tar: 1368 # this should not raise OSError: [Errno 17] File exists 1369 try: 1370 tar.extractall(path=tempdir) 1371 except OSError: 1372 self.fail("extractall failed with symlinked files") 1373 finally: 1374 os_helper.unlink(temparchive) 1375 os_helper.rmtree(tempdir) 1376 1377 def test_pathnames(self): 1378 self._test_pathname("foo") 1379 self._test_pathname(os.path.join("foo", ".", "bar")) 1380 self._test_pathname(os.path.join("foo", "..", "bar")) 1381 self._test_pathname(os.path.join(".", "foo")) 1382 self._test_pathname(os.path.join(".", "foo", ".")) 1383 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1384 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1385 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1386 self._test_pathname(os.path.join("..", "foo")) 1387 self._test_pathname(os.path.join("..", "foo", "..")) 1388 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1389 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1390 1391 self._test_pathname("foo" + os.sep + os.sep + "bar") 1392 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1393 1394 def test_abs_pathnames(self): 1395 if sys.platform == "win32": 1396 self._test_pathname("C:\\foo", "foo") 1397 else: 1398 self._test_pathname("/foo", "foo") 1399 self._test_pathname("///foo", "foo") 1400 1401 def test_cwd(self): 1402 # Test adding the current working directory. 1403 with os_helper.change_cwd(TEMPDIR): 1404 tar = tarfile.open(tmpname, self.mode) 1405 try: 1406 tar.add(".") 1407 finally: 1408 tar.close() 1409 1410 tar = tarfile.open(tmpname, "r") 1411 try: 1412 for t in tar: 1413 if t.name != ".": 1414 self.assertTrue(t.name.startswith("./"), t.name) 1415 finally: 1416 tar.close() 1417 1418 def test_open_nonwritable_fileobj(self): 1419 for exctype in OSError, EOFError, RuntimeError: 1420 class BadFile(io.BytesIO): 1421 first = True 1422 def write(self, data): 1423 if self.first: 1424 self.first = False 1425 raise exctype 1426 1427 f = BadFile() 1428 with self.assertRaises(exctype): 1429 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1430 format=tarfile.PAX_FORMAT, 1431 pax_headers={'non': 'empty'}) 1432 self.assertFalse(f.closed) 1433 1434 1435class GzipWriteTest(GzipTest, WriteTest): 1436 pass 1437 1438 1439class Bz2WriteTest(Bz2Test, WriteTest): 1440 pass 1441 1442 1443class LzmaWriteTest(LzmaTest, WriteTest): 1444 pass 1445 1446 1447class StreamWriteTest(WriteTestBase, unittest.TestCase): 1448 1449 prefix = "w|" 1450 decompressor = None 1451 1452 def test_stream_padding(self): 1453 # Test for bug #1543303. 1454 tar = tarfile.open(tmpname, self.mode) 1455 tar.close() 1456 if self.decompressor: 1457 dec = self.decompressor() 1458 with open(tmpname, "rb") as fobj: 1459 data = fobj.read() 1460 data = dec.decompress(data) 1461 self.assertFalse(dec.unused_data, "found trailing data") 1462 else: 1463 with self.open(tmpname) as fobj: 1464 data = fobj.read() 1465 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1466 "incorrect zero padding") 1467 1468 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1469 "Missing umask implementation") 1470 def test_file_mode(self): 1471 # Test for issue #8464: Create files with correct 1472 # permissions. 1473 if os.path.exists(tmpname): 1474 os_helper.unlink(tmpname) 1475 1476 original_umask = os.umask(0o022) 1477 try: 1478 tar = tarfile.open(tmpname, self.mode) 1479 tar.close() 1480 mode = os.stat(tmpname).st_mode & 0o777 1481 self.assertEqual(mode, 0o644, "wrong file permissions") 1482 finally: 1483 os.umask(original_umask) 1484 1485 1486class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1487 def test_source_directory_not_leaked(self): 1488 """ 1489 Ensure the source directory is not included in the tar header 1490 per bpo-41316. 1491 """ 1492 tarfile.open(tmpname, self.mode).close() 1493 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1494 assert os.path.dirname(tmpname) not in payload 1495 1496 1497class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1498 decompressor = bz2.BZ2Decompressor if bz2 else None 1499 1500class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1501 decompressor = lzma.LZMADecompressor if lzma else None 1502 1503 1504class GNUWriteTest(unittest.TestCase): 1505 # This testcase checks for correct creation of GNU Longname 1506 # and Longlink extended headers (cp. bug #812325). 1507 1508 def _length(self, s): 1509 blocks = len(s) // 512 + 1 1510 return blocks * 512 1511 1512 def _calc_size(self, name, link=None): 1513 # Initial tar header 1514 count = 512 1515 1516 if len(name) > tarfile.LENGTH_NAME: 1517 # GNU longname extended header + longname 1518 count += 512 1519 count += self._length(name) 1520 if link is not None and len(link) > tarfile.LENGTH_LINK: 1521 # GNU longlink extended header + longlink 1522 count += 512 1523 count += self._length(link) 1524 return count 1525 1526 def _test(self, name, link=None): 1527 tarinfo = tarfile.TarInfo(name) 1528 if link: 1529 tarinfo.linkname = link 1530 tarinfo.type = tarfile.LNKTYPE 1531 1532 tar = tarfile.open(tmpname, "w") 1533 try: 1534 tar.format = tarfile.GNU_FORMAT 1535 tar.addfile(tarinfo) 1536 1537 v1 = self._calc_size(name, link) 1538 v2 = tar.offset 1539 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1540 finally: 1541 tar.close() 1542 1543 tar = tarfile.open(tmpname) 1544 try: 1545 member = tar.next() 1546 self.assertIsNotNone(member, 1547 "unable to read longname member") 1548 self.assertEqual(tarinfo.name, member.name, 1549 "unable to read longname member") 1550 self.assertEqual(tarinfo.linkname, member.linkname, 1551 "unable to read longname member") 1552 finally: 1553 tar.close() 1554 1555 def test_longname_1023(self): 1556 self._test(("longnam/" * 127) + "longnam") 1557 1558 def test_longname_1024(self): 1559 self._test(("longnam/" * 127) + "longname") 1560 1561 def test_longname_1025(self): 1562 self._test(("longnam/" * 127) + "longname_") 1563 1564 def test_longlink_1023(self): 1565 self._test("name", ("longlnk/" * 127) + "longlnk") 1566 1567 def test_longlink_1024(self): 1568 self._test("name", ("longlnk/" * 127) + "longlink") 1569 1570 def test_longlink_1025(self): 1571 self._test("name", ("longlnk/" * 127) + "longlink_") 1572 1573 def test_longnamelink_1023(self): 1574 self._test(("longnam/" * 127) + "longnam", 1575 ("longlnk/" * 127) + "longlnk") 1576 1577 def test_longnamelink_1024(self): 1578 self._test(("longnam/" * 127) + "longname", 1579 ("longlnk/" * 127) + "longlink") 1580 1581 def test_longnamelink_1025(self): 1582 self._test(("longnam/" * 127) + "longname_", 1583 ("longlnk/" * 127) + "longlink_") 1584 1585 1586class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1587 1588 prefix = "w:" 1589 1590 def test_headers_written_only_for_device_files(self): 1591 # Regression test for bpo-18819. 1592 tempdir = os.path.join(TEMPDIR, "device_header_test") 1593 os.mkdir(tempdir) 1594 try: 1595 tar = tarfile.open(tmpname, self.mode) 1596 try: 1597 input_blk = tarfile.TarInfo(name="my_block_device") 1598 input_reg = tarfile.TarInfo(name="my_regular_file") 1599 input_blk.type = tarfile.BLKTYPE 1600 input_reg.type = tarfile.REGTYPE 1601 tar.addfile(input_blk) 1602 tar.addfile(input_reg) 1603 finally: 1604 tar.close() 1605 1606 # devmajor and devminor should be *interpreted* as 0 in both... 1607 tar = tarfile.open(tmpname, "r") 1608 try: 1609 output_blk = tar.getmember("my_block_device") 1610 output_reg = tar.getmember("my_regular_file") 1611 finally: 1612 tar.close() 1613 self.assertEqual(output_blk.devmajor, 0) 1614 self.assertEqual(output_blk.devminor, 0) 1615 self.assertEqual(output_reg.devmajor, 0) 1616 self.assertEqual(output_reg.devminor, 0) 1617 1618 # ...but the fields should not actually be set on regular files: 1619 with open(tmpname, "rb") as infile: 1620 buf = infile.read() 1621 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1622 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1623 # See `struct posixheader` in GNU docs for byte offsets: 1624 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1625 device_headers = slice(329, 329 + 16) 1626 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1627 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1628 finally: 1629 os_helper.rmtree(tempdir) 1630 1631 1632class CreateTest(WriteTestBase, unittest.TestCase): 1633 1634 prefix = "x:" 1635 1636 file_path = os.path.join(TEMPDIR, "spameggs42") 1637 1638 def setUp(self): 1639 os_helper.unlink(tmpname) 1640 1641 @classmethod 1642 def setUpClass(cls): 1643 with open(cls.file_path, "wb") as fobj: 1644 fobj.write(b"aaa") 1645 1646 @classmethod 1647 def tearDownClass(cls): 1648 os_helper.unlink(cls.file_path) 1649 1650 def test_create(self): 1651 with tarfile.open(tmpname, self.mode) as tobj: 1652 tobj.add(self.file_path) 1653 1654 with self.taropen(tmpname) as tobj: 1655 names = tobj.getnames() 1656 self.assertEqual(len(names), 1) 1657 self.assertIn('spameggs42', names[0]) 1658 1659 def test_create_existing(self): 1660 with tarfile.open(tmpname, self.mode) as tobj: 1661 tobj.add(self.file_path) 1662 1663 with self.assertRaises(FileExistsError): 1664 tobj = tarfile.open(tmpname, self.mode) 1665 1666 with self.taropen(tmpname) as tobj: 1667 names = tobj.getnames() 1668 self.assertEqual(len(names), 1) 1669 self.assertIn('spameggs42', names[0]) 1670 1671 def test_create_taropen(self): 1672 with self.taropen(tmpname, "x") as tobj: 1673 tobj.add(self.file_path) 1674 1675 with self.taropen(tmpname) as tobj: 1676 names = tobj.getnames() 1677 self.assertEqual(len(names), 1) 1678 self.assertIn('spameggs42', names[0]) 1679 1680 def test_create_existing_taropen(self): 1681 with self.taropen(tmpname, "x") as tobj: 1682 tobj.add(self.file_path) 1683 1684 with self.assertRaises(FileExistsError): 1685 with self.taropen(tmpname, "x"): 1686 pass 1687 1688 with self.taropen(tmpname) as tobj: 1689 names = tobj.getnames() 1690 self.assertEqual(len(names), 1) 1691 self.assertIn("spameggs42", names[0]) 1692 1693 def test_create_pathlike_name(self): 1694 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1695 self.assertIsInstance(tobj.name, str) 1696 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1697 tobj.add(pathlib.Path(self.file_path)) 1698 names = tobj.getnames() 1699 self.assertEqual(len(names), 1) 1700 self.assertIn('spameggs42', names[0]) 1701 1702 with self.taropen(tmpname) as tobj: 1703 names = tobj.getnames() 1704 self.assertEqual(len(names), 1) 1705 self.assertIn('spameggs42', names[0]) 1706 1707 def test_create_taropen_pathlike_name(self): 1708 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1709 self.assertIsInstance(tobj.name, str) 1710 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1711 tobj.add(pathlib.Path(self.file_path)) 1712 names = tobj.getnames() 1713 self.assertEqual(len(names), 1) 1714 self.assertIn('spameggs42', names[0]) 1715 1716 with self.taropen(tmpname) as tobj: 1717 names = tobj.getnames() 1718 self.assertEqual(len(names), 1) 1719 self.assertIn('spameggs42', names[0]) 1720 1721 1722class GzipCreateTest(GzipTest, CreateTest): 1723 1724 def test_create_with_compresslevel(self): 1725 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1726 tobj.add(self.file_path) 1727 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 1728 pass 1729 1730 1731class Bz2CreateTest(Bz2Test, CreateTest): 1732 1733 def test_create_with_compresslevel(self): 1734 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1735 tobj.add(self.file_path) 1736 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 1737 pass 1738 1739 1740class LzmaCreateTest(LzmaTest, CreateTest): 1741 1742 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 1743 # It does not allow for preset to be specified when reading. 1744 def test_create_with_preset(self): 1745 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 1746 tobj.add(self.file_path) 1747 1748 1749class CreateWithXModeTest(CreateTest): 1750 1751 prefix = "x" 1752 1753 test_create_taropen = None 1754 test_create_existing_taropen = None 1755 1756 1757@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1758class HardlinkTest(unittest.TestCase): 1759 # Test the creation of LNKTYPE (hardlink) members in an archive. 1760 1761 def setUp(self): 1762 self.foo = os.path.join(TEMPDIR, "foo") 1763 self.bar = os.path.join(TEMPDIR, "bar") 1764 1765 with open(self.foo, "wb") as fobj: 1766 fobj.write(b"foo") 1767 1768 try: 1769 os.link(self.foo, self.bar) 1770 except PermissionError as e: 1771 self.skipTest('os.link(): %s' % e) 1772 1773 self.tar = tarfile.open(tmpname, "w") 1774 self.tar.add(self.foo) 1775 1776 def tearDown(self): 1777 self.tar.close() 1778 os_helper.unlink(self.foo) 1779 os_helper.unlink(self.bar) 1780 1781 def test_add_twice(self): 1782 # The same name will be added as a REGTYPE every 1783 # time regardless of st_nlink. 1784 tarinfo = self.tar.gettarinfo(self.foo) 1785 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1786 "add file as regular failed") 1787 1788 def test_add_hardlink(self): 1789 tarinfo = self.tar.gettarinfo(self.bar) 1790 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1791 "add file as hardlink failed") 1792 1793 def test_dereference_hardlink(self): 1794 self.tar.dereference = True 1795 tarinfo = self.tar.gettarinfo(self.bar) 1796 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1797 "dereferencing hardlink failed") 1798 1799 1800class PaxWriteTest(GNUWriteTest): 1801 1802 def _test(self, name, link=None): 1803 # See GNUWriteTest. 1804 tarinfo = tarfile.TarInfo(name) 1805 if link: 1806 tarinfo.linkname = link 1807 tarinfo.type = tarfile.LNKTYPE 1808 1809 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1810 try: 1811 tar.addfile(tarinfo) 1812 finally: 1813 tar.close() 1814 1815 tar = tarfile.open(tmpname) 1816 try: 1817 if link: 1818 l = tar.getmembers()[0].linkname 1819 self.assertEqual(link, l, "PAX longlink creation failed") 1820 else: 1821 n = tar.getmembers()[0].name 1822 self.assertEqual(name, n, "PAX longname creation failed") 1823 finally: 1824 tar.close() 1825 1826 def test_pax_global_header(self): 1827 pax_headers = { 1828 "foo": "bar", 1829 "uid": "0", 1830 "mtime": "1.23", 1831 "test": "\xe4\xf6\xfc", 1832 "\xe4\xf6\xfc": "test"} 1833 1834 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1835 pax_headers=pax_headers) 1836 try: 1837 tar.addfile(tarfile.TarInfo("test")) 1838 finally: 1839 tar.close() 1840 1841 # Test if the global header was written correctly. 1842 tar = tarfile.open(tmpname, encoding="iso8859-1") 1843 try: 1844 self.assertEqual(tar.pax_headers, pax_headers) 1845 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1846 # Test if all the fields are strings. 1847 for key, val in tar.pax_headers.items(): 1848 self.assertIsNot(type(key), bytes) 1849 self.assertIsNot(type(val), bytes) 1850 if key in tarfile.PAX_NUMBER_FIELDS: 1851 try: 1852 tarfile.PAX_NUMBER_FIELDS[key](val) 1853 except (TypeError, ValueError): 1854 self.fail("unable to convert pax header field") 1855 finally: 1856 tar.close() 1857 1858 def test_pax_extended_header(self): 1859 # The fields from the pax header have priority over the 1860 # TarInfo. 1861 pax_headers = {"path": "foo", "uid": "123"} 1862 1863 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1864 encoding="iso8859-1") 1865 try: 1866 t = tarfile.TarInfo() 1867 t.name = "\xe4\xf6\xfc" # non-ASCII 1868 t.uid = 8**8 # too large 1869 t.pax_headers = pax_headers 1870 tar.addfile(t) 1871 finally: 1872 tar.close() 1873 1874 tar = tarfile.open(tmpname, encoding="iso8859-1") 1875 try: 1876 t = tar.getmembers()[0] 1877 self.assertEqual(t.pax_headers, pax_headers) 1878 self.assertEqual(t.name, "foo") 1879 self.assertEqual(t.uid, 123) 1880 finally: 1881 tar.close() 1882 1883 1884class UnicodeTest: 1885 1886 def test_iso8859_1_filename(self): 1887 self._test_unicode_filename("iso8859-1") 1888 1889 def test_utf7_filename(self): 1890 self._test_unicode_filename("utf7") 1891 1892 def test_utf8_filename(self): 1893 self._test_unicode_filename("utf-8") 1894 1895 def _test_unicode_filename(self, encoding): 1896 tar = tarfile.open(tmpname, "w", format=self.format, 1897 encoding=encoding, errors="strict") 1898 try: 1899 name = "\xe4\xf6\xfc" 1900 tar.addfile(tarfile.TarInfo(name)) 1901 finally: 1902 tar.close() 1903 1904 tar = tarfile.open(tmpname, encoding=encoding) 1905 try: 1906 self.assertEqual(tar.getmembers()[0].name, name) 1907 finally: 1908 tar.close() 1909 1910 def test_unicode_filename_error(self): 1911 tar = tarfile.open(tmpname, "w", format=self.format, 1912 encoding="ascii", errors="strict") 1913 try: 1914 tarinfo = tarfile.TarInfo() 1915 1916 tarinfo.name = "\xe4\xf6\xfc" 1917 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1918 1919 tarinfo.name = "foo" 1920 tarinfo.uname = "\xe4\xf6\xfc" 1921 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1922 finally: 1923 tar.close() 1924 1925 def test_unicode_argument(self): 1926 tar = tarfile.open(tarname, "r", 1927 encoding="iso8859-1", errors="strict") 1928 try: 1929 for t in tar: 1930 self.assertIs(type(t.name), str) 1931 self.assertIs(type(t.linkname), str) 1932 self.assertIs(type(t.uname), str) 1933 self.assertIs(type(t.gname), str) 1934 finally: 1935 tar.close() 1936 1937 def test_uname_unicode(self): 1938 t = tarfile.TarInfo("foo") 1939 t.uname = "\xe4\xf6\xfc" 1940 t.gname = "\xe4\xf6\xfc" 1941 1942 tar = tarfile.open(tmpname, mode="w", format=self.format, 1943 encoding="iso8859-1") 1944 try: 1945 tar.addfile(t) 1946 finally: 1947 tar.close() 1948 1949 tar = tarfile.open(tmpname, encoding="iso8859-1") 1950 try: 1951 t = tar.getmember("foo") 1952 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1953 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1954 1955 if self.format != tarfile.PAX_FORMAT: 1956 tar.close() 1957 tar = tarfile.open(tmpname, encoding="ascii") 1958 t = tar.getmember("foo") 1959 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1960 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1961 finally: 1962 tar.close() 1963 1964 1965class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1966 1967 format = tarfile.USTAR_FORMAT 1968 1969 # Test whether the utf-8 encoded version of a filename exceeds the 100 1970 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1971 # bytes). 1972 def test_unicode_name1(self): 1973 self._test_ustar_name("0123456789" * 10) 1974 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1975 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1976 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1977 1978 def test_unicode_name2(self): 1979 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1980 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1981 1982 # Test whether the utf-8 encoded version of a filename exceeds the 155 1983 # bytes prefix + '/' + 100 bytes name limit. 1984 def test_unicode_longname1(self): 1985 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1986 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1987 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1988 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1989 1990 def test_unicode_longname2(self): 1991 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1992 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1993 1994 def test_unicode_longname3(self): 1995 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1996 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1997 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1998 1999 def test_unicode_longname4(self): 2000 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2001 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2002 2003 def _test_ustar_name(self, name, exc=None): 2004 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2005 t = tarfile.TarInfo(name) 2006 if exc is None: 2007 tar.addfile(t) 2008 else: 2009 self.assertRaises(exc, tar.addfile, t) 2010 2011 if exc is None: 2012 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2013 for t in tar: 2014 self.assertEqual(name, t.name) 2015 break 2016 2017 # Test the same as above for the 100 bytes link field. 2018 def test_unicode_link1(self): 2019 self._test_ustar_link("0123456789" * 10) 2020 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2021 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2022 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2023 2024 def test_unicode_link2(self): 2025 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2026 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2027 2028 def _test_ustar_link(self, name, exc=None): 2029 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2030 t = tarfile.TarInfo("foo") 2031 t.linkname = name 2032 if exc is None: 2033 tar.addfile(t) 2034 else: 2035 self.assertRaises(exc, tar.addfile, t) 2036 2037 if exc is None: 2038 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2039 for t in tar: 2040 self.assertEqual(name, t.linkname) 2041 break 2042 2043 2044class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2045 2046 format = tarfile.GNU_FORMAT 2047 2048 def test_bad_pax_header(self): 2049 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2050 # without a hdrcharset=BINARY header. 2051 for encoding, name in ( 2052 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2053 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2054 with tarfile.open(tarname, encoding=encoding, 2055 errors="surrogateescape") as tar: 2056 try: 2057 t = tar.getmember(name) 2058 except KeyError: 2059 self.fail("unable to read bad GNU tar pax header") 2060 2061 2062class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2063 2064 format = tarfile.PAX_FORMAT 2065 2066 # PAX_FORMAT ignores encoding in write mode. 2067 test_unicode_filename_error = None 2068 2069 def test_binary_header(self): 2070 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2071 for encoding, name in ( 2072 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2073 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2074 with tarfile.open(tarname, encoding=encoding, 2075 errors="surrogateescape") as tar: 2076 try: 2077 t = tar.getmember(name) 2078 except KeyError: 2079 self.fail("unable to read POSIX.1-2008 binary header") 2080 2081 2082class AppendTestBase: 2083 # Test append mode (cp. patch #1652681). 2084 2085 def setUp(self): 2086 self.tarname = tmpname 2087 if os.path.exists(self.tarname): 2088 os_helper.unlink(self.tarname) 2089 2090 def _create_testtar(self, mode="w:"): 2091 with tarfile.open(tarname, encoding="iso8859-1") as src: 2092 t = src.getmember("ustar/regtype") 2093 t.name = "foo" 2094 with src.extractfile(t) as f: 2095 with tarfile.open(self.tarname, mode) as tar: 2096 tar.addfile(t, f) 2097 2098 def test_append_compressed(self): 2099 self._create_testtar("w:" + self.suffix) 2100 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2101 2102class AppendTest(AppendTestBase, unittest.TestCase): 2103 test_append_compressed = None 2104 2105 def _add_testfile(self, fileobj=None): 2106 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2107 tar.addfile(tarfile.TarInfo("bar")) 2108 2109 def _test(self, names=["bar"], fileobj=None): 2110 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2111 self.assertEqual(tar.getnames(), names) 2112 2113 def test_non_existing(self): 2114 self._add_testfile() 2115 self._test() 2116 2117 def test_empty(self): 2118 tarfile.open(self.tarname, "w:").close() 2119 self._add_testfile() 2120 self._test() 2121 2122 def test_empty_fileobj(self): 2123 fobj = io.BytesIO(b"\0" * 1024) 2124 self._add_testfile(fobj) 2125 fobj.seek(0) 2126 self._test(fileobj=fobj) 2127 2128 def test_fileobj(self): 2129 self._create_testtar() 2130 with open(self.tarname, "rb") as fobj: 2131 data = fobj.read() 2132 fobj = io.BytesIO(data) 2133 self._add_testfile(fobj) 2134 fobj.seek(0) 2135 self._test(names=["foo", "bar"], fileobj=fobj) 2136 2137 def test_existing(self): 2138 self._create_testtar() 2139 self._add_testfile() 2140 self._test(names=["foo", "bar"]) 2141 2142 # Append mode is supposed to fail if the tarfile to append to 2143 # does not end with a zero block. 2144 def _test_error(self, data): 2145 with open(self.tarname, "wb") as fobj: 2146 fobj.write(data) 2147 self.assertRaises(tarfile.ReadError, self._add_testfile) 2148 2149 def test_null(self): 2150 self._test_error(b"") 2151 2152 def test_incomplete(self): 2153 self._test_error(b"\0" * 13) 2154 2155 def test_premature_eof(self): 2156 data = tarfile.TarInfo("foo").tobuf() 2157 self._test_error(data) 2158 2159 def test_trailing_garbage(self): 2160 data = tarfile.TarInfo("foo").tobuf() 2161 self._test_error(data + b"\0" * 13) 2162 2163 def test_invalid(self): 2164 self._test_error(b"a" * 512) 2165 2166class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2167 pass 2168 2169class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2170 pass 2171 2172class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2173 pass 2174 2175 2176class LimitsTest(unittest.TestCase): 2177 2178 def test_ustar_limits(self): 2179 # 100 char name 2180 tarinfo = tarfile.TarInfo("0123456789" * 10) 2181 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2182 2183 # 101 char name that cannot be stored 2184 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2185 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2186 2187 # 256 char name with a slash at pos 156 2188 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2189 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2190 2191 # 256 char name that cannot be stored 2192 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2193 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2194 2195 # 512 char name 2196 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2197 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2198 2199 # 512 char linkname 2200 tarinfo = tarfile.TarInfo("longlink") 2201 tarinfo.linkname = "123/" * 126 + "longname" 2202 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2203 2204 # uid > 8 digits 2205 tarinfo = tarfile.TarInfo("name") 2206 tarinfo.uid = 0o10000000 2207 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2208 2209 def test_gnu_limits(self): 2210 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2211 tarinfo.tobuf(tarfile.GNU_FORMAT) 2212 2213 tarinfo = tarfile.TarInfo("longlink") 2214 tarinfo.linkname = "123/" * 126 + "longname" 2215 tarinfo.tobuf(tarfile.GNU_FORMAT) 2216 2217 # uid >= 256 ** 7 2218 tarinfo = tarfile.TarInfo("name") 2219 tarinfo.uid = 0o4000000000000000000 2220 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2221 2222 def test_pax_limits(self): 2223 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2224 tarinfo.tobuf(tarfile.PAX_FORMAT) 2225 2226 tarinfo = tarfile.TarInfo("longlink") 2227 tarinfo.linkname = "123/" * 126 + "longname" 2228 tarinfo.tobuf(tarfile.PAX_FORMAT) 2229 2230 tarinfo = tarfile.TarInfo("name") 2231 tarinfo.uid = 0o4000000000000000000 2232 tarinfo.tobuf(tarfile.PAX_FORMAT) 2233 2234 2235class MiscTest(unittest.TestCase): 2236 2237 def test_char_fields(self): 2238 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2239 b"foo\0\0\0\0\0") 2240 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2241 b"foo") 2242 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2243 "foo") 2244 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2245 "foo") 2246 2247 def test_read_number_fields(self): 2248 # Issue 13158: Test if GNU tar specific base-256 number fields 2249 # are decoded correctly. 2250 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2251 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2252 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2253 0o10000000) 2254 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2255 0xffffffff) 2256 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2257 -1) 2258 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2259 -100) 2260 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2261 -0x100000000000000) 2262 2263 # Issue 24514: Test if empty number fields are converted to zero. 2264 self.assertEqual(tarfile.nti(b"\0"), 0) 2265 self.assertEqual(tarfile.nti(b" \0"), 0) 2266 2267 def test_write_number_fields(self): 2268 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2269 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2270 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2271 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2272 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2273 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2274 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2275 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2276 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2277 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2278 self.assertEqual(tarfile.itn(-0x100000000000000, 2279 format=tarfile.GNU_FORMAT), 2280 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2281 2282 # Issue 32713: Test if itn() supports float values outside the 2283 # non-GNU format range 2284 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2285 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2286 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2287 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2288 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2289 2290 def test_number_field_limits(self): 2291 with self.assertRaises(ValueError): 2292 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2293 with self.assertRaises(ValueError): 2294 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2295 with self.assertRaises(ValueError): 2296 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2297 with self.assertRaises(ValueError): 2298 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2299 2300 def test__all__(self): 2301 not_exported = { 2302 'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE', 2303 'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME', 2304 'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2305 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE', 2306 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE', 2307 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES', 2308 'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS', 2309 'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 2310 'copyfileobj', 'filemode', 'EmptyHeaderError', 2311 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 2312 'SubsequentHeaderError', 'ExFileObject', 'main'} 2313 support.check__all__(self, tarfile, not_exported=not_exported) 2314 2315 def test_useful_error_message_when_modules_missing(self): 2316 fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz') 2317 with self.assertRaises(tarfile.ReadError) as excinfo: 2318 error = tarfile.CompressionError('lzma module is not available'), 2319 with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error): 2320 tarfile.open(fname) 2321 2322 self.assertIn( 2323 "\n- method xz: CompressionError('lzma module is not available')\n", 2324 str(excinfo.exception), 2325 ) 2326 2327 2328class CommandLineTest(unittest.TestCase): 2329 2330 def tarfilecmd(self, *args, **kwargs): 2331 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2332 **kwargs) 2333 return out.replace(os.linesep.encode(), b'\n') 2334 2335 def tarfilecmd_failure(self, *args): 2336 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2337 2338 def make_simple_tarfile(self, tar_name): 2339 files = [support.findfile('tokenize_tests.txt'), 2340 support.findfile('tokenize_tests-no-coding-cookie-' 2341 'and-utf8-bom-sig-only.txt')] 2342 self.addCleanup(os_helper.unlink, tar_name) 2343 with tarfile.open(tar_name, 'w') as tf: 2344 for tardata in files: 2345 tf.add(tardata, arcname=os.path.basename(tardata)) 2346 2347 def test_bad_use(self): 2348 rc, out, err = self.tarfilecmd_failure() 2349 self.assertEqual(out, b'') 2350 self.assertIn(b'usage', err.lower()) 2351 self.assertIn(b'error', err.lower()) 2352 self.assertIn(b'required', err.lower()) 2353 rc, out, err = self.tarfilecmd_failure('-l', '') 2354 self.assertEqual(out, b'') 2355 self.assertNotEqual(err.strip(), b'') 2356 2357 def test_test_command(self): 2358 for tar_name in testtarnames: 2359 for opt in '-t', '--test': 2360 out = self.tarfilecmd(opt, tar_name) 2361 self.assertEqual(out, b'') 2362 2363 def test_test_command_verbose(self): 2364 for tar_name in testtarnames: 2365 for opt in '-v', '--verbose': 2366 out = self.tarfilecmd(opt, '-t', tar_name, 2367 PYTHONIOENCODING='utf-8') 2368 self.assertIn(b'is a tar archive.\n', out) 2369 2370 def test_test_command_invalid_file(self): 2371 zipname = support.findfile('zipdir.zip') 2372 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2373 self.assertIn(b' is not a tar archive.', err) 2374 self.assertEqual(out, b'') 2375 self.assertEqual(rc, 1) 2376 2377 for tar_name in testtarnames: 2378 with self.subTest(tar_name=tar_name): 2379 with open(tar_name, 'rb') as f: 2380 data = f.read() 2381 try: 2382 with open(tmpname, 'wb') as f: 2383 f.write(data[:511]) 2384 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2385 self.assertEqual(out, b'') 2386 self.assertEqual(rc, 1) 2387 finally: 2388 os_helper.unlink(tmpname) 2389 2390 def test_list_command(self): 2391 for tar_name in testtarnames: 2392 with support.captured_stdout() as t: 2393 with tarfile.open(tar_name, 'r') as tf: 2394 tf.list(verbose=False) 2395 expected = t.getvalue().encode('ascii', 'backslashreplace') 2396 for opt in '-l', '--list': 2397 out = self.tarfilecmd(opt, tar_name, 2398 PYTHONIOENCODING='ascii') 2399 self.assertEqual(out, expected) 2400 2401 def test_list_command_verbose(self): 2402 for tar_name in testtarnames: 2403 with support.captured_stdout() as t: 2404 with tarfile.open(tar_name, 'r') as tf: 2405 tf.list(verbose=True) 2406 expected = t.getvalue().encode('ascii', 'backslashreplace') 2407 for opt in '-v', '--verbose': 2408 out = self.tarfilecmd(opt, '-l', tar_name, 2409 PYTHONIOENCODING='ascii') 2410 self.assertEqual(out, expected) 2411 2412 def test_list_command_invalid_file(self): 2413 zipname = support.findfile('zipdir.zip') 2414 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2415 self.assertIn(b' is not a tar archive.', err) 2416 self.assertEqual(out, b'') 2417 self.assertEqual(rc, 1) 2418 2419 def test_create_command(self): 2420 files = [support.findfile('tokenize_tests.txt'), 2421 support.findfile('tokenize_tests-no-coding-cookie-' 2422 'and-utf8-bom-sig-only.txt')] 2423 for opt in '-c', '--create': 2424 try: 2425 out = self.tarfilecmd(opt, tmpname, *files) 2426 self.assertEqual(out, b'') 2427 with tarfile.open(tmpname) as tar: 2428 tar.getmembers() 2429 finally: 2430 os_helper.unlink(tmpname) 2431 2432 def test_create_command_verbose(self): 2433 files = [support.findfile('tokenize_tests.txt'), 2434 support.findfile('tokenize_tests-no-coding-cookie-' 2435 'and-utf8-bom-sig-only.txt')] 2436 for opt in '-v', '--verbose': 2437 try: 2438 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2439 PYTHONIOENCODING='utf-8') 2440 self.assertIn(b' file created.', out) 2441 with tarfile.open(tmpname) as tar: 2442 tar.getmembers() 2443 finally: 2444 os_helper.unlink(tmpname) 2445 2446 def test_create_command_dotless_filename(self): 2447 files = [support.findfile('tokenize_tests.txt')] 2448 try: 2449 out = self.tarfilecmd('-c', dotlessname, *files) 2450 self.assertEqual(out, b'') 2451 with tarfile.open(dotlessname) as tar: 2452 tar.getmembers() 2453 finally: 2454 os_helper.unlink(dotlessname) 2455 2456 def test_create_command_dot_started_filename(self): 2457 tar_name = os.path.join(TEMPDIR, ".testtar") 2458 files = [support.findfile('tokenize_tests.txt')] 2459 try: 2460 out = self.tarfilecmd('-c', tar_name, *files) 2461 self.assertEqual(out, b'') 2462 with tarfile.open(tar_name) as tar: 2463 tar.getmembers() 2464 finally: 2465 os_helper.unlink(tar_name) 2466 2467 def test_create_command_compressed(self): 2468 files = [support.findfile('tokenize_tests.txt'), 2469 support.findfile('tokenize_tests-no-coding-cookie-' 2470 'and-utf8-bom-sig-only.txt')] 2471 for filetype in (GzipTest, Bz2Test, LzmaTest): 2472 if not filetype.open: 2473 continue 2474 try: 2475 tar_name = tmpname + '.' + filetype.suffix 2476 out = self.tarfilecmd('-c', tar_name, *files) 2477 with filetype.taropen(tar_name) as tar: 2478 tar.getmembers() 2479 finally: 2480 os_helper.unlink(tar_name) 2481 2482 def test_extract_command(self): 2483 self.make_simple_tarfile(tmpname) 2484 for opt in '-e', '--extract': 2485 try: 2486 with os_helper.temp_cwd(tarextdir): 2487 out = self.tarfilecmd(opt, tmpname) 2488 self.assertEqual(out, b'') 2489 finally: 2490 os_helper.rmtree(tarextdir) 2491 2492 def test_extract_command_verbose(self): 2493 self.make_simple_tarfile(tmpname) 2494 for opt in '-v', '--verbose': 2495 try: 2496 with os_helper.temp_cwd(tarextdir): 2497 out = self.tarfilecmd(opt, '-e', tmpname, 2498 PYTHONIOENCODING='utf-8') 2499 self.assertIn(b' file is extracted.', out) 2500 finally: 2501 os_helper.rmtree(tarextdir) 2502 2503 def test_extract_command_different_directory(self): 2504 self.make_simple_tarfile(tmpname) 2505 try: 2506 with os_helper.temp_cwd(tarextdir): 2507 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2508 self.assertEqual(out, b'') 2509 finally: 2510 os_helper.rmtree(tarextdir) 2511 2512 def test_extract_command_invalid_file(self): 2513 zipname = support.findfile('zipdir.zip') 2514 with os_helper.temp_cwd(tarextdir): 2515 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2516 self.assertIn(b' is not a tar archive.', err) 2517 self.assertEqual(out, b'') 2518 self.assertEqual(rc, 1) 2519 2520 2521class ContextManagerTest(unittest.TestCase): 2522 2523 def test_basic(self): 2524 with tarfile.open(tarname) as tar: 2525 self.assertFalse(tar.closed, "closed inside runtime context") 2526 self.assertTrue(tar.closed, "context manager failed") 2527 2528 def test_closed(self): 2529 # The __enter__() method is supposed to raise OSError 2530 # if the TarFile object is already closed. 2531 tar = tarfile.open(tarname) 2532 tar.close() 2533 with self.assertRaises(OSError): 2534 with tar: 2535 pass 2536 2537 def test_exception(self): 2538 # Test if the OSError exception is passed through properly. 2539 with self.assertRaises(Exception) as exc: 2540 with tarfile.open(tarname) as tar: 2541 raise OSError 2542 self.assertIsInstance(exc.exception, OSError, 2543 "wrong exception raised in context manager") 2544 self.assertTrue(tar.closed, "context manager failed") 2545 2546 def test_no_eof(self): 2547 # __exit__() must not write end-of-archive blocks if an 2548 # exception was raised. 2549 try: 2550 with tarfile.open(tmpname, "w") as tar: 2551 raise Exception 2552 except: 2553 pass 2554 self.assertEqual(os.path.getsize(tmpname), 0, 2555 "context manager wrote an end-of-archive block") 2556 self.assertTrue(tar.closed, "context manager failed") 2557 2558 def test_eof(self): 2559 # __exit__() must write end-of-archive blocks, i.e. call 2560 # TarFile.close() if there was no error. 2561 with tarfile.open(tmpname, "w"): 2562 pass 2563 self.assertNotEqual(os.path.getsize(tmpname), 0, 2564 "context manager wrote no end-of-archive block") 2565 2566 def test_fileobj(self): 2567 # Test that __exit__() did not close the external file 2568 # object. 2569 with open(tmpname, "wb") as fobj: 2570 try: 2571 with tarfile.open(fileobj=fobj, mode="w") as tar: 2572 raise Exception 2573 except: 2574 pass 2575 self.assertFalse(fobj.closed, "external file object was closed") 2576 self.assertTrue(tar.closed, "context manager failed") 2577 2578 2579@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2580class LinkEmulationTest(ReadTest, unittest.TestCase): 2581 2582 # Test for issue #8741 regression. On platforms that do not support 2583 # symbolic or hard links tarfile tries to extract these types of members 2584 # as the regular files they point to. 2585 def _test_link_extraction(self, name): 2586 self.tar.extract(name, TEMPDIR) 2587 with open(os.path.join(TEMPDIR, name), "rb") as f: 2588 data = f.read() 2589 self.assertEqual(sha256sum(data), sha256_regtype) 2590 2591 # See issues #1578269, #8879, and #17689 for some history on these skips 2592 @unittest.skipIf(hasattr(os.path, "islink"), 2593 "Skip emulation - has os.path.islink but not os.link") 2594 def test_hardlink_extraction1(self): 2595 self._test_link_extraction("ustar/lnktype") 2596 2597 @unittest.skipIf(hasattr(os.path, "islink"), 2598 "Skip emulation - has os.path.islink but not os.link") 2599 def test_hardlink_extraction2(self): 2600 self._test_link_extraction("./ustar/linktest2/lnktype") 2601 2602 @unittest.skipIf(hasattr(os, "symlink"), 2603 "Skip emulation if symlink exists") 2604 def test_symlink_extraction1(self): 2605 self._test_link_extraction("ustar/symtype") 2606 2607 @unittest.skipIf(hasattr(os, "symlink"), 2608 "Skip emulation if symlink exists") 2609 def test_symlink_extraction2(self): 2610 self._test_link_extraction("./ustar/linktest2/symtype") 2611 2612 2613class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2614 # Issue5068: The _BZ2Proxy.read() method loops forever 2615 # on an empty or partial bzipped file. 2616 2617 def _test_partial_input(self, mode): 2618 class MyBytesIO(io.BytesIO): 2619 hit_eof = False 2620 def read(self, n): 2621 if self.hit_eof: 2622 raise AssertionError("infinite loop detected in " 2623 "tarfile.open()") 2624 self.hit_eof = self.tell() == len(self.getvalue()) 2625 return super(MyBytesIO, self).read(n) 2626 def seek(self, *args): 2627 self.hit_eof = False 2628 return super(MyBytesIO, self).seek(*args) 2629 2630 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2631 for x in range(len(data) + 1): 2632 try: 2633 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2634 except tarfile.ReadError: 2635 pass # we have no interest in ReadErrors 2636 2637 def test_partial_input(self): 2638 self._test_partial_input("r") 2639 2640 def test_partial_input_bz2(self): 2641 self._test_partial_input("r:bz2") 2642 2643 2644def root_is_uid_gid_0(): 2645 try: 2646 import pwd, grp 2647 except ImportError: 2648 return False 2649 if pwd.getpwuid(0)[0] != 'root': 2650 return False 2651 if grp.getgrgid(0)[0] != 'root': 2652 return False 2653 return True 2654 2655 2656@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2657@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2658class NumericOwnerTest(unittest.TestCase): 2659 # mock the following: 2660 # os.chown: so we can test what's being called 2661 # os.chmod: so the modes are not actually changed. if they are, we can't 2662 # delete the files/directories 2663 # os.geteuid: so we can lie and say we're root (uid = 0) 2664 2665 @staticmethod 2666 def _make_test_archive(filename_1, dirname_1, filename_2): 2667 # the file contents to write 2668 fobj = io.BytesIO(b"content") 2669 2670 # create a tar file with a file, a directory, and a file within that 2671 # directory. Assign various .uid/.gid values to them 2672 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2673 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2674 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2675 ] 2676 with tarfile.open(tmpname, 'w') as tarfl: 2677 for name, uid, gid, typ, contents in items: 2678 t = tarfile.TarInfo(name) 2679 t.uid = uid 2680 t.gid = gid 2681 t.uname = 'root' 2682 t.gname = 'root' 2683 t.type = typ 2684 tarfl.addfile(t, contents) 2685 2686 # return the full pathname to the tar file 2687 return tmpname 2688 2689 @staticmethod 2690 @contextmanager 2691 def _setup_test(mock_geteuid): 2692 mock_geteuid.return_value = 0 # lie and say we're root 2693 fname = 'numeric-owner-testfile' 2694 dirname = 'dir' 2695 2696 # the names we want stored in the tarfile 2697 filename_1 = fname 2698 dirname_1 = dirname 2699 filename_2 = os.path.join(dirname, fname) 2700 2701 # create the tarfile with the contents we're after 2702 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2703 dirname_1, 2704 filename_2) 2705 2706 # open the tarfile for reading. yield it and the names of the items 2707 # we stored into the file 2708 with tarfile.open(tar_filename) as tarfl: 2709 yield tarfl, filename_1, dirname_1, filename_2 2710 2711 @unittest.mock.patch('os.chown') 2712 @unittest.mock.patch('os.chmod') 2713 @unittest.mock.patch('os.geteuid') 2714 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2715 mock_chown): 2716 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2717 filename_2): 2718 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2719 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2720 2721 # convert to filesystem paths 2722 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2723 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2724 2725 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2726 unittest.mock.call(f_filename_2, 88, 87), 2727 ], 2728 any_order=True) 2729 2730 @unittest.mock.patch('os.chown') 2731 @unittest.mock.patch('os.chmod') 2732 @unittest.mock.patch('os.geteuid') 2733 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2734 mock_chown): 2735 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2736 filename_2): 2737 tarfl.extractall(TEMPDIR, numeric_owner=True) 2738 2739 # convert to filesystem paths 2740 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2741 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2742 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2743 2744 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2745 unittest.mock.call(f_dirname_1, 77, 76), 2746 unittest.mock.call(f_filename_2, 88, 87), 2747 ], 2748 any_order=True) 2749 2750 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2751 # because the uname and gname in the test file are 'root', and extract() 2752 # will look them up using pwd and grp to find their uid and gid, which we 2753 # test here to be 0. 2754 @unittest.skipUnless(root_is_uid_gid_0(), 2755 'uid=0,gid=0 must be named "root"') 2756 @unittest.mock.patch('os.chown') 2757 @unittest.mock.patch('os.chmod') 2758 @unittest.mock.patch('os.geteuid') 2759 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2760 mock_chown): 2761 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2762 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2763 2764 # convert to filesystem paths 2765 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2766 2767 mock_chown.assert_called_with(f_filename_1, 0, 0) 2768 2769 @unittest.mock.patch('os.geteuid') 2770 def test_keyword_only(self, mock_geteuid): 2771 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2772 self.assertRaises(TypeError, 2773 tarfl.extract, filename_1, TEMPDIR, False, True) 2774 2775 2776def setUpModule(): 2777 os_helper.unlink(TEMPDIR) 2778 os.makedirs(TEMPDIR) 2779 2780 global testtarnames 2781 testtarnames = [tarname] 2782 with open(tarname, "rb") as fobj: 2783 data = fobj.read() 2784 2785 # Create compressed tarfiles. 2786 for c in GzipTest, Bz2Test, LzmaTest: 2787 if c.open: 2788 os_helper.unlink(c.tarname) 2789 testtarnames.append(c.tarname) 2790 with c.open(c.tarname, "wb") as tar: 2791 tar.write(data) 2792 2793def tearDownModule(): 2794 if os.path.exists(TEMPDIR): 2795 os_helper.rmtree(TEMPDIR) 2796 2797if __name__ == "__main__": 2798 unittest.main() 2799