1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import asynchat 6import asyncore 7import codecs 8import contextlib 9import decimal 10import errno 11import fractions 12import getpass 13import itertools 14import locale 15import mmap 16import os 17import pickle 18import shutil 19import signal 20import socket 21import stat 22import subprocess 23import sys 24import sysconfig 25import threading 26import time 27import unittest 28import uuid 29import warnings 30from test import support 31 32try: 33 import resource 34except ImportError: 35 resource = None 36try: 37 import fcntl 38except ImportError: 39 fcntl = None 40try: 41 import _winapi 42except ImportError: 43 _winapi = None 44try: 45 import pwd 46 all_users = [u.pw_uid for u in pwd.getpwall()] 47except (ImportError, AttributeError): 48 all_users = [] 49try: 50 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 51except ImportError: 52 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 53 54from test.support.script_helper import assert_python_ok 55from test.support import unix_shell, FakePath 56 57 58root_in_posix = False 59if hasattr(os, 'geteuid'): 60 root_in_posix = (os.geteuid() == 0) 61 62# Detect whether we're on a Linux system that uses the (now outdated 63# and unmaintained) linuxthreads threading library. There's an issue 64# when combining linuxthreads with a failed execv call: see 65# http://bugs.python.org/issue4970. 66if hasattr(sys, 'thread_info') and sys.thread_info.version: 67 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 68else: 69 USING_LINUXTHREADS = False 70 71# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 72HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 73 74 75def requires_os_func(name): 76 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 77 78 79def create_file(filename, content=b'content'): 80 with open(filename, "xb", 0) as fp: 81 fp.write(content) 82 83 84# Tests creating TESTFN 85class FileTests(unittest.TestCase): 86 def setUp(self): 87 if os.path.lexists(support.TESTFN): 88 os.unlink(support.TESTFN) 89 tearDown = setUp 90 91 def test_access(self): 92 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 93 os.close(f) 94 self.assertTrue(os.access(support.TESTFN, os.W_OK)) 95 96 def test_closerange(self): 97 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 98 # We must allocate two consecutive file descriptors, otherwise 99 # it will mess up other file descriptors (perhaps even the three 100 # standard ones). 101 second = os.dup(first) 102 try: 103 retries = 0 104 while second != first + 1: 105 os.close(first) 106 retries += 1 107 if retries > 10: 108 # XXX test skipped 109 self.skipTest("couldn't allocate two consecutive fds") 110 first, second = second, os.dup(second) 111 finally: 112 os.close(second) 113 # close a fd that is open, and one that isn't 114 os.closerange(first, first + 2) 115 self.assertRaises(OSError, os.write, first, b"a") 116 117 @support.cpython_only 118 def test_rename(self): 119 path = support.TESTFN 120 old = sys.getrefcount(path) 121 self.assertRaises(TypeError, os.rename, path, 0) 122 new = sys.getrefcount(path) 123 self.assertEqual(old, new) 124 125 def test_read(self): 126 with open(support.TESTFN, "w+b") as fobj: 127 fobj.write(b"spam") 128 fobj.flush() 129 fd = fobj.fileno() 130 os.lseek(fd, 0, 0) 131 s = os.read(fd, 4) 132 self.assertEqual(type(s), bytes) 133 self.assertEqual(s, b"spam") 134 135 @support.cpython_only 136 # Skip the test on 32-bit platforms: the number of bytes must fit in a 137 # Py_ssize_t type 138 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 139 "needs INT_MAX < PY_SSIZE_T_MAX") 140 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 141 def test_large_read(self, size): 142 self.addCleanup(support.unlink, support.TESTFN) 143 create_file(support.TESTFN, b'test') 144 145 # Issue #21932: Make sure that os.read() does not raise an 146 # OverflowError for size larger than INT_MAX 147 with open(support.TESTFN, "rb") as fp: 148 data = os.read(fp.fileno(), size) 149 150 # The test does not try to read more than 2 GiB at once because the 151 # operating system is free to return less bytes than requested. 152 self.assertEqual(data, b'test') 153 154 def test_write(self): 155 # os.write() accepts bytes- and buffer-like objects but not strings 156 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) 157 self.assertRaises(TypeError, os.write, fd, "beans") 158 os.write(fd, b"bacon\n") 159 os.write(fd, bytearray(b"eggs\n")) 160 os.write(fd, memoryview(b"spam\n")) 161 os.close(fd) 162 with open(support.TESTFN, "rb") as fobj: 163 self.assertEqual(fobj.read().splitlines(), 164 [b"bacon", b"eggs", b"spam"]) 165 166 def write_windows_console(self, *args): 167 retcode = subprocess.call(args, 168 # use a new console to not flood the test output 169 creationflags=subprocess.CREATE_NEW_CONSOLE, 170 # use a shell to hide the console window (SW_HIDE) 171 shell=True) 172 self.assertEqual(retcode, 0) 173 174 @unittest.skipUnless(sys.platform == 'win32', 175 'test specific to the Windows console') 176 def test_write_windows_console(self): 177 # Issue #11395: the Windows console returns an error (12: not enough 178 # space error) on writing into stdout if stdout mode is binary and the 179 # length is greater than 66,000 bytes (or less, depending on heap 180 # usage). 181 code = "print('x' * 100000)" 182 self.write_windows_console(sys.executable, "-c", code) 183 self.write_windows_console(sys.executable, "-u", "-c", code) 184 185 def fdopen_helper(self, *args): 186 fd = os.open(support.TESTFN, os.O_RDONLY) 187 f = os.fdopen(fd, *args) 188 f.close() 189 190 def test_fdopen(self): 191 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 192 os.close(fd) 193 194 self.fdopen_helper() 195 self.fdopen_helper('r') 196 self.fdopen_helper('r', 100) 197 198 def test_replace(self): 199 TESTFN2 = support.TESTFN + ".2" 200 self.addCleanup(support.unlink, support.TESTFN) 201 self.addCleanup(support.unlink, TESTFN2) 202 203 create_file(support.TESTFN, b"1") 204 create_file(TESTFN2, b"2") 205 206 os.replace(support.TESTFN, TESTFN2) 207 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) 208 with open(TESTFN2, 'r') as f: 209 self.assertEqual(f.read(), "1") 210 211 def test_open_keywords(self): 212 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 213 dir_fd=None) 214 os.close(f) 215 216 def test_symlink_keywords(self): 217 symlink = support.get_attribute(os, "symlink") 218 try: 219 symlink(src='target', dst=support.TESTFN, 220 target_is_directory=False, dir_fd=None) 221 except (NotImplementedError, OSError): 222 pass # No OS support or unprivileged user 223 224 225# Test attributes on return values from os.*stat* family. 226class StatAttributeTests(unittest.TestCase): 227 def setUp(self): 228 self.fname = support.TESTFN 229 self.addCleanup(support.unlink, self.fname) 230 create_file(self.fname, b"ABC") 231 232 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 233 def check_stat_attributes(self, fname): 234 result = os.stat(fname) 235 236 # Make sure direct access works 237 self.assertEqual(result[stat.ST_SIZE], 3) 238 self.assertEqual(result.st_size, 3) 239 240 # Make sure all the attributes are there 241 members = dir(result) 242 for name in dir(stat): 243 if name[:3] == 'ST_': 244 attr = name.lower() 245 if name.endswith("TIME"): 246 def trunc(x): return int(x) 247 else: 248 def trunc(x): return x 249 self.assertEqual(trunc(getattr(result, attr)), 250 result[getattr(stat, name)]) 251 self.assertIn(attr, members) 252 253 # Make sure that the st_?time and st_?time_ns fields roughly agree 254 # (they should always agree up to around tens-of-microseconds) 255 for name in 'st_atime st_mtime st_ctime'.split(): 256 floaty = int(getattr(result, name) * 100000) 257 nanosecondy = getattr(result, name + "_ns") // 10000 258 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 259 260 try: 261 result[200] 262 self.fail("No exception raised") 263 except IndexError: 264 pass 265 266 # Make sure that assignment fails 267 try: 268 result.st_mode = 1 269 self.fail("No exception raised") 270 except AttributeError: 271 pass 272 273 try: 274 result.st_rdev = 1 275 self.fail("No exception raised") 276 except (AttributeError, TypeError): 277 pass 278 279 try: 280 result.parrot = 1 281 self.fail("No exception raised") 282 except AttributeError: 283 pass 284 285 # Use the stat_result constructor with a too-short tuple. 286 try: 287 result2 = os.stat_result((10,)) 288 self.fail("No exception raised") 289 except TypeError: 290 pass 291 292 # Use the constructor with a too-long tuple. 293 try: 294 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 295 except TypeError: 296 pass 297 298 def test_stat_attributes(self): 299 self.check_stat_attributes(self.fname) 300 301 def test_stat_attributes_bytes(self): 302 try: 303 fname = self.fname.encode(sys.getfilesystemencoding()) 304 except UnicodeEncodeError: 305 self.skipTest("cannot encode %a for the filesystem" % self.fname) 306 self.check_stat_attributes(fname) 307 308 def test_stat_result_pickle(self): 309 result = os.stat(self.fname) 310 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 311 p = pickle.dumps(result, proto) 312 self.assertIn(b'stat_result', p) 313 if proto < 4: 314 self.assertIn(b'cos\nstat_result\n', p) 315 unpickled = pickle.loads(p) 316 self.assertEqual(result, unpickled) 317 318 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 319 def test_statvfs_attributes(self): 320 result = os.statvfs(self.fname) 321 322 # Make sure direct access works 323 self.assertEqual(result.f_bfree, result[3]) 324 325 # Make sure all the attributes are there. 326 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 327 'ffree', 'favail', 'flag', 'namemax') 328 for value, member in enumerate(members): 329 self.assertEqual(getattr(result, 'f_' + member), result[value]) 330 331 self.assertTrue(isinstance(result.f_fsid, int)) 332 333 # Test that the size of the tuple doesn't change 334 self.assertEqual(len(result), 10) 335 336 # Make sure that assignment really fails 337 try: 338 result.f_bfree = 1 339 self.fail("No exception raised") 340 except AttributeError: 341 pass 342 343 try: 344 result.parrot = 1 345 self.fail("No exception raised") 346 except AttributeError: 347 pass 348 349 # Use the constructor with a too-short tuple. 350 try: 351 result2 = os.statvfs_result((10,)) 352 self.fail("No exception raised") 353 except TypeError: 354 pass 355 356 # Use the constructor with a too-long tuple. 357 try: 358 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 359 except TypeError: 360 pass 361 362 @unittest.skipUnless(hasattr(os, 'statvfs'), 363 "need os.statvfs()") 364 def test_statvfs_result_pickle(self): 365 result = os.statvfs(self.fname) 366 367 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 368 p = pickle.dumps(result, proto) 369 self.assertIn(b'statvfs_result', p) 370 if proto < 4: 371 self.assertIn(b'cos\nstatvfs_result\n', p) 372 unpickled = pickle.loads(p) 373 self.assertEqual(result, unpickled) 374 375 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 376 def test_1686475(self): 377 # Verify that an open file can be stat'ed 378 try: 379 os.stat(r"c:\pagefile.sys") 380 except FileNotFoundError: 381 self.skipTest(r'c:\pagefile.sys does not exist') 382 except OSError as e: 383 self.fail("Could not stat pagefile.sys") 384 385 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 386 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 387 def test_15261(self): 388 # Verify that stat'ing a closed fd does not cause crash 389 r, w = os.pipe() 390 try: 391 os.stat(r) # should not raise error 392 finally: 393 os.close(r) 394 os.close(w) 395 with self.assertRaises(OSError) as ctx: 396 os.stat(r) 397 self.assertEqual(ctx.exception.errno, errno.EBADF) 398 399 def check_file_attributes(self, result): 400 self.assertTrue(hasattr(result, 'st_file_attributes')) 401 self.assertTrue(isinstance(result.st_file_attributes, int)) 402 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 403 404 @unittest.skipUnless(sys.platform == "win32", 405 "st_file_attributes is Win32 specific") 406 def test_file_attributes(self): 407 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 408 result = os.stat(self.fname) 409 self.check_file_attributes(result) 410 self.assertEqual( 411 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 412 0) 413 414 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 415 dirname = support.TESTFN + "dir" 416 os.mkdir(dirname) 417 self.addCleanup(os.rmdir, dirname) 418 419 result = os.stat(dirname) 420 self.check_file_attributes(result) 421 self.assertEqual( 422 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 423 stat.FILE_ATTRIBUTE_DIRECTORY) 424 425 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 426 def test_access_denied(self): 427 # Default to FindFirstFile WIN32_FIND_DATA when access is 428 # denied. See issue 28075. 429 # os.environ['TEMP'] should be located on a volume that 430 # supports file ACLs. 431 fname = os.path.join(os.environ['TEMP'], self.fname) 432 self.addCleanup(support.unlink, fname) 433 create_file(fname, b'ABC') 434 # Deny the right to [S]YNCHRONIZE on the file to 435 # force CreateFile to fail with ERROR_ACCESS_DENIED. 436 DETACHED_PROCESS = 8 437 subprocess.check_call( 438 # bpo-30584: Use security identifier *S-1-5-32-545 instead 439 # of localized "Users" to not depend on the locale. 440 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 441 creationflags=DETACHED_PROCESS 442 ) 443 result = os.stat(fname) 444 self.assertNotEqual(result.st_size, 0) 445 446 447class UtimeTests(unittest.TestCase): 448 def setUp(self): 449 self.dirname = support.TESTFN 450 self.fname = os.path.join(self.dirname, "f1") 451 452 self.addCleanup(support.rmtree, self.dirname) 453 os.mkdir(self.dirname) 454 create_file(self.fname) 455 456 def support_subsecond(self, filename): 457 # Heuristic to check if the filesystem supports timestamp with 458 # subsecond resolution: check if float and int timestamps are different 459 st = os.stat(filename) 460 return ((st.st_atime != st[7]) 461 or (st.st_mtime != st[8]) 462 or (st.st_ctime != st[9])) 463 464 def _test_utime(self, set_time, filename=None): 465 if not filename: 466 filename = self.fname 467 468 support_subsecond = self.support_subsecond(filename) 469 if support_subsecond: 470 # Timestamp with a resolution of 1 microsecond (10^-6). 471 # 472 # The resolution of the C internal function used by os.utime() 473 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 474 # test with a resolution of 1 ns requires more work: 475 # see the issue #15745. 476 atime_ns = 1002003000 # 1.002003 seconds 477 mtime_ns = 4005006000 # 4.005006 seconds 478 else: 479 # use a resolution of 1 second 480 atime_ns = 5 * 10**9 481 mtime_ns = 8 * 10**9 482 483 set_time(filename, (atime_ns, mtime_ns)) 484 st = os.stat(filename) 485 486 if support_subsecond: 487 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 488 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 489 else: 490 self.assertEqual(st.st_atime, atime_ns * 1e-9) 491 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 492 self.assertEqual(st.st_atime_ns, atime_ns) 493 self.assertEqual(st.st_mtime_ns, mtime_ns) 494 495 def test_utime(self): 496 def set_time(filename, ns): 497 # test the ns keyword parameter 498 os.utime(filename, ns=ns) 499 self._test_utime(set_time) 500 501 @staticmethod 502 def ns_to_sec(ns): 503 # Convert a number of nanosecond (int) to a number of seconds (float). 504 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 505 # issue, os.utime() rounds towards minus infinity. 506 return (ns * 1e-9) + 0.5e-9 507 508 def test_utime_by_indexed(self): 509 # pass times as floating point seconds as the second indexed parameter 510 def set_time(filename, ns): 511 atime_ns, mtime_ns = ns 512 atime = self.ns_to_sec(atime_ns) 513 mtime = self.ns_to_sec(mtime_ns) 514 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 515 # or utime(time_t) 516 os.utime(filename, (atime, mtime)) 517 self._test_utime(set_time) 518 519 def test_utime_by_times(self): 520 def set_time(filename, ns): 521 atime_ns, mtime_ns = ns 522 atime = self.ns_to_sec(atime_ns) 523 mtime = self.ns_to_sec(mtime_ns) 524 # test the times keyword parameter 525 os.utime(filename, times=(atime, mtime)) 526 self._test_utime(set_time) 527 528 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 529 "follow_symlinks support for utime required " 530 "for this test.") 531 def test_utime_nofollow_symlinks(self): 532 def set_time(filename, ns): 533 # use follow_symlinks=False to test utimensat(timespec) 534 # or lutimes(timeval) 535 os.utime(filename, ns=ns, follow_symlinks=False) 536 self._test_utime(set_time) 537 538 @unittest.skipUnless(os.utime in os.supports_fd, 539 "fd support for utime required for this test.") 540 def test_utime_fd(self): 541 def set_time(filename, ns): 542 with open(filename, 'wb', 0) as fp: 543 # use a file descriptor to test futimens(timespec) 544 # or futimes(timeval) 545 os.utime(fp.fileno(), ns=ns) 546 self._test_utime(set_time) 547 548 @unittest.skipUnless(os.utime in os.supports_dir_fd, 549 "dir_fd support for utime required for this test.") 550 def test_utime_dir_fd(self): 551 def set_time(filename, ns): 552 dirname, name = os.path.split(filename) 553 dirfd = os.open(dirname, os.O_RDONLY) 554 try: 555 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 556 os.utime(name, dir_fd=dirfd, ns=ns) 557 finally: 558 os.close(dirfd) 559 self._test_utime(set_time) 560 561 def test_utime_directory(self): 562 def set_time(filename, ns): 563 # test calling os.utime() on a directory 564 os.utime(filename, ns=ns) 565 self._test_utime(set_time, filename=self.dirname) 566 567 def _test_utime_current(self, set_time): 568 # Get the system clock 569 current = time.time() 570 571 # Call os.utime() to set the timestamp to the current system clock 572 set_time(self.fname) 573 574 if not self.support_subsecond(self.fname): 575 delta = 1.0 576 else: 577 # On Windows, the usual resolution of time.time() is 15.6 ms. 578 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 579 # 580 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 581 # also 50 ms on other platforms. 582 delta = 0.050 583 st = os.stat(self.fname) 584 msg = ("st_time=%r, current=%r, dt=%r" 585 % (st.st_mtime, current, st.st_mtime - current)) 586 self.assertAlmostEqual(st.st_mtime, current, 587 delta=delta, msg=msg) 588 589 def test_utime_current(self): 590 def set_time(filename): 591 # Set to the current time in the new way 592 os.utime(self.fname) 593 self._test_utime_current(set_time) 594 595 def test_utime_current_old(self): 596 def set_time(filename): 597 # Set to the current time in the old explicit way. 598 os.utime(self.fname, None) 599 self._test_utime_current(set_time) 600 601 def get_file_system(self, path): 602 if sys.platform == 'win32': 603 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 604 import ctypes 605 kernel32 = ctypes.windll.kernel32 606 buf = ctypes.create_unicode_buffer("", 100) 607 ok = kernel32.GetVolumeInformationW(root, None, 0, 608 None, None, None, 609 buf, len(buf)) 610 if ok: 611 return buf.value 612 # return None if the filesystem is unknown 613 614 def test_large_time(self): 615 # Many filesystems are limited to the year 2038. At least, the test 616 # pass with NTFS filesystem. 617 if self.get_file_system(self.dirname) != "NTFS": 618 self.skipTest("requires NTFS") 619 620 large = 5000000000 # some day in 2128 621 os.utime(self.fname, (large, large)) 622 self.assertEqual(os.stat(self.fname).st_mtime, large) 623 624 def test_utime_invalid_arguments(self): 625 # seconds and nanoseconds parameters are mutually exclusive 626 with self.assertRaises(ValueError): 627 os.utime(self.fname, (5, 5), ns=(5, 5)) 628 with self.assertRaises(TypeError): 629 os.utime(self.fname, [5, 5]) 630 with self.assertRaises(TypeError): 631 os.utime(self.fname, (5,)) 632 with self.assertRaises(TypeError): 633 os.utime(self.fname, (5, 5, 5)) 634 with self.assertRaises(TypeError): 635 os.utime(self.fname, ns=[5, 5]) 636 with self.assertRaises(TypeError): 637 os.utime(self.fname, ns=(5,)) 638 with self.assertRaises(TypeError): 639 os.utime(self.fname, ns=(5, 5, 5)) 640 641 if os.utime not in os.supports_follow_symlinks: 642 with self.assertRaises(NotImplementedError): 643 os.utime(self.fname, (5, 5), follow_symlinks=False) 644 if os.utime not in os.supports_fd: 645 with open(self.fname, 'wb', 0) as fp: 646 with self.assertRaises(TypeError): 647 os.utime(fp.fileno(), (5, 5)) 648 if os.utime not in os.supports_dir_fd: 649 with self.assertRaises(NotImplementedError): 650 os.utime(self.fname, (5, 5), dir_fd=0) 651 652 @support.cpython_only 653 def test_issue31577(self): 654 # The interpreter shouldn't crash in case utime() received a bad 655 # ns argument. 656 def get_bad_int(divmod_ret_val): 657 class BadInt: 658 def __divmod__(*args): 659 return divmod_ret_val 660 return BadInt() 661 with self.assertRaises(TypeError): 662 os.utime(self.fname, ns=(get_bad_int(42), 1)) 663 with self.assertRaises(TypeError): 664 os.utime(self.fname, ns=(get_bad_int(()), 1)) 665 with self.assertRaises(TypeError): 666 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 667 668 669from test import mapping_tests 670 671class EnvironTests(mapping_tests.BasicTestMappingProtocol): 672 """check that os.environ object conform to mapping protocol""" 673 type2test = None 674 675 def setUp(self): 676 self.__save = dict(os.environ) 677 if os.supports_bytes_environ: 678 self.__saveb = dict(os.environb) 679 for key, value in self._reference().items(): 680 os.environ[key] = value 681 682 def tearDown(self): 683 os.environ.clear() 684 os.environ.update(self.__save) 685 if os.supports_bytes_environ: 686 os.environb.clear() 687 os.environb.update(self.__saveb) 688 689 def _reference(self): 690 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 691 692 def _empty_mapping(self): 693 os.environ.clear() 694 return os.environ 695 696 # Bug 1110478 697 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 698 'requires a shell') 699 def test_update2(self): 700 os.environ.clear() 701 os.environ.update(HELLO="World") 702 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 703 value = popen.read().strip() 704 self.assertEqual(value, "World") 705 706 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 707 'requires a shell') 708 def test_os_popen_iter(self): 709 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 710 % unix_shell) as popen: 711 it = iter(popen) 712 self.assertEqual(next(it), "line1\n") 713 self.assertEqual(next(it), "line2\n") 714 self.assertEqual(next(it), "line3\n") 715 self.assertRaises(StopIteration, next, it) 716 717 # Verify environ keys and values from the OS are of the 718 # correct str type. 719 def test_keyvalue_types(self): 720 for key, val in os.environ.items(): 721 self.assertEqual(type(key), str) 722 self.assertEqual(type(val), str) 723 724 def test_items(self): 725 for key, value in self._reference().items(): 726 self.assertEqual(os.environ.get(key), value) 727 728 # Issue 7310 729 def test___repr__(self): 730 """Check that the repr() of os.environ looks like environ({...}).""" 731 env = os.environ 732 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( 733 '{!r}: {!r}'.format(key, value) 734 for key, value in env.items()))) 735 736 def test_get_exec_path(self): 737 defpath_list = os.defpath.split(os.pathsep) 738 test_path = ['/monty', '/python', '', '/flying/circus'] 739 test_env = {'PATH': os.pathsep.join(test_path)} 740 741 saved_environ = os.environ 742 try: 743 os.environ = dict(test_env) 744 # Test that defaulting to os.environ works. 745 self.assertSequenceEqual(test_path, os.get_exec_path()) 746 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 747 finally: 748 os.environ = saved_environ 749 750 # No PATH environment variable 751 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 752 # Empty PATH environment variable 753 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 754 # Supplied PATH environment variable 755 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 756 757 if os.supports_bytes_environ: 758 # env cannot contain 'PATH' and b'PATH' keys 759 try: 760 # ignore BytesWarning warning 761 with warnings.catch_warnings(record=True): 762 mixed_env = {'PATH': '1', b'PATH': b'2'} 763 except BytesWarning: 764 # mixed_env cannot be created with python -bb 765 pass 766 else: 767 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 768 769 # bytes key and/or value 770 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 771 ['abc']) 772 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 773 ['abc']) 774 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 775 ['abc']) 776 777 @unittest.skipUnless(os.supports_bytes_environ, 778 "os.environb required for this test.") 779 def test_environb(self): 780 # os.environ -> os.environb 781 value = 'euro\u20ac' 782 try: 783 value_bytes = value.encode(sys.getfilesystemencoding(), 784 'surrogateescape') 785 except UnicodeEncodeError: 786 msg = "U+20AC character is not encodable to %s" % ( 787 sys.getfilesystemencoding(),) 788 self.skipTest(msg) 789 os.environ['unicode'] = value 790 self.assertEqual(os.environ['unicode'], value) 791 self.assertEqual(os.environb[b'unicode'], value_bytes) 792 793 # os.environb -> os.environ 794 value = b'\xff' 795 os.environb[b'bytes'] = value 796 self.assertEqual(os.environb[b'bytes'], value) 797 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 798 self.assertEqual(os.environ['bytes'], value_str) 799 800 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 801 @support.requires_mac_ver(10, 6) 802 def test_unset_error(self): 803 if sys.platform == "win32": 804 # an environment variable is limited to 32,767 characters 805 key = 'x' * 50000 806 self.assertRaises(ValueError, os.environ.__delitem__, key) 807 else: 808 # "=" is not allowed in a variable name 809 key = 'key=' 810 self.assertRaises(OSError, os.environ.__delitem__, key) 811 812 def test_key_type(self): 813 missing = 'missingkey' 814 self.assertNotIn(missing, os.environ) 815 816 with self.assertRaises(KeyError) as cm: 817 os.environ[missing] 818 self.assertIs(cm.exception.args[0], missing) 819 self.assertTrue(cm.exception.__suppress_context__) 820 821 with self.assertRaises(KeyError) as cm: 822 del os.environ[missing] 823 self.assertIs(cm.exception.args[0], missing) 824 self.assertTrue(cm.exception.__suppress_context__) 825 826 def _test_environ_iteration(self, collection): 827 iterator = iter(collection) 828 new_key = "__new_key__" 829 830 next(iterator) # start iteration over os.environ.items 831 832 # add a new key in os.environ mapping 833 os.environ[new_key] = "test_environ_iteration" 834 835 try: 836 next(iterator) # force iteration over modified mapping 837 self.assertEqual(os.environ[new_key], "test_environ_iteration") 838 finally: 839 del os.environ[new_key] 840 841 def test_iter_error_when_changing_os_environ(self): 842 self._test_environ_iteration(os.environ) 843 844 def test_iter_error_when_changing_os_environ_items(self): 845 self._test_environ_iteration(os.environ.items()) 846 847 def test_iter_error_when_changing_os_environ_values(self): 848 self._test_environ_iteration(os.environ.values()) 849 850 851class WalkTests(unittest.TestCase): 852 """Tests for os.walk().""" 853 854 # Wrapper to hide minor differences between os.walk and os.fwalk 855 # to tests both functions with the same code base 856 def walk(self, top, **kwargs): 857 if 'follow_symlinks' in kwargs: 858 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 859 return os.walk(top, **kwargs) 860 861 def setUp(self): 862 join = os.path.join 863 self.addCleanup(support.rmtree, support.TESTFN) 864 865 # Build: 866 # TESTFN/ 867 # TEST1/ a file kid and two directory kids 868 # tmp1 869 # SUB1/ a file kid and a directory kid 870 # tmp2 871 # SUB11/ no kids 872 # SUB2/ a file kid and a dirsymlink kid 873 # tmp3 874 # SUB21/ not readable 875 # tmp5 876 # link/ a symlink to TESTFN.2 877 # broken_link 878 # broken_link2 879 # broken_link3 880 # TEST2/ 881 # tmp4 a lone file 882 self.walk_path = join(support.TESTFN, "TEST1") 883 self.sub1_path = join(self.walk_path, "SUB1") 884 self.sub11_path = join(self.sub1_path, "SUB11") 885 sub2_path = join(self.walk_path, "SUB2") 886 sub21_path = join(sub2_path, "SUB21") 887 tmp1_path = join(self.walk_path, "tmp1") 888 tmp2_path = join(self.sub1_path, "tmp2") 889 tmp3_path = join(sub2_path, "tmp3") 890 tmp5_path = join(sub21_path, "tmp3") 891 self.link_path = join(sub2_path, "link") 892 t2_path = join(support.TESTFN, "TEST2") 893 tmp4_path = join(support.TESTFN, "TEST2", "tmp4") 894 broken_link_path = join(sub2_path, "broken_link") 895 broken_link2_path = join(sub2_path, "broken_link2") 896 broken_link3_path = join(sub2_path, "broken_link3") 897 898 # Create stuff. 899 os.makedirs(self.sub11_path) 900 os.makedirs(sub2_path) 901 os.makedirs(sub21_path) 902 os.makedirs(t2_path) 903 904 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 905 with open(path, "x") as f: 906 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 907 908 if support.can_symlink(): 909 os.symlink(os.path.abspath(t2_path), self.link_path) 910 os.symlink('broken', broken_link_path, True) 911 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 912 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 913 self.sub2_tree = (sub2_path, ["SUB21", "link"], 914 ["broken_link", "broken_link2", "broken_link3", 915 "tmp3"]) 916 else: 917 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 918 919 os.chmod(sub21_path, 0) 920 try: 921 os.listdir(sub21_path) 922 except PermissionError: 923 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 924 else: 925 os.chmod(sub21_path, stat.S_IRWXU) 926 os.unlink(tmp5_path) 927 os.rmdir(sub21_path) 928 del self.sub2_tree[1][:1] 929 930 def test_walk_topdown(self): 931 # Walk top-down. 932 all = list(self.walk(self.walk_path)) 933 934 self.assertEqual(len(all), 4) 935 # We can't know which order SUB1 and SUB2 will appear in. 936 # Not flipped: TESTFN, SUB1, SUB11, SUB2 937 # flipped: TESTFN, SUB2, SUB1, SUB11 938 flipped = all[0][1][0] != "SUB1" 939 all[0][1].sort() 940 all[3 - 2 * flipped][-1].sort() 941 all[3 - 2 * flipped][1].sort() 942 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 943 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 944 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 945 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 946 947 def test_walk_prune(self, walk_path=None): 948 if walk_path is None: 949 walk_path = self.walk_path 950 # Prune the search. 951 all = [] 952 for root, dirs, files in self.walk(walk_path): 953 all.append((root, dirs, files)) 954 # Don't descend into SUB1. 955 if 'SUB1' in dirs: 956 # Note that this also mutates the dirs we appended to all! 957 dirs.remove('SUB1') 958 959 self.assertEqual(len(all), 2) 960 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 961 962 all[1][-1].sort() 963 all[1][1].sort() 964 self.assertEqual(all[1], self.sub2_tree) 965 966 def test_file_like_path(self): 967 self.test_walk_prune(FakePath(self.walk_path)) 968 969 def test_walk_bottom_up(self): 970 # Walk bottom-up. 971 all = list(self.walk(self.walk_path, topdown=False)) 972 973 self.assertEqual(len(all), 4, all) 974 # We can't know which order SUB1 and SUB2 will appear in. 975 # Not flipped: SUB11, SUB1, SUB2, TESTFN 976 # flipped: SUB2, SUB11, SUB1, TESTFN 977 flipped = all[3][1][0] != "SUB1" 978 all[3][1].sort() 979 all[2 - 2 * flipped][-1].sort() 980 all[2 - 2 * flipped][1].sort() 981 self.assertEqual(all[3], 982 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 983 self.assertEqual(all[flipped], 984 (self.sub11_path, [], [])) 985 self.assertEqual(all[flipped + 1], 986 (self.sub1_path, ["SUB11"], ["tmp2"])) 987 self.assertEqual(all[2 - 2 * flipped], 988 self.sub2_tree) 989 990 def test_walk_symlink(self): 991 if not support.can_symlink(): 992 self.skipTest("need symlink support") 993 994 # Walk, following symlinks. 995 walk_it = self.walk(self.walk_path, follow_symlinks=True) 996 for root, dirs, files in walk_it: 997 if root == self.link_path: 998 self.assertEqual(dirs, []) 999 self.assertEqual(files, ["tmp4"]) 1000 break 1001 else: 1002 self.fail("Didn't follow symlink with followlinks=True") 1003 1004 def test_walk_bad_dir(self): 1005 # Walk top-down. 1006 errors = [] 1007 walk_it = self.walk(self.walk_path, onerror=errors.append) 1008 root, dirs, files = next(walk_it) 1009 self.assertEqual(errors, []) 1010 dir1 = 'SUB1' 1011 path1 = os.path.join(root, dir1) 1012 path1new = os.path.join(root, dir1 + '.new') 1013 os.rename(path1, path1new) 1014 try: 1015 roots = [r for r, d, f in walk_it] 1016 self.assertTrue(errors) 1017 self.assertNotIn(path1, roots) 1018 self.assertNotIn(path1new, roots) 1019 for dir2 in dirs: 1020 if dir2 != dir1: 1021 self.assertIn(os.path.join(root, dir2), roots) 1022 finally: 1023 os.rename(path1new, path1) 1024 1025 1026@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1027class FwalkTests(WalkTests): 1028 """Tests for os.fwalk().""" 1029 1030 def walk(self, top, **kwargs): 1031 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1032 yield (root, dirs, files) 1033 1034 def fwalk(self, *args, **kwargs): 1035 return os.fwalk(*args, **kwargs) 1036 1037 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1038 """ 1039 compare with walk() results. 1040 """ 1041 walk_kwargs = walk_kwargs.copy() 1042 fwalk_kwargs = fwalk_kwargs.copy() 1043 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1044 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1045 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1046 1047 expected = {} 1048 for root, dirs, files in os.walk(**walk_kwargs): 1049 expected[root] = (set(dirs), set(files)) 1050 1051 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1052 self.assertIn(root, expected) 1053 self.assertEqual(expected[root], (set(dirs), set(files))) 1054 1055 def test_compare_to_walk(self): 1056 kwargs = {'top': support.TESTFN} 1057 self._compare_to_walk(kwargs, kwargs) 1058 1059 def test_dir_fd(self): 1060 try: 1061 fd = os.open(".", os.O_RDONLY) 1062 walk_kwargs = {'top': support.TESTFN} 1063 fwalk_kwargs = walk_kwargs.copy() 1064 fwalk_kwargs['dir_fd'] = fd 1065 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1066 finally: 1067 os.close(fd) 1068 1069 def test_yields_correct_dir_fd(self): 1070 # check returned file descriptors 1071 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1072 args = support.TESTFN, topdown, None 1073 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1074 # check that the FD is valid 1075 os.fstat(rootfd) 1076 # redundant check 1077 os.stat(rootfd) 1078 # check that listdir() returns consistent information 1079 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1080 1081 def test_fd_leak(self): 1082 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1083 # we both check that calling fwalk() a large number of times doesn't 1084 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1085 minfd = os.dup(1) 1086 os.close(minfd) 1087 for i in range(256): 1088 for x in self.fwalk(support.TESTFN): 1089 pass 1090 newfd = os.dup(1) 1091 self.addCleanup(os.close, newfd) 1092 self.assertEqual(newfd, minfd) 1093 1094class BytesWalkTests(WalkTests): 1095 """Tests for os.walk() with bytes.""" 1096 def walk(self, top, **kwargs): 1097 if 'follow_symlinks' in kwargs: 1098 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1099 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1100 root = os.fsdecode(broot) 1101 dirs = list(map(os.fsdecode, bdirs)) 1102 files = list(map(os.fsdecode, bfiles)) 1103 yield (root, dirs, files) 1104 bdirs[:] = list(map(os.fsencode, dirs)) 1105 bfiles[:] = list(map(os.fsencode, files)) 1106 1107@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1108class BytesFwalkTests(FwalkTests): 1109 """Tests for os.walk() with bytes.""" 1110 def fwalk(self, top='.', *args, **kwargs): 1111 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1112 root = os.fsdecode(broot) 1113 dirs = list(map(os.fsdecode, bdirs)) 1114 files = list(map(os.fsdecode, bfiles)) 1115 yield (root, dirs, files, topfd) 1116 bdirs[:] = list(map(os.fsencode, dirs)) 1117 bfiles[:] = list(map(os.fsencode, files)) 1118 1119 1120class MakedirTests(unittest.TestCase): 1121 def setUp(self): 1122 os.mkdir(support.TESTFN) 1123 1124 def test_makedir(self): 1125 base = support.TESTFN 1126 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1127 os.makedirs(path) # Should work 1128 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1129 os.makedirs(path) 1130 1131 # Try paths with a '.' in them 1132 self.assertRaises(OSError, os.makedirs, os.curdir) 1133 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1134 os.makedirs(path) 1135 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1136 'dir5', 'dir6') 1137 os.makedirs(path) 1138 1139 def test_mode(self): 1140 with support.temp_umask(0o002): 1141 base = support.TESTFN 1142 parent = os.path.join(base, 'dir1') 1143 path = os.path.join(parent, 'dir2') 1144 os.makedirs(path, 0o555) 1145 self.assertTrue(os.path.exists(path)) 1146 self.assertTrue(os.path.isdir(path)) 1147 if os.name != 'nt': 1148 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1149 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1150 1151 def test_exist_ok_existing_directory(self): 1152 path = os.path.join(support.TESTFN, 'dir1') 1153 mode = 0o777 1154 old_mask = os.umask(0o022) 1155 os.makedirs(path, mode) 1156 self.assertRaises(OSError, os.makedirs, path, mode) 1157 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1158 os.makedirs(path, 0o776, exist_ok=True) 1159 os.makedirs(path, mode=mode, exist_ok=True) 1160 os.umask(old_mask) 1161 1162 # Issue #25583: A drive root could raise PermissionError on Windows 1163 os.makedirs(os.path.abspath('/'), exist_ok=True) 1164 1165 def test_exist_ok_s_isgid_directory(self): 1166 path = os.path.join(support.TESTFN, 'dir1') 1167 S_ISGID = stat.S_ISGID 1168 mode = 0o777 1169 old_mask = os.umask(0o022) 1170 try: 1171 existing_testfn_mode = stat.S_IMODE( 1172 os.lstat(support.TESTFN).st_mode) 1173 try: 1174 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID) 1175 except PermissionError: 1176 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1177 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID): 1178 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1179 # The os should apply S_ISGID from the parent dir for us, but 1180 # this test need not depend on that behavior. Be explicit. 1181 os.makedirs(path, mode | S_ISGID) 1182 # http://bugs.python.org/issue14992 1183 # Should not fail when the bit is already set. 1184 os.makedirs(path, mode, exist_ok=True) 1185 # remove the bit. 1186 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1187 # May work even when the bit is not already set when demanded. 1188 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1189 finally: 1190 os.umask(old_mask) 1191 1192 def test_exist_ok_existing_regular_file(self): 1193 base = support.TESTFN 1194 path = os.path.join(support.TESTFN, 'dir1') 1195 f = open(path, 'w') 1196 f.write('abc') 1197 f.close() 1198 self.assertRaises(OSError, os.makedirs, path) 1199 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1200 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1201 os.remove(path) 1202 1203 def tearDown(self): 1204 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', 1205 'dir4', 'dir5', 'dir6') 1206 # If the tests failed, the bottom-most directory ('../dir6') 1207 # may not have been created, so we look for the outermost directory 1208 # that exists. 1209 while not os.path.exists(path) and path != support.TESTFN: 1210 path = os.path.dirname(path) 1211 1212 os.removedirs(path) 1213 1214 1215@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") 1216class ChownFileTests(unittest.TestCase): 1217 1218 @classmethod 1219 def setUpClass(cls): 1220 os.mkdir(support.TESTFN) 1221 1222 def test_chown_uid_gid_arguments_must_be_index(self): 1223 stat = os.stat(support.TESTFN) 1224 uid = stat.st_uid 1225 gid = stat.st_gid 1226 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1227 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) 1228 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) 1229 self.assertIsNone(os.chown(support.TESTFN, uid, gid)) 1230 self.assertIsNone(os.chown(support.TESTFN, -1, -1)) 1231 1232 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1233 def test_chown_gid(self): 1234 groups = os.getgroups() 1235 if len(groups) < 2: 1236 self.skipTest("test needs at least 2 groups") 1237 1238 gid_1, gid_2 = groups[:2] 1239 uid = os.stat(support.TESTFN).st_uid 1240 1241 os.chown(support.TESTFN, uid, gid_1) 1242 gid = os.stat(support.TESTFN).st_gid 1243 self.assertEqual(gid, gid_1) 1244 1245 os.chown(support.TESTFN, uid, gid_2) 1246 gid = os.stat(support.TESTFN).st_gid 1247 self.assertEqual(gid, gid_2) 1248 1249 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1250 "test needs root privilege and more than one user") 1251 def test_chown_with_root(self): 1252 uid_1, uid_2 = all_users[:2] 1253 gid = os.stat(support.TESTFN).st_gid 1254 os.chown(support.TESTFN, uid_1, gid) 1255 uid = os.stat(support.TESTFN).st_uid 1256 self.assertEqual(uid, uid_1) 1257 os.chown(support.TESTFN, uid_2, gid) 1258 uid = os.stat(support.TESTFN).st_uid 1259 self.assertEqual(uid, uid_2) 1260 1261 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1262 "test needs non-root account and more than one user") 1263 def test_chown_without_permission(self): 1264 uid_1, uid_2 = all_users[:2] 1265 gid = os.stat(support.TESTFN).st_gid 1266 with self.assertRaises(PermissionError): 1267 os.chown(support.TESTFN, uid_1, gid) 1268 os.chown(support.TESTFN, uid_2, gid) 1269 1270 @classmethod 1271 def tearDownClass(cls): 1272 os.rmdir(support.TESTFN) 1273 1274 1275class RemoveDirsTests(unittest.TestCase): 1276 def setUp(self): 1277 os.makedirs(support.TESTFN) 1278 1279 def tearDown(self): 1280 support.rmtree(support.TESTFN) 1281 1282 def test_remove_all(self): 1283 dira = os.path.join(support.TESTFN, 'dira') 1284 os.mkdir(dira) 1285 dirb = os.path.join(dira, 'dirb') 1286 os.mkdir(dirb) 1287 os.removedirs(dirb) 1288 self.assertFalse(os.path.exists(dirb)) 1289 self.assertFalse(os.path.exists(dira)) 1290 self.assertFalse(os.path.exists(support.TESTFN)) 1291 1292 def test_remove_partial(self): 1293 dira = os.path.join(support.TESTFN, 'dira') 1294 os.mkdir(dira) 1295 dirb = os.path.join(dira, 'dirb') 1296 os.mkdir(dirb) 1297 create_file(os.path.join(dira, 'file.txt')) 1298 os.removedirs(dirb) 1299 self.assertFalse(os.path.exists(dirb)) 1300 self.assertTrue(os.path.exists(dira)) 1301 self.assertTrue(os.path.exists(support.TESTFN)) 1302 1303 def test_remove_nothing(self): 1304 dira = os.path.join(support.TESTFN, 'dira') 1305 os.mkdir(dira) 1306 dirb = os.path.join(dira, 'dirb') 1307 os.mkdir(dirb) 1308 create_file(os.path.join(dirb, 'file.txt')) 1309 with self.assertRaises(OSError): 1310 os.removedirs(dirb) 1311 self.assertTrue(os.path.exists(dirb)) 1312 self.assertTrue(os.path.exists(dira)) 1313 self.assertTrue(os.path.exists(support.TESTFN)) 1314 1315 1316class DevNullTests(unittest.TestCase): 1317 def test_devnull(self): 1318 with open(os.devnull, 'wb', 0) as f: 1319 f.write(b'hello') 1320 f.close() 1321 with open(os.devnull, 'rb') as f: 1322 self.assertEqual(f.read(), b'') 1323 1324 1325class URandomTests(unittest.TestCase): 1326 def test_urandom_length(self): 1327 self.assertEqual(len(os.urandom(0)), 0) 1328 self.assertEqual(len(os.urandom(1)), 1) 1329 self.assertEqual(len(os.urandom(10)), 10) 1330 self.assertEqual(len(os.urandom(100)), 100) 1331 self.assertEqual(len(os.urandom(1000)), 1000) 1332 1333 def test_urandom_value(self): 1334 data1 = os.urandom(16) 1335 self.assertIsInstance(data1, bytes) 1336 data2 = os.urandom(16) 1337 self.assertNotEqual(data1, data2) 1338 1339 def get_urandom_subprocess(self, count): 1340 code = '\n'.join(( 1341 'import os, sys', 1342 'data = os.urandom(%s)' % count, 1343 'sys.stdout.buffer.write(data)', 1344 'sys.stdout.buffer.flush()')) 1345 out = assert_python_ok('-c', code) 1346 stdout = out[1] 1347 self.assertEqual(len(stdout), count) 1348 return stdout 1349 1350 def test_urandom_subprocess(self): 1351 data1 = self.get_urandom_subprocess(16) 1352 data2 = self.get_urandom_subprocess(16) 1353 self.assertNotEqual(data1, data2) 1354 1355 1356@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1357class GetRandomTests(unittest.TestCase): 1358 @classmethod 1359 def setUpClass(cls): 1360 try: 1361 os.getrandom(1) 1362 except OSError as exc: 1363 if exc.errno == errno.ENOSYS: 1364 # Python compiled on a more recent Linux version 1365 # than the current Linux kernel 1366 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1367 else: 1368 raise 1369 1370 def test_getrandom_type(self): 1371 data = os.getrandom(16) 1372 self.assertIsInstance(data, bytes) 1373 self.assertEqual(len(data), 16) 1374 1375 def test_getrandom0(self): 1376 empty = os.getrandom(0) 1377 self.assertEqual(empty, b'') 1378 1379 def test_getrandom_random(self): 1380 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1381 1382 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1383 # resource /dev/random 1384 1385 def test_getrandom_nonblock(self): 1386 # The call must not fail. Check also that the flag exists 1387 try: 1388 os.getrandom(1, os.GRND_NONBLOCK) 1389 except BlockingIOError: 1390 # System urandom is not initialized yet 1391 pass 1392 1393 def test_getrandom_value(self): 1394 data1 = os.getrandom(16) 1395 data2 = os.getrandom(16) 1396 self.assertNotEqual(data1, data2) 1397 1398 1399# os.urandom() doesn't use a file descriptor when it is implemented with the 1400# getentropy() function, the getrandom() function or the getrandom() syscall 1401OS_URANDOM_DONT_USE_FD = ( 1402 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1403 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1404 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1405 1406@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1407 "os.random() does not use a file descriptor") 1408class URandomFDTests(unittest.TestCase): 1409 @unittest.skipUnless(resource, "test requires the resource module") 1410 def test_urandom_failure(self): 1411 # Check urandom() failing when it is not able to open /dev/random. 1412 # We spawn a new process to make the test more robust (if getrlimit() 1413 # failed to restore the file descriptor limit after this, the whole 1414 # test suite would crash; this actually happened on the OS X Tiger 1415 # buildbot). 1416 code = """if 1: 1417 import errno 1418 import os 1419 import resource 1420 1421 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1422 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1423 try: 1424 os.urandom(16) 1425 except OSError as e: 1426 assert e.errno == errno.EMFILE, e.errno 1427 else: 1428 raise AssertionError("OSError not raised") 1429 """ 1430 assert_python_ok('-c', code) 1431 1432 def test_urandom_fd_closed(self): 1433 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1434 # closed. 1435 code = """if 1: 1436 import os 1437 import sys 1438 import test.support 1439 os.urandom(4) 1440 with test.support.SuppressCrashReport(): 1441 os.closerange(3, 256) 1442 sys.stdout.buffer.write(os.urandom(4)) 1443 """ 1444 rc, out, err = assert_python_ok('-Sc', code) 1445 1446 def test_urandom_fd_reopened(self): 1447 # Issue #21207: urandom() should detect its fd to /dev/urandom 1448 # changed to something else, and reopen it. 1449 self.addCleanup(support.unlink, support.TESTFN) 1450 create_file(support.TESTFN, b"x" * 256) 1451 1452 code = """if 1: 1453 import os 1454 import sys 1455 import test.support 1456 os.urandom(4) 1457 with test.support.SuppressCrashReport(): 1458 for fd in range(3, 256): 1459 try: 1460 os.close(fd) 1461 except OSError: 1462 pass 1463 else: 1464 # Found the urandom fd (XXX hopefully) 1465 break 1466 os.closerange(3, 256) 1467 with open({TESTFN!r}, 'rb') as f: 1468 new_fd = f.fileno() 1469 # Issue #26935: posix allows new_fd and fd to be equal but 1470 # some libc implementations have dup2 return an error in this 1471 # case. 1472 if new_fd != fd: 1473 os.dup2(new_fd, fd) 1474 sys.stdout.buffer.write(os.urandom(4)) 1475 sys.stdout.buffer.write(os.urandom(4)) 1476 """.format(TESTFN=support.TESTFN) 1477 rc, out, err = assert_python_ok('-Sc', code) 1478 self.assertEqual(len(out), 8) 1479 self.assertNotEqual(out[0:4], out[4:8]) 1480 rc, out2, err2 = assert_python_ok('-Sc', code) 1481 self.assertEqual(len(out2), 8) 1482 self.assertNotEqual(out2, out) 1483 1484 1485@contextlib.contextmanager 1486def _execvpe_mockup(defpath=None): 1487 """ 1488 Stubs out execv and execve functions when used as context manager. 1489 Records exec calls. The mock execv and execve functions always raise an 1490 exception as they would normally never return. 1491 """ 1492 # A list of tuples containing (function name, first arg, args) 1493 # of calls to execv or execve that have been made. 1494 calls = [] 1495 1496 def mock_execv(name, *args): 1497 calls.append(('execv', name, args)) 1498 raise RuntimeError("execv called") 1499 1500 def mock_execve(name, *args): 1501 calls.append(('execve', name, args)) 1502 raise OSError(errno.ENOTDIR, "execve called") 1503 1504 try: 1505 orig_execv = os.execv 1506 orig_execve = os.execve 1507 orig_defpath = os.defpath 1508 os.execv = mock_execv 1509 os.execve = mock_execve 1510 if defpath is not None: 1511 os.defpath = defpath 1512 yield calls 1513 finally: 1514 os.execv = orig_execv 1515 os.execve = orig_execve 1516 os.defpath = orig_defpath 1517 1518 1519class ExecTests(unittest.TestCase): 1520 @unittest.skipIf(USING_LINUXTHREADS, 1521 "avoid triggering a linuxthreads bug: see issue #4970") 1522 def test_execvpe_with_bad_program(self): 1523 self.assertRaises(OSError, os.execvpe, 'no such app-', 1524 ['no such app-'], None) 1525 1526 def test_execv_with_bad_arglist(self): 1527 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1528 self.assertRaises(ValueError, os.execv, 'notepad', []) 1529 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 1530 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 1531 1532 def test_execvpe_with_bad_arglist(self): 1533 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 1534 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 1535 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 1536 1537 @unittest.skipUnless(hasattr(os, '_execvpe'), 1538 "No internal os._execvpe function to test.") 1539 def _test_internal_execvpe(self, test_type): 1540 program_path = os.sep + 'absolutepath' 1541 if test_type is bytes: 1542 program = b'executable' 1543 fullpath = os.path.join(os.fsencode(program_path), program) 1544 native_fullpath = fullpath 1545 arguments = [b'progname', 'arg1', 'arg2'] 1546 else: 1547 program = 'executable' 1548 arguments = ['progname', 'arg1', 'arg2'] 1549 fullpath = os.path.join(program_path, program) 1550 if os.name != "nt": 1551 native_fullpath = os.fsencode(fullpath) 1552 else: 1553 native_fullpath = fullpath 1554 env = {'spam': 'beans'} 1555 1556 # test os._execvpe() with an absolute path 1557 with _execvpe_mockup() as calls: 1558 self.assertRaises(RuntimeError, 1559 os._execvpe, fullpath, arguments) 1560 self.assertEqual(len(calls), 1) 1561 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 1562 1563 # test os._execvpe() with a relative path: 1564 # os.get_exec_path() returns defpath 1565 with _execvpe_mockup(defpath=program_path) as calls: 1566 self.assertRaises(OSError, 1567 os._execvpe, program, arguments, env=env) 1568 self.assertEqual(len(calls), 1) 1569 self.assertSequenceEqual(calls[0], 1570 ('execve', native_fullpath, (arguments, env))) 1571 1572 # test os._execvpe() with a relative path: 1573 # os.get_exec_path() reads the 'PATH' variable 1574 with _execvpe_mockup() as calls: 1575 env_path = env.copy() 1576 if test_type is bytes: 1577 env_path[b'PATH'] = program_path 1578 else: 1579 env_path['PATH'] = program_path 1580 self.assertRaises(OSError, 1581 os._execvpe, program, arguments, env=env_path) 1582 self.assertEqual(len(calls), 1) 1583 self.assertSequenceEqual(calls[0], 1584 ('execve', native_fullpath, (arguments, env_path))) 1585 1586 def test_internal_execvpe_str(self): 1587 self._test_internal_execvpe(str) 1588 if os.name != "nt": 1589 self._test_internal_execvpe(bytes) 1590 1591 def test_execve_invalid_env(self): 1592 args = [sys.executable, '-c', 'pass'] 1593 1594 # null character in the environment variable name 1595 newenv = os.environ.copy() 1596 newenv["FRUIT\0VEGETABLE"] = "cabbage" 1597 with self.assertRaises(ValueError): 1598 os.execve(args[0], args, newenv) 1599 1600 # null character in the environment variable value 1601 newenv = os.environ.copy() 1602 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 1603 with self.assertRaises(ValueError): 1604 os.execve(args[0], args, newenv) 1605 1606 # equal character in the environment variable name 1607 newenv = os.environ.copy() 1608 newenv["FRUIT=ORANGE"] = "lemon" 1609 with self.assertRaises(ValueError): 1610 os.execve(args[0], args, newenv) 1611 1612 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 1613 def test_execve_with_empty_path(self): 1614 # bpo-32890: Check GetLastError() misuse 1615 try: 1616 os.execve('', ['arg'], {}) 1617 except OSError as e: 1618 self.assertTrue(e.winerror is None or e.winerror != 0) 1619 else: 1620 self.fail('No OSError raised') 1621 1622 1623@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1624class Win32ErrorTests(unittest.TestCase): 1625 def setUp(self): 1626 try: 1627 os.stat(support.TESTFN) 1628 except FileNotFoundError: 1629 exists = False 1630 except OSError as exc: 1631 exists = True 1632 self.fail("file %s must not exist; os.stat failed with %s" 1633 % (support.TESTFN, exc)) 1634 else: 1635 self.fail("file %s must not exist" % support.TESTFN) 1636 1637 def test_rename(self): 1638 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") 1639 1640 def test_remove(self): 1641 self.assertRaises(OSError, os.remove, support.TESTFN) 1642 1643 def test_chdir(self): 1644 self.assertRaises(OSError, os.chdir, support.TESTFN) 1645 1646 def test_mkdir(self): 1647 self.addCleanup(support.unlink, support.TESTFN) 1648 1649 with open(support.TESTFN, "x") as f: 1650 self.assertRaises(OSError, os.mkdir, support.TESTFN) 1651 1652 def test_utime(self): 1653 self.assertRaises(OSError, os.utime, support.TESTFN, None) 1654 1655 def test_chmod(self): 1656 self.assertRaises(OSError, os.chmod, support.TESTFN, 0) 1657 1658 1659class TestInvalidFD(unittest.TestCase): 1660 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1661 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1662 #singles.append("close") 1663 #We omit close because it doesn't raise an exception on some platforms 1664 def get_single(f): 1665 def helper(self): 1666 if hasattr(os, f): 1667 self.check(getattr(os, f)) 1668 return helper 1669 for f in singles: 1670 locals()["test_"+f] = get_single(f) 1671 1672 def check(self, f, *args): 1673 try: 1674 f(support.make_bad_fd(), *args) 1675 except OSError as e: 1676 self.assertEqual(e.errno, errno.EBADF) 1677 else: 1678 self.fail("%r didn't raise an OSError with a bad file descriptor" 1679 % f) 1680 1681 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 1682 def test_isatty(self): 1683 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1684 1685 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 1686 def test_closerange(self): 1687 fd = support.make_bad_fd() 1688 # Make sure none of the descriptors we are about to close are 1689 # currently valid (issue 6542). 1690 for i in range(10): 1691 try: os.fstat(fd+i) 1692 except OSError: 1693 pass 1694 else: 1695 break 1696 if i < 2: 1697 raise unittest.SkipTest( 1698 "Unable to acquire a range of invalid file descriptors") 1699 self.assertEqual(os.closerange(fd, fd + i-1), None) 1700 1701 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 1702 def test_dup2(self): 1703 self.check(os.dup2, 20) 1704 1705 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 1706 def test_fchmod(self): 1707 self.check(os.fchmod, 0) 1708 1709 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 1710 def test_fchown(self): 1711 self.check(os.fchown, -1, -1) 1712 1713 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 1714 def test_fpathconf(self): 1715 self.check(os.pathconf, "PC_NAME_MAX") 1716 self.check(os.fpathconf, "PC_NAME_MAX") 1717 1718 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 1719 def test_ftruncate(self): 1720 self.check(os.truncate, 0) 1721 self.check(os.ftruncate, 0) 1722 1723 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 1724 def test_lseek(self): 1725 self.check(os.lseek, 0, 0) 1726 1727 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 1728 def test_read(self): 1729 self.check(os.read, 1) 1730 1731 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 1732 def test_readv(self): 1733 buf = bytearray(10) 1734 self.check(os.readv, [buf]) 1735 1736 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 1737 def test_tcsetpgrpt(self): 1738 self.check(os.tcsetpgrp, 0) 1739 1740 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 1741 def test_write(self): 1742 self.check(os.write, b" ") 1743 1744 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 1745 def test_writev(self): 1746 self.check(os.writev, [b'abc']) 1747 1748 def test_inheritable(self): 1749 self.check(os.get_inheritable) 1750 self.check(os.set_inheritable, True) 1751 1752 @unittest.skipUnless(hasattr(os, 'get_blocking'), 1753 'needs os.get_blocking() and os.set_blocking()') 1754 def test_blocking(self): 1755 self.check(os.get_blocking) 1756 self.check(os.set_blocking, True) 1757 1758 1759class LinkTests(unittest.TestCase): 1760 def setUp(self): 1761 self.file1 = support.TESTFN 1762 self.file2 = os.path.join(support.TESTFN + "2") 1763 1764 def tearDown(self): 1765 for file in (self.file1, self.file2): 1766 if os.path.exists(file): 1767 os.unlink(file) 1768 1769 def _test_link(self, file1, file2): 1770 create_file(file1) 1771 1772 try: 1773 os.link(file1, file2) 1774 except PermissionError as e: 1775 self.skipTest('os.link(): %s' % e) 1776 with open(file1, "r") as f1, open(file2, "r") as f2: 1777 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 1778 1779 def test_link(self): 1780 self._test_link(self.file1, self.file2) 1781 1782 def test_link_bytes(self): 1783 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 1784 bytes(self.file2, sys.getfilesystemencoding())) 1785 1786 def test_unicode_name(self): 1787 try: 1788 os.fsencode("\xf1") 1789 except UnicodeError: 1790 raise unittest.SkipTest("Unable to encode for this platform.") 1791 1792 self.file1 += "\xf1" 1793 self.file2 = self.file1 + "2" 1794 self._test_link(self.file1, self.file2) 1795 1796@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 1797class PosixUidGidTests(unittest.TestCase): 1798 # uid_t and gid_t are 32-bit unsigned integers on Linux 1799 UID_OVERFLOW = (1 << 32) 1800 GID_OVERFLOW = (1 << 32) 1801 1802 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 1803 def test_setuid(self): 1804 if os.getuid() != 0: 1805 self.assertRaises(OSError, os.setuid, 0) 1806 self.assertRaises(TypeError, os.setuid, 'not an int') 1807 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 1808 1809 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 1810 def test_setgid(self): 1811 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1812 self.assertRaises(OSError, os.setgid, 0) 1813 self.assertRaises(TypeError, os.setgid, 'not an int') 1814 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 1815 1816 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 1817 def test_seteuid(self): 1818 if os.getuid() != 0: 1819 self.assertRaises(OSError, os.seteuid, 0) 1820 self.assertRaises(TypeError, os.setegid, 'not an int') 1821 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 1822 1823 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 1824 def test_setegid(self): 1825 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1826 self.assertRaises(OSError, os.setegid, 0) 1827 self.assertRaises(TypeError, os.setegid, 'not an int') 1828 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 1829 1830 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 1831 def test_setreuid(self): 1832 if os.getuid() != 0: 1833 self.assertRaises(OSError, os.setreuid, 0, 0) 1834 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 1835 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 1836 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 1837 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 1838 1839 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 1840 def test_setreuid_neg1(self): 1841 # Needs to accept -1. We run this in a subprocess to avoid 1842 # altering the test runner's process state (issue8045). 1843 subprocess.check_call([ 1844 sys.executable, '-c', 1845 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 1846 1847 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 1848 def test_setregid(self): 1849 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1850 self.assertRaises(OSError, os.setregid, 0, 0) 1851 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 1852 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 1853 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 1854 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 1855 1856 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 1857 def test_setregid_neg1(self): 1858 # Needs to accept -1. We run this in a subprocess to avoid 1859 # altering the test runner's process state (issue8045). 1860 subprocess.check_call([ 1861 sys.executable, '-c', 1862 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 1863 1864@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 1865class Pep383Tests(unittest.TestCase): 1866 def setUp(self): 1867 if support.TESTFN_UNENCODABLE: 1868 self.dir = support.TESTFN_UNENCODABLE 1869 elif support.TESTFN_NONASCII: 1870 self.dir = support.TESTFN_NONASCII 1871 else: 1872 self.dir = support.TESTFN 1873 self.bdir = os.fsencode(self.dir) 1874 1875 bytesfn = [] 1876 def add_filename(fn): 1877 try: 1878 fn = os.fsencode(fn) 1879 except UnicodeEncodeError: 1880 return 1881 bytesfn.append(fn) 1882 add_filename(support.TESTFN_UNICODE) 1883 if support.TESTFN_UNENCODABLE: 1884 add_filename(support.TESTFN_UNENCODABLE) 1885 if support.TESTFN_NONASCII: 1886 add_filename(support.TESTFN_NONASCII) 1887 if not bytesfn: 1888 self.skipTest("couldn't create any non-ascii filename") 1889 1890 self.unicodefn = set() 1891 os.mkdir(self.dir) 1892 try: 1893 for fn in bytesfn: 1894 support.create_empty_file(os.path.join(self.bdir, fn)) 1895 fn = os.fsdecode(fn) 1896 if fn in self.unicodefn: 1897 raise ValueError("duplicate filename") 1898 self.unicodefn.add(fn) 1899 except: 1900 shutil.rmtree(self.dir) 1901 raise 1902 1903 def tearDown(self): 1904 shutil.rmtree(self.dir) 1905 1906 def test_listdir(self): 1907 expected = self.unicodefn 1908 found = set(os.listdir(self.dir)) 1909 self.assertEqual(found, expected) 1910 # test listdir without arguments 1911 current_directory = os.getcwd() 1912 try: 1913 os.chdir(os.sep) 1914 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 1915 finally: 1916 os.chdir(current_directory) 1917 1918 def test_open(self): 1919 for fn in self.unicodefn: 1920 f = open(os.path.join(self.dir, fn), 'rb') 1921 f.close() 1922 1923 @unittest.skipUnless(hasattr(os, 'statvfs'), 1924 "need os.statvfs()") 1925 def test_statvfs(self): 1926 # issue #9645 1927 for fn in self.unicodefn: 1928 # should not fail with file not found error 1929 fullname = os.path.join(self.dir, fn) 1930 os.statvfs(fullname) 1931 1932 def test_stat(self): 1933 for fn in self.unicodefn: 1934 os.stat(os.path.join(self.dir, fn)) 1935 1936@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1937class Win32KillTests(unittest.TestCase): 1938 def _kill(self, sig): 1939 # Start sys.executable as a subprocess and communicate from the 1940 # subprocess to the parent that the interpreter is ready. When it 1941 # becomes ready, send *sig* via os.kill to the subprocess and check 1942 # that the return code is equal to *sig*. 1943 import ctypes 1944 from ctypes import wintypes 1945 import msvcrt 1946 1947 # Since we can't access the contents of the process' stdout until the 1948 # process has exited, use PeekNamedPipe to see what's inside stdout 1949 # without waiting. This is done so we can tell that the interpreter 1950 # is started and running at a point where it could handle a signal. 1951 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 1952 PeekNamedPipe.restype = wintypes.BOOL 1953 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 1954 ctypes.POINTER(ctypes.c_char), # stdout buf 1955 wintypes.DWORD, # Buffer size 1956 ctypes.POINTER(wintypes.DWORD), # bytes read 1957 ctypes.POINTER(wintypes.DWORD), # bytes avail 1958 ctypes.POINTER(wintypes.DWORD)) # bytes left 1959 msg = "running" 1960 proc = subprocess.Popen([sys.executable, "-c", 1961 "import sys;" 1962 "sys.stdout.write('{}');" 1963 "sys.stdout.flush();" 1964 "input()".format(msg)], 1965 stdout=subprocess.PIPE, 1966 stderr=subprocess.PIPE, 1967 stdin=subprocess.PIPE) 1968 self.addCleanup(proc.stdout.close) 1969 self.addCleanup(proc.stderr.close) 1970 self.addCleanup(proc.stdin.close) 1971 1972 count, max = 0, 100 1973 while count < max and proc.poll() is None: 1974 # Create a string buffer to store the result of stdout from the pipe 1975 buf = ctypes.create_string_buffer(len(msg)) 1976 # Obtain the text currently in proc.stdout 1977 # Bytes read/avail/left are left as NULL and unused 1978 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 1979 buf, ctypes.sizeof(buf), None, None, None) 1980 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 1981 if buf.value: 1982 self.assertEqual(msg, buf.value.decode()) 1983 break 1984 time.sleep(0.1) 1985 count += 1 1986 else: 1987 self.fail("Did not receive communication from the subprocess") 1988 1989 os.kill(proc.pid, sig) 1990 self.assertEqual(proc.wait(), sig) 1991 1992 def test_kill_sigterm(self): 1993 # SIGTERM doesn't mean anything special, but make sure it works 1994 self._kill(signal.SIGTERM) 1995 1996 def test_kill_int(self): 1997 # os.kill on Windows can take an int which gets set as the exit code 1998 self._kill(100) 1999 2000 def _kill_with_event(self, event, name): 2001 tagname = "test_os_%s" % uuid.uuid1() 2002 m = mmap.mmap(-1, 1, tagname) 2003 m[0] = 0 2004 # Run a script which has console control handling enabled. 2005 proc = subprocess.Popen([sys.executable, 2006 os.path.join(os.path.dirname(__file__), 2007 "win_console_handler.py"), tagname], 2008 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2009 # Let the interpreter startup before we send signals. See #3137. 2010 count, max = 0, 100 2011 while count < max and proc.poll() is None: 2012 if m[0] == 1: 2013 break 2014 time.sleep(0.1) 2015 count += 1 2016 else: 2017 # Forcefully kill the process if we weren't able to signal it. 2018 os.kill(proc.pid, signal.SIGINT) 2019 self.fail("Subprocess didn't finish initialization") 2020 os.kill(proc.pid, event) 2021 # proc.send_signal(event) could also be done here. 2022 # Allow time for the signal to be passed and the process to exit. 2023 time.sleep(0.5) 2024 if not proc.poll(): 2025 # Forcefully kill the process if we weren't able to signal it. 2026 os.kill(proc.pid, signal.SIGINT) 2027 self.fail("subprocess did not stop on {}".format(name)) 2028 2029 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2030 def test_CTRL_C_EVENT(self): 2031 from ctypes import wintypes 2032 import ctypes 2033 2034 # Make a NULL value by creating a pointer with no argument. 2035 NULL = ctypes.POINTER(ctypes.c_int)() 2036 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2037 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2038 wintypes.BOOL) 2039 SetConsoleCtrlHandler.restype = wintypes.BOOL 2040 2041 # Calling this with NULL and FALSE causes the calling process to 2042 # handle Ctrl+C, rather than ignore it. This property is inherited 2043 # by subprocesses. 2044 SetConsoleCtrlHandler(NULL, 0) 2045 2046 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2047 2048 def test_CTRL_BREAK_EVENT(self): 2049 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2050 2051 2052@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2053class Win32ListdirTests(unittest.TestCase): 2054 """Test listdir on Windows.""" 2055 2056 def setUp(self): 2057 self.created_paths = [] 2058 for i in range(2): 2059 dir_name = 'SUB%d' % i 2060 dir_path = os.path.join(support.TESTFN, dir_name) 2061 file_name = 'FILE%d' % i 2062 file_path = os.path.join(support.TESTFN, file_name) 2063 os.makedirs(dir_path) 2064 with open(file_path, 'w') as f: 2065 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2066 self.created_paths.extend([dir_name, file_name]) 2067 self.created_paths.sort() 2068 2069 def tearDown(self): 2070 shutil.rmtree(support.TESTFN) 2071 2072 def test_listdir_no_extended_path(self): 2073 """Test when the path is not an "extended" path.""" 2074 # unicode 2075 self.assertEqual( 2076 sorted(os.listdir(support.TESTFN)), 2077 self.created_paths) 2078 2079 # bytes 2080 self.assertEqual( 2081 sorted(os.listdir(os.fsencode(support.TESTFN))), 2082 [os.fsencode(path) for path in self.created_paths]) 2083 2084 def test_listdir_extended_path(self): 2085 """Test when the path starts with '\\\\?\\'.""" 2086 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2087 # unicode 2088 path = '\\\\?\\' + os.path.abspath(support.TESTFN) 2089 self.assertEqual( 2090 sorted(os.listdir(path)), 2091 self.created_paths) 2092 2093 # bytes 2094 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) 2095 self.assertEqual( 2096 sorted(os.listdir(path)), 2097 [os.fsencode(path) for path in self.created_paths]) 2098 2099 2100@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2101@support.skip_unless_symlink 2102class Win32SymlinkTests(unittest.TestCase): 2103 filelink = 'filelinktest' 2104 filelink_target = os.path.abspath(__file__) 2105 dirlink = 'dirlinktest' 2106 dirlink_target = os.path.dirname(filelink_target) 2107 missing_link = 'missing link' 2108 2109 def setUp(self): 2110 assert os.path.exists(self.dirlink_target) 2111 assert os.path.exists(self.filelink_target) 2112 assert not os.path.exists(self.dirlink) 2113 assert not os.path.exists(self.filelink) 2114 assert not os.path.exists(self.missing_link) 2115 2116 def tearDown(self): 2117 if os.path.exists(self.filelink): 2118 os.remove(self.filelink) 2119 if os.path.exists(self.dirlink): 2120 os.rmdir(self.dirlink) 2121 if os.path.lexists(self.missing_link): 2122 os.remove(self.missing_link) 2123 2124 def test_directory_link(self): 2125 os.symlink(self.dirlink_target, self.dirlink) 2126 self.assertTrue(os.path.exists(self.dirlink)) 2127 self.assertTrue(os.path.isdir(self.dirlink)) 2128 self.assertTrue(os.path.islink(self.dirlink)) 2129 self.check_stat(self.dirlink, self.dirlink_target) 2130 2131 def test_file_link(self): 2132 os.symlink(self.filelink_target, self.filelink) 2133 self.assertTrue(os.path.exists(self.filelink)) 2134 self.assertTrue(os.path.isfile(self.filelink)) 2135 self.assertTrue(os.path.islink(self.filelink)) 2136 self.check_stat(self.filelink, self.filelink_target) 2137 2138 def _create_missing_dir_link(self): 2139 'Create a "directory" link to a non-existent target' 2140 linkname = self.missing_link 2141 if os.path.lexists(linkname): 2142 os.remove(linkname) 2143 target = r'c:\\target does not exist.29r3c740' 2144 assert not os.path.exists(target) 2145 target_is_dir = True 2146 os.symlink(target, linkname, target_is_dir) 2147 2148 def test_remove_directory_link_to_missing_target(self): 2149 self._create_missing_dir_link() 2150 # For compatibility with Unix, os.remove will check the 2151 # directory status and call RemoveDirectory if the symlink 2152 # was created with target_is_dir==True. 2153 os.remove(self.missing_link) 2154 2155 @unittest.skip("currently fails; consider for improvement") 2156 def test_isdir_on_directory_link_to_missing_target(self): 2157 self._create_missing_dir_link() 2158 # consider having isdir return true for directory links 2159 self.assertTrue(os.path.isdir(self.missing_link)) 2160 2161 @unittest.skip("currently fails; consider for improvement") 2162 def test_rmdir_on_directory_link_to_missing_target(self): 2163 self._create_missing_dir_link() 2164 # consider allowing rmdir to remove directory links 2165 os.rmdir(self.missing_link) 2166 2167 def check_stat(self, link, target): 2168 self.assertEqual(os.stat(link), os.stat(target)) 2169 self.assertNotEqual(os.lstat(link), os.stat(link)) 2170 2171 bytes_link = os.fsencode(link) 2172 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2173 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2174 2175 def test_12084(self): 2176 level1 = os.path.abspath(support.TESTFN) 2177 level2 = os.path.join(level1, "level2") 2178 level3 = os.path.join(level2, "level3") 2179 self.addCleanup(support.rmtree, level1) 2180 2181 os.mkdir(level1) 2182 os.mkdir(level2) 2183 os.mkdir(level3) 2184 2185 file1 = os.path.abspath(os.path.join(level1, "file1")) 2186 create_file(file1) 2187 2188 orig_dir = os.getcwd() 2189 try: 2190 os.chdir(level2) 2191 link = os.path.join(level2, "link") 2192 os.symlink(os.path.relpath(file1), "link") 2193 self.assertIn("link", os.listdir(os.getcwd())) 2194 2195 # Check os.stat calls from the same dir as the link 2196 self.assertEqual(os.stat(file1), os.stat("link")) 2197 2198 # Check os.stat calls from a dir below the link 2199 os.chdir(level1) 2200 self.assertEqual(os.stat(file1), 2201 os.stat(os.path.relpath(link))) 2202 2203 # Check os.stat calls from a dir above the link 2204 os.chdir(level3) 2205 self.assertEqual(os.stat(file1), 2206 os.stat(os.path.relpath(link))) 2207 finally: 2208 os.chdir(orig_dir) 2209 2210 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2211 and os.path.exists(r'C:\ProgramData'), 2212 'Test directories not found') 2213 def test_29248(self): 2214 # os.symlink() calls CreateSymbolicLink, which creates 2215 # the reparse data buffer with the print name stored 2216 # first, so the offset is always 0. CreateSymbolicLink 2217 # stores the "PrintName" DOS path (e.g. "C:\") first, 2218 # with an offset of 0, followed by the "SubstituteName" 2219 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2220 # the other hand, seems to have been created manually 2221 # with an inverted order. 2222 target = os.readlink(r'C:\Users\All Users') 2223 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2224 2225 def test_buffer_overflow(self): 2226 # Older versions would have a buffer overflow when detecting 2227 # whether a link source was a directory. This test ensures we 2228 # no longer crash, but does not otherwise validate the behavior 2229 segment = 'X' * 27 2230 path = os.path.join(*[segment] * 10) 2231 test_cases = [ 2232 # overflow with absolute src 2233 ('\\' + path, segment), 2234 # overflow dest with relative src 2235 (segment, path), 2236 # overflow when joining src 2237 (path[:180], path[:180]), 2238 ] 2239 for src, dest in test_cases: 2240 try: 2241 os.symlink(src, dest) 2242 except FileNotFoundError: 2243 pass 2244 else: 2245 try: 2246 os.remove(dest) 2247 except OSError: 2248 pass 2249 # Also test with bytes, since that is a separate code path. 2250 try: 2251 os.symlink(os.fsencode(src), os.fsencode(dest)) 2252 except FileNotFoundError: 2253 pass 2254 else: 2255 try: 2256 os.remove(dest) 2257 except OSError: 2258 pass 2259 2260@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2261class Win32JunctionTests(unittest.TestCase): 2262 junction = 'junctiontest' 2263 junction_target = os.path.dirname(os.path.abspath(__file__)) 2264 2265 def setUp(self): 2266 assert os.path.exists(self.junction_target) 2267 assert not os.path.exists(self.junction) 2268 2269 def tearDown(self): 2270 if os.path.exists(self.junction): 2271 # os.rmdir delegates to Windows' RemoveDirectoryW, 2272 # which removes junction points safely. 2273 os.rmdir(self.junction) 2274 2275 def test_create_junction(self): 2276 _winapi.CreateJunction(self.junction_target, self.junction) 2277 self.assertTrue(os.path.exists(self.junction)) 2278 self.assertTrue(os.path.isdir(self.junction)) 2279 2280 # Junctions are not recognized as links. 2281 self.assertFalse(os.path.islink(self.junction)) 2282 2283 def test_unlink_removes_junction(self): 2284 _winapi.CreateJunction(self.junction_target, self.junction) 2285 self.assertTrue(os.path.exists(self.junction)) 2286 2287 os.unlink(self.junction) 2288 self.assertFalse(os.path.exists(self.junction)) 2289 2290@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2291class Win32NtTests(unittest.TestCase): 2292 def setUp(self): 2293 from test import support 2294 self.nt = support.import_module('nt') 2295 pass 2296 2297 def tearDown(self): 2298 pass 2299 2300 def test_getfinalpathname_handles(self): 2301 try: 2302 import ctypes, ctypes.wintypes 2303 except ImportError: 2304 raise unittest.SkipTest('ctypes module is required for this test') 2305 2306 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2307 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2308 2309 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2310 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2311 ctypes.wintypes.LPDWORD) 2312 2313 # This is a pseudo-handle that doesn't need to be closed 2314 hproc = kernel.GetCurrentProcess() 2315 2316 handle_count = ctypes.wintypes.DWORD() 2317 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2318 self.assertEqual(1, ok) 2319 2320 before_count = handle_count.value 2321 2322 # The first two test the error path, __file__ tests the success path 2323 filenames = [ r'\\?\C:', 2324 r'\\?\NUL', 2325 r'\\?\CONIN', 2326 __file__ ] 2327 2328 for i in range(10): 2329 for name in filenames: 2330 try: 2331 tmp = self.nt._getfinalpathname(name) 2332 except: 2333 # Failure is expected 2334 pass 2335 try: 2336 tmp = os.stat(name) 2337 except: 2338 pass 2339 2340 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2341 self.assertEqual(1, ok) 2342 2343 handle_delta = handle_count.value - before_count 2344 2345 self.assertEqual(0, handle_delta) 2346 2347@support.skip_unless_symlink 2348class NonLocalSymlinkTests(unittest.TestCase): 2349 2350 def setUp(self): 2351 r""" 2352 Create this structure: 2353 2354 base 2355 \___ some_dir 2356 """ 2357 os.makedirs('base/some_dir') 2358 2359 def tearDown(self): 2360 shutil.rmtree('base') 2361 2362 def test_directory_link_nonlocal(self): 2363 """ 2364 The symlink target should resolve relative to the link, not relative 2365 to the current directory. 2366 2367 Then, link base/some_link -> base/some_dir and ensure that some_link 2368 is resolved as a directory. 2369 2370 In issue13772, it was discovered that directory detection failed if 2371 the symlink target was not specified relative to the current 2372 directory, which was a defect in the implementation. 2373 """ 2374 src = os.path.join('base', 'some_link') 2375 os.symlink('some_dir', src) 2376 assert os.path.isdir(src) 2377 2378 2379class FSEncodingTests(unittest.TestCase): 2380 def test_nop(self): 2381 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 2382 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 2383 2384 def test_identity(self): 2385 # assert fsdecode(fsencode(x)) == x 2386 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 2387 try: 2388 bytesfn = os.fsencode(fn) 2389 except UnicodeEncodeError: 2390 continue 2391 self.assertEqual(os.fsdecode(bytesfn), fn) 2392 2393 2394 2395class DeviceEncodingTests(unittest.TestCase): 2396 2397 def test_bad_fd(self): 2398 # Return None when an fd doesn't actually exist. 2399 self.assertIsNone(os.device_encoding(123456)) 2400 2401 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or 2402 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 2403 'test requires a tty and either Windows or nl_langinfo(CODESET)') 2404 def test_device_encoding(self): 2405 encoding = os.device_encoding(0) 2406 self.assertIsNotNone(encoding) 2407 self.assertTrue(codecs.lookup(encoding)) 2408 2409 2410class PidTests(unittest.TestCase): 2411 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2412 def test_getppid(self): 2413 p = subprocess.Popen([sys.executable, '-c', 2414 'import os; print(os.getppid())'], 2415 stdout=subprocess.PIPE) 2416 stdout, _ = p.communicate() 2417 # We are the parent of our subprocess 2418 self.assertEqual(int(stdout), os.getpid()) 2419 2420 def check_waitpid(self, code, exitcode): 2421 if sys.platform == 'win32': 2422 # On Windows, os.spawnv() simply joins arguments with spaces: 2423 # arguments need to be quoted 2424 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 2425 else: 2426 args = [sys.executable, '-c', code] 2427 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 2428 2429 pid2, status = os.waitpid(pid, 0) 2430 if sys.platform == 'win32': 2431 self.assertEqual(status, exitcode << 8) 2432 else: 2433 self.assertTrue(os.WIFEXITED(status), status) 2434 self.assertEqual(os.WEXITSTATUS(status), exitcode) 2435 self.assertEqual(pid2, pid) 2436 2437 def test_waitpid(self): 2438 self.check_waitpid(code='pass', exitcode=0) 2439 2440 def test_waitpid_exitcode(self): 2441 exitcode = 23 2442 code = f'import sys; sys.exit({exitcode})' 2443 self.check_waitpid(code, exitcode=exitcode) 2444 2445 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2446 def test_waitpid_windows(self): 2447 # bpo-40138: test os.waitpid() with exit code larger than INT_MAX. 2448 STATUS_CONTROL_C_EXIT = 0xC000013A 2449 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 2450 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 2451 2452 2453class SpawnTests(unittest.TestCase): 2454 def create_args(self, *, with_env=False, use_bytes=False): 2455 self.exitcode = 17 2456 2457 filename = support.TESTFN 2458 self.addCleanup(support.unlink, filename) 2459 2460 if not with_env: 2461 code = 'import sys; sys.exit(%s)' % self.exitcode 2462 else: 2463 self.env = dict(os.environ) 2464 # create an unique key 2465 self.key = str(uuid.uuid4()) 2466 self.env[self.key] = self.key 2467 # read the variable from os.environ to check that it exists 2468 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 2469 % (self.key, self.exitcode)) 2470 2471 with open(filename, "w") as fp: 2472 fp.write(code) 2473 2474 args = [sys.executable, filename] 2475 if use_bytes: 2476 args = [os.fsencode(a) for a in args] 2477 self.env = {os.fsencode(k): os.fsencode(v) 2478 for k, v in self.env.items()} 2479 2480 return args 2481 2482 @requires_os_func('spawnl') 2483 def test_spawnl(self): 2484 args = self.create_args() 2485 exitcode = os.spawnl(os.P_WAIT, args[0], *args) 2486 self.assertEqual(exitcode, self.exitcode) 2487 2488 @requires_os_func('spawnle') 2489 def test_spawnle(self): 2490 args = self.create_args(with_env=True) 2491 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) 2492 self.assertEqual(exitcode, self.exitcode) 2493 2494 @requires_os_func('spawnlp') 2495 def test_spawnlp(self): 2496 args = self.create_args() 2497 exitcode = os.spawnlp(os.P_WAIT, args[0], *args) 2498 self.assertEqual(exitcode, self.exitcode) 2499 2500 @requires_os_func('spawnlpe') 2501 def test_spawnlpe(self): 2502 args = self.create_args(with_env=True) 2503 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) 2504 self.assertEqual(exitcode, self.exitcode) 2505 2506 @requires_os_func('spawnv') 2507 def test_spawnv(self): 2508 args = self.create_args() 2509 exitcode = os.spawnv(os.P_WAIT, args[0], args) 2510 self.assertEqual(exitcode, self.exitcode) 2511 2512 @requires_os_func('spawnve') 2513 def test_spawnve(self): 2514 args = self.create_args(with_env=True) 2515 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2516 self.assertEqual(exitcode, self.exitcode) 2517 2518 @requires_os_func('spawnvp') 2519 def test_spawnvp(self): 2520 args = self.create_args() 2521 exitcode = os.spawnvp(os.P_WAIT, args[0], args) 2522 self.assertEqual(exitcode, self.exitcode) 2523 2524 @requires_os_func('spawnvpe') 2525 def test_spawnvpe(self): 2526 args = self.create_args(with_env=True) 2527 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) 2528 self.assertEqual(exitcode, self.exitcode) 2529 2530 @requires_os_func('spawnv') 2531 def test_nowait(self): 2532 args = self.create_args() 2533 pid = os.spawnv(os.P_NOWAIT, args[0], args) 2534 result = os.waitpid(pid, 0) 2535 self.assertEqual(result[0], pid) 2536 status = result[1] 2537 if hasattr(os, 'WIFEXITED'): 2538 self.assertTrue(os.WIFEXITED(status)) 2539 self.assertEqual(os.WEXITSTATUS(status), self.exitcode) 2540 else: 2541 self.assertEqual(status, self.exitcode << 8) 2542 2543 @requires_os_func('spawnve') 2544 def test_spawnve_bytes(self): 2545 # Test bytes handling in parse_arglist and parse_envlist (#28114) 2546 args = self.create_args(with_env=True, use_bytes=True) 2547 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2548 self.assertEqual(exitcode, self.exitcode) 2549 2550 @requires_os_func('spawnl') 2551 def test_spawnl_noargs(self): 2552 args = self.create_args() 2553 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0]) 2554 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '') 2555 2556 @requires_os_func('spawnle') 2557 def test_spawnle_noargs(self): 2558 args = self.create_args() 2559 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {}) 2560 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {}) 2561 2562 @requires_os_func('spawnv') 2563 def test_spawnv_noargs(self): 2564 args = self.create_args() 2565 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ()) 2566 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], []) 2567 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',)) 2568 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ['']) 2569 2570 @requires_os_func('spawnve') 2571 def test_spawnve_noargs(self): 2572 args = self.create_args() 2573 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {}) 2574 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {}) 2575 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {}) 2576 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {}) 2577 2578 def _test_invalid_env(self, spawn): 2579 args = [sys.executable, '-c', 'pass'] 2580 2581 # null character in the environment variable name 2582 newenv = os.environ.copy() 2583 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2584 try: 2585 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2586 except ValueError: 2587 pass 2588 else: 2589 self.assertEqual(exitcode, 127) 2590 2591 # null character in the environment variable value 2592 newenv = os.environ.copy() 2593 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2594 try: 2595 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2596 except ValueError: 2597 pass 2598 else: 2599 self.assertEqual(exitcode, 127) 2600 2601 # equal character in the environment variable name 2602 newenv = os.environ.copy() 2603 newenv["FRUIT=ORANGE"] = "lemon" 2604 try: 2605 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2606 except ValueError: 2607 pass 2608 else: 2609 self.assertEqual(exitcode, 127) 2610 2611 # equal character in the environment variable value 2612 filename = support.TESTFN 2613 self.addCleanup(support.unlink, filename) 2614 with open(filename, "w") as fp: 2615 fp.write('import sys, os\n' 2616 'if os.getenv("FRUIT") != "orange=lemon":\n' 2617 ' raise AssertionError') 2618 args = [sys.executable, filename] 2619 newenv = os.environ.copy() 2620 newenv["FRUIT"] = "orange=lemon" 2621 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2622 self.assertEqual(exitcode, 0) 2623 2624 @requires_os_func('spawnve') 2625 def test_spawnve_invalid_env(self): 2626 self._test_invalid_env(os.spawnve) 2627 2628 @requires_os_func('spawnvpe') 2629 def test_spawnvpe_invalid_env(self): 2630 self._test_invalid_env(os.spawnvpe) 2631 2632 2633# The introduction of this TestCase caused at least two different errors on 2634# *nix buildbots. Temporarily skip this to let the buildbots move along. 2635@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 2636@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 2637class LoginTests(unittest.TestCase): 2638 def test_getlogin(self): 2639 user_name = os.getlogin() 2640 self.assertNotEqual(len(user_name), 0) 2641 2642 2643@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 2644 "needs os.getpriority and os.setpriority") 2645class ProgramPriorityTests(unittest.TestCase): 2646 """Tests for os.getpriority() and os.setpriority().""" 2647 2648 def test_set_get_priority(self): 2649 2650 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2651 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 2652 try: 2653 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2654 if base >= 19 and new_prio <= 19: 2655 raise unittest.SkipTest("unable to reliably test setpriority " 2656 "at current nice level of %s" % base) 2657 else: 2658 self.assertEqual(new_prio, base + 1) 2659 finally: 2660 try: 2661 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 2662 except OSError as err: 2663 if err.errno != errno.EACCES: 2664 raise 2665 2666 2667class SendfileTestServer(asyncore.dispatcher, threading.Thread): 2668 2669 class Handler(asynchat.async_chat): 2670 2671 def __init__(self, conn): 2672 asynchat.async_chat.__init__(self, conn) 2673 self.in_buffer = [] 2674 self.accumulate = True 2675 self.closed = False 2676 self.push(b"220 ready\r\n") 2677 2678 def handle_read(self): 2679 data = self.recv(4096) 2680 if self.accumulate: 2681 self.in_buffer.append(data) 2682 2683 def get_data(self): 2684 return b''.join(self.in_buffer) 2685 2686 def handle_close(self): 2687 self.close() 2688 self.closed = True 2689 2690 def handle_error(self): 2691 raise 2692 2693 def __init__(self, address): 2694 threading.Thread.__init__(self) 2695 asyncore.dispatcher.__init__(self) 2696 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 2697 self.bind(address) 2698 self.listen(5) 2699 self.host, self.port = self.socket.getsockname()[:2] 2700 self.handler_instance = None 2701 self._active = False 2702 self._active_lock = threading.Lock() 2703 2704 # --- public API 2705 2706 @property 2707 def running(self): 2708 return self._active 2709 2710 def start(self): 2711 assert not self.running 2712 self.__flag = threading.Event() 2713 threading.Thread.start(self) 2714 self.__flag.wait() 2715 2716 def stop(self): 2717 assert self.running 2718 self._active = False 2719 self.join() 2720 2721 def wait(self): 2722 # wait for handler connection to be closed, then stop the server 2723 while not getattr(self.handler_instance, "closed", False): 2724 time.sleep(0.001) 2725 self.stop() 2726 2727 # --- internals 2728 2729 def run(self): 2730 self._active = True 2731 self.__flag.set() 2732 while self._active and asyncore.socket_map: 2733 self._active_lock.acquire() 2734 asyncore.loop(timeout=0.001, count=1) 2735 self._active_lock.release() 2736 asyncore.close_all() 2737 2738 def handle_accept(self): 2739 conn, addr = self.accept() 2740 self.handler_instance = self.Handler(conn) 2741 2742 def handle_connect(self): 2743 self.close() 2744 handle_read = handle_connect 2745 2746 def writable(self): 2747 return 0 2748 2749 def handle_error(self): 2750 raise 2751 2752 2753@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 2754class TestSendfile(unittest.TestCase): 2755 2756 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 2757 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 2758 not sys.platform.startswith("solaris") and \ 2759 not sys.platform.startswith("sunos") 2760 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 2761 'requires headers and trailers support') 2762 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 2763 'test is only meaningful on 32-bit builds') 2764 2765 @classmethod 2766 def setUpClass(cls): 2767 cls.key = support.threading_setup() 2768 create_file(support.TESTFN, cls.DATA) 2769 2770 @classmethod 2771 def tearDownClass(cls): 2772 support.threading_cleanup(*cls.key) 2773 support.unlink(support.TESTFN) 2774 2775 def setUp(self): 2776 self.server = SendfileTestServer((support.HOST, 0)) 2777 self.server.start() 2778 self.client = socket.socket() 2779 self.client.connect((self.server.host, self.server.port)) 2780 self.client.settimeout(1) 2781 # synchronize by waiting for "220 ready" response 2782 self.client.recv(1024) 2783 self.sockno = self.client.fileno() 2784 self.file = open(support.TESTFN, 'rb') 2785 self.fileno = self.file.fileno() 2786 2787 def tearDown(self): 2788 self.file.close() 2789 self.client.close() 2790 if self.server.running: 2791 self.server.stop() 2792 self.server = None 2793 2794 def sendfile_wrapper(self, *args, **kwargs): 2795 """A higher level wrapper representing how an application is 2796 supposed to use sendfile(). 2797 """ 2798 while True: 2799 try: 2800 return os.sendfile(*args, **kwargs) 2801 except OSError as err: 2802 if err.errno == errno.ECONNRESET: 2803 # disconnected 2804 raise 2805 elif err.errno in (errno.EAGAIN, errno.EBUSY): 2806 # we have to retry send data 2807 continue 2808 else: 2809 raise 2810 2811 def test_send_whole_file(self): 2812 # normal send 2813 total_sent = 0 2814 offset = 0 2815 nbytes = 4096 2816 while total_sent < len(self.DATA): 2817 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 2818 if sent == 0: 2819 break 2820 offset += sent 2821 total_sent += sent 2822 self.assertTrue(sent <= nbytes) 2823 self.assertEqual(offset, total_sent) 2824 2825 self.assertEqual(total_sent, len(self.DATA)) 2826 self.client.shutdown(socket.SHUT_RDWR) 2827 self.client.close() 2828 self.server.wait() 2829 data = self.server.handler_instance.get_data() 2830 self.assertEqual(len(data), len(self.DATA)) 2831 self.assertEqual(data, self.DATA) 2832 2833 def test_send_at_certain_offset(self): 2834 # start sending a file at a certain offset 2835 total_sent = 0 2836 offset = len(self.DATA) // 2 2837 must_send = len(self.DATA) - offset 2838 nbytes = 4096 2839 while total_sent < must_send: 2840 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 2841 if sent == 0: 2842 break 2843 offset += sent 2844 total_sent += sent 2845 self.assertTrue(sent <= nbytes) 2846 2847 self.client.shutdown(socket.SHUT_RDWR) 2848 self.client.close() 2849 self.server.wait() 2850 data = self.server.handler_instance.get_data() 2851 expected = self.DATA[len(self.DATA) // 2:] 2852 self.assertEqual(total_sent, len(expected)) 2853 self.assertEqual(len(data), len(expected)) 2854 self.assertEqual(data, expected) 2855 2856 def test_offset_overflow(self): 2857 # specify an offset > file size 2858 offset = len(self.DATA) + 4096 2859 try: 2860 sent = os.sendfile(self.sockno, self.fileno, offset, 4096) 2861 except OSError as e: 2862 # Solaris can raise EINVAL if offset >= file length, ignore. 2863 if e.errno != errno.EINVAL: 2864 raise 2865 else: 2866 self.assertEqual(sent, 0) 2867 self.client.shutdown(socket.SHUT_RDWR) 2868 self.client.close() 2869 self.server.wait() 2870 data = self.server.handler_instance.get_data() 2871 self.assertEqual(data, b'') 2872 2873 def test_invalid_offset(self): 2874 with self.assertRaises(OSError) as cm: 2875 os.sendfile(self.sockno, self.fileno, -1, 4096) 2876 self.assertEqual(cm.exception.errno, errno.EINVAL) 2877 2878 def test_keywords(self): 2879 # Keyword arguments should be supported 2880 os.sendfile(out=self.sockno, offset=0, count=4096, 2881 **{'in': self.fileno}) 2882 if self.SUPPORT_HEADERS_TRAILERS: 2883 os.sendfile(self.sockno, self.fileno, offset=0, count=4096, 2884 headers=(), trailers=(), flags=0) 2885 2886 # --- headers / trailers tests 2887 2888 @requires_headers_trailers 2889 def test_headers(self): 2890 total_sent = 0 2891 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 2892 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 2893 headers=[b"x" * 512, b"y" * 256]) 2894 self.assertLessEqual(sent, 512 + 256 + 4096) 2895 total_sent += sent 2896 offset = 4096 2897 while total_sent < len(expected_data): 2898 nbytes = min(len(expected_data) - total_sent, 4096) 2899 sent = self.sendfile_wrapper(self.sockno, self.fileno, 2900 offset, nbytes) 2901 if sent == 0: 2902 break 2903 self.assertLessEqual(sent, nbytes) 2904 total_sent += sent 2905 offset += sent 2906 2907 self.assertEqual(total_sent, len(expected_data)) 2908 self.client.close() 2909 self.server.wait() 2910 data = self.server.handler_instance.get_data() 2911 self.assertEqual(hash(data), hash(expected_data)) 2912 2913 @requires_headers_trailers 2914 def test_trailers(self): 2915 TESTFN2 = support.TESTFN + "2" 2916 file_data = b"abcdef" 2917 2918 self.addCleanup(support.unlink, TESTFN2) 2919 create_file(TESTFN2, file_data) 2920 2921 with open(TESTFN2, 'rb') as f: 2922 os.sendfile(self.sockno, f.fileno(), 0, 5, 2923 trailers=[b"123456", b"789"]) 2924 self.client.close() 2925 self.server.wait() 2926 data = self.server.handler_instance.get_data() 2927 self.assertEqual(data, b"abcde123456789") 2928 2929 @requires_headers_trailers 2930 @requires_32b 2931 def test_headers_overflow_32bits(self): 2932 self.server.handler_instance.accumulate = False 2933 with self.assertRaises(OSError) as cm: 2934 os.sendfile(self.sockno, self.fileno, 0, 0, 2935 headers=[b"x" * 2**16] * 2**15) 2936 self.assertEqual(cm.exception.errno, errno.EINVAL) 2937 2938 @requires_headers_trailers 2939 @requires_32b 2940 def test_trailers_overflow_32bits(self): 2941 self.server.handler_instance.accumulate = False 2942 with self.assertRaises(OSError) as cm: 2943 os.sendfile(self.sockno, self.fileno, 0, 0, 2944 trailers=[b"x" * 2**16] * 2**15) 2945 self.assertEqual(cm.exception.errno, errno.EINVAL) 2946 2947 @requires_headers_trailers 2948 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 2949 'test needs os.SF_NODISKIO') 2950 def test_flags(self): 2951 try: 2952 os.sendfile(self.sockno, self.fileno, 0, 4096, 2953 flags=os.SF_NODISKIO) 2954 except OSError as err: 2955 if err.errno not in (errno.EBUSY, errno.EAGAIN): 2956 raise 2957 2958 2959def supports_extended_attributes(): 2960 if not hasattr(os, "setxattr"): 2961 return False 2962 2963 try: 2964 with open(support.TESTFN, "xb", 0) as fp: 2965 try: 2966 os.setxattr(fp.fileno(), b"user.test", b"") 2967 except OSError: 2968 return False 2969 finally: 2970 support.unlink(support.TESTFN) 2971 2972 return True 2973 2974 2975@unittest.skipUnless(supports_extended_attributes(), 2976 "no non-broken extended attribute support") 2977# Kernels < 2.6.39 don't respect setxattr flags. 2978@support.requires_linux_version(2, 6, 39) 2979class ExtendedAttributeTests(unittest.TestCase): 2980 2981 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 2982 fn = support.TESTFN 2983 self.addCleanup(support.unlink, fn) 2984 create_file(fn) 2985 2986 with self.assertRaises(OSError) as cm: 2987 getxattr(fn, s("user.test"), **kwargs) 2988 self.assertEqual(cm.exception.errno, errno.ENODATA) 2989 2990 init_xattr = listxattr(fn) 2991 self.assertIsInstance(init_xattr, list) 2992 2993 setxattr(fn, s("user.test"), b"", **kwargs) 2994 xattr = set(init_xattr) 2995 xattr.add("user.test") 2996 self.assertEqual(set(listxattr(fn)), xattr) 2997 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 2998 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 2999 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3000 3001 with self.assertRaises(OSError) as cm: 3002 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3003 self.assertEqual(cm.exception.errno, errno.EEXIST) 3004 3005 with self.assertRaises(OSError) as cm: 3006 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3007 self.assertEqual(cm.exception.errno, errno.ENODATA) 3008 3009 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3010 xattr.add("user.test2") 3011 self.assertEqual(set(listxattr(fn)), xattr) 3012 removexattr(fn, s("user.test"), **kwargs) 3013 3014 with self.assertRaises(OSError) as cm: 3015 getxattr(fn, s("user.test"), **kwargs) 3016 self.assertEqual(cm.exception.errno, errno.ENODATA) 3017 3018 xattr.remove("user.test") 3019 self.assertEqual(set(listxattr(fn)), xattr) 3020 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3021 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3022 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3023 removexattr(fn, s("user.test"), **kwargs) 3024 many = sorted("user.test{}".format(i) for i in range(100)) 3025 for thing in many: 3026 setxattr(fn, thing, b"x", **kwargs) 3027 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3028 3029 def _check_xattrs(self, *args, **kwargs): 3030 self._check_xattrs_str(str, *args, **kwargs) 3031 support.unlink(support.TESTFN) 3032 3033 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3034 support.unlink(support.TESTFN) 3035 3036 def test_simple(self): 3037 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3038 os.listxattr) 3039 3040 def test_lpath(self): 3041 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3042 os.listxattr, follow_symlinks=False) 3043 3044 def test_fds(self): 3045 def getxattr(path, *args): 3046 with open(path, "rb") as fp: 3047 return os.getxattr(fp.fileno(), *args) 3048 def setxattr(path, *args): 3049 with open(path, "wb", 0) as fp: 3050 os.setxattr(fp.fileno(), *args) 3051 def removexattr(path, *args): 3052 with open(path, "wb", 0) as fp: 3053 os.removexattr(fp.fileno(), *args) 3054 def listxattr(path, *args): 3055 with open(path, "rb") as fp: 3056 return os.listxattr(fp.fileno(), *args) 3057 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3058 3059 3060@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3061class TermsizeTests(unittest.TestCase): 3062 def test_does_not_crash(self): 3063 """Check if get_terminal_size() returns a meaningful value. 3064 3065 There's no easy portable way to actually check the size of the 3066 terminal, so let's check if it returns something sensible instead. 3067 """ 3068 try: 3069 size = os.get_terminal_size() 3070 except OSError as e: 3071 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3072 # Under win32 a generic OSError can be thrown if the 3073 # handle cannot be retrieved 3074 self.skipTest("failed to query terminal size") 3075 raise 3076 3077 self.assertGreaterEqual(size.columns, 0) 3078 self.assertGreaterEqual(size.lines, 0) 3079 3080 def test_stty_match(self): 3081 """Check if stty returns the same results 3082 3083 stty actually tests stdin, so get_terminal_size is invoked on 3084 stdin explicitly. If stty succeeded, then get_terminal_size() 3085 should work too. 3086 """ 3087 try: 3088 size = subprocess.check_output(['stty', 'size']).decode().split() 3089 except (FileNotFoundError, subprocess.CalledProcessError, 3090 PermissionError): 3091 self.skipTest("stty invocation failed") 3092 expected = (int(size[1]), int(size[0])) # reversed order 3093 3094 try: 3095 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3096 except OSError as e: 3097 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3098 # Under win32 a generic OSError can be thrown if the 3099 # handle cannot be retrieved 3100 self.skipTest("failed to query terminal size") 3101 raise 3102 self.assertEqual(expected, actual) 3103 3104 3105class OSErrorTests(unittest.TestCase): 3106 def setUp(self): 3107 class Str(str): 3108 pass 3109 3110 self.bytes_filenames = [] 3111 self.unicode_filenames = [] 3112 if support.TESTFN_UNENCODABLE is not None: 3113 decoded = support.TESTFN_UNENCODABLE 3114 else: 3115 decoded = support.TESTFN 3116 self.unicode_filenames.append(decoded) 3117 self.unicode_filenames.append(Str(decoded)) 3118 if support.TESTFN_UNDECODABLE is not None: 3119 encoded = support.TESTFN_UNDECODABLE 3120 else: 3121 encoded = os.fsencode(support.TESTFN) 3122 self.bytes_filenames.append(encoded) 3123 self.bytes_filenames.append(bytearray(encoded)) 3124 self.bytes_filenames.append(memoryview(encoded)) 3125 3126 self.filenames = self.bytes_filenames + self.unicode_filenames 3127 3128 def test_oserror_filename(self): 3129 funcs = [ 3130 (self.filenames, os.chdir,), 3131 (self.filenames, os.chmod, 0o777), 3132 (self.filenames, os.lstat,), 3133 (self.filenames, os.open, os.O_RDONLY), 3134 (self.filenames, os.rmdir,), 3135 (self.filenames, os.stat,), 3136 (self.filenames, os.unlink,), 3137 ] 3138 if sys.platform == "win32": 3139 funcs.extend(( 3140 (self.bytes_filenames, os.rename, b"dst"), 3141 (self.bytes_filenames, os.replace, b"dst"), 3142 (self.unicode_filenames, os.rename, "dst"), 3143 (self.unicode_filenames, os.replace, "dst"), 3144 (self.unicode_filenames, os.listdir, ), 3145 )) 3146 else: 3147 funcs.extend(( 3148 (self.filenames, os.listdir,), 3149 (self.filenames, os.rename, "dst"), 3150 (self.filenames, os.replace, "dst"), 3151 )) 3152 if hasattr(os, "chown"): 3153 funcs.append((self.filenames, os.chown, 0, 0)) 3154 if hasattr(os, "lchown"): 3155 funcs.append((self.filenames, os.lchown, 0, 0)) 3156 if hasattr(os, "truncate"): 3157 funcs.append((self.filenames, os.truncate, 0)) 3158 if hasattr(os, "chflags"): 3159 funcs.append((self.filenames, os.chflags, 0)) 3160 if hasattr(os, "lchflags"): 3161 funcs.append((self.filenames, os.lchflags, 0)) 3162 if hasattr(os, "chroot"): 3163 funcs.append((self.filenames, os.chroot,)) 3164 if hasattr(os, "link"): 3165 if sys.platform == "win32": 3166 funcs.append((self.bytes_filenames, os.link, b"dst")) 3167 funcs.append((self.unicode_filenames, os.link, "dst")) 3168 else: 3169 funcs.append((self.filenames, os.link, "dst")) 3170 if hasattr(os, "listxattr"): 3171 funcs.extend(( 3172 (self.filenames, os.listxattr,), 3173 (self.filenames, os.getxattr, "user.test"), 3174 (self.filenames, os.setxattr, "user.test", b'user'), 3175 (self.filenames, os.removexattr, "user.test"), 3176 )) 3177 if hasattr(os, "lchmod"): 3178 funcs.append((self.filenames, os.lchmod, 0o777)) 3179 if hasattr(os, "readlink"): 3180 if sys.platform == "win32": 3181 funcs.append((self.unicode_filenames, os.readlink,)) 3182 else: 3183 funcs.append((self.filenames, os.readlink,)) 3184 3185 3186 for filenames, func, *func_args in funcs: 3187 for name in filenames: 3188 try: 3189 if isinstance(name, (str, bytes)): 3190 func(name, *func_args) 3191 else: 3192 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3193 func(name, *func_args) 3194 except OSError as err: 3195 self.assertIs(err.filename, name, str(func)) 3196 except UnicodeDecodeError: 3197 pass 3198 else: 3199 self.fail("No exception thrown by {}".format(func)) 3200 3201class CPUCountTests(unittest.TestCase): 3202 def test_cpu_count(self): 3203 cpus = os.cpu_count() 3204 if cpus is not None: 3205 self.assertIsInstance(cpus, int) 3206 self.assertGreater(cpus, 0) 3207 else: 3208 self.skipTest("Could not determine the number of CPUs") 3209 3210 3211class FDInheritanceTests(unittest.TestCase): 3212 def test_get_set_inheritable(self): 3213 fd = os.open(__file__, os.O_RDONLY) 3214 self.addCleanup(os.close, fd) 3215 self.assertEqual(os.get_inheritable(fd), False) 3216 3217 os.set_inheritable(fd, True) 3218 self.assertEqual(os.get_inheritable(fd), True) 3219 3220 @unittest.skipIf(fcntl is None, "need fcntl") 3221 def test_get_inheritable_cloexec(self): 3222 fd = os.open(__file__, os.O_RDONLY) 3223 self.addCleanup(os.close, fd) 3224 self.assertEqual(os.get_inheritable(fd), False) 3225 3226 # clear FD_CLOEXEC flag 3227 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3228 flags &= ~fcntl.FD_CLOEXEC 3229 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3230 3231 self.assertEqual(os.get_inheritable(fd), True) 3232 3233 @unittest.skipIf(fcntl is None, "need fcntl") 3234 def test_set_inheritable_cloexec(self): 3235 fd = os.open(__file__, os.O_RDONLY) 3236 self.addCleanup(os.close, fd) 3237 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3238 fcntl.FD_CLOEXEC) 3239 3240 os.set_inheritable(fd, True) 3241 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3242 0) 3243 3244 def test_open(self): 3245 fd = os.open(__file__, os.O_RDONLY) 3246 self.addCleanup(os.close, fd) 3247 self.assertEqual(os.get_inheritable(fd), False) 3248 3249 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3250 def test_pipe(self): 3251 rfd, wfd = os.pipe() 3252 self.addCleanup(os.close, rfd) 3253 self.addCleanup(os.close, wfd) 3254 self.assertEqual(os.get_inheritable(rfd), False) 3255 self.assertEqual(os.get_inheritable(wfd), False) 3256 3257 def test_dup(self): 3258 fd1 = os.open(__file__, os.O_RDONLY) 3259 self.addCleanup(os.close, fd1) 3260 3261 fd2 = os.dup(fd1) 3262 self.addCleanup(os.close, fd2) 3263 self.assertEqual(os.get_inheritable(fd2), False) 3264 3265 def test_dup_standard_stream(self): 3266 fd = os.dup(1) 3267 self.addCleanup(os.close, fd) 3268 self.assertGreater(fd, 0) 3269 3270 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3271 def test_dup_nul(self): 3272 # os.dup() was creating inheritable fds for character files. 3273 fd1 = os.open('NUL', os.O_RDONLY) 3274 self.addCleanup(os.close, fd1) 3275 fd2 = os.dup(fd1) 3276 self.addCleanup(os.close, fd2) 3277 self.assertFalse(os.get_inheritable(fd2)) 3278 3279 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 3280 def test_dup2(self): 3281 fd = os.open(__file__, os.O_RDONLY) 3282 self.addCleanup(os.close, fd) 3283 3284 # inheritable by default 3285 fd2 = os.open(__file__, os.O_RDONLY) 3286 self.addCleanup(os.close, fd2) 3287 self.assertEqual(os.dup2(fd, fd2), fd2) 3288 self.assertTrue(os.get_inheritable(fd2)) 3289 3290 # force non-inheritable 3291 fd3 = os.open(__file__, os.O_RDONLY) 3292 self.addCleanup(os.close, fd3) 3293 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 3294 self.assertFalse(os.get_inheritable(fd3)) 3295 3296 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 3297 def test_openpty(self): 3298 master_fd, slave_fd = os.openpty() 3299 self.addCleanup(os.close, master_fd) 3300 self.addCleanup(os.close, slave_fd) 3301 self.assertEqual(os.get_inheritable(master_fd), False) 3302 self.assertEqual(os.get_inheritable(slave_fd), False) 3303 3304 3305class PathTConverterTests(unittest.TestCase): 3306 # tuples of (function name, allows fd arguments, additional arguments to 3307 # function, cleanup function) 3308 functions = [ 3309 ('stat', True, (), None), 3310 ('lstat', False, (), None), 3311 ('access', False, (os.F_OK,), None), 3312 ('chflags', False, (0,), None), 3313 ('lchflags', False, (0,), None), 3314 ('open', False, (0,), getattr(os, 'close', None)), 3315 ] 3316 3317 def test_path_t_converter(self): 3318 str_filename = support.TESTFN 3319 if os.name == 'nt': 3320 bytes_fspath = bytes_filename = None 3321 else: 3322 bytes_filename = support.TESTFN.encode('ascii') 3323 bytes_fspath = FakePath(bytes_filename) 3324 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 3325 self.addCleanup(support.unlink, support.TESTFN) 3326 self.addCleanup(os.close, fd) 3327 3328 int_fspath = FakePath(fd) 3329 str_fspath = FakePath(str_filename) 3330 3331 for name, allow_fd, extra_args, cleanup_fn in self.functions: 3332 with self.subTest(name=name): 3333 try: 3334 fn = getattr(os, name) 3335 except AttributeError: 3336 continue 3337 3338 for path in (str_filename, bytes_filename, str_fspath, 3339 bytes_fspath): 3340 if path is None: 3341 continue 3342 with self.subTest(name=name, path=path): 3343 result = fn(path, *extra_args) 3344 if cleanup_fn is not None: 3345 cleanup_fn(result) 3346 3347 with self.assertRaisesRegex( 3348 TypeError, 'to return str or bytes'): 3349 fn(int_fspath, *extra_args) 3350 3351 if allow_fd: 3352 result = fn(fd, *extra_args) # should not fail 3353 if cleanup_fn is not None: 3354 cleanup_fn(result) 3355 else: 3356 with self.assertRaisesRegex( 3357 TypeError, 3358 'os.PathLike'): 3359 fn(fd, *extra_args) 3360 3361 def test_path_t_converter_and_custom_class(self): 3362 msg = r'__fspath__\(\) to return str or bytes, not %s' 3363 with self.assertRaisesRegex(TypeError, msg % r'int'): 3364 os.stat(FakePath(2)) 3365 with self.assertRaisesRegex(TypeError, msg % r'float'): 3366 os.stat(FakePath(2.34)) 3367 with self.assertRaisesRegex(TypeError, msg % r'object'): 3368 os.stat(FakePath(object())) 3369 3370 3371@unittest.skipUnless(hasattr(os, 'get_blocking'), 3372 'needs os.get_blocking() and os.set_blocking()') 3373class BlockingTests(unittest.TestCase): 3374 def test_blocking(self): 3375 fd = os.open(__file__, os.O_RDONLY) 3376 self.addCleanup(os.close, fd) 3377 self.assertEqual(os.get_blocking(fd), True) 3378 3379 os.set_blocking(fd, False) 3380 self.assertEqual(os.get_blocking(fd), False) 3381 3382 os.set_blocking(fd, True) 3383 self.assertEqual(os.get_blocking(fd), True) 3384 3385 3386 3387class ExportsTests(unittest.TestCase): 3388 def test_os_all(self): 3389 self.assertIn('open', os.__all__) 3390 self.assertIn('walk', os.__all__) 3391 3392 3393class TestScandir(unittest.TestCase): 3394 check_no_resource_warning = support.check_no_resource_warning 3395 3396 def setUp(self): 3397 self.path = os.path.realpath(support.TESTFN) 3398 self.bytes_path = os.fsencode(self.path) 3399 self.addCleanup(support.rmtree, self.path) 3400 os.mkdir(self.path) 3401 3402 def create_file(self, name="file.txt"): 3403 path = self.bytes_path if isinstance(name, bytes) else self.path 3404 filename = os.path.join(path, name) 3405 create_file(filename, b'python') 3406 return filename 3407 3408 def get_entries(self, names): 3409 entries = dict((entry.name, entry) 3410 for entry in os.scandir(self.path)) 3411 self.assertEqual(sorted(entries.keys()), names) 3412 return entries 3413 3414 def assert_stat_equal(self, stat1, stat2, skip_fields): 3415 if skip_fields: 3416 for attr in dir(stat1): 3417 if not attr.startswith("st_"): 3418 continue 3419 if attr in ("st_dev", "st_ino", "st_nlink"): 3420 continue 3421 self.assertEqual(getattr(stat1, attr), 3422 getattr(stat2, attr), 3423 (stat1, stat2, attr)) 3424 else: 3425 self.assertEqual(stat1, stat2) 3426 3427 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 3428 self.assertIsInstance(entry, os.DirEntry) 3429 self.assertEqual(entry.name, name) 3430 self.assertEqual(entry.path, os.path.join(self.path, name)) 3431 self.assertEqual(entry.inode(), 3432 os.stat(entry.path, follow_symlinks=False).st_ino) 3433 3434 entry_stat = os.stat(entry.path) 3435 self.assertEqual(entry.is_dir(), 3436 stat.S_ISDIR(entry_stat.st_mode)) 3437 self.assertEqual(entry.is_file(), 3438 stat.S_ISREG(entry_stat.st_mode)) 3439 self.assertEqual(entry.is_symlink(), 3440 os.path.islink(entry.path)) 3441 3442 entry_lstat = os.stat(entry.path, follow_symlinks=False) 3443 self.assertEqual(entry.is_dir(follow_symlinks=False), 3444 stat.S_ISDIR(entry_lstat.st_mode)) 3445 self.assertEqual(entry.is_file(follow_symlinks=False), 3446 stat.S_ISREG(entry_lstat.st_mode)) 3447 3448 self.assert_stat_equal(entry.stat(), 3449 entry_stat, 3450 os.name == 'nt' and not is_symlink) 3451 self.assert_stat_equal(entry.stat(follow_symlinks=False), 3452 entry_lstat, 3453 os.name == 'nt') 3454 3455 def test_attributes(self): 3456 link = hasattr(os, 'link') 3457 symlink = support.can_symlink() 3458 3459 dirname = os.path.join(self.path, "dir") 3460 os.mkdir(dirname) 3461 filename = self.create_file("file.txt") 3462 if link: 3463 try: 3464 os.link(filename, os.path.join(self.path, "link_file.txt")) 3465 except PermissionError as e: 3466 self.skipTest('os.link(): %s' % e) 3467 if symlink: 3468 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 3469 target_is_directory=True) 3470 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 3471 3472 names = ['dir', 'file.txt'] 3473 if link: 3474 names.append('link_file.txt') 3475 if symlink: 3476 names.extend(('symlink_dir', 'symlink_file.txt')) 3477 entries = self.get_entries(names) 3478 3479 entry = entries['dir'] 3480 self.check_entry(entry, 'dir', True, False, False) 3481 3482 entry = entries['file.txt'] 3483 self.check_entry(entry, 'file.txt', False, True, False) 3484 3485 if link: 3486 entry = entries['link_file.txt'] 3487 self.check_entry(entry, 'link_file.txt', False, True, False) 3488 3489 if symlink: 3490 entry = entries['symlink_dir'] 3491 self.check_entry(entry, 'symlink_dir', True, False, True) 3492 3493 entry = entries['symlink_file.txt'] 3494 self.check_entry(entry, 'symlink_file.txt', False, True, True) 3495 3496 def get_entry(self, name): 3497 path = self.bytes_path if isinstance(name, bytes) else self.path 3498 entries = list(os.scandir(path)) 3499 self.assertEqual(len(entries), 1) 3500 3501 entry = entries[0] 3502 self.assertEqual(entry.name, name) 3503 return entry 3504 3505 def create_file_entry(self, name='file.txt'): 3506 filename = self.create_file(name=name) 3507 return self.get_entry(os.path.basename(filename)) 3508 3509 def test_current_directory(self): 3510 filename = self.create_file() 3511 old_dir = os.getcwd() 3512 try: 3513 os.chdir(self.path) 3514 3515 # call scandir() without parameter: it must list the content 3516 # of the current directory 3517 entries = dict((entry.name, entry) for entry in os.scandir()) 3518 self.assertEqual(sorted(entries.keys()), 3519 [os.path.basename(filename)]) 3520 finally: 3521 os.chdir(old_dir) 3522 3523 def test_repr(self): 3524 entry = self.create_file_entry() 3525 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 3526 3527 def test_fspath_protocol(self): 3528 entry = self.create_file_entry() 3529 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 3530 3531 def test_fspath_protocol_bytes(self): 3532 bytes_filename = os.fsencode('bytesfile.txt') 3533 bytes_entry = self.create_file_entry(name=bytes_filename) 3534 fspath = os.fspath(bytes_entry) 3535 self.assertIsInstance(fspath, bytes) 3536 self.assertEqual(fspath, 3537 os.path.join(os.fsencode(self.path),bytes_filename)) 3538 3539 def test_removed_dir(self): 3540 path = os.path.join(self.path, 'dir') 3541 3542 os.mkdir(path) 3543 entry = self.get_entry('dir') 3544 os.rmdir(path) 3545 3546 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3547 if os.name == 'nt': 3548 self.assertTrue(entry.is_dir()) 3549 self.assertFalse(entry.is_file()) 3550 self.assertFalse(entry.is_symlink()) 3551 if os.name == 'nt': 3552 self.assertRaises(FileNotFoundError, entry.inode) 3553 # don't fail 3554 entry.stat() 3555 entry.stat(follow_symlinks=False) 3556 else: 3557 self.assertGreater(entry.inode(), 0) 3558 self.assertRaises(FileNotFoundError, entry.stat) 3559 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3560 3561 def test_removed_file(self): 3562 entry = self.create_file_entry() 3563 os.unlink(entry.path) 3564 3565 self.assertFalse(entry.is_dir()) 3566 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3567 if os.name == 'nt': 3568 self.assertTrue(entry.is_file()) 3569 self.assertFalse(entry.is_symlink()) 3570 if os.name == 'nt': 3571 self.assertRaises(FileNotFoundError, entry.inode) 3572 # don't fail 3573 entry.stat() 3574 entry.stat(follow_symlinks=False) 3575 else: 3576 self.assertGreater(entry.inode(), 0) 3577 self.assertRaises(FileNotFoundError, entry.stat) 3578 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3579 3580 def test_broken_symlink(self): 3581 if not support.can_symlink(): 3582 return self.skipTest('cannot create symbolic link') 3583 3584 filename = self.create_file("file.txt") 3585 os.symlink(filename, 3586 os.path.join(self.path, "symlink.txt")) 3587 entries = self.get_entries(['file.txt', 'symlink.txt']) 3588 entry = entries['symlink.txt'] 3589 os.unlink(filename) 3590 3591 self.assertGreater(entry.inode(), 0) 3592 self.assertFalse(entry.is_dir()) 3593 self.assertFalse(entry.is_file()) # broken symlink returns False 3594 self.assertFalse(entry.is_dir(follow_symlinks=False)) 3595 self.assertFalse(entry.is_file(follow_symlinks=False)) 3596 self.assertTrue(entry.is_symlink()) 3597 self.assertRaises(FileNotFoundError, entry.stat) 3598 # don't fail 3599 entry.stat(follow_symlinks=False) 3600 3601 def test_bytes(self): 3602 self.create_file("file.txt") 3603 3604 path_bytes = os.fsencode(self.path) 3605 entries = list(os.scandir(path_bytes)) 3606 self.assertEqual(len(entries), 1, entries) 3607 entry = entries[0] 3608 3609 self.assertEqual(entry.name, b'file.txt') 3610 self.assertEqual(entry.path, 3611 os.fsencode(os.path.join(self.path, 'file.txt'))) 3612 3613 def test_bytes_like(self): 3614 self.create_file("file.txt") 3615 3616 for cls in bytearray, memoryview: 3617 path_bytes = cls(os.fsencode(self.path)) 3618 with self.assertWarns(DeprecationWarning): 3619 entries = list(os.scandir(path_bytes)) 3620 self.assertEqual(len(entries), 1, entries) 3621 entry = entries[0] 3622 3623 self.assertEqual(entry.name, b'file.txt') 3624 self.assertEqual(entry.path, 3625 os.fsencode(os.path.join(self.path, 'file.txt'))) 3626 self.assertIs(type(entry.name), bytes) 3627 self.assertIs(type(entry.path), bytes) 3628 3629 @unittest.skipUnless(os.listdir in os.supports_fd, 3630 'fd support for listdir required for this test.') 3631 def test_fd(self): 3632 self.assertIn(os.scandir, os.supports_fd) 3633 self.create_file('file.txt') 3634 expected_names = ['file.txt'] 3635 if support.can_symlink(): 3636 os.symlink('file.txt', os.path.join(self.path, 'link')) 3637 expected_names.append('link') 3638 3639 fd = os.open(self.path, os.O_RDONLY) 3640 try: 3641 with os.scandir(fd) as it: 3642 entries = list(it) 3643 names = [entry.name for entry in entries] 3644 self.assertEqual(sorted(names), expected_names) 3645 self.assertEqual(names, os.listdir(fd)) 3646 for entry in entries: 3647 self.assertEqual(entry.path, entry.name) 3648 self.assertEqual(os.fspath(entry), entry.name) 3649 self.assertEqual(entry.is_symlink(), entry.name == 'link') 3650 if os.stat in os.supports_dir_fd: 3651 st = os.stat(entry.name, dir_fd=fd) 3652 self.assertEqual(entry.stat(), st) 3653 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 3654 self.assertEqual(entry.stat(follow_symlinks=False), st) 3655 finally: 3656 os.close(fd) 3657 3658 def test_empty_path(self): 3659 self.assertRaises(FileNotFoundError, os.scandir, '') 3660 3661 def test_consume_iterator_twice(self): 3662 self.create_file("file.txt") 3663 iterator = os.scandir(self.path) 3664 3665 entries = list(iterator) 3666 self.assertEqual(len(entries), 1, entries) 3667 3668 # check than consuming the iterator twice doesn't raise exception 3669 entries2 = list(iterator) 3670 self.assertEqual(len(entries2), 0, entries2) 3671 3672 def test_bad_path_type(self): 3673 for obj in [1.234, {}, []]: 3674 self.assertRaises(TypeError, os.scandir, obj) 3675 3676 def test_close(self): 3677 self.create_file("file.txt") 3678 self.create_file("file2.txt") 3679 iterator = os.scandir(self.path) 3680 next(iterator) 3681 iterator.close() 3682 # multiple closes 3683 iterator.close() 3684 with self.check_no_resource_warning(): 3685 del iterator 3686 3687 def test_context_manager(self): 3688 self.create_file("file.txt") 3689 self.create_file("file2.txt") 3690 with os.scandir(self.path) as iterator: 3691 next(iterator) 3692 with self.check_no_resource_warning(): 3693 del iterator 3694 3695 def test_context_manager_close(self): 3696 self.create_file("file.txt") 3697 self.create_file("file2.txt") 3698 with os.scandir(self.path) as iterator: 3699 next(iterator) 3700 iterator.close() 3701 3702 def test_context_manager_exception(self): 3703 self.create_file("file.txt") 3704 self.create_file("file2.txt") 3705 with self.assertRaises(ZeroDivisionError): 3706 with os.scandir(self.path) as iterator: 3707 next(iterator) 3708 1/0 3709 with self.check_no_resource_warning(): 3710 del iterator 3711 3712 def test_resource_warning(self): 3713 self.create_file("file.txt") 3714 self.create_file("file2.txt") 3715 iterator = os.scandir(self.path) 3716 next(iterator) 3717 with self.assertWarns(ResourceWarning): 3718 del iterator 3719 support.gc_collect() 3720 # exhausted iterator 3721 iterator = os.scandir(self.path) 3722 list(iterator) 3723 with self.check_no_resource_warning(): 3724 del iterator 3725 3726 3727class TestPEP519(unittest.TestCase): 3728 3729 # Abstracted so it can be overridden to test pure Python implementation 3730 # if a C version is provided. 3731 fspath = staticmethod(os.fspath) 3732 3733 def test_return_bytes(self): 3734 for b in b'hello', b'goodbye', b'some/path/and/file': 3735 self.assertEqual(b, self.fspath(b)) 3736 3737 def test_return_string(self): 3738 for s in 'hello', 'goodbye', 'some/path/and/file': 3739 self.assertEqual(s, self.fspath(s)) 3740 3741 def test_fsencode_fsdecode(self): 3742 for p in "path/like/object", b"path/like/object": 3743 pathlike = FakePath(p) 3744 3745 self.assertEqual(p, self.fspath(pathlike)) 3746 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 3747 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 3748 3749 def test_pathlike(self): 3750 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 3751 self.assertTrue(issubclass(FakePath, os.PathLike)) 3752 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 3753 3754 def test_garbage_in_exception_out(self): 3755 vapor = type('blah', (), {}) 3756 for o in int, type, os, vapor(): 3757 self.assertRaises(TypeError, self.fspath, o) 3758 3759 def test_argument_required(self): 3760 self.assertRaises(TypeError, self.fspath) 3761 3762 def test_bad_pathlike(self): 3763 # __fspath__ returns a value other than str or bytes. 3764 self.assertRaises(TypeError, self.fspath, FakePath(42)) 3765 # __fspath__ attribute that is not callable. 3766 c = type('foo', (), {}) 3767 c.__fspath__ = 1 3768 self.assertRaises(TypeError, self.fspath, c()) 3769 # __fspath__ raises an exception. 3770 self.assertRaises(ZeroDivisionError, self.fspath, 3771 FakePath(ZeroDivisionError())) 3772 3773 def test_pathlike_subclasshook(self): 3774 # bpo-38878: subclasshook causes subclass checks 3775 # true on abstract implementation. 3776 class A(os.PathLike): 3777 pass 3778 self.assertFalse(issubclass(FakePath, A)) 3779 self.assertTrue(issubclass(FakePath, os.PathLike)) 3780 3781 3782class TimesTests(unittest.TestCase): 3783 def test_times(self): 3784 times = os.times() 3785 self.assertIsInstance(times, os.times_result) 3786 3787 for field in ('user', 'system', 'children_user', 'children_system', 3788 'elapsed'): 3789 value = getattr(times, field) 3790 self.assertIsInstance(value, float) 3791 3792 if os.name == 'nt': 3793 self.assertEqual(times.children_user, 0) 3794 self.assertEqual(times.children_system, 0) 3795 self.assertEqual(times.elapsed, 0) 3796 3797 3798# Only test if the C version is provided, otherwise TestPEP519 already tested 3799# the pure Python implementation. 3800if hasattr(os, "_fspath"): 3801 class TestPEP519PurePython(TestPEP519): 3802 3803 """Explicitly test the pure Python implementation of os.fspath().""" 3804 3805 fspath = staticmethod(os._fspath) 3806 3807 3808if __name__ == "__main__": 3809 unittest.main() 3810