1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import asynchat 6import asyncore 7import codecs 8import contextlib 9import decimal 10import errno 11import fnmatch 12import fractions 13import itertools 14import locale 15import mmap 16import os 17import pickle 18import shutil 19import signal 20import socket 21import stat 22import subprocess 23import sys 24import sysconfig 25import tempfile 26import threading 27import time 28import unittest 29import uuid 30import warnings 31from test import support 32from platform import win32_is_iot 33 34try: 35 import resource 36except ImportError: 37 resource = None 38try: 39 import fcntl 40except ImportError: 41 fcntl = None 42try: 43 import _winapi 44except ImportError: 45 _winapi = None 46try: 47 import pwd 48 all_users = [u.pw_uid for u in pwd.getpwall()] 49except (ImportError, AttributeError): 50 all_users = [] 51try: 52 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 53except ImportError: 54 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 55 56from test.support.script_helper import assert_python_ok 57from test.support import unix_shell, FakePath 58 59 60root_in_posix = False 61if hasattr(os, 'geteuid'): 62 root_in_posix = (os.geteuid() == 0) 63 64# Detect whether we're on a Linux system that uses the (now outdated 65# and unmaintained) linuxthreads threading library. There's an issue 66# when combining linuxthreads with a failed execv call: see 67# http://bugs.python.org/issue4970. 68if hasattr(sys, 'thread_info') and sys.thread_info.version: 69 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 70else: 71 USING_LINUXTHREADS = False 72 73# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 75 76 77def requires_os_func(name): 78 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 79 80 81def create_file(filename, content=b'content'): 82 with open(filename, "xb", 0) as fp: 83 fp.write(content) 84 85 86class MiscTests(unittest.TestCase): 87 def test_getcwd(self): 88 cwd = os.getcwd() 89 self.assertIsInstance(cwd, str) 90 91 def test_getcwd_long_path(self): 92 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On 93 # Windows, MAX_PATH is defined as 260 characters, but Windows supports 94 # longer path if longer paths support is enabled. Internally, the os 95 # module uses MAXPATHLEN which is at least 1024. 96 # 97 # Use a directory name of 200 characters to fit into Windows MAX_PATH 98 # limit. 99 # 100 # On Windows, the test can stop when trying to create a path longer 101 # than MAX_PATH if long paths support is disabled: 102 # see RtlAreLongPathsEnabled(). 103 min_len = 2000 # characters 104 dirlen = 200 # characters 105 dirname = 'python_test_dir_' 106 dirname = dirname + ('a' * (dirlen - len(dirname))) 107 108 with tempfile.TemporaryDirectory() as tmpdir: 109 with support.change_cwd(tmpdir) as path: 110 expected = path 111 112 while True: 113 cwd = os.getcwd() 114 self.assertEqual(cwd, expected) 115 116 need = min_len - (len(cwd) + len(os.path.sep)) 117 if need <= 0: 118 break 119 if len(dirname) > need and need > 0: 120 dirname = dirname[:need] 121 122 path = os.path.join(path, dirname) 123 try: 124 os.mkdir(path) 125 # On Windows, chdir() can fail 126 # even if mkdir() succeeded 127 os.chdir(path) 128 except FileNotFoundError: 129 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and 130 # ERROR_FILENAME_EXCED_RANGE (206) errors 131 # ("The filename or extension is too long") 132 break 133 except OSError as exc: 134 if exc.errno == errno.ENAMETOOLONG: 135 break 136 else: 137 raise 138 139 expected = path 140 141 if support.verbose: 142 print(f"Tested current directory length: {len(cwd)}") 143 144 def test_getcwdb(self): 145 cwd = os.getcwdb() 146 self.assertIsInstance(cwd, bytes) 147 self.assertEqual(os.fsdecode(cwd), os.getcwd()) 148 149 150# Tests creating TESTFN 151class FileTests(unittest.TestCase): 152 def setUp(self): 153 if os.path.lexists(support.TESTFN): 154 os.unlink(support.TESTFN) 155 tearDown = setUp 156 157 def test_access(self): 158 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 159 os.close(f) 160 self.assertTrue(os.access(support.TESTFN, os.W_OK)) 161 162 def test_closerange(self): 163 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 164 # We must allocate two consecutive file descriptors, otherwise 165 # it will mess up other file descriptors (perhaps even the three 166 # standard ones). 167 second = os.dup(first) 168 try: 169 retries = 0 170 while second != first + 1: 171 os.close(first) 172 retries += 1 173 if retries > 10: 174 # XXX test skipped 175 self.skipTest("couldn't allocate two consecutive fds") 176 first, second = second, os.dup(second) 177 finally: 178 os.close(second) 179 # close a fd that is open, and one that isn't 180 os.closerange(first, first + 2) 181 self.assertRaises(OSError, os.write, first, b"a") 182 183 @support.cpython_only 184 def test_rename(self): 185 path = support.TESTFN 186 old = sys.getrefcount(path) 187 self.assertRaises(TypeError, os.rename, path, 0) 188 new = sys.getrefcount(path) 189 self.assertEqual(old, new) 190 191 def test_read(self): 192 with open(support.TESTFN, "w+b") as fobj: 193 fobj.write(b"spam") 194 fobj.flush() 195 fd = fobj.fileno() 196 os.lseek(fd, 0, 0) 197 s = os.read(fd, 4) 198 self.assertEqual(type(s), bytes) 199 self.assertEqual(s, b"spam") 200 201 @support.cpython_only 202 # Skip the test on 32-bit platforms: the number of bytes must fit in a 203 # Py_ssize_t type 204 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 205 "needs INT_MAX < PY_SSIZE_T_MAX") 206 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 207 def test_large_read(self, size): 208 self.addCleanup(support.unlink, support.TESTFN) 209 create_file(support.TESTFN, b'test') 210 211 # Issue #21932: Make sure that os.read() does not raise an 212 # OverflowError for size larger than INT_MAX 213 with open(support.TESTFN, "rb") as fp: 214 data = os.read(fp.fileno(), size) 215 216 # The test does not try to read more than 2 GiB at once because the 217 # operating system is free to return less bytes than requested. 218 self.assertEqual(data, b'test') 219 220 def test_write(self): 221 # os.write() accepts bytes- and buffer-like objects but not strings 222 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) 223 self.assertRaises(TypeError, os.write, fd, "beans") 224 os.write(fd, b"bacon\n") 225 os.write(fd, bytearray(b"eggs\n")) 226 os.write(fd, memoryview(b"spam\n")) 227 os.close(fd) 228 with open(support.TESTFN, "rb") as fobj: 229 self.assertEqual(fobj.read().splitlines(), 230 [b"bacon", b"eggs", b"spam"]) 231 232 def write_windows_console(self, *args): 233 retcode = subprocess.call(args, 234 # use a new console to not flood the test output 235 creationflags=subprocess.CREATE_NEW_CONSOLE, 236 # use a shell to hide the console window (SW_HIDE) 237 shell=True) 238 self.assertEqual(retcode, 0) 239 240 @unittest.skipUnless(sys.platform == 'win32', 241 'test specific to the Windows console') 242 def test_write_windows_console(self): 243 # Issue #11395: the Windows console returns an error (12: not enough 244 # space error) on writing into stdout if stdout mode is binary and the 245 # length is greater than 66,000 bytes (or less, depending on heap 246 # usage). 247 code = "print('x' * 100000)" 248 self.write_windows_console(sys.executable, "-c", code) 249 self.write_windows_console(sys.executable, "-u", "-c", code) 250 251 def fdopen_helper(self, *args): 252 fd = os.open(support.TESTFN, os.O_RDONLY) 253 f = os.fdopen(fd, *args) 254 f.close() 255 256 def test_fdopen(self): 257 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 258 os.close(fd) 259 260 self.fdopen_helper() 261 self.fdopen_helper('r') 262 self.fdopen_helper('r', 100) 263 264 def test_replace(self): 265 TESTFN2 = support.TESTFN + ".2" 266 self.addCleanup(support.unlink, support.TESTFN) 267 self.addCleanup(support.unlink, TESTFN2) 268 269 create_file(support.TESTFN, b"1") 270 create_file(TESTFN2, b"2") 271 272 os.replace(support.TESTFN, TESTFN2) 273 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) 274 with open(TESTFN2, 'r') as f: 275 self.assertEqual(f.read(), "1") 276 277 def test_open_keywords(self): 278 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 279 dir_fd=None) 280 os.close(f) 281 282 def test_symlink_keywords(self): 283 symlink = support.get_attribute(os, "symlink") 284 try: 285 symlink(src='target', dst=support.TESTFN, 286 target_is_directory=False, dir_fd=None) 287 except (NotImplementedError, OSError): 288 pass # No OS support or unprivileged user 289 290 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 291 def test_copy_file_range_invalid_values(self): 292 with self.assertRaises(ValueError): 293 os.copy_file_range(0, 1, -10) 294 295 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 296 def test_copy_file_range(self): 297 TESTFN2 = support.TESTFN + ".3" 298 data = b'0123456789' 299 300 create_file(support.TESTFN, data) 301 self.addCleanup(support.unlink, support.TESTFN) 302 303 in_file = open(support.TESTFN, 'rb') 304 self.addCleanup(in_file.close) 305 in_fd = in_file.fileno() 306 307 out_file = open(TESTFN2, 'w+b') 308 self.addCleanup(support.unlink, TESTFN2) 309 self.addCleanup(out_file.close) 310 out_fd = out_file.fileno() 311 312 try: 313 i = os.copy_file_range(in_fd, out_fd, 5) 314 except OSError as e: 315 # Handle the case in which Python was compiled 316 # in a system with the syscall but without support 317 # in the kernel. 318 if e.errno != errno.ENOSYS: 319 raise 320 self.skipTest(e) 321 else: 322 # The number of copied bytes can be less than 323 # the number of bytes originally requested. 324 self.assertIn(i, range(0, 6)); 325 326 with open(TESTFN2, 'rb') as in_file: 327 self.assertEqual(in_file.read(), data[:i]) 328 329 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 330 def test_copy_file_range_offset(self): 331 TESTFN4 = support.TESTFN + ".4" 332 data = b'0123456789' 333 bytes_to_copy = 6 334 in_skip = 3 335 out_seek = 5 336 337 create_file(support.TESTFN, data) 338 self.addCleanup(support.unlink, support.TESTFN) 339 340 in_file = open(support.TESTFN, 'rb') 341 self.addCleanup(in_file.close) 342 in_fd = in_file.fileno() 343 344 out_file = open(TESTFN4, 'w+b') 345 self.addCleanup(support.unlink, TESTFN4) 346 self.addCleanup(out_file.close) 347 out_fd = out_file.fileno() 348 349 try: 350 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy, 351 offset_src=in_skip, 352 offset_dst=out_seek) 353 except OSError as e: 354 # Handle the case in which Python was compiled 355 # in a system with the syscall but without support 356 # in the kernel. 357 if e.errno != errno.ENOSYS: 358 raise 359 self.skipTest(e) 360 else: 361 # The number of copied bytes can be less than 362 # the number of bytes originally requested. 363 self.assertIn(i, range(0, bytes_to_copy+1)); 364 365 with open(TESTFN4, 'rb') as in_file: 366 read = in_file.read() 367 # seeked bytes (5) are zero'ed 368 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 369 # 012 are skipped (in_skip) 370 # 345678 are copied in the file (in_skip + bytes_to_copy) 371 self.assertEqual(read[out_seek:], 372 data[in_skip:in_skip+i]) 373 374# Test attributes on return values from os.*stat* family. 375class StatAttributeTests(unittest.TestCase): 376 def setUp(self): 377 self.fname = support.TESTFN 378 self.addCleanup(support.unlink, self.fname) 379 create_file(self.fname, b"ABC") 380 381 def check_stat_attributes(self, fname): 382 result = os.stat(fname) 383 384 # Make sure direct access works 385 self.assertEqual(result[stat.ST_SIZE], 3) 386 self.assertEqual(result.st_size, 3) 387 388 # Make sure all the attributes are there 389 members = dir(result) 390 for name in dir(stat): 391 if name[:3] == 'ST_': 392 attr = name.lower() 393 if name.endswith("TIME"): 394 def trunc(x): return int(x) 395 else: 396 def trunc(x): return x 397 self.assertEqual(trunc(getattr(result, attr)), 398 result[getattr(stat, name)]) 399 self.assertIn(attr, members) 400 401 # Make sure that the st_?time and st_?time_ns fields roughly agree 402 # (they should always agree up to around tens-of-microseconds) 403 for name in 'st_atime st_mtime st_ctime'.split(): 404 floaty = int(getattr(result, name) * 100000) 405 nanosecondy = getattr(result, name + "_ns") // 10000 406 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 407 408 try: 409 result[200] 410 self.fail("No exception raised") 411 except IndexError: 412 pass 413 414 # Make sure that assignment fails 415 try: 416 result.st_mode = 1 417 self.fail("No exception raised") 418 except AttributeError: 419 pass 420 421 try: 422 result.st_rdev = 1 423 self.fail("No exception raised") 424 except (AttributeError, TypeError): 425 pass 426 427 try: 428 result.parrot = 1 429 self.fail("No exception raised") 430 except AttributeError: 431 pass 432 433 # Use the stat_result constructor with a too-short tuple. 434 try: 435 result2 = os.stat_result((10,)) 436 self.fail("No exception raised") 437 except TypeError: 438 pass 439 440 # Use the constructor with a too-long tuple. 441 try: 442 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 443 except TypeError: 444 pass 445 446 def test_stat_attributes(self): 447 self.check_stat_attributes(self.fname) 448 449 def test_stat_attributes_bytes(self): 450 try: 451 fname = self.fname.encode(sys.getfilesystemencoding()) 452 except UnicodeEncodeError: 453 self.skipTest("cannot encode %a for the filesystem" % self.fname) 454 self.check_stat_attributes(fname) 455 456 def test_stat_result_pickle(self): 457 result = os.stat(self.fname) 458 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 459 p = pickle.dumps(result, proto) 460 self.assertIn(b'stat_result', p) 461 if proto < 4: 462 self.assertIn(b'cos\nstat_result\n', p) 463 unpickled = pickle.loads(p) 464 self.assertEqual(result, unpickled) 465 466 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 467 def test_statvfs_attributes(self): 468 result = os.statvfs(self.fname) 469 470 # Make sure direct access works 471 self.assertEqual(result.f_bfree, result[3]) 472 473 # Make sure all the attributes are there. 474 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 475 'ffree', 'favail', 'flag', 'namemax') 476 for value, member in enumerate(members): 477 self.assertEqual(getattr(result, 'f_' + member), result[value]) 478 479 self.assertTrue(isinstance(result.f_fsid, int)) 480 481 # Test that the size of the tuple doesn't change 482 self.assertEqual(len(result), 10) 483 484 # Make sure that assignment really fails 485 try: 486 result.f_bfree = 1 487 self.fail("No exception raised") 488 except AttributeError: 489 pass 490 491 try: 492 result.parrot = 1 493 self.fail("No exception raised") 494 except AttributeError: 495 pass 496 497 # Use the constructor with a too-short tuple. 498 try: 499 result2 = os.statvfs_result((10,)) 500 self.fail("No exception raised") 501 except TypeError: 502 pass 503 504 # Use the constructor with a too-long tuple. 505 try: 506 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 507 except TypeError: 508 pass 509 510 @unittest.skipUnless(hasattr(os, 'statvfs'), 511 "need os.statvfs()") 512 def test_statvfs_result_pickle(self): 513 result = os.statvfs(self.fname) 514 515 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 516 p = pickle.dumps(result, proto) 517 self.assertIn(b'statvfs_result', p) 518 if proto < 4: 519 self.assertIn(b'cos\nstatvfs_result\n', p) 520 unpickled = pickle.loads(p) 521 self.assertEqual(result, unpickled) 522 523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 524 def test_1686475(self): 525 # Verify that an open file can be stat'ed 526 try: 527 os.stat(r"c:\pagefile.sys") 528 except FileNotFoundError: 529 self.skipTest(r'c:\pagefile.sys does not exist') 530 except OSError as e: 531 self.fail("Could not stat pagefile.sys") 532 533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 534 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 535 def test_15261(self): 536 # Verify that stat'ing a closed fd does not cause crash 537 r, w = os.pipe() 538 try: 539 os.stat(r) # should not raise error 540 finally: 541 os.close(r) 542 os.close(w) 543 with self.assertRaises(OSError) as ctx: 544 os.stat(r) 545 self.assertEqual(ctx.exception.errno, errno.EBADF) 546 547 def check_file_attributes(self, result): 548 self.assertTrue(hasattr(result, 'st_file_attributes')) 549 self.assertTrue(isinstance(result.st_file_attributes, int)) 550 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 551 552 @unittest.skipUnless(sys.platform == "win32", 553 "st_file_attributes is Win32 specific") 554 def test_file_attributes(self): 555 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 556 result = os.stat(self.fname) 557 self.check_file_attributes(result) 558 self.assertEqual( 559 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 560 0) 561 562 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 563 dirname = support.TESTFN + "dir" 564 os.mkdir(dirname) 565 self.addCleanup(os.rmdir, dirname) 566 567 result = os.stat(dirname) 568 self.check_file_attributes(result) 569 self.assertEqual( 570 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 571 stat.FILE_ATTRIBUTE_DIRECTORY) 572 573 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 574 def test_access_denied(self): 575 # Default to FindFirstFile WIN32_FIND_DATA when access is 576 # denied. See issue 28075. 577 # os.environ['TEMP'] should be located on a volume that 578 # supports file ACLs. 579 fname = os.path.join(os.environ['TEMP'], self.fname) 580 self.addCleanup(support.unlink, fname) 581 create_file(fname, b'ABC') 582 # Deny the right to [S]YNCHRONIZE on the file to 583 # force CreateFile to fail with ERROR_ACCESS_DENIED. 584 DETACHED_PROCESS = 8 585 subprocess.check_call( 586 # bpo-30584: Use security identifier *S-1-5-32-545 instead 587 # of localized "Users" to not depend on the locale. 588 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 589 creationflags=DETACHED_PROCESS 590 ) 591 result = os.stat(fname) 592 self.assertNotEqual(result.st_size, 0) 593 594 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 595 def test_stat_block_device(self): 596 # bpo-38030: os.stat fails for block devices 597 # Test a filename like "//./C:" 598 fname = "//./" + os.path.splitdrive(os.getcwd())[0] 599 result = os.stat(fname) 600 self.assertEqual(result.st_mode, stat.S_IFBLK) 601 602 603class UtimeTests(unittest.TestCase): 604 def setUp(self): 605 self.dirname = support.TESTFN 606 self.fname = os.path.join(self.dirname, "f1") 607 608 self.addCleanup(support.rmtree, self.dirname) 609 os.mkdir(self.dirname) 610 create_file(self.fname) 611 612 def support_subsecond(self, filename): 613 # Heuristic to check if the filesystem supports timestamp with 614 # subsecond resolution: check if float and int timestamps are different 615 st = os.stat(filename) 616 return ((st.st_atime != st[7]) 617 or (st.st_mtime != st[8]) 618 or (st.st_ctime != st[9])) 619 620 def _test_utime(self, set_time, filename=None): 621 if not filename: 622 filename = self.fname 623 624 support_subsecond = self.support_subsecond(filename) 625 if support_subsecond: 626 # Timestamp with a resolution of 1 microsecond (10^-6). 627 # 628 # The resolution of the C internal function used by os.utime() 629 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 630 # test with a resolution of 1 ns requires more work: 631 # see the issue #15745. 632 atime_ns = 1002003000 # 1.002003 seconds 633 mtime_ns = 4005006000 # 4.005006 seconds 634 else: 635 # use a resolution of 1 second 636 atime_ns = 5 * 10**9 637 mtime_ns = 8 * 10**9 638 639 set_time(filename, (atime_ns, mtime_ns)) 640 st = os.stat(filename) 641 642 if support_subsecond: 643 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 644 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 645 else: 646 self.assertEqual(st.st_atime, atime_ns * 1e-9) 647 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 648 self.assertEqual(st.st_atime_ns, atime_ns) 649 self.assertEqual(st.st_mtime_ns, mtime_ns) 650 651 def test_utime(self): 652 def set_time(filename, ns): 653 # test the ns keyword parameter 654 os.utime(filename, ns=ns) 655 self._test_utime(set_time) 656 657 @staticmethod 658 def ns_to_sec(ns): 659 # Convert a number of nanosecond (int) to a number of seconds (float). 660 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 661 # issue, os.utime() rounds towards minus infinity. 662 return (ns * 1e-9) + 0.5e-9 663 664 def test_utime_by_indexed(self): 665 # pass times as floating point seconds as the second indexed parameter 666 def set_time(filename, ns): 667 atime_ns, mtime_ns = ns 668 atime = self.ns_to_sec(atime_ns) 669 mtime = self.ns_to_sec(mtime_ns) 670 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 671 # or utime(time_t) 672 os.utime(filename, (atime, mtime)) 673 self._test_utime(set_time) 674 675 def test_utime_by_times(self): 676 def set_time(filename, ns): 677 atime_ns, mtime_ns = ns 678 atime = self.ns_to_sec(atime_ns) 679 mtime = self.ns_to_sec(mtime_ns) 680 # test the times keyword parameter 681 os.utime(filename, times=(atime, mtime)) 682 self._test_utime(set_time) 683 684 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 685 "follow_symlinks support for utime required " 686 "for this test.") 687 def test_utime_nofollow_symlinks(self): 688 def set_time(filename, ns): 689 # use follow_symlinks=False to test utimensat(timespec) 690 # or lutimes(timeval) 691 os.utime(filename, ns=ns, follow_symlinks=False) 692 self._test_utime(set_time) 693 694 @unittest.skipUnless(os.utime in os.supports_fd, 695 "fd support for utime required for this test.") 696 def test_utime_fd(self): 697 def set_time(filename, ns): 698 with open(filename, 'wb', 0) as fp: 699 # use a file descriptor to test futimens(timespec) 700 # or futimes(timeval) 701 os.utime(fp.fileno(), ns=ns) 702 self._test_utime(set_time) 703 704 @unittest.skipUnless(os.utime in os.supports_dir_fd, 705 "dir_fd support for utime required for this test.") 706 def test_utime_dir_fd(self): 707 def set_time(filename, ns): 708 dirname, name = os.path.split(filename) 709 dirfd = os.open(dirname, os.O_RDONLY) 710 try: 711 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 712 os.utime(name, dir_fd=dirfd, ns=ns) 713 finally: 714 os.close(dirfd) 715 self._test_utime(set_time) 716 717 def test_utime_directory(self): 718 def set_time(filename, ns): 719 # test calling os.utime() on a directory 720 os.utime(filename, ns=ns) 721 self._test_utime(set_time, filename=self.dirname) 722 723 def _test_utime_current(self, set_time): 724 # Get the system clock 725 current = time.time() 726 727 # Call os.utime() to set the timestamp to the current system clock 728 set_time(self.fname) 729 730 if not self.support_subsecond(self.fname): 731 delta = 1.0 732 else: 733 # On Windows, the usual resolution of time.time() is 15.6 ms. 734 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 735 # 736 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 737 # also 50 ms on other platforms. 738 delta = 0.050 739 st = os.stat(self.fname) 740 msg = ("st_time=%r, current=%r, dt=%r" 741 % (st.st_mtime, current, st.st_mtime - current)) 742 self.assertAlmostEqual(st.st_mtime, current, 743 delta=delta, msg=msg) 744 745 def test_utime_current(self): 746 def set_time(filename): 747 # Set to the current time in the new way 748 os.utime(self.fname) 749 self._test_utime_current(set_time) 750 751 def test_utime_current_old(self): 752 def set_time(filename): 753 # Set to the current time in the old explicit way. 754 os.utime(self.fname, None) 755 self._test_utime_current(set_time) 756 757 def get_file_system(self, path): 758 if sys.platform == 'win32': 759 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 760 import ctypes 761 kernel32 = ctypes.windll.kernel32 762 buf = ctypes.create_unicode_buffer("", 100) 763 ok = kernel32.GetVolumeInformationW(root, None, 0, 764 None, None, None, 765 buf, len(buf)) 766 if ok: 767 return buf.value 768 # return None if the filesystem is unknown 769 770 def test_large_time(self): 771 # Many filesystems are limited to the year 2038. At least, the test 772 # pass with NTFS filesystem. 773 if self.get_file_system(self.dirname) != "NTFS": 774 self.skipTest("requires NTFS") 775 776 large = 5000000000 # some day in 2128 777 os.utime(self.fname, (large, large)) 778 self.assertEqual(os.stat(self.fname).st_mtime, large) 779 780 def test_utime_invalid_arguments(self): 781 # seconds and nanoseconds parameters are mutually exclusive 782 with self.assertRaises(ValueError): 783 os.utime(self.fname, (5, 5), ns=(5, 5)) 784 with self.assertRaises(TypeError): 785 os.utime(self.fname, [5, 5]) 786 with self.assertRaises(TypeError): 787 os.utime(self.fname, (5,)) 788 with self.assertRaises(TypeError): 789 os.utime(self.fname, (5, 5, 5)) 790 with self.assertRaises(TypeError): 791 os.utime(self.fname, ns=[5, 5]) 792 with self.assertRaises(TypeError): 793 os.utime(self.fname, ns=(5,)) 794 with self.assertRaises(TypeError): 795 os.utime(self.fname, ns=(5, 5, 5)) 796 797 if os.utime not in os.supports_follow_symlinks: 798 with self.assertRaises(NotImplementedError): 799 os.utime(self.fname, (5, 5), follow_symlinks=False) 800 if os.utime not in os.supports_fd: 801 with open(self.fname, 'wb', 0) as fp: 802 with self.assertRaises(TypeError): 803 os.utime(fp.fileno(), (5, 5)) 804 if os.utime not in os.supports_dir_fd: 805 with self.assertRaises(NotImplementedError): 806 os.utime(self.fname, (5, 5), dir_fd=0) 807 808 @support.cpython_only 809 def test_issue31577(self): 810 # The interpreter shouldn't crash in case utime() received a bad 811 # ns argument. 812 def get_bad_int(divmod_ret_val): 813 class BadInt: 814 def __divmod__(*args): 815 return divmod_ret_val 816 return BadInt() 817 with self.assertRaises(TypeError): 818 os.utime(self.fname, ns=(get_bad_int(42), 1)) 819 with self.assertRaises(TypeError): 820 os.utime(self.fname, ns=(get_bad_int(()), 1)) 821 with self.assertRaises(TypeError): 822 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 823 824 825from test import mapping_tests 826 827class EnvironTests(mapping_tests.BasicTestMappingProtocol): 828 """check that os.environ object conform to mapping protocol""" 829 type2test = None 830 831 def setUp(self): 832 self.__save = dict(os.environ) 833 if os.supports_bytes_environ: 834 self.__saveb = dict(os.environb) 835 for key, value in self._reference().items(): 836 os.environ[key] = value 837 838 def tearDown(self): 839 os.environ.clear() 840 os.environ.update(self.__save) 841 if os.supports_bytes_environ: 842 os.environb.clear() 843 os.environb.update(self.__saveb) 844 845 def _reference(self): 846 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 847 848 def _empty_mapping(self): 849 os.environ.clear() 850 return os.environ 851 852 # Bug 1110478 853 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 854 'requires a shell') 855 def test_update2(self): 856 os.environ.clear() 857 os.environ.update(HELLO="World") 858 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 859 value = popen.read().strip() 860 self.assertEqual(value, "World") 861 862 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 863 'requires a shell') 864 def test_os_popen_iter(self): 865 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 866 % unix_shell) as popen: 867 it = iter(popen) 868 self.assertEqual(next(it), "line1\n") 869 self.assertEqual(next(it), "line2\n") 870 self.assertEqual(next(it), "line3\n") 871 self.assertRaises(StopIteration, next, it) 872 873 # Verify environ keys and values from the OS are of the 874 # correct str type. 875 def test_keyvalue_types(self): 876 for key, val in os.environ.items(): 877 self.assertEqual(type(key), str) 878 self.assertEqual(type(val), str) 879 880 def test_items(self): 881 for key, value in self._reference().items(): 882 self.assertEqual(os.environ.get(key), value) 883 884 # Issue 7310 885 def test___repr__(self): 886 """Check that the repr() of os.environ looks like environ({...}).""" 887 env = os.environ 888 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( 889 '{!r}: {!r}'.format(key, value) 890 for key, value in env.items()))) 891 892 def test_get_exec_path(self): 893 defpath_list = os.defpath.split(os.pathsep) 894 test_path = ['/monty', '/python', '', '/flying/circus'] 895 test_env = {'PATH': os.pathsep.join(test_path)} 896 897 saved_environ = os.environ 898 try: 899 os.environ = dict(test_env) 900 # Test that defaulting to os.environ works. 901 self.assertSequenceEqual(test_path, os.get_exec_path()) 902 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 903 finally: 904 os.environ = saved_environ 905 906 # No PATH environment variable 907 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 908 # Empty PATH environment variable 909 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 910 # Supplied PATH environment variable 911 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 912 913 if os.supports_bytes_environ: 914 # env cannot contain 'PATH' and b'PATH' keys 915 try: 916 # ignore BytesWarning warning 917 with warnings.catch_warnings(record=True): 918 mixed_env = {'PATH': '1', b'PATH': b'2'} 919 except BytesWarning: 920 # mixed_env cannot be created with python -bb 921 pass 922 else: 923 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 924 925 # bytes key and/or value 926 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 927 ['abc']) 928 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 929 ['abc']) 930 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 931 ['abc']) 932 933 @unittest.skipUnless(os.supports_bytes_environ, 934 "os.environb required for this test.") 935 def test_environb(self): 936 # os.environ -> os.environb 937 value = 'euro\u20ac' 938 try: 939 value_bytes = value.encode(sys.getfilesystemencoding(), 940 'surrogateescape') 941 except UnicodeEncodeError: 942 msg = "U+20AC character is not encodable to %s" % ( 943 sys.getfilesystemencoding(),) 944 self.skipTest(msg) 945 os.environ['unicode'] = value 946 self.assertEqual(os.environ['unicode'], value) 947 self.assertEqual(os.environb[b'unicode'], value_bytes) 948 949 # os.environb -> os.environ 950 value = b'\xff' 951 os.environb[b'bytes'] = value 952 self.assertEqual(os.environb[b'bytes'], value) 953 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 954 self.assertEqual(os.environ['bytes'], value_str) 955 956 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 957 @support.requires_mac_ver(10, 6) 958 def test_unset_error(self): 959 if sys.platform == "win32": 960 # an environment variable is limited to 32,767 characters 961 key = 'x' * 50000 962 self.assertRaises(ValueError, os.environ.__delitem__, key) 963 else: 964 # "=" is not allowed in a variable name 965 key = 'key=' 966 self.assertRaises(OSError, os.environ.__delitem__, key) 967 968 def test_key_type(self): 969 missing = 'missingkey' 970 self.assertNotIn(missing, os.environ) 971 972 with self.assertRaises(KeyError) as cm: 973 os.environ[missing] 974 self.assertIs(cm.exception.args[0], missing) 975 self.assertTrue(cm.exception.__suppress_context__) 976 977 with self.assertRaises(KeyError) as cm: 978 del os.environ[missing] 979 self.assertIs(cm.exception.args[0], missing) 980 self.assertTrue(cm.exception.__suppress_context__) 981 982 def _test_environ_iteration(self, collection): 983 iterator = iter(collection) 984 new_key = "__new_key__" 985 986 next(iterator) # start iteration over os.environ.items 987 988 # add a new key in os.environ mapping 989 os.environ[new_key] = "test_environ_iteration" 990 991 try: 992 next(iterator) # force iteration over modified mapping 993 self.assertEqual(os.environ[new_key], "test_environ_iteration") 994 finally: 995 del os.environ[new_key] 996 997 def test_iter_error_when_changing_os_environ(self): 998 self._test_environ_iteration(os.environ) 999 1000 def test_iter_error_when_changing_os_environ_items(self): 1001 self._test_environ_iteration(os.environ.items()) 1002 1003 def test_iter_error_when_changing_os_environ_values(self): 1004 self._test_environ_iteration(os.environ.values()) 1005 1006 1007class WalkTests(unittest.TestCase): 1008 """Tests for os.walk().""" 1009 1010 # Wrapper to hide minor differences between os.walk and os.fwalk 1011 # to tests both functions with the same code base 1012 def walk(self, top, **kwargs): 1013 if 'follow_symlinks' in kwargs: 1014 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1015 return os.walk(top, **kwargs) 1016 1017 def setUp(self): 1018 join = os.path.join 1019 self.addCleanup(support.rmtree, support.TESTFN) 1020 1021 # Build: 1022 # TESTFN/ 1023 # TEST1/ a file kid and two directory kids 1024 # tmp1 1025 # SUB1/ a file kid and a directory kid 1026 # tmp2 1027 # SUB11/ no kids 1028 # SUB2/ a file kid and a dirsymlink kid 1029 # tmp3 1030 # SUB21/ not readable 1031 # tmp5 1032 # link/ a symlink to TESTFN.2 1033 # broken_link 1034 # broken_link2 1035 # broken_link3 1036 # TEST2/ 1037 # tmp4 a lone file 1038 self.walk_path = join(support.TESTFN, "TEST1") 1039 self.sub1_path = join(self.walk_path, "SUB1") 1040 self.sub11_path = join(self.sub1_path, "SUB11") 1041 sub2_path = join(self.walk_path, "SUB2") 1042 sub21_path = join(sub2_path, "SUB21") 1043 tmp1_path = join(self.walk_path, "tmp1") 1044 tmp2_path = join(self.sub1_path, "tmp2") 1045 tmp3_path = join(sub2_path, "tmp3") 1046 tmp5_path = join(sub21_path, "tmp3") 1047 self.link_path = join(sub2_path, "link") 1048 t2_path = join(support.TESTFN, "TEST2") 1049 tmp4_path = join(support.TESTFN, "TEST2", "tmp4") 1050 broken_link_path = join(sub2_path, "broken_link") 1051 broken_link2_path = join(sub2_path, "broken_link2") 1052 broken_link3_path = join(sub2_path, "broken_link3") 1053 1054 # Create stuff. 1055 os.makedirs(self.sub11_path) 1056 os.makedirs(sub2_path) 1057 os.makedirs(sub21_path) 1058 os.makedirs(t2_path) 1059 1060 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 1061 with open(path, "x") as f: 1062 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 1063 1064 if support.can_symlink(): 1065 os.symlink(os.path.abspath(t2_path), self.link_path) 1066 os.symlink('broken', broken_link_path, True) 1067 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 1068 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 1069 self.sub2_tree = (sub2_path, ["SUB21", "link"], 1070 ["broken_link", "broken_link2", "broken_link3", 1071 "tmp3"]) 1072 else: 1073 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 1074 1075 os.chmod(sub21_path, 0) 1076 try: 1077 os.listdir(sub21_path) 1078 except PermissionError: 1079 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 1080 else: 1081 os.chmod(sub21_path, stat.S_IRWXU) 1082 os.unlink(tmp5_path) 1083 os.rmdir(sub21_path) 1084 del self.sub2_tree[1][:1] 1085 1086 def test_walk_topdown(self): 1087 # Walk top-down. 1088 all = list(self.walk(self.walk_path)) 1089 1090 self.assertEqual(len(all), 4) 1091 # We can't know which order SUB1 and SUB2 will appear in. 1092 # Not flipped: TESTFN, SUB1, SUB11, SUB2 1093 # flipped: TESTFN, SUB2, SUB1, SUB11 1094 flipped = all[0][1][0] != "SUB1" 1095 all[0][1].sort() 1096 all[3 - 2 * flipped][-1].sort() 1097 all[3 - 2 * flipped][1].sort() 1098 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1099 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 1100 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 1101 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 1102 1103 def test_walk_prune(self, walk_path=None): 1104 if walk_path is None: 1105 walk_path = self.walk_path 1106 # Prune the search. 1107 all = [] 1108 for root, dirs, files in self.walk(walk_path): 1109 all.append((root, dirs, files)) 1110 # Don't descend into SUB1. 1111 if 'SUB1' in dirs: 1112 # Note that this also mutates the dirs we appended to all! 1113 dirs.remove('SUB1') 1114 1115 self.assertEqual(len(all), 2) 1116 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 1117 1118 all[1][-1].sort() 1119 all[1][1].sort() 1120 self.assertEqual(all[1], self.sub2_tree) 1121 1122 def test_file_like_path(self): 1123 self.test_walk_prune(FakePath(self.walk_path)) 1124 1125 def test_walk_bottom_up(self): 1126 # Walk bottom-up. 1127 all = list(self.walk(self.walk_path, topdown=False)) 1128 1129 self.assertEqual(len(all), 4, all) 1130 # We can't know which order SUB1 and SUB2 will appear in. 1131 # Not flipped: SUB11, SUB1, SUB2, TESTFN 1132 # flipped: SUB2, SUB11, SUB1, TESTFN 1133 flipped = all[3][1][0] != "SUB1" 1134 all[3][1].sort() 1135 all[2 - 2 * flipped][-1].sort() 1136 all[2 - 2 * flipped][1].sort() 1137 self.assertEqual(all[3], 1138 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1139 self.assertEqual(all[flipped], 1140 (self.sub11_path, [], [])) 1141 self.assertEqual(all[flipped + 1], 1142 (self.sub1_path, ["SUB11"], ["tmp2"])) 1143 self.assertEqual(all[2 - 2 * flipped], 1144 self.sub2_tree) 1145 1146 def test_walk_symlink(self): 1147 if not support.can_symlink(): 1148 self.skipTest("need symlink support") 1149 1150 # Walk, following symlinks. 1151 walk_it = self.walk(self.walk_path, follow_symlinks=True) 1152 for root, dirs, files in walk_it: 1153 if root == self.link_path: 1154 self.assertEqual(dirs, []) 1155 self.assertEqual(files, ["tmp4"]) 1156 break 1157 else: 1158 self.fail("Didn't follow symlink with followlinks=True") 1159 1160 def test_walk_bad_dir(self): 1161 # Walk top-down. 1162 errors = [] 1163 walk_it = self.walk(self.walk_path, onerror=errors.append) 1164 root, dirs, files = next(walk_it) 1165 self.assertEqual(errors, []) 1166 dir1 = 'SUB1' 1167 path1 = os.path.join(root, dir1) 1168 path1new = os.path.join(root, dir1 + '.new') 1169 os.rename(path1, path1new) 1170 try: 1171 roots = [r for r, d, f in walk_it] 1172 self.assertTrue(errors) 1173 self.assertNotIn(path1, roots) 1174 self.assertNotIn(path1new, roots) 1175 for dir2 in dirs: 1176 if dir2 != dir1: 1177 self.assertIn(os.path.join(root, dir2), roots) 1178 finally: 1179 os.rename(path1new, path1) 1180 1181 def test_walk_many_open_files(self): 1182 depth = 30 1183 base = os.path.join(support.TESTFN, 'deep') 1184 p = os.path.join(base, *(['d']*depth)) 1185 os.makedirs(p) 1186 1187 iters = [self.walk(base, topdown=False) for j in range(100)] 1188 for i in range(depth + 1): 1189 expected = (p, ['d'] if i else [], []) 1190 for it in iters: 1191 self.assertEqual(next(it), expected) 1192 p = os.path.dirname(p) 1193 1194 iters = [self.walk(base, topdown=True) for j in range(100)] 1195 p = base 1196 for i in range(depth + 1): 1197 expected = (p, ['d'] if i < depth else [], []) 1198 for it in iters: 1199 self.assertEqual(next(it), expected) 1200 p = os.path.join(p, 'd') 1201 1202 1203@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1204class FwalkTests(WalkTests): 1205 """Tests for os.fwalk().""" 1206 1207 def walk(self, top, **kwargs): 1208 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1209 yield (root, dirs, files) 1210 1211 def fwalk(self, *args, **kwargs): 1212 return os.fwalk(*args, **kwargs) 1213 1214 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1215 """ 1216 compare with walk() results. 1217 """ 1218 walk_kwargs = walk_kwargs.copy() 1219 fwalk_kwargs = fwalk_kwargs.copy() 1220 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1221 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1222 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1223 1224 expected = {} 1225 for root, dirs, files in os.walk(**walk_kwargs): 1226 expected[root] = (set(dirs), set(files)) 1227 1228 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1229 self.assertIn(root, expected) 1230 self.assertEqual(expected[root], (set(dirs), set(files))) 1231 1232 def test_compare_to_walk(self): 1233 kwargs = {'top': support.TESTFN} 1234 self._compare_to_walk(kwargs, kwargs) 1235 1236 def test_dir_fd(self): 1237 try: 1238 fd = os.open(".", os.O_RDONLY) 1239 walk_kwargs = {'top': support.TESTFN} 1240 fwalk_kwargs = walk_kwargs.copy() 1241 fwalk_kwargs['dir_fd'] = fd 1242 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1243 finally: 1244 os.close(fd) 1245 1246 def test_yields_correct_dir_fd(self): 1247 # check returned file descriptors 1248 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1249 args = support.TESTFN, topdown, None 1250 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1251 # check that the FD is valid 1252 os.fstat(rootfd) 1253 # redundant check 1254 os.stat(rootfd) 1255 # check that listdir() returns consistent information 1256 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1257 1258 def test_fd_leak(self): 1259 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1260 # we both check that calling fwalk() a large number of times doesn't 1261 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1262 minfd = os.dup(1) 1263 os.close(minfd) 1264 for i in range(256): 1265 for x in self.fwalk(support.TESTFN): 1266 pass 1267 newfd = os.dup(1) 1268 self.addCleanup(os.close, newfd) 1269 self.assertEqual(newfd, minfd) 1270 1271 # fwalk() keeps file descriptors open 1272 test_walk_many_open_files = None 1273 1274 1275class BytesWalkTests(WalkTests): 1276 """Tests for os.walk() with bytes.""" 1277 def walk(self, top, **kwargs): 1278 if 'follow_symlinks' in kwargs: 1279 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1280 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1281 root = os.fsdecode(broot) 1282 dirs = list(map(os.fsdecode, bdirs)) 1283 files = list(map(os.fsdecode, bfiles)) 1284 yield (root, dirs, files) 1285 bdirs[:] = list(map(os.fsencode, dirs)) 1286 bfiles[:] = list(map(os.fsencode, files)) 1287 1288@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1289class BytesFwalkTests(FwalkTests): 1290 """Tests for os.walk() with bytes.""" 1291 def fwalk(self, top='.', *args, **kwargs): 1292 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1293 root = os.fsdecode(broot) 1294 dirs = list(map(os.fsdecode, bdirs)) 1295 files = list(map(os.fsdecode, bfiles)) 1296 yield (root, dirs, files, topfd) 1297 bdirs[:] = list(map(os.fsencode, dirs)) 1298 bfiles[:] = list(map(os.fsencode, files)) 1299 1300 1301class MakedirTests(unittest.TestCase): 1302 def setUp(self): 1303 os.mkdir(support.TESTFN) 1304 1305 def test_makedir(self): 1306 base = support.TESTFN 1307 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1308 os.makedirs(path) # Should work 1309 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1310 os.makedirs(path) 1311 1312 # Try paths with a '.' in them 1313 self.assertRaises(OSError, os.makedirs, os.curdir) 1314 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1315 os.makedirs(path) 1316 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1317 'dir5', 'dir6') 1318 os.makedirs(path) 1319 1320 def test_mode(self): 1321 with support.temp_umask(0o002): 1322 base = support.TESTFN 1323 parent = os.path.join(base, 'dir1') 1324 path = os.path.join(parent, 'dir2') 1325 os.makedirs(path, 0o555) 1326 self.assertTrue(os.path.exists(path)) 1327 self.assertTrue(os.path.isdir(path)) 1328 if os.name != 'nt': 1329 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1330 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1331 1332 def test_exist_ok_existing_directory(self): 1333 path = os.path.join(support.TESTFN, 'dir1') 1334 mode = 0o777 1335 old_mask = os.umask(0o022) 1336 os.makedirs(path, mode) 1337 self.assertRaises(OSError, os.makedirs, path, mode) 1338 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1339 os.makedirs(path, 0o776, exist_ok=True) 1340 os.makedirs(path, mode=mode, exist_ok=True) 1341 os.umask(old_mask) 1342 1343 # Issue #25583: A drive root could raise PermissionError on Windows 1344 os.makedirs(os.path.abspath('/'), exist_ok=True) 1345 1346 def test_exist_ok_s_isgid_directory(self): 1347 path = os.path.join(support.TESTFN, 'dir1') 1348 S_ISGID = stat.S_ISGID 1349 mode = 0o777 1350 old_mask = os.umask(0o022) 1351 try: 1352 existing_testfn_mode = stat.S_IMODE( 1353 os.lstat(support.TESTFN).st_mode) 1354 try: 1355 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID) 1356 except PermissionError: 1357 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1358 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID): 1359 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1360 # The os should apply S_ISGID from the parent dir for us, but 1361 # this test need not depend on that behavior. Be explicit. 1362 os.makedirs(path, mode | S_ISGID) 1363 # http://bugs.python.org/issue14992 1364 # Should not fail when the bit is already set. 1365 os.makedirs(path, mode, exist_ok=True) 1366 # remove the bit. 1367 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1368 # May work even when the bit is not already set when demanded. 1369 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1370 finally: 1371 os.umask(old_mask) 1372 1373 def test_exist_ok_existing_regular_file(self): 1374 base = support.TESTFN 1375 path = os.path.join(support.TESTFN, 'dir1') 1376 with open(path, 'w') as f: 1377 f.write('abc') 1378 self.assertRaises(OSError, os.makedirs, path) 1379 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1380 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1381 os.remove(path) 1382 1383 def tearDown(self): 1384 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', 1385 'dir4', 'dir5', 'dir6') 1386 # If the tests failed, the bottom-most directory ('../dir6') 1387 # may not have been created, so we look for the outermost directory 1388 # that exists. 1389 while not os.path.exists(path) and path != support.TESTFN: 1390 path = os.path.dirname(path) 1391 1392 os.removedirs(path) 1393 1394 1395@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") 1396class ChownFileTests(unittest.TestCase): 1397 1398 @classmethod 1399 def setUpClass(cls): 1400 os.mkdir(support.TESTFN) 1401 1402 def test_chown_uid_gid_arguments_must_be_index(self): 1403 stat = os.stat(support.TESTFN) 1404 uid = stat.st_uid 1405 gid = stat.st_gid 1406 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1407 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) 1408 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) 1409 self.assertIsNone(os.chown(support.TESTFN, uid, gid)) 1410 self.assertIsNone(os.chown(support.TESTFN, -1, -1)) 1411 1412 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1413 def test_chown_gid(self): 1414 groups = os.getgroups() 1415 if len(groups) < 2: 1416 self.skipTest("test needs at least 2 groups") 1417 1418 gid_1, gid_2 = groups[:2] 1419 uid = os.stat(support.TESTFN).st_uid 1420 1421 os.chown(support.TESTFN, uid, gid_1) 1422 gid = os.stat(support.TESTFN).st_gid 1423 self.assertEqual(gid, gid_1) 1424 1425 os.chown(support.TESTFN, uid, gid_2) 1426 gid = os.stat(support.TESTFN).st_gid 1427 self.assertEqual(gid, gid_2) 1428 1429 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1430 "test needs root privilege and more than one user") 1431 def test_chown_with_root(self): 1432 uid_1, uid_2 = all_users[:2] 1433 gid = os.stat(support.TESTFN).st_gid 1434 os.chown(support.TESTFN, uid_1, gid) 1435 uid = os.stat(support.TESTFN).st_uid 1436 self.assertEqual(uid, uid_1) 1437 os.chown(support.TESTFN, uid_2, gid) 1438 uid = os.stat(support.TESTFN).st_uid 1439 self.assertEqual(uid, uid_2) 1440 1441 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1442 "test needs non-root account and more than one user") 1443 def test_chown_without_permission(self): 1444 uid_1, uid_2 = all_users[:2] 1445 gid = os.stat(support.TESTFN).st_gid 1446 with self.assertRaises(PermissionError): 1447 os.chown(support.TESTFN, uid_1, gid) 1448 os.chown(support.TESTFN, uid_2, gid) 1449 1450 @classmethod 1451 def tearDownClass(cls): 1452 os.rmdir(support.TESTFN) 1453 1454 1455class RemoveDirsTests(unittest.TestCase): 1456 def setUp(self): 1457 os.makedirs(support.TESTFN) 1458 1459 def tearDown(self): 1460 support.rmtree(support.TESTFN) 1461 1462 def test_remove_all(self): 1463 dira = os.path.join(support.TESTFN, 'dira') 1464 os.mkdir(dira) 1465 dirb = os.path.join(dira, 'dirb') 1466 os.mkdir(dirb) 1467 os.removedirs(dirb) 1468 self.assertFalse(os.path.exists(dirb)) 1469 self.assertFalse(os.path.exists(dira)) 1470 self.assertFalse(os.path.exists(support.TESTFN)) 1471 1472 def test_remove_partial(self): 1473 dira = os.path.join(support.TESTFN, 'dira') 1474 os.mkdir(dira) 1475 dirb = os.path.join(dira, 'dirb') 1476 os.mkdir(dirb) 1477 create_file(os.path.join(dira, 'file.txt')) 1478 os.removedirs(dirb) 1479 self.assertFalse(os.path.exists(dirb)) 1480 self.assertTrue(os.path.exists(dira)) 1481 self.assertTrue(os.path.exists(support.TESTFN)) 1482 1483 def test_remove_nothing(self): 1484 dira = os.path.join(support.TESTFN, 'dira') 1485 os.mkdir(dira) 1486 dirb = os.path.join(dira, 'dirb') 1487 os.mkdir(dirb) 1488 create_file(os.path.join(dirb, 'file.txt')) 1489 with self.assertRaises(OSError): 1490 os.removedirs(dirb) 1491 self.assertTrue(os.path.exists(dirb)) 1492 self.assertTrue(os.path.exists(dira)) 1493 self.assertTrue(os.path.exists(support.TESTFN)) 1494 1495 1496class DevNullTests(unittest.TestCase): 1497 def test_devnull(self): 1498 with open(os.devnull, 'wb', 0) as f: 1499 f.write(b'hello') 1500 f.close() 1501 with open(os.devnull, 'rb') as f: 1502 self.assertEqual(f.read(), b'') 1503 1504 1505class URandomTests(unittest.TestCase): 1506 def test_urandom_length(self): 1507 self.assertEqual(len(os.urandom(0)), 0) 1508 self.assertEqual(len(os.urandom(1)), 1) 1509 self.assertEqual(len(os.urandom(10)), 10) 1510 self.assertEqual(len(os.urandom(100)), 100) 1511 self.assertEqual(len(os.urandom(1000)), 1000) 1512 1513 def test_urandom_value(self): 1514 data1 = os.urandom(16) 1515 self.assertIsInstance(data1, bytes) 1516 data2 = os.urandom(16) 1517 self.assertNotEqual(data1, data2) 1518 1519 def get_urandom_subprocess(self, count): 1520 code = '\n'.join(( 1521 'import os, sys', 1522 'data = os.urandom(%s)' % count, 1523 'sys.stdout.buffer.write(data)', 1524 'sys.stdout.buffer.flush()')) 1525 out = assert_python_ok('-c', code) 1526 stdout = out[1] 1527 self.assertEqual(len(stdout), count) 1528 return stdout 1529 1530 def test_urandom_subprocess(self): 1531 data1 = self.get_urandom_subprocess(16) 1532 data2 = self.get_urandom_subprocess(16) 1533 self.assertNotEqual(data1, data2) 1534 1535 1536@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1537class GetRandomTests(unittest.TestCase): 1538 @classmethod 1539 def setUpClass(cls): 1540 try: 1541 os.getrandom(1) 1542 except OSError as exc: 1543 if exc.errno == errno.ENOSYS: 1544 # Python compiled on a more recent Linux version 1545 # than the current Linux kernel 1546 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1547 else: 1548 raise 1549 1550 def test_getrandom_type(self): 1551 data = os.getrandom(16) 1552 self.assertIsInstance(data, bytes) 1553 self.assertEqual(len(data), 16) 1554 1555 def test_getrandom0(self): 1556 empty = os.getrandom(0) 1557 self.assertEqual(empty, b'') 1558 1559 def test_getrandom_random(self): 1560 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1561 1562 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1563 # resource /dev/random 1564 1565 def test_getrandom_nonblock(self): 1566 # The call must not fail. Check also that the flag exists 1567 try: 1568 os.getrandom(1, os.GRND_NONBLOCK) 1569 except BlockingIOError: 1570 # System urandom is not initialized yet 1571 pass 1572 1573 def test_getrandom_value(self): 1574 data1 = os.getrandom(16) 1575 data2 = os.getrandom(16) 1576 self.assertNotEqual(data1, data2) 1577 1578 1579# os.urandom() doesn't use a file descriptor when it is implemented with the 1580# getentropy() function, the getrandom() function or the getrandom() syscall 1581OS_URANDOM_DONT_USE_FD = ( 1582 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1583 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1584 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1585 1586@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1587 "os.random() does not use a file descriptor") 1588@unittest.skipIf(sys.platform == "vxworks", 1589 "VxWorks can't set RLIMIT_NOFILE to 1") 1590class URandomFDTests(unittest.TestCase): 1591 @unittest.skipUnless(resource, "test requires the resource module") 1592 def test_urandom_failure(self): 1593 # Check urandom() failing when it is not able to open /dev/random. 1594 # We spawn a new process to make the test more robust (if getrlimit() 1595 # failed to restore the file descriptor limit after this, the whole 1596 # test suite would crash; this actually happened on the OS X Tiger 1597 # buildbot). 1598 code = """if 1: 1599 import errno 1600 import os 1601 import resource 1602 1603 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1604 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1605 try: 1606 os.urandom(16) 1607 except OSError as e: 1608 assert e.errno == errno.EMFILE, e.errno 1609 else: 1610 raise AssertionError("OSError not raised") 1611 """ 1612 assert_python_ok('-c', code) 1613 1614 def test_urandom_fd_closed(self): 1615 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1616 # closed. 1617 code = """if 1: 1618 import os 1619 import sys 1620 import test.support 1621 os.urandom(4) 1622 with test.support.SuppressCrashReport(): 1623 os.closerange(3, 256) 1624 sys.stdout.buffer.write(os.urandom(4)) 1625 """ 1626 rc, out, err = assert_python_ok('-Sc', code) 1627 1628 def test_urandom_fd_reopened(self): 1629 # Issue #21207: urandom() should detect its fd to /dev/urandom 1630 # changed to something else, and reopen it. 1631 self.addCleanup(support.unlink, support.TESTFN) 1632 create_file(support.TESTFN, b"x" * 256) 1633 1634 code = """if 1: 1635 import os 1636 import sys 1637 import test.support 1638 os.urandom(4) 1639 with test.support.SuppressCrashReport(): 1640 for fd in range(3, 256): 1641 try: 1642 os.close(fd) 1643 except OSError: 1644 pass 1645 else: 1646 # Found the urandom fd (XXX hopefully) 1647 break 1648 os.closerange(3, 256) 1649 with open({TESTFN!r}, 'rb') as f: 1650 new_fd = f.fileno() 1651 # Issue #26935: posix allows new_fd and fd to be equal but 1652 # some libc implementations have dup2 return an error in this 1653 # case. 1654 if new_fd != fd: 1655 os.dup2(new_fd, fd) 1656 sys.stdout.buffer.write(os.urandom(4)) 1657 sys.stdout.buffer.write(os.urandom(4)) 1658 """.format(TESTFN=support.TESTFN) 1659 rc, out, err = assert_python_ok('-Sc', code) 1660 self.assertEqual(len(out), 8) 1661 self.assertNotEqual(out[0:4], out[4:8]) 1662 rc, out2, err2 = assert_python_ok('-Sc', code) 1663 self.assertEqual(len(out2), 8) 1664 self.assertNotEqual(out2, out) 1665 1666 1667@contextlib.contextmanager 1668def _execvpe_mockup(defpath=None): 1669 """ 1670 Stubs out execv and execve functions when used as context manager. 1671 Records exec calls. The mock execv and execve functions always raise an 1672 exception as they would normally never return. 1673 """ 1674 # A list of tuples containing (function name, first arg, args) 1675 # of calls to execv or execve that have been made. 1676 calls = [] 1677 1678 def mock_execv(name, *args): 1679 calls.append(('execv', name, args)) 1680 raise RuntimeError("execv called") 1681 1682 def mock_execve(name, *args): 1683 calls.append(('execve', name, args)) 1684 raise OSError(errno.ENOTDIR, "execve called") 1685 1686 try: 1687 orig_execv = os.execv 1688 orig_execve = os.execve 1689 orig_defpath = os.defpath 1690 os.execv = mock_execv 1691 os.execve = mock_execve 1692 if defpath is not None: 1693 os.defpath = defpath 1694 yield calls 1695 finally: 1696 os.execv = orig_execv 1697 os.execve = orig_execve 1698 os.defpath = orig_defpath 1699 1700@unittest.skipUnless(hasattr(os, 'execv'), 1701 "need os.execv()") 1702class ExecTests(unittest.TestCase): 1703 @unittest.skipIf(USING_LINUXTHREADS, 1704 "avoid triggering a linuxthreads bug: see issue #4970") 1705 def test_execvpe_with_bad_program(self): 1706 self.assertRaises(OSError, os.execvpe, 'no such app-', 1707 ['no such app-'], None) 1708 1709 def test_execv_with_bad_arglist(self): 1710 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1711 self.assertRaises(ValueError, os.execv, 'notepad', []) 1712 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 1713 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 1714 1715 def test_execvpe_with_bad_arglist(self): 1716 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 1717 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 1718 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 1719 1720 @unittest.skipUnless(hasattr(os, '_execvpe'), 1721 "No internal os._execvpe function to test.") 1722 def _test_internal_execvpe(self, test_type): 1723 program_path = os.sep + 'absolutepath' 1724 if test_type is bytes: 1725 program = b'executable' 1726 fullpath = os.path.join(os.fsencode(program_path), program) 1727 native_fullpath = fullpath 1728 arguments = [b'progname', 'arg1', 'arg2'] 1729 else: 1730 program = 'executable' 1731 arguments = ['progname', 'arg1', 'arg2'] 1732 fullpath = os.path.join(program_path, program) 1733 if os.name != "nt": 1734 native_fullpath = os.fsencode(fullpath) 1735 else: 1736 native_fullpath = fullpath 1737 env = {'spam': 'beans'} 1738 1739 # test os._execvpe() with an absolute path 1740 with _execvpe_mockup() as calls: 1741 self.assertRaises(RuntimeError, 1742 os._execvpe, fullpath, arguments) 1743 self.assertEqual(len(calls), 1) 1744 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 1745 1746 # test os._execvpe() with a relative path: 1747 # os.get_exec_path() returns defpath 1748 with _execvpe_mockup(defpath=program_path) as calls: 1749 self.assertRaises(OSError, 1750 os._execvpe, program, arguments, env=env) 1751 self.assertEqual(len(calls), 1) 1752 self.assertSequenceEqual(calls[0], 1753 ('execve', native_fullpath, (arguments, env))) 1754 1755 # test os._execvpe() with a relative path: 1756 # os.get_exec_path() reads the 'PATH' variable 1757 with _execvpe_mockup() as calls: 1758 env_path = env.copy() 1759 if test_type is bytes: 1760 env_path[b'PATH'] = program_path 1761 else: 1762 env_path['PATH'] = program_path 1763 self.assertRaises(OSError, 1764 os._execvpe, program, arguments, env=env_path) 1765 self.assertEqual(len(calls), 1) 1766 self.assertSequenceEqual(calls[0], 1767 ('execve', native_fullpath, (arguments, env_path))) 1768 1769 def test_internal_execvpe_str(self): 1770 self._test_internal_execvpe(str) 1771 if os.name != "nt": 1772 self._test_internal_execvpe(bytes) 1773 1774 def test_execve_invalid_env(self): 1775 args = [sys.executable, '-c', 'pass'] 1776 1777 # null character in the environment variable name 1778 newenv = os.environ.copy() 1779 newenv["FRUIT\0VEGETABLE"] = "cabbage" 1780 with self.assertRaises(ValueError): 1781 os.execve(args[0], args, newenv) 1782 1783 # null character in the environment variable value 1784 newenv = os.environ.copy() 1785 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 1786 with self.assertRaises(ValueError): 1787 os.execve(args[0], args, newenv) 1788 1789 # equal character in the environment variable name 1790 newenv = os.environ.copy() 1791 newenv["FRUIT=ORANGE"] = "lemon" 1792 with self.assertRaises(ValueError): 1793 os.execve(args[0], args, newenv) 1794 1795 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 1796 def test_execve_with_empty_path(self): 1797 # bpo-32890: Check GetLastError() misuse 1798 try: 1799 os.execve('', ['arg'], {}) 1800 except OSError as e: 1801 self.assertTrue(e.winerror is None or e.winerror != 0) 1802 else: 1803 self.fail('No OSError raised') 1804 1805 1806@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1807class Win32ErrorTests(unittest.TestCase): 1808 def setUp(self): 1809 try: 1810 os.stat(support.TESTFN) 1811 except FileNotFoundError: 1812 exists = False 1813 except OSError as exc: 1814 exists = True 1815 self.fail("file %s must not exist; os.stat failed with %s" 1816 % (support.TESTFN, exc)) 1817 else: 1818 self.fail("file %s must not exist" % support.TESTFN) 1819 1820 def test_rename(self): 1821 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") 1822 1823 def test_remove(self): 1824 self.assertRaises(OSError, os.remove, support.TESTFN) 1825 1826 def test_chdir(self): 1827 self.assertRaises(OSError, os.chdir, support.TESTFN) 1828 1829 def test_mkdir(self): 1830 self.addCleanup(support.unlink, support.TESTFN) 1831 1832 with open(support.TESTFN, "x") as f: 1833 self.assertRaises(OSError, os.mkdir, support.TESTFN) 1834 1835 def test_utime(self): 1836 self.assertRaises(OSError, os.utime, support.TESTFN, None) 1837 1838 def test_chmod(self): 1839 self.assertRaises(OSError, os.chmod, support.TESTFN, 0) 1840 1841 1842class TestInvalidFD(unittest.TestCase): 1843 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1844 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1845 #singles.append("close") 1846 #We omit close because it doesn't raise an exception on some platforms 1847 def get_single(f): 1848 def helper(self): 1849 if hasattr(os, f): 1850 self.check(getattr(os, f)) 1851 return helper 1852 for f in singles: 1853 locals()["test_"+f] = get_single(f) 1854 1855 def check(self, f, *args): 1856 try: 1857 f(support.make_bad_fd(), *args) 1858 except OSError as e: 1859 self.assertEqual(e.errno, errno.EBADF) 1860 else: 1861 self.fail("%r didn't raise an OSError with a bad file descriptor" 1862 % f) 1863 1864 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 1865 def test_isatty(self): 1866 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1867 1868 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 1869 def test_closerange(self): 1870 fd = support.make_bad_fd() 1871 # Make sure none of the descriptors we are about to close are 1872 # currently valid (issue 6542). 1873 for i in range(10): 1874 try: os.fstat(fd+i) 1875 except OSError: 1876 pass 1877 else: 1878 break 1879 if i < 2: 1880 raise unittest.SkipTest( 1881 "Unable to acquire a range of invalid file descriptors") 1882 self.assertEqual(os.closerange(fd, fd + i-1), None) 1883 1884 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 1885 def test_dup2(self): 1886 self.check(os.dup2, 20) 1887 1888 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 1889 def test_fchmod(self): 1890 self.check(os.fchmod, 0) 1891 1892 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 1893 def test_fchown(self): 1894 self.check(os.fchown, -1, -1) 1895 1896 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 1897 def test_fpathconf(self): 1898 self.check(os.pathconf, "PC_NAME_MAX") 1899 self.check(os.fpathconf, "PC_NAME_MAX") 1900 1901 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 1902 def test_ftruncate(self): 1903 self.check(os.truncate, 0) 1904 self.check(os.ftruncate, 0) 1905 1906 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 1907 def test_lseek(self): 1908 self.check(os.lseek, 0, 0) 1909 1910 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 1911 def test_read(self): 1912 self.check(os.read, 1) 1913 1914 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 1915 def test_readv(self): 1916 buf = bytearray(10) 1917 self.check(os.readv, [buf]) 1918 1919 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 1920 def test_tcsetpgrpt(self): 1921 self.check(os.tcsetpgrp, 0) 1922 1923 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 1924 def test_write(self): 1925 self.check(os.write, b" ") 1926 1927 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 1928 def test_writev(self): 1929 self.check(os.writev, [b'abc']) 1930 1931 def test_inheritable(self): 1932 self.check(os.get_inheritable) 1933 self.check(os.set_inheritable, True) 1934 1935 @unittest.skipUnless(hasattr(os, 'get_blocking'), 1936 'needs os.get_blocking() and os.set_blocking()') 1937 def test_blocking(self): 1938 self.check(os.get_blocking) 1939 self.check(os.set_blocking, True) 1940 1941 1942class LinkTests(unittest.TestCase): 1943 def setUp(self): 1944 self.file1 = support.TESTFN 1945 self.file2 = os.path.join(support.TESTFN + "2") 1946 1947 def tearDown(self): 1948 for file in (self.file1, self.file2): 1949 if os.path.exists(file): 1950 os.unlink(file) 1951 1952 def _test_link(self, file1, file2): 1953 create_file(file1) 1954 1955 try: 1956 os.link(file1, file2) 1957 except PermissionError as e: 1958 self.skipTest('os.link(): %s' % e) 1959 with open(file1, "r") as f1, open(file2, "r") as f2: 1960 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 1961 1962 def test_link(self): 1963 self._test_link(self.file1, self.file2) 1964 1965 def test_link_bytes(self): 1966 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 1967 bytes(self.file2, sys.getfilesystemencoding())) 1968 1969 def test_unicode_name(self): 1970 try: 1971 os.fsencode("\xf1") 1972 except UnicodeError: 1973 raise unittest.SkipTest("Unable to encode for this platform.") 1974 1975 self.file1 += "\xf1" 1976 self.file2 = self.file1 + "2" 1977 self._test_link(self.file1, self.file2) 1978 1979@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 1980class PosixUidGidTests(unittest.TestCase): 1981 # uid_t and gid_t are 32-bit unsigned integers on Linux 1982 UID_OVERFLOW = (1 << 32) 1983 GID_OVERFLOW = (1 << 32) 1984 1985 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 1986 def test_setuid(self): 1987 if os.getuid() != 0: 1988 self.assertRaises(OSError, os.setuid, 0) 1989 self.assertRaises(TypeError, os.setuid, 'not an int') 1990 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 1991 1992 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 1993 def test_setgid(self): 1994 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1995 self.assertRaises(OSError, os.setgid, 0) 1996 self.assertRaises(TypeError, os.setgid, 'not an int') 1997 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 1998 1999 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 2000 def test_seteuid(self): 2001 if os.getuid() != 0: 2002 self.assertRaises(OSError, os.seteuid, 0) 2003 self.assertRaises(TypeError, os.setegid, 'not an int') 2004 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 2005 2006 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 2007 def test_setegid(self): 2008 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2009 self.assertRaises(OSError, os.setegid, 0) 2010 self.assertRaises(TypeError, os.setegid, 'not an int') 2011 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 2012 2013 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2014 def test_setreuid(self): 2015 if os.getuid() != 0: 2016 self.assertRaises(OSError, os.setreuid, 0, 0) 2017 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 2018 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 2019 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 2020 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 2021 2022 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2023 def test_setreuid_neg1(self): 2024 # Needs to accept -1. We run this in a subprocess to avoid 2025 # altering the test runner's process state (issue8045). 2026 subprocess.check_call([ 2027 sys.executable, '-c', 2028 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 2029 2030 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2031 def test_setregid(self): 2032 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2033 self.assertRaises(OSError, os.setregid, 0, 0) 2034 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 2035 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 2036 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 2037 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 2038 2039 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2040 def test_setregid_neg1(self): 2041 # Needs to accept -1. We run this in a subprocess to avoid 2042 # altering the test runner's process state (issue8045). 2043 subprocess.check_call([ 2044 sys.executable, '-c', 2045 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 2046 2047@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2048class Pep383Tests(unittest.TestCase): 2049 def setUp(self): 2050 if support.TESTFN_UNENCODABLE: 2051 self.dir = support.TESTFN_UNENCODABLE 2052 elif support.TESTFN_NONASCII: 2053 self.dir = support.TESTFN_NONASCII 2054 else: 2055 self.dir = support.TESTFN 2056 self.bdir = os.fsencode(self.dir) 2057 2058 bytesfn = [] 2059 def add_filename(fn): 2060 try: 2061 fn = os.fsencode(fn) 2062 except UnicodeEncodeError: 2063 return 2064 bytesfn.append(fn) 2065 add_filename(support.TESTFN_UNICODE) 2066 if support.TESTFN_UNENCODABLE: 2067 add_filename(support.TESTFN_UNENCODABLE) 2068 if support.TESTFN_NONASCII: 2069 add_filename(support.TESTFN_NONASCII) 2070 if not bytesfn: 2071 self.skipTest("couldn't create any non-ascii filename") 2072 2073 self.unicodefn = set() 2074 os.mkdir(self.dir) 2075 try: 2076 for fn in bytesfn: 2077 support.create_empty_file(os.path.join(self.bdir, fn)) 2078 fn = os.fsdecode(fn) 2079 if fn in self.unicodefn: 2080 raise ValueError("duplicate filename") 2081 self.unicodefn.add(fn) 2082 except: 2083 shutil.rmtree(self.dir) 2084 raise 2085 2086 def tearDown(self): 2087 shutil.rmtree(self.dir) 2088 2089 def test_listdir(self): 2090 expected = self.unicodefn 2091 found = set(os.listdir(self.dir)) 2092 self.assertEqual(found, expected) 2093 # test listdir without arguments 2094 current_directory = os.getcwd() 2095 try: 2096 os.chdir(os.sep) 2097 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 2098 finally: 2099 os.chdir(current_directory) 2100 2101 def test_open(self): 2102 for fn in self.unicodefn: 2103 f = open(os.path.join(self.dir, fn), 'rb') 2104 f.close() 2105 2106 @unittest.skipUnless(hasattr(os, 'statvfs'), 2107 "need os.statvfs()") 2108 def test_statvfs(self): 2109 # issue #9645 2110 for fn in self.unicodefn: 2111 # should not fail with file not found error 2112 fullname = os.path.join(self.dir, fn) 2113 os.statvfs(fullname) 2114 2115 def test_stat(self): 2116 for fn in self.unicodefn: 2117 os.stat(os.path.join(self.dir, fn)) 2118 2119@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2120class Win32KillTests(unittest.TestCase): 2121 def _kill(self, sig): 2122 # Start sys.executable as a subprocess and communicate from the 2123 # subprocess to the parent that the interpreter is ready. When it 2124 # becomes ready, send *sig* via os.kill to the subprocess and check 2125 # that the return code is equal to *sig*. 2126 import ctypes 2127 from ctypes import wintypes 2128 import msvcrt 2129 2130 # Since we can't access the contents of the process' stdout until the 2131 # process has exited, use PeekNamedPipe to see what's inside stdout 2132 # without waiting. This is done so we can tell that the interpreter 2133 # is started and running at a point where it could handle a signal. 2134 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 2135 PeekNamedPipe.restype = wintypes.BOOL 2136 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 2137 ctypes.POINTER(ctypes.c_char), # stdout buf 2138 wintypes.DWORD, # Buffer size 2139 ctypes.POINTER(wintypes.DWORD), # bytes read 2140 ctypes.POINTER(wintypes.DWORD), # bytes avail 2141 ctypes.POINTER(wintypes.DWORD)) # bytes left 2142 msg = "running" 2143 proc = subprocess.Popen([sys.executable, "-c", 2144 "import sys;" 2145 "sys.stdout.write('{}');" 2146 "sys.stdout.flush();" 2147 "input()".format(msg)], 2148 stdout=subprocess.PIPE, 2149 stderr=subprocess.PIPE, 2150 stdin=subprocess.PIPE) 2151 self.addCleanup(proc.stdout.close) 2152 self.addCleanup(proc.stderr.close) 2153 self.addCleanup(proc.stdin.close) 2154 2155 count, max = 0, 100 2156 while count < max and proc.poll() is None: 2157 # Create a string buffer to store the result of stdout from the pipe 2158 buf = ctypes.create_string_buffer(len(msg)) 2159 # Obtain the text currently in proc.stdout 2160 # Bytes read/avail/left are left as NULL and unused 2161 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 2162 buf, ctypes.sizeof(buf), None, None, None) 2163 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 2164 if buf.value: 2165 self.assertEqual(msg, buf.value.decode()) 2166 break 2167 time.sleep(0.1) 2168 count += 1 2169 else: 2170 self.fail("Did not receive communication from the subprocess") 2171 2172 os.kill(proc.pid, sig) 2173 self.assertEqual(proc.wait(), sig) 2174 2175 def test_kill_sigterm(self): 2176 # SIGTERM doesn't mean anything special, but make sure it works 2177 self._kill(signal.SIGTERM) 2178 2179 def test_kill_int(self): 2180 # os.kill on Windows can take an int which gets set as the exit code 2181 self._kill(100) 2182 2183 def _kill_with_event(self, event, name): 2184 tagname = "test_os_%s" % uuid.uuid1() 2185 m = mmap.mmap(-1, 1, tagname) 2186 m[0] = 0 2187 # Run a script which has console control handling enabled. 2188 proc = subprocess.Popen([sys.executable, 2189 os.path.join(os.path.dirname(__file__), 2190 "win_console_handler.py"), tagname], 2191 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2192 # Let the interpreter startup before we send signals. See #3137. 2193 count, max = 0, 100 2194 while count < max and proc.poll() is None: 2195 if m[0] == 1: 2196 break 2197 time.sleep(0.1) 2198 count += 1 2199 else: 2200 # Forcefully kill the process if we weren't able to signal it. 2201 os.kill(proc.pid, signal.SIGINT) 2202 self.fail("Subprocess didn't finish initialization") 2203 os.kill(proc.pid, event) 2204 # proc.send_signal(event) could also be done here. 2205 # Allow time for the signal to be passed and the process to exit. 2206 time.sleep(0.5) 2207 if not proc.poll(): 2208 # Forcefully kill the process if we weren't able to signal it. 2209 os.kill(proc.pid, signal.SIGINT) 2210 self.fail("subprocess did not stop on {}".format(name)) 2211 2212 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2213 def test_CTRL_C_EVENT(self): 2214 from ctypes import wintypes 2215 import ctypes 2216 2217 # Make a NULL value by creating a pointer with no argument. 2218 NULL = ctypes.POINTER(ctypes.c_int)() 2219 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2220 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2221 wintypes.BOOL) 2222 SetConsoleCtrlHandler.restype = wintypes.BOOL 2223 2224 # Calling this with NULL and FALSE causes the calling process to 2225 # handle Ctrl+C, rather than ignore it. This property is inherited 2226 # by subprocesses. 2227 SetConsoleCtrlHandler(NULL, 0) 2228 2229 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2230 2231 def test_CTRL_BREAK_EVENT(self): 2232 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2233 2234 2235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2236class Win32ListdirTests(unittest.TestCase): 2237 """Test listdir on Windows.""" 2238 2239 def setUp(self): 2240 self.created_paths = [] 2241 for i in range(2): 2242 dir_name = 'SUB%d' % i 2243 dir_path = os.path.join(support.TESTFN, dir_name) 2244 file_name = 'FILE%d' % i 2245 file_path = os.path.join(support.TESTFN, file_name) 2246 os.makedirs(dir_path) 2247 with open(file_path, 'w') as f: 2248 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2249 self.created_paths.extend([dir_name, file_name]) 2250 self.created_paths.sort() 2251 2252 def tearDown(self): 2253 shutil.rmtree(support.TESTFN) 2254 2255 def test_listdir_no_extended_path(self): 2256 """Test when the path is not an "extended" path.""" 2257 # unicode 2258 self.assertEqual( 2259 sorted(os.listdir(support.TESTFN)), 2260 self.created_paths) 2261 2262 # bytes 2263 self.assertEqual( 2264 sorted(os.listdir(os.fsencode(support.TESTFN))), 2265 [os.fsencode(path) for path in self.created_paths]) 2266 2267 def test_listdir_extended_path(self): 2268 """Test when the path starts with '\\\\?\\'.""" 2269 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2270 # unicode 2271 path = '\\\\?\\' + os.path.abspath(support.TESTFN) 2272 self.assertEqual( 2273 sorted(os.listdir(path)), 2274 self.created_paths) 2275 2276 # bytes 2277 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) 2278 self.assertEqual( 2279 sorted(os.listdir(path)), 2280 [os.fsencode(path) for path in self.created_paths]) 2281 2282 2283@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') 2284class ReadlinkTests(unittest.TestCase): 2285 filelink = 'readlinktest' 2286 filelink_target = os.path.abspath(__file__) 2287 filelinkb = os.fsencode(filelink) 2288 filelinkb_target = os.fsencode(filelink_target) 2289 2290 def assertPathEqual(self, left, right): 2291 left = os.path.normcase(left) 2292 right = os.path.normcase(right) 2293 if sys.platform == 'win32': 2294 # Bad practice to blindly strip the prefix as it may be required to 2295 # correctly refer to the file, but we're only comparing paths here. 2296 has_prefix = lambda p: p.startswith( 2297 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\') 2298 if has_prefix(left): 2299 left = left[4:] 2300 if has_prefix(right): 2301 right = right[4:] 2302 self.assertEqual(left, right) 2303 2304 def setUp(self): 2305 self.assertTrue(os.path.exists(self.filelink_target)) 2306 self.assertTrue(os.path.exists(self.filelinkb_target)) 2307 self.assertFalse(os.path.exists(self.filelink)) 2308 self.assertFalse(os.path.exists(self.filelinkb)) 2309 2310 def test_not_symlink(self): 2311 filelink_target = FakePath(self.filelink_target) 2312 self.assertRaises(OSError, os.readlink, self.filelink_target) 2313 self.assertRaises(OSError, os.readlink, filelink_target) 2314 2315 def test_missing_link(self): 2316 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link') 2317 self.assertRaises(FileNotFoundError, os.readlink, 2318 FakePath('missing-link')) 2319 2320 @support.skip_unless_symlink 2321 def test_pathlike(self): 2322 os.symlink(self.filelink_target, self.filelink) 2323 self.addCleanup(support.unlink, self.filelink) 2324 filelink = FakePath(self.filelink) 2325 self.assertPathEqual(os.readlink(filelink), self.filelink_target) 2326 2327 @support.skip_unless_symlink 2328 def test_pathlike_bytes(self): 2329 os.symlink(self.filelinkb_target, self.filelinkb) 2330 self.addCleanup(support.unlink, self.filelinkb) 2331 path = os.readlink(FakePath(self.filelinkb)) 2332 self.assertPathEqual(path, self.filelinkb_target) 2333 self.assertIsInstance(path, bytes) 2334 2335 @support.skip_unless_symlink 2336 def test_bytes(self): 2337 os.symlink(self.filelinkb_target, self.filelinkb) 2338 self.addCleanup(support.unlink, self.filelinkb) 2339 path = os.readlink(self.filelinkb) 2340 self.assertPathEqual(path, self.filelinkb_target) 2341 self.assertIsInstance(path, bytes) 2342 2343 2344@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2345@support.skip_unless_symlink 2346class Win32SymlinkTests(unittest.TestCase): 2347 filelink = 'filelinktest' 2348 filelink_target = os.path.abspath(__file__) 2349 dirlink = 'dirlinktest' 2350 dirlink_target = os.path.dirname(filelink_target) 2351 missing_link = 'missing link' 2352 2353 def setUp(self): 2354 assert os.path.exists(self.dirlink_target) 2355 assert os.path.exists(self.filelink_target) 2356 assert not os.path.exists(self.dirlink) 2357 assert not os.path.exists(self.filelink) 2358 assert not os.path.exists(self.missing_link) 2359 2360 def tearDown(self): 2361 if os.path.exists(self.filelink): 2362 os.remove(self.filelink) 2363 if os.path.exists(self.dirlink): 2364 os.rmdir(self.dirlink) 2365 if os.path.lexists(self.missing_link): 2366 os.remove(self.missing_link) 2367 2368 def test_directory_link(self): 2369 os.symlink(self.dirlink_target, self.dirlink) 2370 self.assertTrue(os.path.exists(self.dirlink)) 2371 self.assertTrue(os.path.isdir(self.dirlink)) 2372 self.assertTrue(os.path.islink(self.dirlink)) 2373 self.check_stat(self.dirlink, self.dirlink_target) 2374 2375 def test_file_link(self): 2376 os.symlink(self.filelink_target, self.filelink) 2377 self.assertTrue(os.path.exists(self.filelink)) 2378 self.assertTrue(os.path.isfile(self.filelink)) 2379 self.assertTrue(os.path.islink(self.filelink)) 2380 self.check_stat(self.filelink, self.filelink_target) 2381 2382 def _create_missing_dir_link(self): 2383 'Create a "directory" link to a non-existent target' 2384 linkname = self.missing_link 2385 if os.path.lexists(linkname): 2386 os.remove(linkname) 2387 target = r'c:\\target does not exist.29r3c740' 2388 assert not os.path.exists(target) 2389 target_is_dir = True 2390 os.symlink(target, linkname, target_is_dir) 2391 2392 def test_remove_directory_link_to_missing_target(self): 2393 self._create_missing_dir_link() 2394 # For compatibility with Unix, os.remove will check the 2395 # directory status and call RemoveDirectory if the symlink 2396 # was created with target_is_dir==True. 2397 os.remove(self.missing_link) 2398 2399 def test_isdir_on_directory_link_to_missing_target(self): 2400 self._create_missing_dir_link() 2401 self.assertFalse(os.path.isdir(self.missing_link)) 2402 2403 def test_rmdir_on_directory_link_to_missing_target(self): 2404 self._create_missing_dir_link() 2405 os.rmdir(self.missing_link) 2406 2407 def check_stat(self, link, target): 2408 self.assertEqual(os.stat(link), os.stat(target)) 2409 self.assertNotEqual(os.lstat(link), os.stat(link)) 2410 2411 bytes_link = os.fsencode(link) 2412 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2413 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2414 2415 def test_12084(self): 2416 level1 = os.path.abspath(support.TESTFN) 2417 level2 = os.path.join(level1, "level2") 2418 level3 = os.path.join(level2, "level3") 2419 self.addCleanup(support.rmtree, level1) 2420 2421 os.mkdir(level1) 2422 os.mkdir(level2) 2423 os.mkdir(level3) 2424 2425 file1 = os.path.abspath(os.path.join(level1, "file1")) 2426 create_file(file1) 2427 2428 orig_dir = os.getcwd() 2429 try: 2430 os.chdir(level2) 2431 link = os.path.join(level2, "link") 2432 os.symlink(os.path.relpath(file1), "link") 2433 self.assertIn("link", os.listdir(os.getcwd())) 2434 2435 # Check os.stat calls from the same dir as the link 2436 self.assertEqual(os.stat(file1), os.stat("link")) 2437 2438 # Check os.stat calls from a dir below the link 2439 os.chdir(level1) 2440 self.assertEqual(os.stat(file1), 2441 os.stat(os.path.relpath(link))) 2442 2443 # Check os.stat calls from a dir above the link 2444 os.chdir(level3) 2445 self.assertEqual(os.stat(file1), 2446 os.stat(os.path.relpath(link))) 2447 finally: 2448 os.chdir(orig_dir) 2449 2450 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2451 and os.path.exists(r'C:\ProgramData'), 2452 'Test directories not found') 2453 def test_29248(self): 2454 # os.symlink() calls CreateSymbolicLink, which creates 2455 # the reparse data buffer with the print name stored 2456 # first, so the offset is always 0. CreateSymbolicLink 2457 # stores the "PrintName" DOS path (e.g. "C:\") first, 2458 # with an offset of 0, followed by the "SubstituteName" 2459 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2460 # the other hand, seems to have been created manually 2461 # with an inverted order. 2462 target = os.readlink(r'C:\Users\All Users') 2463 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2464 2465 def test_buffer_overflow(self): 2466 # Older versions would have a buffer overflow when detecting 2467 # whether a link source was a directory. This test ensures we 2468 # no longer crash, but does not otherwise validate the behavior 2469 segment = 'X' * 27 2470 path = os.path.join(*[segment] * 10) 2471 test_cases = [ 2472 # overflow with absolute src 2473 ('\\' + path, segment), 2474 # overflow dest with relative src 2475 (segment, path), 2476 # overflow when joining src 2477 (path[:180], path[:180]), 2478 ] 2479 for src, dest in test_cases: 2480 try: 2481 os.symlink(src, dest) 2482 except FileNotFoundError: 2483 pass 2484 else: 2485 try: 2486 os.remove(dest) 2487 except OSError: 2488 pass 2489 # Also test with bytes, since that is a separate code path. 2490 try: 2491 os.symlink(os.fsencode(src), os.fsencode(dest)) 2492 except FileNotFoundError: 2493 pass 2494 else: 2495 try: 2496 os.remove(dest) 2497 except OSError: 2498 pass 2499 2500 def test_appexeclink(self): 2501 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') 2502 if not os.path.isdir(root): 2503 self.skipTest("test requires a WindowsApps directory") 2504 2505 aliases = [os.path.join(root, a) 2506 for a in fnmatch.filter(os.listdir(root), '*.exe')] 2507 2508 for alias in aliases: 2509 if support.verbose: 2510 print() 2511 print("Testing with", alias) 2512 st = os.lstat(alias) 2513 self.assertEqual(st, os.stat(alias)) 2514 self.assertFalse(stat.S_ISLNK(st.st_mode)) 2515 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) 2516 # testing the first one we see is sufficient 2517 break 2518 else: 2519 self.skipTest("test requires an app execution alias") 2520 2521@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2522class Win32JunctionTests(unittest.TestCase): 2523 junction = 'junctiontest' 2524 junction_target = os.path.dirname(os.path.abspath(__file__)) 2525 2526 def setUp(self): 2527 assert os.path.exists(self.junction_target) 2528 assert not os.path.lexists(self.junction) 2529 2530 def tearDown(self): 2531 if os.path.lexists(self.junction): 2532 os.unlink(self.junction) 2533 2534 def test_create_junction(self): 2535 _winapi.CreateJunction(self.junction_target, self.junction) 2536 self.assertTrue(os.path.lexists(self.junction)) 2537 self.assertTrue(os.path.exists(self.junction)) 2538 self.assertTrue(os.path.isdir(self.junction)) 2539 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) 2540 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) 2541 2542 # bpo-37834: Junctions are not recognized as links. 2543 self.assertFalse(os.path.islink(self.junction)) 2544 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), 2545 os.path.normcase(os.readlink(self.junction))) 2546 2547 def test_unlink_removes_junction(self): 2548 _winapi.CreateJunction(self.junction_target, self.junction) 2549 self.assertTrue(os.path.exists(self.junction)) 2550 self.assertTrue(os.path.lexists(self.junction)) 2551 2552 os.unlink(self.junction) 2553 self.assertFalse(os.path.exists(self.junction)) 2554 2555@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2556class Win32NtTests(unittest.TestCase): 2557 def test_getfinalpathname_handles(self): 2558 nt = support.import_module('nt') 2559 ctypes = support.import_module('ctypes') 2560 import ctypes.wintypes 2561 2562 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2563 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2564 2565 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2566 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2567 ctypes.wintypes.LPDWORD) 2568 2569 # This is a pseudo-handle that doesn't need to be closed 2570 hproc = kernel.GetCurrentProcess() 2571 2572 handle_count = ctypes.wintypes.DWORD() 2573 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2574 self.assertEqual(1, ok) 2575 2576 before_count = handle_count.value 2577 2578 # The first two test the error path, __file__ tests the success path 2579 filenames = [ 2580 r'\\?\C:', 2581 r'\\?\NUL', 2582 r'\\?\CONIN', 2583 __file__, 2584 ] 2585 2586 for _ in range(10): 2587 for name in filenames: 2588 try: 2589 nt._getfinalpathname(name) 2590 except Exception: 2591 # Failure is expected 2592 pass 2593 try: 2594 os.stat(name) 2595 except Exception: 2596 pass 2597 2598 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2599 self.assertEqual(1, ok) 2600 2601 handle_delta = handle_count.value - before_count 2602 2603 self.assertEqual(0, handle_delta) 2604 2605@support.skip_unless_symlink 2606class NonLocalSymlinkTests(unittest.TestCase): 2607 2608 def setUp(self): 2609 r""" 2610 Create this structure: 2611 2612 base 2613 \___ some_dir 2614 """ 2615 os.makedirs('base/some_dir') 2616 2617 def tearDown(self): 2618 shutil.rmtree('base') 2619 2620 def test_directory_link_nonlocal(self): 2621 """ 2622 The symlink target should resolve relative to the link, not relative 2623 to the current directory. 2624 2625 Then, link base/some_link -> base/some_dir and ensure that some_link 2626 is resolved as a directory. 2627 2628 In issue13772, it was discovered that directory detection failed if 2629 the symlink target was not specified relative to the current 2630 directory, which was a defect in the implementation. 2631 """ 2632 src = os.path.join('base', 'some_link') 2633 os.symlink('some_dir', src) 2634 assert os.path.isdir(src) 2635 2636 2637class FSEncodingTests(unittest.TestCase): 2638 def test_nop(self): 2639 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 2640 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 2641 2642 def test_identity(self): 2643 # assert fsdecode(fsencode(x)) == x 2644 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 2645 try: 2646 bytesfn = os.fsencode(fn) 2647 except UnicodeEncodeError: 2648 continue 2649 self.assertEqual(os.fsdecode(bytesfn), fn) 2650 2651 2652 2653class DeviceEncodingTests(unittest.TestCase): 2654 2655 def test_bad_fd(self): 2656 # Return None when an fd doesn't actually exist. 2657 self.assertIsNone(os.device_encoding(123456)) 2658 2659 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or 2660 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 2661 'test requires a tty and either Windows or nl_langinfo(CODESET)') 2662 def test_device_encoding(self): 2663 encoding = os.device_encoding(0) 2664 self.assertIsNotNone(encoding) 2665 self.assertTrue(codecs.lookup(encoding)) 2666 2667 2668class PidTests(unittest.TestCase): 2669 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2670 def test_getppid(self): 2671 p = subprocess.Popen([sys.executable, '-c', 2672 'import os; print(os.getppid())'], 2673 stdout=subprocess.PIPE) 2674 stdout, _ = p.communicate() 2675 # We are the parent of our subprocess 2676 self.assertEqual(int(stdout), os.getpid()) 2677 2678 def check_waitpid(self, code, exitcode): 2679 if sys.platform == 'win32': 2680 # On Windows, os.spawnv() simply joins arguments with spaces: 2681 # arguments need to be quoted 2682 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 2683 else: 2684 args = [sys.executable, '-c', code] 2685 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 2686 2687 pid2, status = os.waitpid(pid, 0) 2688 if sys.platform == 'win32': 2689 self.assertEqual(status, exitcode << 8) 2690 else: 2691 self.assertTrue(os.WIFEXITED(status), status) 2692 self.assertEqual(os.WEXITSTATUS(status), exitcode) 2693 self.assertEqual(pid2, pid) 2694 2695 def test_waitpid(self): 2696 self.check_waitpid(code='pass', exitcode=0) 2697 2698 def test_waitpid_exitcode(self): 2699 exitcode = 23 2700 code = f'import sys; sys.exit({exitcode})' 2701 self.check_waitpid(code, exitcode=exitcode) 2702 2703 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2704 def test_waitpid_windows(self): 2705 # bpo-40138: test os.waitpid() with exit code larger than INT_MAX. 2706 STATUS_CONTROL_C_EXIT = 0xC000013A 2707 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 2708 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 2709 2710 2711class SpawnTests(unittest.TestCase): 2712 def create_args(self, *, with_env=False, use_bytes=False): 2713 self.exitcode = 17 2714 2715 filename = support.TESTFN 2716 self.addCleanup(support.unlink, filename) 2717 2718 if not with_env: 2719 code = 'import sys; sys.exit(%s)' % self.exitcode 2720 else: 2721 self.env = dict(os.environ) 2722 # create an unique key 2723 self.key = str(uuid.uuid4()) 2724 self.env[self.key] = self.key 2725 # read the variable from os.environ to check that it exists 2726 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 2727 % (self.key, self.exitcode)) 2728 2729 with open(filename, "w") as fp: 2730 fp.write(code) 2731 2732 args = [sys.executable, filename] 2733 if use_bytes: 2734 args = [os.fsencode(a) for a in args] 2735 self.env = {os.fsencode(k): os.fsencode(v) 2736 for k, v in self.env.items()} 2737 2738 return args 2739 2740 @requires_os_func('spawnl') 2741 def test_spawnl(self): 2742 args = self.create_args() 2743 exitcode = os.spawnl(os.P_WAIT, args[0], *args) 2744 self.assertEqual(exitcode, self.exitcode) 2745 2746 @requires_os_func('spawnle') 2747 def test_spawnle(self): 2748 args = self.create_args(with_env=True) 2749 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) 2750 self.assertEqual(exitcode, self.exitcode) 2751 2752 @requires_os_func('spawnlp') 2753 def test_spawnlp(self): 2754 args = self.create_args() 2755 exitcode = os.spawnlp(os.P_WAIT, args[0], *args) 2756 self.assertEqual(exitcode, self.exitcode) 2757 2758 @requires_os_func('spawnlpe') 2759 def test_spawnlpe(self): 2760 args = self.create_args(with_env=True) 2761 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) 2762 self.assertEqual(exitcode, self.exitcode) 2763 2764 @requires_os_func('spawnv') 2765 def test_spawnv(self): 2766 args = self.create_args() 2767 exitcode = os.spawnv(os.P_WAIT, args[0], args) 2768 self.assertEqual(exitcode, self.exitcode) 2769 2770 @requires_os_func('spawnve') 2771 def test_spawnve(self): 2772 args = self.create_args(with_env=True) 2773 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2774 self.assertEqual(exitcode, self.exitcode) 2775 2776 @requires_os_func('spawnvp') 2777 def test_spawnvp(self): 2778 args = self.create_args() 2779 exitcode = os.spawnvp(os.P_WAIT, args[0], args) 2780 self.assertEqual(exitcode, self.exitcode) 2781 2782 @requires_os_func('spawnvpe') 2783 def test_spawnvpe(self): 2784 args = self.create_args(with_env=True) 2785 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) 2786 self.assertEqual(exitcode, self.exitcode) 2787 2788 @requires_os_func('spawnv') 2789 def test_nowait(self): 2790 args = self.create_args() 2791 pid = os.spawnv(os.P_NOWAIT, args[0], args) 2792 result = os.waitpid(pid, 0) 2793 self.assertEqual(result[0], pid) 2794 status = result[1] 2795 if hasattr(os, 'WIFEXITED'): 2796 self.assertTrue(os.WIFEXITED(status)) 2797 self.assertEqual(os.WEXITSTATUS(status), self.exitcode) 2798 else: 2799 self.assertEqual(status, self.exitcode << 8) 2800 2801 @requires_os_func('spawnve') 2802 def test_spawnve_bytes(self): 2803 # Test bytes handling in parse_arglist and parse_envlist (#28114) 2804 args = self.create_args(with_env=True, use_bytes=True) 2805 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2806 self.assertEqual(exitcode, self.exitcode) 2807 2808 @requires_os_func('spawnl') 2809 def test_spawnl_noargs(self): 2810 args = self.create_args() 2811 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0]) 2812 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '') 2813 2814 @requires_os_func('spawnle') 2815 def test_spawnle_noargs(self): 2816 args = self.create_args() 2817 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {}) 2818 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {}) 2819 2820 @requires_os_func('spawnv') 2821 def test_spawnv_noargs(self): 2822 args = self.create_args() 2823 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ()) 2824 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], []) 2825 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',)) 2826 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ['']) 2827 2828 @requires_os_func('spawnve') 2829 def test_spawnve_noargs(self): 2830 args = self.create_args() 2831 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {}) 2832 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {}) 2833 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {}) 2834 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {}) 2835 2836 def _test_invalid_env(self, spawn): 2837 args = [sys.executable, '-c', 'pass'] 2838 2839 # null character in the environment variable name 2840 newenv = os.environ.copy() 2841 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2842 try: 2843 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2844 except ValueError: 2845 pass 2846 else: 2847 self.assertEqual(exitcode, 127) 2848 2849 # null character in the environment variable value 2850 newenv = os.environ.copy() 2851 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2852 try: 2853 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2854 except ValueError: 2855 pass 2856 else: 2857 self.assertEqual(exitcode, 127) 2858 2859 # equal character in the environment variable name 2860 newenv = os.environ.copy() 2861 newenv["FRUIT=ORANGE"] = "lemon" 2862 try: 2863 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2864 except ValueError: 2865 pass 2866 else: 2867 self.assertEqual(exitcode, 127) 2868 2869 # equal character in the environment variable value 2870 filename = support.TESTFN 2871 self.addCleanup(support.unlink, filename) 2872 with open(filename, "w") as fp: 2873 fp.write('import sys, os\n' 2874 'if os.getenv("FRUIT") != "orange=lemon":\n' 2875 ' raise AssertionError') 2876 args = [sys.executable, filename] 2877 newenv = os.environ.copy() 2878 newenv["FRUIT"] = "orange=lemon" 2879 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2880 self.assertEqual(exitcode, 0) 2881 2882 @requires_os_func('spawnve') 2883 def test_spawnve_invalid_env(self): 2884 self._test_invalid_env(os.spawnve) 2885 2886 @requires_os_func('spawnvpe') 2887 def test_spawnvpe_invalid_env(self): 2888 self._test_invalid_env(os.spawnvpe) 2889 2890 2891# The introduction of this TestCase caused at least two different errors on 2892# *nix buildbots. Temporarily skip this to let the buildbots move along. 2893@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 2894@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 2895class LoginTests(unittest.TestCase): 2896 def test_getlogin(self): 2897 user_name = os.getlogin() 2898 self.assertNotEqual(len(user_name), 0) 2899 2900 2901@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 2902 "needs os.getpriority and os.setpriority") 2903class ProgramPriorityTests(unittest.TestCase): 2904 """Tests for os.getpriority() and os.setpriority().""" 2905 2906 def test_set_get_priority(self): 2907 2908 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2909 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 2910 try: 2911 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2912 if base >= 19 and new_prio <= 19: 2913 raise unittest.SkipTest("unable to reliably test setpriority " 2914 "at current nice level of %s" % base) 2915 else: 2916 self.assertEqual(new_prio, base + 1) 2917 finally: 2918 try: 2919 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 2920 except OSError as err: 2921 if err.errno != errno.EACCES: 2922 raise 2923 2924 2925class SendfileTestServer(asyncore.dispatcher, threading.Thread): 2926 2927 class Handler(asynchat.async_chat): 2928 2929 def __init__(self, conn): 2930 asynchat.async_chat.__init__(self, conn) 2931 self.in_buffer = [] 2932 self.accumulate = True 2933 self.closed = False 2934 self.push(b"220 ready\r\n") 2935 2936 def handle_read(self): 2937 data = self.recv(4096) 2938 if self.accumulate: 2939 self.in_buffer.append(data) 2940 2941 def get_data(self): 2942 return b''.join(self.in_buffer) 2943 2944 def handle_close(self): 2945 self.close() 2946 self.closed = True 2947 2948 def handle_error(self): 2949 raise 2950 2951 def __init__(self, address): 2952 threading.Thread.__init__(self) 2953 asyncore.dispatcher.__init__(self) 2954 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 2955 self.bind(address) 2956 self.listen(5) 2957 self.host, self.port = self.socket.getsockname()[:2] 2958 self.handler_instance = None 2959 self._active = False 2960 self._active_lock = threading.Lock() 2961 2962 # --- public API 2963 2964 @property 2965 def running(self): 2966 return self._active 2967 2968 def start(self): 2969 assert not self.running 2970 self.__flag = threading.Event() 2971 threading.Thread.start(self) 2972 self.__flag.wait() 2973 2974 def stop(self): 2975 assert self.running 2976 self._active = False 2977 self.join() 2978 2979 def wait(self): 2980 # wait for handler connection to be closed, then stop the server 2981 while not getattr(self.handler_instance, "closed", False): 2982 time.sleep(0.001) 2983 self.stop() 2984 2985 # --- internals 2986 2987 def run(self): 2988 self._active = True 2989 self.__flag.set() 2990 while self._active and asyncore.socket_map: 2991 self._active_lock.acquire() 2992 asyncore.loop(timeout=0.001, count=1) 2993 self._active_lock.release() 2994 asyncore.close_all() 2995 2996 def handle_accept(self): 2997 conn, addr = self.accept() 2998 self.handler_instance = self.Handler(conn) 2999 3000 def handle_connect(self): 3001 self.close() 3002 handle_read = handle_connect 3003 3004 def writable(self): 3005 return 0 3006 3007 def handle_error(self): 3008 raise 3009 3010 3011@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 3012class TestSendfile(unittest.TestCase): 3013 3014 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 3015 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 3016 not sys.platform.startswith("solaris") and \ 3017 not sys.platform.startswith("sunos") 3018 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 3019 'requires headers and trailers support') 3020 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 3021 'test is only meaningful on 32-bit builds') 3022 3023 @classmethod 3024 def setUpClass(cls): 3025 cls.key = support.threading_setup() 3026 create_file(support.TESTFN, cls.DATA) 3027 3028 @classmethod 3029 def tearDownClass(cls): 3030 support.threading_cleanup(*cls.key) 3031 support.unlink(support.TESTFN) 3032 3033 def setUp(self): 3034 self.server = SendfileTestServer((support.HOST, 0)) 3035 self.server.start() 3036 self.client = socket.socket() 3037 self.client.connect((self.server.host, self.server.port)) 3038 self.client.settimeout(1) 3039 # synchronize by waiting for "220 ready" response 3040 self.client.recv(1024) 3041 self.sockno = self.client.fileno() 3042 self.file = open(support.TESTFN, 'rb') 3043 self.fileno = self.file.fileno() 3044 3045 def tearDown(self): 3046 self.file.close() 3047 self.client.close() 3048 if self.server.running: 3049 self.server.stop() 3050 self.server = None 3051 3052 def sendfile_wrapper(self, *args, **kwargs): 3053 """A higher level wrapper representing how an application is 3054 supposed to use sendfile(). 3055 """ 3056 while True: 3057 try: 3058 return os.sendfile(*args, **kwargs) 3059 except OSError as err: 3060 if err.errno == errno.ECONNRESET: 3061 # disconnected 3062 raise 3063 elif err.errno in (errno.EAGAIN, errno.EBUSY): 3064 # we have to retry send data 3065 continue 3066 else: 3067 raise 3068 3069 def test_send_whole_file(self): 3070 # normal send 3071 total_sent = 0 3072 offset = 0 3073 nbytes = 4096 3074 while total_sent < len(self.DATA): 3075 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3076 if sent == 0: 3077 break 3078 offset += sent 3079 total_sent += sent 3080 self.assertTrue(sent <= nbytes) 3081 self.assertEqual(offset, total_sent) 3082 3083 self.assertEqual(total_sent, len(self.DATA)) 3084 self.client.shutdown(socket.SHUT_RDWR) 3085 self.client.close() 3086 self.server.wait() 3087 data = self.server.handler_instance.get_data() 3088 self.assertEqual(len(data), len(self.DATA)) 3089 self.assertEqual(data, self.DATA) 3090 3091 def test_send_at_certain_offset(self): 3092 # start sending a file at a certain offset 3093 total_sent = 0 3094 offset = len(self.DATA) // 2 3095 must_send = len(self.DATA) - offset 3096 nbytes = 4096 3097 while total_sent < must_send: 3098 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3099 if sent == 0: 3100 break 3101 offset += sent 3102 total_sent += sent 3103 self.assertTrue(sent <= nbytes) 3104 3105 self.client.shutdown(socket.SHUT_RDWR) 3106 self.client.close() 3107 self.server.wait() 3108 data = self.server.handler_instance.get_data() 3109 expected = self.DATA[len(self.DATA) // 2:] 3110 self.assertEqual(total_sent, len(expected)) 3111 self.assertEqual(len(data), len(expected)) 3112 self.assertEqual(data, expected) 3113 3114 def test_offset_overflow(self): 3115 # specify an offset > file size 3116 offset = len(self.DATA) + 4096 3117 try: 3118 sent = os.sendfile(self.sockno, self.fileno, offset, 4096) 3119 except OSError as e: 3120 # Solaris can raise EINVAL if offset >= file length, ignore. 3121 if e.errno != errno.EINVAL: 3122 raise 3123 else: 3124 self.assertEqual(sent, 0) 3125 self.client.shutdown(socket.SHUT_RDWR) 3126 self.client.close() 3127 self.server.wait() 3128 data = self.server.handler_instance.get_data() 3129 self.assertEqual(data, b'') 3130 3131 def test_invalid_offset(self): 3132 with self.assertRaises(OSError) as cm: 3133 os.sendfile(self.sockno, self.fileno, -1, 4096) 3134 self.assertEqual(cm.exception.errno, errno.EINVAL) 3135 3136 def test_keywords(self): 3137 # Keyword arguments should be supported 3138 os.sendfile(out=self.sockno, offset=0, count=4096, 3139 **{'in': self.fileno}) 3140 if self.SUPPORT_HEADERS_TRAILERS: 3141 os.sendfile(self.sockno, self.fileno, offset=0, count=4096, 3142 headers=(), trailers=(), flags=0) 3143 3144 # --- headers / trailers tests 3145 3146 @requires_headers_trailers 3147 def test_headers(self): 3148 total_sent = 0 3149 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 3150 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 3151 headers=[b"x" * 512, b"y" * 256]) 3152 self.assertLessEqual(sent, 512 + 256 + 4096) 3153 total_sent += sent 3154 offset = 4096 3155 while total_sent < len(expected_data): 3156 nbytes = min(len(expected_data) - total_sent, 4096) 3157 sent = self.sendfile_wrapper(self.sockno, self.fileno, 3158 offset, nbytes) 3159 if sent == 0: 3160 break 3161 self.assertLessEqual(sent, nbytes) 3162 total_sent += sent 3163 offset += sent 3164 3165 self.assertEqual(total_sent, len(expected_data)) 3166 self.client.close() 3167 self.server.wait() 3168 data = self.server.handler_instance.get_data() 3169 self.assertEqual(hash(data), hash(expected_data)) 3170 3171 @requires_headers_trailers 3172 def test_trailers(self): 3173 TESTFN2 = support.TESTFN + "2" 3174 file_data = b"abcdef" 3175 3176 self.addCleanup(support.unlink, TESTFN2) 3177 create_file(TESTFN2, file_data) 3178 3179 with open(TESTFN2, 'rb') as f: 3180 os.sendfile(self.sockno, f.fileno(), 0, 5, 3181 trailers=[b"123456", b"789"]) 3182 self.client.close() 3183 self.server.wait() 3184 data = self.server.handler_instance.get_data() 3185 self.assertEqual(data, b"abcde123456789") 3186 3187 @requires_headers_trailers 3188 @requires_32b 3189 def test_headers_overflow_32bits(self): 3190 self.server.handler_instance.accumulate = False 3191 with self.assertRaises(OSError) as cm: 3192 os.sendfile(self.sockno, self.fileno, 0, 0, 3193 headers=[b"x" * 2**16] * 2**15) 3194 self.assertEqual(cm.exception.errno, errno.EINVAL) 3195 3196 @requires_headers_trailers 3197 @requires_32b 3198 def test_trailers_overflow_32bits(self): 3199 self.server.handler_instance.accumulate = False 3200 with self.assertRaises(OSError) as cm: 3201 os.sendfile(self.sockno, self.fileno, 0, 0, 3202 trailers=[b"x" * 2**16] * 2**15) 3203 self.assertEqual(cm.exception.errno, errno.EINVAL) 3204 3205 @requires_headers_trailers 3206 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 3207 'test needs os.SF_NODISKIO') 3208 def test_flags(self): 3209 try: 3210 os.sendfile(self.sockno, self.fileno, 0, 4096, 3211 flags=os.SF_NODISKIO) 3212 except OSError as err: 3213 if err.errno not in (errno.EBUSY, errno.EAGAIN): 3214 raise 3215 3216 3217def supports_extended_attributes(): 3218 if not hasattr(os, "setxattr"): 3219 return False 3220 3221 try: 3222 with open(support.TESTFN, "xb", 0) as fp: 3223 try: 3224 os.setxattr(fp.fileno(), b"user.test", b"") 3225 except OSError: 3226 return False 3227 finally: 3228 support.unlink(support.TESTFN) 3229 3230 return True 3231 3232 3233@unittest.skipUnless(supports_extended_attributes(), 3234 "no non-broken extended attribute support") 3235# Kernels < 2.6.39 don't respect setxattr flags. 3236@support.requires_linux_version(2, 6, 39) 3237class ExtendedAttributeTests(unittest.TestCase): 3238 3239 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 3240 fn = support.TESTFN 3241 self.addCleanup(support.unlink, fn) 3242 create_file(fn) 3243 3244 with self.assertRaises(OSError) as cm: 3245 getxattr(fn, s("user.test"), **kwargs) 3246 self.assertEqual(cm.exception.errno, errno.ENODATA) 3247 3248 init_xattr = listxattr(fn) 3249 self.assertIsInstance(init_xattr, list) 3250 3251 setxattr(fn, s("user.test"), b"", **kwargs) 3252 xattr = set(init_xattr) 3253 xattr.add("user.test") 3254 self.assertEqual(set(listxattr(fn)), xattr) 3255 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 3256 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 3257 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3258 3259 with self.assertRaises(OSError) as cm: 3260 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3261 self.assertEqual(cm.exception.errno, errno.EEXIST) 3262 3263 with self.assertRaises(OSError) as cm: 3264 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3265 self.assertEqual(cm.exception.errno, errno.ENODATA) 3266 3267 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3268 xattr.add("user.test2") 3269 self.assertEqual(set(listxattr(fn)), xattr) 3270 removexattr(fn, s("user.test"), **kwargs) 3271 3272 with self.assertRaises(OSError) as cm: 3273 getxattr(fn, s("user.test"), **kwargs) 3274 self.assertEqual(cm.exception.errno, errno.ENODATA) 3275 3276 xattr.remove("user.test") 3277 self.assertEqual(set(listxattr(fn)), xattr) 3278 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3279 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3280 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3281 removexattr(fn, s("user.test"), **kwargs) 3282 many = sorted("user.test{}".format(i) for i in range(100)) 3283 for thing in many: 3284 setxattr(fn, thing, b"x", **kwargs) 3285 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3286 3287 def _check_xattrs(self, *args, **kwargs): 3288 self._check_xattrs_str(str, *args, **kwargs) 3289 support.unlink(support.TESTFN) 3290 3291 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3292 support.unlink(support.TESTFN) 3293 3294 def test_simple(self): 3295 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3296 os.listxattr) 3297 3298 def test_lpath(self): 3299 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3300 os.listxattr, follow_symlinks=False) 3301 3302 def test_fds(self): 3303 def getxattr(path, *args): 3304 with open(path, "rb") as fp: 3305 return os.getxattr(fp.fileno(), *args) 3306 def setxattr(path, *args): 3307 with open(path, "wb", 0) as fp: 3308 os.setxattr(fp.fileno(), *args) 3309 def removexattr(path, *args): 3310 with open(path, "wb", 0) as fp: 3311 os.removexattr(fp.fileno(), *args) 3312 def listxattr(path, *args): 3313 with open(path, "rb") as fp: 3314 return os.listxattr(fp.fileno(), *args) 3315 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3316 3317 3318@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3319class TermsizeTests(unittest.TestCase): 3320 def test_does_not_crash(self): 3321 """Check if get_terminal_size() returns a meaningful value. 3322 3323 There's no easy portable way to actually check the size of the 3324 terminal, so let's check if it returns something sensible instead. 3325 """ 3326 try: 3327 size = os.get_terminal_size() 3328 except OSError as e: 3329 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3330 # Under win32 a generic OSError can be thrown if the 3331 # handle cannot be retrieved 3332 self.skipTest("failed to query terminal size") 3333 raise 3334 3335 self.assertGreaterEqual(size.columns, 0) 3336 self.assertGreaterEqual(size.lines, 0) 3337 3338 def test_stty_match(self): 3339 """Check if stty returns the same results 3340 3341 stty actually tests stdin, so get_terminal_size is invoked on 3342 stdin explicitly. If stty succeeded, then get_terminal_size() 3343 should work too. 3344 """ 3345 try: 3346 size = subprocess.check_output(['stty', 'size']).decode().split() 3347 except (FileNotFoundError, subprocess.CalledProcessError, 3348 PermissionError): 3349 self.skipTest("stty invocation failed") 3350 expected = (int(size[1]), int(size[0])) # reversed order 3351 3352 try: 3353 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3354 except OSError as e: 3355 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3356 # Under win32 a generic OSError can be thrown if the 3357 # handle cannot be retrieved 3358 self.skipTest("failed to query terminal size") 3359 raise 3360 self.assertEqual(expected, actual) 3361 3362 3363@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create') 3364@support.requires_linux_version(3, 17) 3365class MemfdCreateTests(unittest.TestCase): 3366 def test_memfd_create(self): 3367 fd = os.memfd_create("Hi", os.MFD_CLOEXEC) 3368 self.assertNotEqual(fd, -1) 3369 self.addCleanup(os.close, fd) 3370 self.assertFalse(os.get_inheritable(fd)) 3371 with open(fd, "wb", closefd=False) as f: 3372 f.write(b'memfd_create') 3373 self.assertEqual(f.tell(), 12) 3374 3375 fd2 = os.memfd_create("Hi") 3376 self.addCleanup(os.close, fd2) 3377 self.assertFalse(os.get_inheritable(fd2)) 3378 3379 3380class OSErrorTests(unittest.TestCase): 3381 def setUp(self): 3382 class Str(str): 3383 pass 3384 3385 self.bytes_filenames = [] 3386 self.unicode_filenames = [] 3387 if support.TESTFN_UNENCODABLE is not None: 3388 decoded = support.TESTFN_UNENCODABLE 3389 else: 3390 decoded = support.TESTFN 3391 self.unicode_filenames.append(decoded) 3392 self.unicode_filenames.append(Str(decoded)) 3393 if support.TESTFN_UNDECODABLE is not None: 3394 encoded = support.TESTFN_UNDECODABLE 3395 else: 3396 encoded = os.fsencode(support.TESTFN) 3397 self.bytes_filenames.append(encoded) 3398 self.bytes_filenames.append(bytearray(encoded)) 3399 self.bytes_filenames.append(memoryview(encoded)) 3400 3401 self.filenames = self.bytes_filenames + self.unicode_filenames 3402 3403 def test_oserror_filename(self): 3404 funcs = [ 3405 (self.filenames, os.chdir,), 3406 (self.filenames, os.chmod, 0o777), 3407 (self.filenames, os.lstat,), 3408 (self.filenames, os.open, os.O_RDONLY), 3409 (self.filenames, os.rmdir,), 3410 (self.filenames, os.stat,), 3411 (self.filenames, os.unlink,), 3412 ] 3413 if sys.platform == "win32": 3414 funcs.extend(( 3415 (self.bytes_filenames, os.rename, b"dst"), 3416 (self.bytes_filenames, os.replace, b"dst"), 3417 (self.unicode_filenames, os.rename, "dst"), 3418 (self.unicode_filenames, os.replace, "dst"), 3419 (self.unicode_filenames, os.listdir, ), 3420 )) 3421 else: 3422 funcs.extend(( 3423 (self.filenames, os.listdir,), 3424 (self.filenames, os.rename, "dst"), 3425 (self.filenames, os.replace, "dst"), 3426 )) 3427 if hasattr(os, "chown"): 3428 funcs.append((self.filenames, os.chown, 0, 0)) 3429 if hasattr(os, "lchown"): 3430 funcs.append((self.filenames, os.lchown, 0, 0)) 3431 if hasattr(os, "truncate"): 3432 funcs.append((self.filenames, os.truncate, 0)) 3433 if hasattr(os, "chflags"): 3434 funcs.append((self.filenames, os.chflags, 0)) 3435 if hasattr(os, "lchflags"): 3436 funcs.append((self.filenames, os.lchflags, 0)) 3437 if hasattr(os, "chroot"): 3438 funcs.append((self.filenames, os.chroot,)) 3439 if hasattr(os, "link"): 3440 if sys.platform == "win32": 3441 funcs.append((self.bytes_filenames, os.link, b"dst")) 3442 funcs.append((self.unicode_filenames, os.link, "dst")) 3443 else: 3444 funcs.append((self.filenames, os.link, "dst")) 3445 if hasattr(os, "listxattr"): 3446 funcs.extend(( 3447 (self.filenames, os.listxattr,), 3448 (self.filenames, os.getxattr, "user.test"), 3449 (self.filenames, os.setxattr, "user.test", b'user'), 3450 (self.filenames, os.removexattr, "user.test"), 3451 )) 3452 if hasattr(os, "lchmod"): 3453 funcs.append((self.filenames, os.lchmod, 0o777)) 3454 if hasattr(os, "readlink"): 3455 funcs.append((self.filenames, os.readlink,)) 3456 3457 3458 for filenames, func, *func_args in funcs: 3459 for name in filenames: 3460 try: 3461 if isinstance(name, (str, bytes)): 3462 func(name, *func_args) 3463 else: 3464 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3465 func(name, *func_args) 3466 except OSError as err: 3467 self.assertIs(err.filename, name, str(func)) 3468 except UnicodeDecodeError: 3469 pass 3470 else: 3471 self.fail("No exception thrown by {}".format(func)) 3472 3473class CPUCountTests(unittest.TestCase): 3474 def test_cpu_count(self): 3475 cpus = os.cpu_count() 3476 if cpus is not None: 3477 self.assertIsInstance(cpus, int) 3478 self.assertGreater(cpus, 0) 3479 else: 3480 self.skipTest("Could not determine the number of CPUs") 3481 3482 3483class FDInheritanceTests(unittest.TestCase): 3484 def test_get_set_inheritable(self): 3485 fd = os.open(__file__, os.O_RDONLY) 3486 self.addCleanup(os.close, fd) 3487 self.assertEqual(os.get_inheritable(fd), False) 3488 3489 os.set_inheritable(fd, True) 3490 self.assertEqual(os.get_inheritable(fd), True) 3491 3492 @unittest.skipIf(fcntl is None, "need fcntl") 3493 def test_get_inheritable_cloexec(self): 3494 fd = os.open(__file__, os.O_RDONLY) 3495 self.addCleanup(os.close, fd) 3496 self.assertEqual(os.get_inheritable(fd), False) 3497 3498 # clear FD_CLOEXEC flag 3499 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3500 flags &= ~fcntl.FD_CLOEXEC 3501 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3502 3503 self.assertEqual(os.get_inheritable(fd), True) 3504 3505 @unittest.skipIf(fcntl is None, "need fcntl") 3506 def test_set_inheritable_cloexec(self): 3507 fd = os.open(__file__, os.O_RDONLY) 3508 self.addCleanup(os.close, fd) 3509 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3510 fcntl.FD_CLOEXEC) 3511 3512 os.set_inheritable(fd, True) 3513 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3514 0) 3515 3516 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") 3517 def test_get_set_inheritable_o_path(self): 3518 fd = os.open(__file__, os.O_PATH) 3519 self.addCleanup(os.close, fd) 3520 self.assertEqual(os.get_inheritable(fd), False) 3521 3522 os.set_inheritable(fd, True) 3523 self.assertEqual(os.get_inheritable(fd), True) 3524 3525 os.set_inheritable(fd, False) 3526 self.assertEqual(os.get_inheritable(fd), False) 3527 3528 def test_get_set_inheritable_badf(self): 3529 fd = support.make_bad_fd() 3530 3531 with self.assertRaises(OSError) as ctx: 3532 os.get_inheritable(fd) 3533 self.assertEqual(ctx.exception.errno, errno.EBADF) 3534 3535 with self.assertRaises(OSError) as ctx: 3536 os.set_inheritable(fd, True) 3537 self.assertEqual(ctx.exception.errno, errno.EBADF) 3538 3539 with self.assertRaises(OSError) as ctx: 3540 os.set_inheritable(fd, False) 3541 self.assertEqual(ctx.exception.errno, errno.EBADF) 3542 3543 def test_open(self): 3544 fd = os.open(__file__, os.O_RDONLY) 3545 self.addCleanup(os.close, fd) 3546 self.assertEqual(os.get_inheritable(fd), False) 3547 3548 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3549 def test_pipe(self): 3550 rfd, wfd = os.pipe() 3551 self.addCleanup(os.close, rfd) 3552 self.addCleanup(os.close, wfd) 3553 self.assertEqual(os.get_inheritable(rfd), False) 3554 self.assertEqual(os.get_inheritable(wfd), False) 3555 3556 def test_dup(self): 3557 fd1 = os.open(__file__, os.O_RDONLY) 3558 self.addCleanup(os.close, fd1) 3559 3560 fd2 = os.dup(fd1) 3561 self.addCleanup(os.close, fd2) 3562 self.assertEqual(os.get_inheritable(fd2), False) 3563 3564 def test_dup_standard_stream(self): 3565 fd = os.dup(1) 3566 self.addCleanup(os.close, fd) 3567 self.assertGreater(fd, 0) 3568 3569 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3570 def test_dup_nul(self): 3571 # os.dup() was creating inheritable fds for character files. 3572 fd1 = os.open('NUL', os.O_RDONLY) 3573 self.addCleanup(os.close, fd1) 3574 fd2 = os.dup(fd1) 3575 self.addCleanup(os.close, fd2) 3576 self.assertFalse(os.get_inheritable(fd2)) 3577 3578 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 3579 def test_dup2(self): 3580 fd = os.open(__file__, os.O_RDONLY) 3581 self.addCleanup(os.close, fd) 3582 3583 # inheritable by default 3584 fd2 = os.open(__file__, os.O_RDONLY) 3585 self.addCleanup(os.close, fd2) 3586 self.assertEqual(os.dup2(fd, fd2), fd2) 3587 self.assertTrue(os.get_inheritable(fd2)) 3588 3589 # force non-inheritable 3590 fd3 = os.open(__file__, os.O_RDONLY) 3591 self.addCleanup(os.close, fd3) 3592 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 3593 self.assertFalse(os.get_inheritable(fd3)) 3594 3595 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 3596 def test_openpty(self): 3597 master_fd, slave_fd = os.openpty() 3598 self.addCleanup(os.close, master_fd) 3599 self.addCleanup(os.close, slave_fd) 3600 self.assertEqual(os.get_inheritable(master_fd), False) 3601 self.assertEqual(os.get_inheritable(slave_fd), False) 3602 3603 3604class PathTConverterTests(unittest.TestCase): 3605 # tuples of (function name, allows fd arguments, additional arguments to 3606 # function, cleanup function) 3607 functions = [ 3608 ('stat', True, (), None), 3609 ('lstat', False, (), None), 3610 ('access', False, (os.F_OK,), None), 3611 ('chflags', False, (0,), None), 3612 ('lchflags', False, (0,), None), 3613 ('open', False, (0,), getattr(os, 'close', None)), 3614 ] 3615 3616 def test_path_t_converter(self): 3617 str_filename = support.TESTFN 3618 if os.name == 'nt': 3619 bytes_fspath = bytes_filename = None 3620 else: 3621 bytes_filename = support.TESTFN.encode('ascii') 3622 bytes_fspath = FakePath(bytes_filename) 3623 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 3624 self.addCleanup(support.unlink, support.TESTFN) 3625 self.addCleanup(os.close, fd) 3626 3627 int_fspath = FakePath(fd) 3628 str_fspath = FakePath(str_filename) 3629 3630 for name, allow_fd, extra_args, cleanup_fn in self.functions: 3631 with self.subTest(name=name): 3632 try: 3633 fn = getattr(os, name) 3634 except AttributeError: 3635 continue 3636 3637 for path in (str_filename, bytes_filename, str_fspath, 3638 bytes_fspath): 3639 if path is None: 3640 continue 3641 with self.subTest(name=name, path=path): 3642 result = fn(path, *extra_args) 3643 if cleanup_fn is not None: 3644 cleanup_fn(result) 3645 3646 with self.assertRaisesRegex( 3647 TypeError, 'to return str or bytes'): 3648 fn(int_fspath, *extra_args) 3649 3650 if allow_fd: 3651 result = fn(fd, *extra_args) # should not fail 3652 if cleanup_fn is not None: 3653 cleanup_fn(result) 3654 else: 3655 with self.assertRaisesRegex( 3656 TypeError, 3657 'os.PathLike'): 3658 fn(fd, *extra_args) 3659 3660 def test_path_t_converter_and_custom_class(self): 3661 msg = r'__fspath__\(\) to return str or bytes, not %s' 3662 with self.assertRaisesRegex(TypeError, msg % r'int'): 3663 os.stat(FakePath(2)) 3664 with self.assertRaisesRegex(TypeError, msg % r'float'): 3665 os.stat(FakePath(2.34)) 3666 with self.assertRaisesRegex(TypeError, msg % r'object'): 3667 os.stat(FakePath(object())) 3668 3669 3670@unittest.skipUnless(hasattr(os, 'get_blocking'), 3671 'needs os.get_blocking() and os.set_blocking()') 3672class BlockingTests(unittest.TestCase): 3673 def test_blocking(self): 3674 fd = os.open(__file__, os.O_RDONLY) 3675 self.addCleanup(os.close, fd) 3676 self.assertEqual(os.get_blocking(fd), True) 3677 3678 os.set_blocking(fd, False) 3679 self.assertEqual(os.get_blocking(fd), False) 3680 3681 os.set_blocking(fd, True) 3682 self.assertEqual(os.get_blocking(fd), True) 3683 3684 3685 3686class ExportsTests(unittest.TestCase): 3687 def test_os_all(self): 3688 self.assertIn('open', os.__all__) 3689 self.assertIn('walk', os.__all__) 3690 3691 3692class TestScandir(unittest.TestCase): 3693 check_no_resource_warning = support.check_no_resource_warning 3694 3695 def setUp(self): 3696 self.path = os.path.realpath(support.TESTFN) 3697 self.bytes_path = os.fsencode(self.path) 3698 self.addCleanup(support.rmtree, self.path) 3699 os.mkdir(self.path) 3700 3701 def create_file(self, name="file.txt"): 3702 path = self.bytes_path if isinstance(name, bytes) else self.path 3703 filename = os.path.join(path, name) 3704 create_file(filename, b'python') 3705 return filename 3706 3707 def get_entries(self, names): 3708 entries = dict((entry.name, entry) 3709 for entry in os.scandir(self.path)) 3710 self.assertEqual(sorted(entries.keys()), names) 3711 return entries 3712 3713 def assert_stat_equal(self, stat1, stat2, skip_fields): 3714 if skip_fields: 3715 for attr in dir(stat1): 3716 if not attr.startswith("st_"): 3717 continue 3718 if attr in ("st_dev", "st_ino", "st_nlink"): 3719 continue 3720 self.assertEqual(getattr(stat1, attr), 3721 getattr(stat2, attr), 3722 (stat1, stat2, attr)) 3723 else: 3724 self.assertEqual(stat1, stat2) 3725 3726 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 3727 self.assertIsInstance(entry, os.DirEntry) 3728 self.assertEqual(entry.name, name) 3729 self.assertEqual(entry.path, os.path.join(self.path, name)) 3730 self.assertEqual(entry.inode(), 3731 os.stat(entry.path, follow_symlinks=False).st_ino) 3732 3733 entry_stat = os.stat(entry.path) 3734 self.assertEqual(entry.is_dir(), 3735 stat.S_ISDIR(entry_stat.st_mode)) 3736 self.assertEqual(entry.is_file(), 3737 stat.S_ISREG(entry_stat.st_mode)) 3738 self.assertEqual(entry.is_symlink(), 3739 os.path.islink(entry.path)) 3740 3741 entry_lstat = os.stat(entry.path, follow_symlinks=False) 3742 self.assertEqual(entry.is_dir(follow_symlinks=False), 3743 stat.S_ISDIR(entry_lstat.st_mode)) 3744 self.assertEqual(entry.is_file(follow_symlinks=False), 3745 stat.S_ISREG(entry_lstat.st_mode)) 3746 3747 self.assert_stat_equal(entry.stat(), 3748 entry_stat, 3749 os.name == 'nt' and not is_symlink) 3750 self.assert_stat_equal(entry.stat(follow_symlinks=False), 3751 entry_lstat, 3752 os.name == 'nt') 3753 3754 def test_attributes(self): 3755 link = hasattr(os, 'link') 3756 symlink = support.can_symlink() 3757 3758 dirname = os.path.join(self.path, "dir") 3759 os.mkdir(dirname) 3760 filename = self.create_file("file.txt") 3761 if link: 3762 try: 3763 os.link(filename, os.path.join(self.path, "link_file.txt")) 3764 except PermissionError as e: 3765 self.skipTest('os.link(): %s' % e) 3766 if symlink: 3767 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 3768 target_is_directory=True) 3769 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 3770 3771 names = ['dir', 'file.txt'] 3772 if link: 3773 names.append('link_file.txt') 3774 if symlink: 3775 names.extend(('symlink_dir', 'symlink_file.txt')) 3776 entries = self.get_entries(names) 3777 3778 entry = entries['dir'] 3779 self.check_entry(entry, 'dir', True, False, False) 3780 3781 entry = entries['file.txt'] 3782 self.check_entry(entry, 'file.txt', False, True, False) 3783 3784 if link: 3785 entry = entries['link_file.txt'] 3786 self.check_entry(entry, 'link_file.txt', False, True, False) 3787 3788 if symlink: 3789 entry = entries['symlink_dir'] 3790 self.check_entry(entry, 'symlink_dir', True, False, True) 3791 3792 entry = entries['symlink_file.txt'] 3793 self.check_entry(entry, 'symlink_file.txt', False, True, True) 3794 3795 def get_entry(self, name): 3796 path = self.bytes_path if isinstance(name, bytes) else self.path 3797 entries = list(os.scandir(path)) 3798 self.assertEqual(len(entries), 1) 3799 3800 entry = entries[0] 3801 self.assertEqual(entry.name, name) 3802 return entry 3803 3804 def create_file_entry(self, name='file.txt'): 3805 filename = self.create_file(name=name) 3806 return self.get_entry(os.path.basename(filename)) 3807 3808 def test_current_directory(self): 3809 filename = self.create_file() 3810 old_dir = os.getcwd() 3811 try: 3812 os.chdir(self.path) 3813 3814 # call scandir() without parameter: it must list the content 3815 # of the current directory 3816 entries = dict((entry.name, entry) for entry in os.scandir()) 3817 self.assertEqual(sorted(entries.keys()), 3818 [os.path.basename(filename)]) 3819 finally: 3820 os.chdir(old_dir) 3821 3822 def test_repr(self): 3823 entry = self.create_file_entry() 3824 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 3825 3826 def test_fspath_protocol(self): 3827 entry = self.create_file_entry() 3828 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 3829 3830 def test_fspath_protocol_bytes(self): 3831 bytes_filename = os.fsencode('bytesfile.txt') 3832 bytes_entry = self.create_file_entry(name=bytes_filename) 3833 fspath = os.fspath(bytes_entry) 3834 self.assertIsInstance(fspath, bytes) 3835 self.assertEqual(fspath, 3836 os.path.join(os.fsencode(self.path),bytes_filename)) 3837 3838 def test_removed_dir(self): 3839 path = os.path.join(self.path, 'dir') 3840 3841 os.mkdir(path) 3842 entry = self.get_entry('dir') 3843 os.rmdir(path) 3844 3845 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3846 if os.name == 'nt': 3847 self.assertTrue(entry.is_dir()) 3848 self.assertFalse(entry.is_file()) 3849 self.assertFalse(entry.is_symlink()) 3850 if os.name == 'nt': 3851 self.assertRaises(FileNotFoundError, entry.inode) 3852 # don't fail 3853 entry.stat() 3854 entry.stat(follow_symlinks=False) 3855 else: 3856 self.assertGreater(entry.inode(), 0) 3857 self.assertRaises(FileNotFoundError, entry.stat) 3858 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3859 3860 def test_removed_file(self): 3861 entry = self.create_file_entry() 3862 os.unlink(entry.path) 3863 3864 self.assertFalse(entry.is_dir()) 3865 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3866 if os.name == 'nt': 3867 self.assertTrue(entry.is_file()) 3868 self.assertFalse(entry.is_symlink()) 3869 if os.name == 'nt': 3870 self.assertRaises(FileNotFoundError, entry.inode) 3871 # don't fail 3872 entry.stat() 3873 entry.stat(follow_symlinks=False) 3874 else: 3875 self.assertGreater(entry.inode(), 0) 3876 self.assertRaises(FileNotFoundError, entry.stat) 3877 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3878 3879 def test_broken_symlink(self): 3880 if not support.can_symlink(): 3881 return self.skipTest('cannot create symbolic link') 3882 3883 filename = self.create_file("file.txt") 3884 os.symlink(filename, 3885 os.path.join(self.path, "symlink.txt")) 3886 entries = self.get_entries(['file.txt', 'symlink.txt']) 3887 entry = entries['symlink.txt'] 3888 os.unlink(filename) 3889 3890 self.assertGreater(entry.inode(), 0) 3891 self.assertFalse(entry.is_dir()) 3892 self.assertFalse(entry.is_file()) # broken symlink returns False 3893 self.assertFalse(entry.is_dir(follow_symlinks=False)) 3894 self.assertFalse(entry.is_file(follow_symlinks=False)) 3895 self.assertTrue(entry.is_symlink()) 3896 self.assertRaises(FileNotFoundError, entry.stat) 3897 # don't fail 3898 entry.stat(follow_symlinks=False) 3899 3900 def test_bytes(self): 3901 self.create_file("file.txt") 3902 3903 path_bytes = os.fsencode(self.path) 3904 entries = list(os.scandir(path_bytes)) 3905 self.assertEqual(len(entries), 1, entries) 3906 entry = entries[0] 3907 3908 self.assertEqual(entry.name, b'file.txt') 3909 self.assertEqual(entry.path, 3910 os.fsencode(os.path.join(self.path, 'file.txt'))) 3911 3912 def test_bytes_like(self): 3913 self.create_file("file.txt") 3914 3915 for cls in bytearray, memoryview: 3916 path_bytes = cls(os.fsencode(self.path)) 3917 with self.assertWarns(DeprecationWarning): 3918 entries = list(os.scandir(path_bytes)) 3919 self.assertEqual(len(entries), 1, entries) 3920 entry = entries[0] 3921 3922 self.assertEqual(entry.name, b'file.txt') 3923 self.assertEqual(entry.path, 3924 os.fsencode(os.path.join(self.path, 'file.txt'))) 3925 self.assertIs(type(entry.name), bytes) 3926 self.assertIs(type(entry.path), bytes) 3927 3928 @unittest.skipUnless(os.listdir in os.supports_fd, 3929 'fd support for listdir required for this test.') 3930 def test_fd(self): 3931 self.assertIn(os.scandir, os.supports_fd) 3932 self.create_file('file.txt') 3933 expected_names = ['file.txt'] 3934 if support.can_symlink(): 3935 os.symlink('file.txt', os.path.join(self.path, 'link')) 3936 expected_names.append('link') 3937 3938 fd = os.open(self.path, os.O_RDONLY) 3939 try: 3940 with os.scandir(fd) as it: 3941 entries = list(it) 3942 names = [entry.name for entry in entries] 3943 self.assertEqual(sorted(names), expected_names) 3944 self.assertEqual(names, os.listdir(fd)) 3945 for entry in entries: 3946 self.assertEqual(entry.path, entry.name) 3947 self.assertEqual(os.fspath(entry), entry.name) 3948 self.assertEqual(entry.is_symlink(), entry.name == 'link') 3949 if os.stat in os.supports_dir_fd: 3950 st = os.stat(entry.name, dir_fd=fd) 3951 self.assertEqual(entry.stat(), st) 3952 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 3953 self.assertEqual(entry.stat(follow_symlinks=False), st) 3954 finally: 3955 os.close(fd) 3956 3957 def test_empty_path(self): 3958 self.assertRaises(FileNotFoundError, os.scandir, '') 3959 3960 def test_consume_iterator_twice(self): 3961 self.create_file("file.txt") 3962 iterator = os.scandir(self.path) 3963 3964 entries = list(iterator) 3965 self.assertEqual(len(entries), 1, entries) 3966 3967 # check than consuming the iterator twice doesn't raise exception 3968 entries2 = list(iterator) 3969 self.assertEqual(len(entries2), 0, entries2) 3970 3971 def test_bad_path_type(self): 3972 for obj in [1.234, {}, []]: 3973 self.assertRaises(TypeError, os.scandir, obj) 3974 3975 def test_close(self): 3976 self.create_file("file.txt") 3977 self.create_file("file2.txt") 3978 iterator = os.scandir(self.path) 3979 next(iterator) 3980 iterator.close() 3981 # multiple closes 3982 iterator.close() 3983 with self.check_no_resource_warning(): 3984 del iterator 3985 3986 def test_context_manager(self): 3987 self.create_file("file.txt") 3988 self.create_file("file2.txt") 3989 with os.scandir(self.path) as iterator: 3990 next(iterator) 3991 with self.check_no_resource_warning(): 3992 del iterator 3993 3994 def test_context_manager_close(self): 3995 self.create_file("file.txt") 3996 self.create_file("file2.txt") 3997 with os.scandir(self.path) as iterator: 3998 next(iterator) 3999 iterator.close() 4000 4001 def test_context_manager_exception(self): 4002 self.create_file("file.txt") 4003 self.create_file("file2.txt") 4004 with self.assertRaises(ZeroDivisionError): 4005 with os.scandir(self.path) as iterator: 4006 next(iterator) 4007 1/0 4008 with self.check_no_resource_warning(): 4009 del iterator 4010 4011 def test_resource_warning(self): 4012 self.create_file("file.txt") 4013 self.create_file("file2.txt") 4014 iterator = os.scandir(self.path) 4015 next(iterator) 4016 with self.assertWarns(ResourceWarning): 4017 del iterator 4018 support.gc_collect() 4019 # exhausted iterator 4020 iterator = os.scandir(self.path) 4021 list(iterator) 4022 with self.check_no_resource_warning(): 4023 del iterator 4024 4025 4026class TestPEP519(unittest.TestCase): 4027 4028 # Abstracted so it can be overridden to test pure Python implementation 4029 # if a C version is provided. 4030 fspath = staticmethod(os.fspath) 4031 4032 def test_return_bytes(self): 4033 for b in b'hello', b'goodbye', b'some/path/and/file': 4034 self.assertEqual(b, self.fspath(b)) 4035 4036 def test_return_string(self): 4037 for s in 'hello', 'goodbye', 'some/path/and/file': 4038 self.assertEqual(s, self.fspath(s)) 4039 4040 def test_fsencode_fsdecode(self): 4041 for p in "path/like/object", b"path/like/object": 4042 pathlike = FakePath(p) 4043 4044 self.assertEqual(p, self.fspath(pathlike)) 4045 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 4046 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 4047 4048 def test_pathlike(self): 4049 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 4050 self.assertTrue(issubclass(FakePath, os.PathLike)) 4051 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 4052 4053 def test_garbage_in_exception_out(self): 4054 vapor = type('blah', (), {}) 4055 for o in int, type, os, vapor(): 4056 self.assertRaises(TypeError, self.fspath, o) 4057 4058 def test_argument_required(self): 4059 self.assertRaises(TypeError, self.fspath) 4060 4061 def test_bad_pathlike(self): 4062 # __fspath__ returns a value other than str or bytes. 4063 self.assertRaises(TypeError, self.fspath, FakePath(42)) 4064 # __fspath__ attribute that is not callable. 4065 c = type('foo', (), {}) 4066 c.__fspath__ = 1 4067 self.assertRaises(TypeError, self.fspath, c()) 4068 # __fspath__ raises an exception. 4069 self.assertRaises(ZeroDivisionError, self.fspath, 4070 FakePath(ZeroDivisionError())) 4071 4072 def test_pathlike_subclasshook(self): 4073 # bpo-38878: subclasshook causes subclass checks 4074 # true on abstract implementation. 4075 class A(os.PathLike): 4076 pass 4077 self.assertFalse(issubclass(FakePath, A)) 4078 self.assertTrue(issubclass(FakePath, os.PathLike)) 4079 4080 4081class TimesTests(unittest.TestCase): 4082 def test_times(self): 4083 times = os.times() 4084 self.assertIsInstance(times, os.times_result) 4085 4086 for field in ('user', 'system', 'children_user', 'children_system', 4087 'elapsed'): 4088 value = getattr(times, field) 4089 self.assertIsInstance(value, float) 4090 4091 if os.name == 'nt': 4092 self.assertEqual(times.children_user, 0) 4093 self.assertEqual(times.children_system, 0) 4094 self.assertEqual(times.elapsed, 0) 4095 4096 4097# Only test if the C version is provided, otherwise TestPEP519 already tested 4098# the pure Python implementation. 4099if hasattr(os, "_fspath"): 4100 class TestPEP519PurePython(TestPEP519): 4101 4102 """Explicitly test the pure Python implementation of os.fspath().""" 4103 4104 fspath = staticmethod(os._fspath) 4105 4106 4107if __name__ == "__main__": 4108 unittest.main() 4109