1import unittest 2from unittest import mock 3from test import support 4import subprocess 5import sys 6import signal 7import io 8import itertools 9import os 10import errno 11import tempfile 12import time 13import traceback 14import selectors 15import sysconfig 16import select 17import shutil 18import threading 19import gc 20import textwrap 21from test.support import FakePath 22 23try: 24 import _testcapi 25except ImportError: 26 _testcapi = None 27 28 29if support.PGO: 30 raise unittest.SkipTest("test is not helpful for PGO") 31 32mswindows = (sys.platform == "win32") 33 34# 35# Depends on the following external programs: Python 36# 37 38if mswindows: 39 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 40 'os.O_BINARY);') 41else: 42 SETBINARY = '' 43 44NONEXISTING_CMD = ('nonexisting_i_hope',) 45# Ignore errors that indicate the command was not found 46NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError) 47 48ZERO_RETURN_CMD = (sys.executable, '-c', 'pass') 49 50 51def setUpModule(): 52 shell_true = shutil.which('true') 53 if shell_true is None: 54 return 55 if (os.access(shell_true, os.X_OK) and 56 subprocess.run([shell_true]).returncode == 0): 57 global ZERO_RETURN_CMD 58 ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup. 59 60 61class BaseTestCase(unittest.TestCase): 62 def setUp(self): 63 # Try to minimize the number of children we have so this test 64 # doesn't crash on some buildbots (Alphas in particular). 65 support.reap_children() 66 67 def tearDown(self): 68 if not mswindows: 69 # subprocess._active is not used on Windows and is set to None. 70 for inst in subprocess._active: 71 inst.wait() 72 subprocess._cleanup() 73 self.assertFalse( 74 subprocess._active, "subprocess._active not empty" 75 ) 76 self.doCleanups() 77 support.reap_children() 78 79 def assertStderrEqual(self, stderr, expected, msg=None): 80 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 81 # shutdown time. That frustrates tests trying to check stderr produced 82 # from a spawned Python process. 83 actual = support.strip_python_stderr(stderr) 84 # strip_python_stderr also strips whitespace, so we do too. 85 expected = expected.strip() 86 self.assertEqual(actual, expected, msg) 87 88 89class PopenTestException(Exception): 90 pass 91 92 93class PopenExecuteChildRaises(subprocess.Popen): 94 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 95 _execute_child fails. 96 """ 97 def _execute_child(self, *args, **kwargs): 98 raise PopenTestException("Forced Exception for Test") 99 100 101class ProcessTestCase(BaseTestCase): 102 103 def test_io_buffered_by_default(self): 104 p = subprocess.Popen(ZERO_RETURN_CMD, 105 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 106 stderr=subprocess.PIPE) 107 try: 108 self.assertIsInstance(p.stdin, io.BufferedIOBase) 109 self.assertIsInstance(p.stdout, io.BufferedIOBase) 110 self.assertIsInstance(p.stderr, io.BufferedIOBase) 111 finally: 112 p.stdin.close() 113 p.stdout.close() 114 p.stderr.close() 115 p.wait() 116 117 def test_io_unbuffered_works(self): 118 p = subprocess.Popen(ZERO_RETURN_CMD, 119 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 120 stderr=subprocess.PIPE, bufsize=0) 121 try: 122 self.assertIsInstance(p.stdin, io.RawIOBase) 123 self.assertIsInstance(p.stdout, io.RawIOBase) 124 self.assertIsInstance(p.stderr, io.RawIOBase) 125 finally: 126 p.stdin.close() 127 p.stdout.close() 128 p.stderr.close() 129 p.wait() 130 131 def test_call_seq(self): 132 # call() function with sequence argument 133 rc = subprocess.call([sys.executable, "-c", 134 "import sys; sys.exit(47)"]) 135 self.assertEqual(rc, 47) 136 137 def test_call_timeout(self): 138 # call() function with timeout argument; we want to test that the child 139 # process gets killed when the timeout expires. If the child isn't 140 # killed, this call will deadlock since subprocess.call waits for the 141 # child. 142 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 143 [sys.executable, "-c", "while True: pass"], 144 timeout=0.1) 145 146 def test_check_call_zero(self): 147 # check_call() function with zero return code 148 rc = subprocess.check_call(ZERO_RETURN_CMD) 149 self.assertEqual(rc, 0) 150 151 def test_check_call_nonzero(self): 152 # check_call() function with non-zero return code 153 with self.assertRaises(subprocess.CalledProcessError) as c: 154 subprocess.check_call([sys.executable, "-c", 155 "import sys; sys.exit(47)"]) 156 self.assertEqual(c.exception.returncode, 47) 157 158 def test_check_output(self): 159 # check_output() function with zero return code 160 output = subprocess.check_output( 161 [sys.executable, "-c", "print('BDFL')"]) 162 self.assertIn(b'BDFL', output) 163 164 def test_check_output_nonzero(self): 165 # check_call() function with non-zero return code 166 with self.assertRaises(subprocess.CalledProcessError) as c: 167 subprocess.check_output( 168 [sys.executable, "-c", "import sys; sys.exit(5)"]) 169 self.assertEqual(c.exception.returncode, 5) 170 171 def test_check_output_stderr(self): 172 # check_output() function stderr redirected to stdout 173 output = subprocess.check_output( 174 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 175 stderr=subprocess.STDOUT) 176 self.assertIn(b'BDFL', output) 177 178 def test_check_output_stdin_arg(self): 179 # check_output() can be called with stdin set to a file 180 tf = tempfile.TemporaryFile() 181 self.addCleanup(tf.close) 182 tf.write(b'pear') 183 tf.seek(0) 184 output = subprocess.check_output( 185 [sys.executable, "-c", 186 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 187 stdin=tf) 188 self.assertIn(b'PEAR', output) 189 190 def test_check_output_input_arg(self): 191 # check_output() can be called with input set to a string 192 output = subprocess.check_output( 193 [sys.executable, "-c", 194 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 195 input=b'pear') 196 self.assertIn(b'PEAR', output) 197 198 def test_check_output_input_none(self): 199 """input=None has a legacy meaning of input='' on check_output.""" 200 output = subprocess.check_output( 201 [sys.executable, "-c", 202 "import sys; print('XX' if sys.stdin.read() else '')"], 203 input=None) 204 self.assertNotIn(b'XX', output) 205 206 def test_check_output_input_none_text(self): 207 output = subprocess.check_output( 208 [sys.executable, "-c", 209 "import sys; print('XX' if sys.stdin.read() else '')"], 210 input=None, text=True) 211 self.assertNotIn('XX', output) 212 213 def test_check_output_input_none_universal_newlines(self): 214 output = subprocess.check_output( 215 [sys.executable, "-c", 216 "import sys; print('XX' if sys.stdin.read() else '')"], 217 input=None, universal_newlines=True) 218 self.assertNotIn('XX', output) 219 220 def test_check_output_stdout_arg(self): 221 # check_output() refuses to accept 'stdout' argument 222 with self.assertRaises(ValueError) as c: 223 output = subprocess.check_output( 224 [sys.executable, "-c", "print('will not be run')"], 225 stdout=sys.stdout) 226 self.fail("Expected ValueError when stdout arg supplied.") 227 self.assertIn('stdout', c.exception.args[0]) 228 229 def test_check_output_stdin_with_input_arg(self): 230 # check_output() refuses to accept 'stdin' with 'input' 231 tf = tempfile.TemporaryFile() 232 self.addCleanup(tf.close) 233 tf.write(b'pear') 234 tf.seek(0) 235 with self.assertRaises(ValueError) as c: 236 output = subprocess.check_output( 237 [sys.executable, "-c", "print('will not be run')"], 238 stdin=tf, input=b'hare') 239 self.fail("Expected ValueError when stdin and input args supplied.") 240 self.assertIn('stdin', c.exception.args[0]) 241 self.assertIn('input', c.exception.args[0]) 242 243 def test_check_output_timeout(self): 244 # check_output() function with timeout arg 245 with self.assertRaises(subprocess.TimeoutExpired) as c: 246 output = subprocess.check_output( 247 [sys.executable, "-c", 248 "import sys, time\n" 249 "sys.stdout.write('BDFL')\n" 250 "sys.stdout.flush()\n" 251 "time.sleep(3600)"], 252 # Some heavily loaded buildbots (sparc Debian 3.x) require 253 # this much time to start and print. 254 timeout=3) 255 self.fail("Expected TimeoutExpired.") 256 self.assertEqual(c.exception.output, b'BDFL') 257 258 def test_call_kwargs(self): 259 # call() function with keyword args 260 newenv = os.environ.copy() 261 newenv["FRUIT"] = "banana" 262 rc = subprocess.call([sys.executable, "-c", 263 'import sys, os;' 264 'sys.exit(os.getenv("FRUIT")=="banana")'], 265 env=newenv) 266 self.assertEqual(rc, 1) 267 268 def test_invalid_args(self): 269 # Popen() called with invalid arguments should raise TypeError 270 # but Popen.__del__ should not complain (issue #12085) 271 with support.captured_stderr() as s: 272 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 273 argcount = subprocess.Popen.__init__.__code__.co_argcount 274 too_many_args = [0] * (argcount + 1) 275 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 276 self.assertEqual(s.getvalue(), '') 277 278 def test_stdin_none(self): 279 # .stdin is None when not redirected 280 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 281 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 282 self.addCleanup(p.stdout.close) 283 self.addCleanup(p.stderr.close) 284 p.wait() 285 self.assertEqual(p.stdin, None) 286 287 def test_stdout_none(self): 288 # .stdout is None when not redirected, and the child's stdout will 289 # be inherited from the parent. In order to test this we run a 290 # subprocess in a subprocess: 291 # this_test 292 # \-- subprocess created by this test (parent) 293 # \-- subprocess created by the parent subprocess (child) 294 # The parent doesn't specify stdout, so the child will use the 295 # parent's stdout. This test checks that the message printed by the 296 # child goes to the parent stdout. The parent also checks that the 297 # child's stdout is None. See #11963. 298 code = ('import sys; from subprocess import Popen, PIPE;' 299 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 300 ' stdin=PIPE, stderr=PIPE);' 301 'p.wait(); assert p.stdout is None;') 302 p = subprocess.Popen([sys.executable, "-c", code], 303 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 304 self.addCleanup(p.stdout.close) 305 self.addCleanup(p.stderr.close) 306 out, err = p.communicate() 307 self.assertEqual(p.returncode, 0, err) 308 self.assertEqual(out.rstrip(), b'test_stdout_none') 309 310 def test_stderr_none(self): 311 # .stderr is None when not redirected 312 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 313 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 314 self.addCleanup(p.stdout.close) 315 self.addCleanup(p.stdin.close) 316 p.wait() 317 self.assertEqual(p.stderr, None) 318 319 def _assert_python(self, pre_args, **kwargs): 320 # We include sys.exit() to prevent the test runner from hanging 321 # whenever python is found. 322 args = pre_args + ["import sys; sys.exit(47)"] 323 p = subprocess.Popen(args, **kwargs) 324 p.wait() 325 self.assertEqual(47, p.returncode) 326 327 def test_executable(self): 328 # Check that the executable argument works. 329 # 330 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 331 # determine where its standard library is, so we need the directory 332 # of args[0] to be valid for the Popen() call to Python to succeed. 333 # See also issue #16170 and issue #7774. 334 doesnotexist = os.path.join(os.path.dirname(sys.executable), 335 "doesnotexist") 336 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 337 338 def test_bytes_executable(self): 339 doesnotexist = os.path.join(os.path.dirname(sys.executable), 340 "doesnotexist") 341 self._assert_python([doesnotexist, "-c"], 342 executable=os.fsencode(sys.executable)) 343 344 def test_pathlike_executable(self): 345 doesnotexist = os.path.join(os.path.dirname(sys.executable), 346 "doesnotexist") 347 self._assert_python([doesnotexist, "-c"], 348 executable=FakePath(sys.executable)) 349 350 def test_executable_takes_precedence(self): 351 # Check that the executable argument takes precedence over args[0]. 352 # 353 # Verify first that the call succeeds without the executable arg. 354 pre_args = [sys.executable, "-c"] 355 self._assert_python(pre_args) 356 self.assertRaises(NONEXISTING_ERRORS, 357 self._assert_python, pre_args, 358 executable=NONEXISTING_CMD[0]) 359 360 @unittest.skipIf(mswindows, "executable argument replaces shell") 361 def test_executable_replaces_shell(self): 362 # Check that the executable argument replaces the default shell 363 # when shell=True. 364 self._assert_python([], executable=sys.executable, shell=True) 365 366 @unittest.skipIf(mswindows, "executable argument replaces shell") 367 def test_bytes_executable_replaces_shell(self): 368 self._assert_python([], executable=os.fsencode(sys.executable), 369 shell=True) 370 371 @unittest.skipIf(mswindows, "executable argument replaces shell") 372 def test_pathlike_executable_replaces_shell(self): 373 self._assert_python([], executable=FakePath(sys.executable), 374 shell=True) 375 376 # For use in the test_cwd* tests below. 377 def _normalize_cwd(self, cwd): 378 # Normalize an expected cwd (for Tru64 support). 379 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 380 # strings. See bug #1063571. 381 with support.change_cwd(cwd): 382 return os.getcwd() 383 384 # For use in the test_cwd* tests below. 385 def _split_python_path(self): 386 # Return normalized (python_dir, python_base). 387 python_path = os.path.realpath(sys.executable) 388 return os.path.split(python_path) 389 390 # For use in the test_cwd* tests below. 391 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 392 # Invoke Python via Popen, and assert that (1) the call succeeds, 393 # and that (2) the current working directory of the child process 394 # matches *expected_cwd*. 395 p = subprocess.Popen([python_arg, "-c", 396 "import os, sys; " 397 "buf = sys.stdout.buffer; " 398 "buf.write(os.getcwd().encode()); " 399 "buf.flush(); " 400 "sys.exit(47)"], 401 stdout=subprocess.PIPE, 402 **kwargs) 403 self.addCleanup(p.stdout.close) 404 p.wait() 405 self.assertEqual(47, p.returncode) 406 normcase = os.path.normcase 407 self.assertEqual(normcase(expected_cwd), 408 normcase(p.stdout.read().decode())) 409 410 def test_cwd(self): 411 # Check that cwd changes the cwd for the child process. 412 temp_dir = tempfile.gettempdir() 413 temp_dir = self._normalize_cwd(temp_dir) 414 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 415 416 def test_cwd_with_bytes(self): 417 temp_dir = tempfile.gettempdir() 418 temp_dir = self._normalize_cwd(temp_dir) 419 self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir)) 420 421 def test_cwd_with_pathlike(self): 422 temp_dir = tempfile.gettempdir() 423 temp_dir = self._normalize_cwd(temp_dir) 424 self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir)) 425 426 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 427 def test_cwd_with_relative_arg(self): 428 # Check that Popen looks for args[0] relative to cwd if args[0] 429 # is relative. 430 python_dir, python_base = self._split_python_path() 431 rel_python = os.path.join(os.curdir, python_base) 432 with support.temp_cwd() as wrong_dir: 433 # Before calling with the correct cwd, confirm that the call fails 434 # without cwd and with the wrong cwd. 435 self.assertRaises(FileNotFoundError, subprocess.Popen, 436 [rel_python]) 437 self.assertRaises(FileNotFoundError, subprocess.Popen, 438 [rel_python], cwd=wrong_dir) 439 python_dir = self._normalize_cwd(python_dir) 440 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 441 442 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 443 def test_cwd_with_relative_executable(self): 444 # Check that Popen looks for executable relative to cwd if executable 445 # is relative (and that executable takes precedence over args[0]). 446 python_dir, python_base = self._split_python_path() 447 rel_python = os.path.join(os.curdir, python_base) 448 doesntexist = "somethingyoudonthave" 449 with support.temp_cwd() as wrong_dir: 450 # Before calling with the correct cwd, confirm that the call fails 451 # without cwd and with the wrong cwd. 452 self.assertRaises(FileNotFoundError, subprocess.Popen, 453 [doesntexist], executable=rel_python) 454 self.assertRaises(FileNotFoundError, subprocess.Popen, 455 [doesntexist], executable=rel_python, 456 cwd=wrong_dir) 457 python_dir = self._normalize_cwd(python_dir) 458 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 459 cwd=python_dir) 460 461 def test_cwd_with_absolute_arg(self): 462 # Check that Popen can find the executable when the cwd is wrong 463 # if args[0] is an absolute path. 464 python_dir, python_base = self._split_python_path() 465 abs_python = os.path.join(python_dir, python_base) 466 rel_python = os.path.join(os.curdir, python_base) 467 with support.temp_dir() as wrong_dir: 468 # Before calling with an absolute path, confirm that using a 469 # relative path fails. 470 self.assertRaises(FileNotFoundError, subprocess.Popen, 471 [rel_python], cwd=wrong_dir) 472 wrong_dir = self._normalize_cwd(wrong_dir) 473 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 474 475 @unittest.skipIf(sys.base_prefix != sys.prefix, 476 'Test is not venv-compatible') 477 def test_executable_with_cwd(self): 478 python_dir, python_base = self._split_python_path() 479 python_dir = self._normalize_cwd(python_dir) 480 self._assert_cwd(python_dir, "somethingyoudonthave", 481 executable=sys.executable, cwd=python_dir) 482 483 @unittest.skipIf(sys.base_prefix != sys.prefix, 484 'Test is not venv-compatible') 485 @unittest.skipIf(sysconfig.is_python_build(), 486 "need an installed Python. See #7774") 487 def test_executable_without_cwd(self): 488 # For a normal installation, it should work without 'cwd' 489 # argument. For test runs in the build directory, see #7774. 490 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 491 executable=sys.executable) 492 493 def test_stdin_pipe(self): 494 # stdin redirection 495 p = subprocess.Popen([sys.executable, "-c", 496 'import sys; sys.exit(sys.stdin.read() == "pear")'], 497 stdin=subprocess.PIPE) 498 p.stdin.write(b"pear") 499 p.stdin.close() 500 p.wait() 501 self.assertEqual(p.returncode, 1) 502 503 def test_stdin_filedes(self): 504 # stdin is set to open file descriptor 505 tf = tempfile.TemporaryFile() 506 self.addCleanup(tf.close) 507 d = tf.fileno() 508 os.write(d, b"pear") 509 os.lseek(d, 0, 0) 510 p = subprocess.Popen([sys.executable, "-c", 511 'import sys; sys.exit(sys.stdin.read() == "pear")'], 512 stdin=d) 513 p.wait() 514 self.assertEqual(p.returncode, 1) 515 516 def test_stdin_fileobj(self): 517 # stdin is set to open file object 518 tf = tempfile.TemporaryFile() 519 self.addCleanup(tf.close) 520 tf.write(b"pear") 521 tf.seek(0) 522 p = subprocess.Popen([sys.executable, "-c", 523 'import sys; sys.exit(sys.stdin.read() == "pear")'], 524 stdin=tf) 525 p.wait() 526 self.assertEqual(p.returncode, 1) 527 528 def test_stdout_pipe(self): 529 # stdout redirection 530 p = subprocess.Popen([sys.executable, "-c", 531 'import sys; sys.stdout.write("orange")'], 532 stdout=subprocess.PIPE) 533 with p: 534 self.assertEqual(p.stdout.read(), b"orange") 535 536 def test_stdout_filedes(self): 537 # stdout is set to open file descriptor 538 tf = tempfile.TemporaryFile() 539 self.addCleanup(tf.close) 540 d = tf.fileno() 541 p = subprocess.Popen([sys.executable, "-c", 542 'import sys; sys.stdout.write("orange")'], 543 stdout=d) 544 p.wait() 545 os.lseek(d, 0, 0) 546 self.assertEqual(os.read(d, 1024), b"orange") 547 548 def test_stdout_fileobj(self): 549 # stdout is set to open file object 550 tf = tempfile.TemporaryFile() 551 self.addCleanup(tf.close) 552 p = subprocess.Popen([sys.executable, "-c", 553 'import sys; sys.stdout.write("orange")'], 554 stdout=tf) 555 p.wait() 556 tf.seek(0) 557 self.assertEqual(tf.read(), b"orange") 558 559 def test_stderr_pipe(self): 560 # stderr redirection 561 p = subprocess.Popen([sys.executable, "-c", 562 'import sys; sys.stderr.write("strawberry")'], 563 stderr=subprocess.PIPE) 564 with p: 565 self.assertStderrEqual(p.stderr.read(), b"strawberry") 566 567 def test_stderr_filedes(self): 568 # stderr is set to open file descriptor 569 tf = tempfile.TemporaryFile() 570 self.addCleanup(tf.close) 571 d = tf.fileno() 572 p = subprocess.Popen([sys.executable, "-c", 573 'import sys; sys.stderr.write("strawberry")'], 574 stderr=d) 575 p.wait() 576 os.lseek(d, 0, 0) 577 self.assertStderrEqual(os.read(d, 1024), b"strawberry") 578 579 def test_stderr_fileobj(self): 580 # stderr is set to open file object 581 tf = tempfile.TemporaryFile() 582 self.addCleanup(tf.close) 583 p = subprocess.Popen([sys.executable, "-c", 584 'import sys; sys.stderr.write("strawberry")'], 585 stderr=tf) 586 p.wait() 587 tf.seek(0) 588 self.assertStderrEqual(tf.read(), b"strawberry") 589 590 def test_stderr_redirect_with_no_stdout_redirect(self): 591 # test stderr=STDOUT while stdout=None (not set) 592 593 # - grandchild prints to stderr 594 # - child redirects grandchild's stderr to its stdout 595 # - the parent should get grandchild's stderr in child's stdout 596 p = subprocess.Popen([sys.executable, "-c", 597 'import sys, subprocess;' 598 'rc = subprocess.call([sys.executable, "-c",' 599 ' "import sys;"' 600 ' "sys.stderr.write(\'42\')"],' 601 ' stderr=subprocess.STDOUT);' 602 'sys.exit(rc)'], 603 stdout=subprocess.PIPE, 604 stderr=subprocess.PIPE) 605 stdout, stderr = p.communicate() 606 #NOTE: stdout should get stderr from grandchild 607 self.assertStderrEqual(stdout, b'42') 608 self.assertStderrEqual(stderr, b'') # should be empty 609 self.assertEqual(p.returncode, 0) 610 611 def test_stdout_stderr_pipe(self): 612 # capture stdout and stderr to the same pipe 613 p = subprocess.Popen([sys.executable, "-c", 614 'import sys;' 615 'sys.stdout.write("apple");' 616 'sys.stdout.flush();' 617 'sys.stderr.write("orange")'], 618 stdout=subprocess.PIPE, 619 stderr=subprocess.STDOUT) 620 with p: 621 self.assertStderrEqual(p.stdout.read(), b"appleorange") 622 623 def test_stdout_stderr_file(self): 624 # capture stdout and stderr to the same open file 625 tf = tempfile.TemporaryFile() 626 self.addCleanup(tf.close) 627 p = subprocess.Popen([sys.executable, "-c", 628 'import sys;' 629 'sys.stdout.write("apple");' 630 'sys.stdout.flush();' 631 'sys.stderr.write("orange")'], 632 stdout=tf, 633 stderr=tf) 634 p.wait() 635 tf.seek(0) 636 self.assertStderrEqual(tf.read(), b"appleorange") 637 638 def test_stdout_filedes_of_stdout(self): 639 # stdout is set to 1 (#1531862). 640 # To avoid printing the text on stdout, we do something similar to 641 # test_stdout_none (see above). The parent subprocess calls the child 642 # subprocess passing stdout=1, and this test uses stdout=PIPE in 643 # order to capture and check the output of the parent. See #11963. 644 code = ('import sys, subprocess; ' 645 'rc = subprocess.call([sys.executable, "-c", ' 646 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 647 'b\'test with stdout=1\'))"], stdout=1); ' 648 'assert rc == 18') 649 p = subprocess.Popen([sys.executable, "-c", code], 650 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 651 self.addCleanup(p.stdout.close) 652 self.addCleanup(p.stderr.close) 653 out, err = p.communicate() 654 self.assertEqual(p.returncode, 0, err) 655 self.assertEqual(out.rstrip(), b'test with stdout=1') 656 657 def test_stdout_devnull(self): 658 p = subprocess.Popen([sys.executable, "-c", 659 'for i in range(10240):' 660 'print("x" * 1024)'], 661 stdout=subprocess.DEVNULL) 662 p.wait() 663 self.assertEqual(p.stdout, None) 664 665 def test_stderr_devnull(self): 666 p = subprocess.Popen([sys.executable, "-c", 667 'import sys\n' 668 'for i in range(10240):' 669 'sys.stderr.write("x" * 1024)'], 670 stderr=subprocess.DEVNULL) 671 p.wait() 672 self.assertEqual(p.stderr, None) 673 674 def test_stdin_devnull(self): 675 p = subprocess.Popen([sys.executable, "-c", 676 'import sys;' 677 'sys.stdin.read(1)'], 678 stdin=subprocess.DEVNULL) 679 p.wait() 680 self.assertEqual(p.stdin, None) 681 682 def test_env(self): 683 newenv = os.environ.copy() 684 newenv["FRUIT"] = "orange" 685 with subprocess.Popen([sys.executable, "-c", 686 'import sys,os;' 687 'sys.stdout.write(os.getenv("FRUIT"))'], 688 stdout=subprocess.PIPE, 689 env=newenv) as p: 690 stdout, stderr = p.communicate() 691 self.assertEqual(stdout, b"orange") 692 693 # Windows requires at least the SYSTEMROOT environment variable to start 694 # Python 695 @unittest.skipIf(sys.platform == 'win32', 696 'cannot test an empty env on Windows') 697 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1, 698 'The Python shared library cannot be loaded ' 699 'with an empty environment.') 700 def test_empty_env(self): 701 """Verify that env={} is as empty as possible.""" 702 703 def is_env_var_to_ignore(n): 704 """Determine if an environment variable is under our control.""" 705 # This excludes some __CF_* and VERSIONER_* keys MacOS insists 706 # on adding even when the environment in exec is empty. 707 # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. 708 return ('VERSIONER' in n or '__CF' in n or # MacOS 709 n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo 710 n == 'LC_CTYPE') # Locale coercion triggered 711 712 with subprocess.Popen([sys.executable, "-c", 713 'import os; print(list(os.environ.keys()))'], 714 stdout=subprocess.PIPE, env={}) as p: 715 stdout, stderr = p.communicate() 716 child_env_names = eval(stdout.strip()) 717 self.assertIsInstance(child_env_names, list) 718 child_env_names = [k for k in child_env_names 719 if not is_env_var_to_ignore(k)] 720 self.assertEqual(child_env_names, []) 721 722 def test_invalid_cmd(self): 723 # null character in the command name 724 cmd = sys.executable + '\0' 725 with self.assertRaises(ValueError): 726 subprocess.Popen([cmd, "-c", "pass"]) 727 728 # null character in the command argument 729 with self.assertRaises(ValueError): 730 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 731 732 def test_invalid_env(self): 733 # null character in the environment variable name 734 newenv = os.environ.copy() 735 newenv["FRUIT\0VEGETABLE"] = "cabbage" 736 with self.assertRaises(ValueError): 737 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 738 739 # null character in the environment variable value 740 newenv = os.environ.copy() 741 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 742 with self.assertRaises(ValueError): 743 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 744 745 # equal character in the environment variable name 746 newenv = os.environ.copy() 747 newenv["FRUIT=ORANGE"] = "lemon" 748 with self.assertRaises(ValueError): 749 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 750 751 # equal character in the environment variable value 752 newenv = os.environ.copy() 753 newenv["FRUIT"] = "orange=lemon" 754 with subprocess.Popen([sys.executable, "-c", 755 'import sys, os;' 756 'sys.stdout.write(os.getenv("FRUIT"))'], 757 stdout=subprocess.PIPE, 758 env=newenv) as p: 759 stdout, stderr = p.communicate() 760 self.assertEqual(stdout, b"orange=lemon") 761 762 def test_communicate_stdin(self): 763 p = subprocess.Popen([sys.executable, "-c", 764 'import sys;' 765 'sys.exit(sys.stdin.read() == "pear")'], 766 stdin=subprocess.PIPE) 767 p.communicate(b"pear") 768 self.assertEqual(p.returncode, 1) 769 770 def test_communicate_stdout(self): 771 p = subprocess.Popen([sys.executable, "-c", 772 'import sys; sys.stdout.write("pineapple")'], 773 stdout=subprocess.PIPE) 774 (stdout, stderr) = p.communicate() 775 self.assertEqual(stdout, b"pineapple") 776 self.assertEqual(stderr, None) 777 778 def test_communicate_stderr(self): 779 p = subprocess.Popen([sys.executable, "-c", 780 'import sys; sys.stderr.write("pineapple")'], 781 stderr=subprocess.PIPE) 782 (stdout, stderr) = p.communicate() 783 self.assertEqual(stdout, None) 784 self.assertStderrEqual(stderr, b"pineapple") 785 786 def test_communicate(self): 787 p = subprocess.Popen([sys.executable, "-c", 788 'import sys,os;' 789 'sys.stderr.write("pineapple");' 790 'sys.stdout.write(sys.stdin.read())'], 791 stdin=subprocess.PIPE, 792 stdout=subprocess.PIPE, 793 stderr=subprocess.PIPE) 794 self.addCleanup(p.stdout.close) 795 self.addCleanup(p.stderr.close) 796 self.addCleanup(p.stdin.close) 797 (stdout, stderr) = p.communicate(b"banana") 798 self.assertEqual(stdout, b"banana") 799 self.assertStderrEqual(stderr, b"pineapple") 800 801 def test_communicate_timeout(self): 802 p = subprocess.Popen([sys.executable, "-c", 803 'import sys,os,time;' 804 'sys.stderr.write("pineapple\\n");' 805 'time.sleep(1);' 806 'sys.stderr.write("pear\\n");' 807 'sys.stdout.write(sys.stdin.read())'], 808 universal_newlines=True, 809 stdin=subprocess.PIPE, 810 stdout=subprocess.PIPE, 811 stderr=subprocess.PIPE) 812 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 813 timeout=0.3) 814 # Make sure we can keep waiting for it, and that we get the whole output 815 # after it completes. 816 (stdout, stderr) = p.communicate() 817 self.assertEqual(stdout, "banana") 818 self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") 819 820 def test_communicate_timeout_large_output(self): 821 # Test an expiring timeout while the child is outputting lots of data. 822 p = subprocess.Popen([sys.executable, "-c", 823 'import sys,os,time;' 824 'sys.stdout.write("a" * (64 * 1024));' 825 'time.sleep(0.2);' 826 'sys.stdout.write("a" * (64 * 1024));' 827 'time.sleep(0.2);' 828 'sys.stdout.write("a" * (64 * 1024));' 829 'time.sleep(0.2);' 830 'sys.stdout.write("a" * (64 * 1024));'], 831 stdout=subprocess.PIPE) 832 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 833 (stdout, _) = p.communicate() 834 self.assertEqual(len(stdout), 4 * 64 * 1024) 835 836 # Test for the fd leak reported in http://bugs.python.org/issue2791. 837 def test_communicate_pipe_fd_leak(self): 838 for stdin_pipe in (False, True): 839 for stdout_pipe in (False, True): 840 for stderr_pipe in (False, True): 841 options = {} 842 if stdin_pipe: 843 options['stdin'] = subprocess.PIPE 844 if stdout_pipe: 845 options['stdout'] = subprocess.PIPE 846 if stderr_pipe: 847 options['stderr'] = subprocess.PIPE 848 if not options: 849 continue 850 p = subprocess.Popen(ZERO_RETURN_CMD, **options) 851 p.communicate() 852 if p.stdin is not None: 853 self.assertTrue(p.stdin.closed) 854 if p.stdout is not None: 855 self.assertTrue(p.stdout.closed) 856 if p.stderr is not None: 857 self.assertTrue(p.stderr.closed) 858 859 def test_communicate_returns(self): 860 # communicate() should return None if no redirection is active 861 p = subprocess.Popen([sys.executable, "-c", 862 "import sys; sys.exit(47)"]) 863 (stdout, stderr) = p.communicate() 864 self.assertEqual(stdout, None) 865 self.assertEqual(stderr, None) 866 867 def test_communicate_pipe_buf(self): 868 # communicate() with writes larger than pipe_buf 869 # This test will probably deadlock rather than fail, if 870 # communicate() does not work properly. 871 x, y = os.pipe() 872 os.close(x) 873 os.close(y) 874 p = subprocess.Popen([sys.executable, "-c", 875 'import sys,os;' 876 'sys.stdout.write(sys.stdin.read(47));' 877 'sys.stderr.write("x" * %d);' 878 'sys.stdout.write(sys.stdin.read())' % 879 support.PIPE_MAX_SIZE], 880 stdin=subprocess.PIPE, 881 stdout=subprocess.PIPE, 882 stderr=subprocess.PIPE) 883 self.addCleanup(p.stdout.close) 884 self.addCleanup(p.stderr.close) 885 self.addCleanup(p.stdin.close) 886 string_to_write = b"a" * support.PIPE_MAX_SIZE 887 (stdout, stderr) = p.communicate(string_to_write) 888 self.assertEqual(stdout, string_to_write) 889 890 def test_writes_before_communicate(self): 891 # stdin.write before communicate() 892 p = subprocess.Popen([sys.executable, "-c", 893 'import sys,os;' 894 'sys.stdout.write(sys.stdin.read())'], 895 stdin=subprocess.PIPE, 896 stdout=subprocess.PIPE, 897 stderr=subprocess.PIPE) 898 self.addCleanup(p.stdout.close) 899 self.addCleanup(p.stderr.close) 900 self.addCleanup(p.stdin.close) 901 p.stdin.write(b"banana") 902 (stdout, stderr) = p.communicate(b"split") 903 self.assertEqual(stdout, b"bananasplit") 904 self.assertStderrEqual(stderr, b"") 905 906 def test_universal_newlines_and_text(self): 907 args = [ 908 sys.executable, "-c", 909 'import sys,os;' + SETBINARY + 910 'buf = sys.stdout.buffer;' 911 'buf.write(sys.stdin.readline().encode());' 912 'buf.flush();' 913 'buf.write(b"line2\\n");' 914 'buf.flush();' 915 'buf.write(sys.stdin.read().encode());' 916 'buf.flush();' 917 'buf.write(b"line4\\n");' 918 'buf.flush();' 919 'buf.write(b"line5\\r\\n");' 920 'buf.flush();' 921 'buf.write(b"line6\\r");' 922 'buf.flush();' 923 'buf.write(b"\\nline7");' 924 'buf.flush();' 925 'buf.write(b"\\nline8");'] 926 927 for extra_kwarg in ('universal_newlines', 'text'): 928 p = subprocess.Popen(args, **{'stdin': subprocess.PIPE, 929 'stdout': subprocess.PIPE, 930 extra_kwarg: True}) 931 with p: 932 p.stdin.write("line1\n") 933 p.stdin.flush() 934 self.assertEqual(p.stdout.readline(), "line1\n") 935 p.stdin.write("line3\n") 936 p.stdin.close() 937 self.addCleanup(p.stdout.close) 938 self.assertEqual(p.stdout.readline(), 939 "line2\n") 940 self.assertEqual(p.stdout.read(6), 941 "line3\n") 942 self.assertEqual(p.stdout.read(), 943 "line4\nline5\nline6\nline7\nline8") 944 945 def test_universal_newlines_communicate(self): 946 # universal newlines through communicate() 947 p = subprocess.Popen([sys.executable, "-c", 948 'import sys,os;' + SETBINARY + 949 'buf = sys.stdout.buffer;' 950 'buf.write(b"line2\\n");' 951 'buf.flush();' 952 'buf.write(b"line4\\n");' 953 'buf.flush();' 954 'buf.write(b"line5\\r\\n");' 955 'buf.flush();' 956 'buf.write(b"line6\\r");' 957 'buf.flush();' 958 'buf.write(b"\\nline7");' 959 'buf.flush();' 960 'buf.write(b"\\nline8");'], 961 stderr=subprocess.PIPE, 962 stdout=subprocess.PIPE, 963 universal_newlines=1) 964 self.addCleanup(p.stdout.close) 965 self.addCleanup(p.stderr.close) 966 (stdout, stderr) = p.communicate() 967 self.assertEqual(stdout, 968 "line2\nline4\nline5\nline6\nline7\nline8") 969 970 def test_universal_newlines_communicate_stdin(self): 971 # universal newlines through communicate(), with only stdin 972 p = subprocess.Popen([sys.executable, "-c", 973 'import sys,os;' + SETBINARY + textwrap.dedent(''' 974 s = sys.stdin.readline() 975 assert s == "line1\\n", repr(s) 976 s = sys.stdin.read() 977 assert s == "line3\\n", repr(s) 978 ''')], 979 stdin=subprocess.PIPE, 980 universal_newlines=1) 981 (stdout, stderr) = p.communicate("line1\nline3\n") 982 self.assertEqual(p.returncode, 0) 983 984 def test_universal_newlines_communicate_input_none(self): 985 # Test communicate(input=None) with universal newlines. 986 # 987 # We set stdout to PIPE because, as of this writing, a different 988 # code path is tested when the number of pipes is zero or one. 989 p = subprocess.Popen(ZERO_RETURN_CMD, 990 stdin=subprocess.PIPE, 991 stdout=subprocess.PIPE, 992 universal_newlines=True) 993 p.communicate() 994 self.assertEqual(p.returncode, 0) 995 996 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 997 # universal newlines through communicate(), with stdin, stdout, stderr 998 p = subprocess.Popen([sys.executable, "-c", 999 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1000 s = sys.stdin.buffer.readline() 1001 sys.stdout.buffer.write(s) 1002 sys.stdout.buffer.write(b"line2\\r") 1003 sys.stderr.buffer.write(b"eline2\\n") 1004 s = sys.stdin.buffer.read() 1005 sys.stdout.buffer.write(s) 1006 sys.stdout.buffer.write(b"line4\\n") 1007 sys.stdout.buffer.write(b"line5\\r\\n") 1008 sys.stderr.buffer.write(b"eline6\\r") 1009 sys.stderr.buffer.write(b"eline7\\r\\nz") 1010 ''')], 1011 stdin=subprocess.PIPE, 1012 stderr=subprocess.PIPE, 1013 stdout=subprocess.PIPE, 1014 universal_newlines=True) 1015 self.addCleanup(p.stdout.close) 1016 self.addCleanup(p.stderr.close) 1017 (stdout, stderr) = p.communicate("line1\nline3\n") 1018 self.assertEqual(p.returncode, 0) 1019 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 1020 # Python debug build push something like "[42442 refs]\n" 1021 # to stderr at exit of subprocess. 1022 # Don't use assertStderrEqual because it strips CR and LF from output. 1023 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 1024 1025 def test_universal_newlines_communicate_encodings(self): 1026 # Check that universal newlines mode works for various encodings, 1027 # in particular for encodings in the UTF-16 and UTF-32 families. 1028 # See issue #15595. 1029 # 1030 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 1031 # without, and UTF-16 and UTF-32. 1032 for encoding in ['utf-16', 'utf-32-be']: 1033 code = ("import sys; " 1034 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 1035 encoding) 1036 args = [sys.executable, '-c', code] 1037 # We set stdin to be non-None because, as of this writing, 1038 # a different code path is used when the number of pipes is 1039 # zero or one. 1040 popen = subprocess.Popen(args, 1041 stdin=subprocess.PIPE, 1042 stdout=subprocess.PIPE, 1043 encoding=encoding) 1044 stdout, stderr = popen.communicate(input='') 1045 self.assertEqual(stdout, '1\n2\n3\n4') 1046 1047 def test_communicate_errors(self): 1048 for errors, expected in [ 1049 ('ignore', ''), 1050 ('replace', '\ufffd\ufffd'), 1051 ('surrogateescape', '\udc80\udc80'), 1052 ('backslashreplace', '\\x80\\x80'), 1053 ]: 1054 code = ("import sys; " 1055 r"sys.stdout.buffer.write(b'[\x80\x80]')") 1056 args = [sys.executable, '-c', code] 1057 # We set stdin to be non-None because, as of this writing, 1058 # a different code path is used when the number of pipes is 1059 # zero or one. 1060 popen = subprocess.Popen(args, 1061 stdin=subprocess.PIPE, 1062 stdout=subprocess.PIPE, 1063 encoding='utf-8', 1064 errors=errors) 1065 stdout, stderr = popen.communicate(input='') 1066 self.assertEqual(stdout, '[{}]'.format(expected)) 1067 1068 def test_no_leaking(self): 1069 # Make sure we leak no resources 1070 if not mswindows: 1071 max_handles = 1026 # too much for most UNIX systems 1072 else: 1073 max_handles = 2050 # too much for (at least some) Windows setups 1074 handles = [] 1075 tmpdir = tempfile.mkdtemp() 1076 try: 1077 for i in range(max_handles): 1078 try: 1079 tmpfile = os.path.join(tmpdir, support.TESTFN) 1080 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 1081 except OSError as e: 1082 if e.errno != errno.EMFILE: 1083 raise 1084 break 1085 else: 1086 self.skipTest("failed to reach the file descriptor limit " 1087 "(tried %d)" % max_handles) 1088 # Close a couple of them (should be enough for a subprocess) 1089 for i in range(10): 1090 os.close(handles.pop()) 1091 # Loop creating some subprocesses. If one of them leaks some fds, 1092 # the next loop iteration will fail by reaching the max fd limit. 1093 for i in range(15): 1094 p = subprocess.Popen([sys.executable, "-c", 1095 "import sys;" 1096 "sys.stdout.write(sys.stdin.read())"], 1097 stdin=subprocess.PIPE, 1098 stdout=subprocess.PIPE, 1099 stderr=subprocess.PIPE) 1100 data = p.communicate(b"lime")[0] 1101 self.assertEqual(data, b"lime") 1102 finally: 1103 for h in handles: 1104 os.close(h) 1105 shutil.rmtree(tmpdir) 1106 1107 def test_list2cmdline(self): 1108 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 1109 '"a b c" d e') 1110 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 1111 'ab\\"c \\ d') 1112 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 1113 'ab\\"c " \\\\" d') 1114 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 1115 'a\\\\\\b "de fg" h') 1116 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 1117 'a\\\\\\"b c d') 1118 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1119 '"a\\\\b c" d e') 1120 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1121 '"a\\\\b\\ c" d e') 1122 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1123 'ab ""') 1124 1125 def test_poll(self): 1126 p = subprocess.Popen([sys.executable, "-c", 1127 "import os; os.read(0, 1)"], 1128 stdin=subprocess.PIPE) 1129 self.addCleanup(p.stdin.close) 1130 self.assertIsNone(p.poll()) 1131 os.write(p.stdin.fileno(), b'A') 1132 p.wait() 1133 # Subsequent invocations should just return the returncode 1134 self.assertEqual(p.poll(), 0) 1135 1136 def test_wait(self): 1137 p = subprocess.Popen(ZERO_RETURN_CMD) 1138 self.assertEqual(p.wait(), 0) 1139 # Subsequent invocations should just return the returncode 1140 self.assertEqual(p.wait(), 0) 1141 1142 def test_wait_timeout(self): 1143 p = subprocess.Popen([sys.executable, 1144 "-c", "import time; time.sleep(0.3)"]) 1145 with self.assertRaises(subprocess.TimeoutExpired) as c: 1146 p.wait(timeout=0.0001) 1147 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1148 # Some heavily loaded buildbots (sparc Debian 3.x) require this much 1149 # time to start. 1150 self.assertEqual(p.wait(timeout=3), 0) 1151 1152 def test_invalid_bufsize(self): 1153 # an invalid type of the bufsize argument should raise 1154 # TypeError. 1155 with self.assertRaises(TypeError): 1156 subprocess.Popen(ZERO_RETURN_CMD, "orange") 1157 1158 def test_bufsize_is_none(self): 1159 # bufsize=None should be the same as bufsize=0. 1160 p = subprocess.Popen(ZERO_RETURN_CMD, None) 1161 self.assertEqual(p.wait(), 0) 1162 # Again with keyword arg 1163 p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None) 1164 self.assertEqual(p.wait(), 0) 1165 1166 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1167 # subprocess may deadlock with bufsize=1, see issue #21332 1168 with subprocess.Popen([sys.executable, "-c", "import sys;" 1169 "sys.stdout.write(sys.stdin.readline());" 1170 "sys.stdout.flush()"], 1171 stdin=subprocess.PIPE, 1172 stdout=subprocess.PIPE, 1173 stderr=subprocess.DEVNULL, 1174 bufsize=1, 1175 universal_newlines=universal_newlines) as p: 1176 p.stdin.write(line) # expect that it flushes the line in text mode 1177 os.close(p.stdin.fileno()) # close it without flushing the buffer 1178 read_line = p.stdout.readline() 1179 with support.SuppressCrashReport(): 1180 try: 1181 p.stdin.close() 1182 except OSError: 1183 pass 1184 p.stdin = None 1185 self.assertEqual(p.returncode, 0) 1186 self.assertEqual(read_line, expected) 1187 1188 def test_bufsize_equal_one_text_mode(self): 1189 # line is flushed in text mode with bufsize=1. 1190 # we should get the full line in return 1191 line = "line\n" 1192 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1193 1194 def test_bufsize_equal_one_binary_mode(self): 1195 # line is not flushed in binary mode with bufsize=1. 1196 # we should get empty response 1197 line = b'line' + os.linesep.encode() # assume ascii-based locale 1198 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'): 1199 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1200 1201 def test_leaking_fds_on_error(self): 1202 # see bug #5179: Popen leaks file descriptors to PIPEs if 1203 # the child fails to execute; this will eventually exhaust 1204 # the maximum number of open fds. 1024 seems a very common 1205 # value for that limit, but Windows has 2048, so we loop 1206 # 1024 times (each call leaked two fds). 1207 for i in range(1024): 1208 with self.assertRaises(NONEXISTING_ERRORS): 1209 subprocess.Popen(NONEXISTING_CMD, 1210 stdout=subprocess.PIPE, 1211 stderr=subprocess.PIPE) 1212 1213 def test_nonexisting_with_pipes(self): 1214 # bpo-30121: Popen with pipes must close properly pipes on error. 1215 # Previously, os.close() was called with a Windows handle which is not 1216 # a valid file descriptor. 1217 # 1218 # Run the test in a subprocess to control how the CRT reports errors 1219 # and to get stderr content. 1220 try: 1221 import msvcrt 1222 msvcrt.CrtSetReportMode 1223 except (AttributeError, ImportError): 1224 self.skipTest("need msvcrt.CrtSetReportMode") 1225 1226 code = textwrap.dedent(f""" 1227 import msvcrt 1228 import subprocess 1229 1230 cmd = {NONEXISTING_CMD!r} 1231 1232 for report_type in [msvcrt.CRT_WARN, 1233 msvcrt.CRT_ERROR, 1234 msvcrt.CRT_ASSERT]: 1235 msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE) 1236 msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR) 1237 1238 try: 1239 subprocess.Popen(cmd, 1240 stdout=subprocess.PIPE, 1241 stderr=subprocess.PIPE) 1242 except OSError: 1243 pass 1244 """) 1245 cmd = [sys.executable, "-c", code] 1246 proc = subprocess.Popen(cmd, 1247 stderr=subprocess.PIPE, 1248 universal_newlines=True) 1249 with proc: 1250 stderr = proc.communicate()[1] 1251 self.assertEqual(stderr, "") 1252 self.assertEqual(proc.returncode, 0) 1253 1254 def test_double_close_on_error(self): 1255 # Issue #18851 1256 fds = [] 1257 def open_fds(): 1258 for i in range(20): 1259 fds.extend(os.pipe()) 1260 time.sleep(0.001) 1261 t = threading.Thread(target=open_fds) 1262 t.start() 1263 try: 1264 with self.assertRaises(EnvironmentError): 1265 subprocess.Popen(NONEXISTING_CMD, 1266 stdin=subprocess.PIPE, 1267 stdout=subprocess.PIPE, 1268 stderr=subprocess.PIPE) 1269 finally: 1270 t.join() 1271 exc = None 1272 for fd in fds: 1273 # If a double close occurred, some of those fds will 1274 # already have been closed by mistake, and os.close() 1275 # here will raise. 1276 try: 1277 os.close(fd) 1278 except OSError as e: 1279 exc = e 1280 if exc is not None: 1281 raise exc 1282 1283 def test_threadsafe_wait(self): 1284 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1285 proc = subprocess.Popen([sys.executable, '-c', 1286 'import time; time.sleep(12)']) 1287 self.assertEqual(proc.returncode, None) 1288 results = [] 1289 1290 def kill_proc_timer_thread(): 1291 results.append(('thread-start-poll-result', proc.poll())) 1292 # terminate it from the thread and wait for the result. 1293 proc.kill() 1294 proc.wait() 1295 results.append(('thread-after-kill-and-wait', proc.returncode)) 1296 # this wait should be a no-op given the above. 1297 proc.wait() 1298 results.append(('thread-after-second-wait', proc.returncode)) 1299 1300 # This is a timing sensitive test, the failure mode is 1301 # triggered when both the main thread and this thread are in 1302 # the wait() call at once. The delay here is to allow the 1303 # main thread to most likely be blocked in its wait() call. 1304 t = threading.Timer(0.2, kill_proc_timer_thread) 1305 t.start() 1306 1307 if mswindows: 1308 expected_errorcode = 1 1309 else: 1310 # Should be -9 because of the proc.kill() from the thread. 1311 expected_errorcode = -9 1312 1313 # Wait for the process to finish; the thread should kill it 1314 # long before it finishes on its own. Supplying a timeout 1315 # triggers a different code path for better coverage. 1316 proc.wait(timeout=20) 1317 self.assertEqual(proc.returncode, expected_errorcode, 1318 msg="unexpected result in wait from main thread") 1319 1320 # This should be a no-op with no change in returncode. 1321 proc.wait() 1322 self.assertEqual(proc.returncode, expected_errorcode, 1323 msg="unexpected result in second main wait.") 1324 1325 t.join() 1326 # Ensure that all of the thread results are as expected. 1327 # When a race condition occurs in wait(), the returncode could 1328 # be set by the wrong thread that doesn't actually have it 1329 # leading to an incorrect value. 1330 self.assertEqual([('thread-start-poll-result', None), 1331 ('thread-after-kill-and-wait', expected_errorcode), 1332 ('thread-after-second-wait', expected_errorcode)], 1333 results) 1334 1335 def test_issue8780(self): 1336 # Ensure that stdout is inherited from the parent 1337 # if stdout=PIPE is not used 1338 code = ';'.join(( 1339 'import subprocess, sys', 1340 'retcode = subprocess.call(' 1341 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1342 'assert retcode == 0')) 1343 output = subprocess.check_output([sys.executable, '-c', code]) 1344 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1345 1346 def test_handles_closed_on_exception(self): 1347 # If CreateProcess exits with an error, ensure the 1348 # duplicate output handles are released 1349 ifhandle, ifname = tempfile.mkstemp() 1350 ofhandle, ofname = tempfile.mkstemp() 1351 efhandle, efname = tempfile.mkstemp() 1352 try: 1353 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1354 stderr=efhandle) 1355 except OSError: 1356 os.close(ifhandle) 1357 os.remove(ifname) 1358 os.close(ofhandle) 1359 os.remove(ofname) 1360 os.close(efhandle) 1361 os.remove(efname) 1362 self.assertFalse(os.path.exists(ifname)) 1363 self.assertFalse(os.path.exists(ofname)) 1364 self.assertFalse(os.path.exists(efname)) 1365 1366 def test_communicate_epipe(self): 1367 # Issue 10963: communicate() should hide EPIPE 1368 p = subprocess.Popen(ZERO_RETURN_CMD, 1369 stdin=subprocess.PIPE, 1370 stdout=subprocess.PIPE, 1371 stderr=subprocess.PIPE) 1372 self.addCleanup(p.stdout.close) 1373 self.addCleanup(p.stderr.close) 1374 self.addCleanup(p.stdin.close) 1375 p.communicate(b"x" * 2**20) 1376 1377 def test_communicate_epipe_only_stdin(self): 1378 # Issue 10963: communicate() should hide EPIPE 1379 p = subprocess.Popen(ZERO_RETURN_CMD, 1380 stdin=subprocess.PIPE) 1381 self.addCleanup(p.stdin.close) 1382 p.wait() 1383 p.communicate(b"x" * 2**20) 1384 1385 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1386 "Requires signal.SIGUSR1") 1387 @unittest.skipUnless(hasattr(os, 'kill'), 1388 "Requires os.kill") 1389 @unittest.skipUnless(hasattr(os, 'getppid'), 1390 "Requires os.getppid") 1391 def test_communicate_eintr(self): 1392 # Issue #12493: communicate() should handle EINTR 1393 def handler(signum, frame): 1394 pass 1395 old_handler = signal.signal(signal.SIGUSR1, handler) 1396 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1397 1398 args = [sys.executable, "-c", 1399 'import os, signal;' 1400 'os.kill(os.getppid(), signal.SIGUSR1)'] 1401 for stream in ('stdout', 'stderr'): 1402 kw = {stream: subprocess.PIPE} 1403 with subprocess.Popen(args, **kw) as process: 1404 # communicate() will be interrupted by SIGUSR1 1405 process.communicate() 1406 1407 1408 # This test is Linux-ish specific for simplicity to at least have 1409 # some coverage. It is not a platform specific bug. 1410 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1411 "Linux specific") 1412 def test_failed_child_execute_fd_leak(self): 1413 """Test for the fork() failure fd leak reported in issue16327.""" 1414 fd_directory = '/proc/%d/fd' % os.getpid() 1415 fds_before_popen = os.listdir(fd_directory) 1416 with self.assertRaises(PopenTestException): 1417 PopenExecuteChildRaises( 1418 ZERO_RETURN_CMD, stdin=subprocess.PIPE, 1419 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1420 1421 # NOTE: This test doesn't verify that the real _execute_child 1422 # does not close the file descriptors itself on the way out 1423 # during an exception. Code inspection has confirmed that. 1424 1425 fds_after_exception = os.listdir(fd_directory) 1426 self.assertEqual(fds_before_popen, fds_after_exception) 1427 1428 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1429 def test_file_not_found_includes_filename(self): 1430 with self.assertRaises(FileNotFoundError) as c: 1431 subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) 1432 self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') 1433 1434 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1435 def test_file_not_found_with_bad_cwd(self): 1436 with self.assertRaises(FileNotFoundError) as c: 1437 subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory') 1438 self.assertEqual(c.exception.filename, '/some/nonexistent/directory') 1439 1440 1441class RunFuncTestCase(BaseTestCase): 1442 def run_python(self, code, **kwargs): 1443 """Run Python code in a subprocess using subprocess.run""" 1444 argv = [sys.executable, "-c", code] 1445 return subprocess.run(argv, **kwargs) 1446 1447 def test_returncode(self): 1448 # call() function with sequence argument 1449 cp = self.run_python("import sys; sys.exit(47)") 1450 self.assertEqual(cp.returncode, 47) 1451 with self.assertRaises(subprocess.CalledProcessError): 1452 cp.check_returncode() 1453 1454 def test_check(self): 1455 with self.assertRaises(subprocess.CalledProcessError) as c: 1456 self.run_python("import sys; sys.exit(47)", check=True) 1457 self.assertEqual(c.exception.returncode, 47) 1458 1459 def test_check_zero(self): 1460 # check_returncode shouldn't raise when returncode is zero 1461 cp = subprocess.run(ZERO_RETURN_CMD, check=True) 1462 self.assertEqual(cp.returncode, 0) 1463 1464 def test_timeout(self): 1465 # run() function with timeout argument; we want to test that the child 1466 # process gets killed when the timeout expires. If the child isn't 1467 # killed, this call will deadlock since subprocess.run waits for the 1468 # child. 1469 with self.assertRaises(subprocess.TimeoutExpired): 1470 self.run_python("while True: pass", timeout=0.0001) 1471 1472 def test_capture_stdout(self): 1473 # capture stdout with zero return code 1474 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1475 self.assertIn(b'BDFL', cp.stdout) 1476 1477 def test_capture_stderr(self): 1478 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1479 stderr=subprocess.PIPE) 1480 self.assertIn(b'BDFL', cp.stderr) 1481 1482 def test_check_output_stdin_arg(self): 1483 # run() can be called with stdin set to a file 1484 tf = tempfile.TemporaryFile() 1485 self.addCleanup(tf.close) 1486 tf.write(b'pear') 1487 tf.seek(0) 1488 cp = self.run_python( 1489 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1490 stdin=tf, stdout=subprocess.PIPE) 1491 self.assertIn(b'PEAR', cp.stdout) 1492 1493 def test_check_output_input_arg(self): 1494 # check_output() can be called with input set to a string 1495 cp = self.run_python( 1496 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1497 input=b'pear', stdout=subprocess.PIPE) 1498 self.assertIn(b'PEAR', cp.stdout) 1499 1500 def test_check_output_stdin_with_input_arg(self): 1501 # run() refuses to accept 'stdin' with 'input' 1502 tf = tempfile.TemporaryFile() 1503 self.addCleanup(tf.close) 1504 tf.write(b'pear') 1505 tf.seek(0) 1506 with self.assertRaises(ValueError, 1507 msg="Expected ValueError when stdin and input args supplied.") as c: 1508 output = self.run_python("print('will not be run')", 1509 stdin=tf, input=b'hare') 1510 self.assertIn('stdin', c.exception.args[0]) 1511 self.assertIn('input', c.exception.args[0]) 1512 1513 def test_check_output_timeout(self): 1514 with self.assertRaises(subprocess.TimeoutExpired) as c: 1515 cp = self.run_python(( 1516 "import sys, time\n" 1517 "sys.stdout.write('BDFL')\n" 1518 "sys.stdout.flush()\n" 1519 "time.sleep(3600)"), 1520 # Some heavily loaded buildbots (sparc Debian 3.x) require 1521 # this much time to start and print. 1522 timeout=3, stdout=subprocess.PIPE) 1523 self.assertEqual(c.exception.output, b'BDFL') 1524 # output is aliased to stdout 1525 self.assertEqual(c.exception.stdout, b'BDFL') 1526 1527 def test_run_kwargs(self): 1528 newenv = os.environ.copy() 1529 newenv["FRUIT"] = "banana" 1530 cp = self.run_python(('import sys, os;' 1531 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1532 env=newenv) 1533 self.assertEqual(cp.returncode, 33) 1534 1535 def test_run_with_pathlike_path(self): 1536 # bpo-31961: test run(pathlike_object) 1537 # the name of a command that can be run without 1538 # any argumenets that exit fast 1539 prog = 'tree.com' if mswindows else 'ls' 1540 path = shutil.which(prog) 1541 if path is None: 1542 self.skipTest(f'{prog} required for this test') 1543 path = FakePath(path) 1544 res = subprocess.run(path, stdout=subprocess.DEVNULL) 1545 self.assertEqual(res.returncode, 0) 1546 with self.assertRaises(TypeError): 1547 subprocess.run(path, stdout=subprocess.DEVNULL, shell=True) 1548 1549 def test_run_with_bytes_path_and_arguments(self): 1550 # bpo-31961: test run([bytes_object, b'additional arguments']) 1551 path = os.fsencode(sys.executable) 1552 args = [path, '-c', b'import sys; sys.exit(57)'] 1553 res = subprocess.run(args) 1554 self.assertEqual(res.returncode, 57) 1555 1556 def test_run_with_pathlike_path_and_arguments(self): 1557 # bpo-31961: test run([pathlike_object, 'additional arguments']) 1558 path = FakePath(sys.executable) 1559 args = [path, '-c', 'import sys; sys.exit(57)'] 1560 res = subprocess.run(args) 1561 self.assertEqual(res.returncode, 57) 1562 1563 def test_capture_output(self): 1564 cp = self.run_python(("import sys;" 1565 "sys.stdout.write('BDFL'); " 1566 "sys.stderr.write('FLUFL')"), 1567 capture_output=True) 1568 self.assertIn(b'BDFL', cp.stdout) 1569 self.assertIn(b'FLUFL', cp.stderr) 1570 1571 def test_stdout_with_capture_output_arg(self): 1572 # run() refuses to accept 'stdout' with 'capture_output' 1573 tf = tempfile.TemporaryFile() 1574 self.addCleanup(tf.close) 1575 with self.assertRaises(ValueError, 1576 msg=("Expected ValueError when stdout and capture_output " 1577 "args supplied.")) as c: 1578 output = self.run_python("print('will not be run')", 1579 capture_output=True, stdout=tf) 1580 self.assertIn('stdout', c.exception.args[0]) 1581 self.assertIn('capture_output', c.exception.args[0]) 1582 1583 def test_stderr_with_capture_output_arg(self): 1584 # run() refuses to accept 'stderr' with 'capture_output' 1585 tf = tempfile.TemporaryFile() 1586 self.addCleanup(tf.close) 1587 with self.assertRaises(ValueError, 1588 msg=("Expected ValueError when stderr and capture_output " 1589 "args supplied.")) as c: 1590 output = self.run_python("print('will not be run')", 1591 capture_output=True, stderr=tf) 1592 self.assertIn('stderr', c.exception.args[0]) 1593 self.assertIn('capture_output', c.exception.args[0]) 1594 1595 # This test _might_ wind up a bit fragile on loaded build+test machines 1596 # as it depends on the timing with wide enough margins for normal situations 1597 # but does assert that it happened "soon enough" to believe the right thing 1598 # happened. 1599 @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command") 1600 def test_run_with_shell_timeout_and_capture_output(self): 1601 """Output capturing after a timeout mustn't hang forever on open filehandles.""" 1602 before_secs = time.monotonic() 1603 try: 1604 subprocess.run('sleep 3', shell=True, timeout=0.1, 1605 capture_output=True) # New session unspecified. 1606 except subprocess.TimeoutExpired as exc: 1607 after_secs = time.monotonic() 1608 stacks = traceback.format_exc() # assertRaises doesn't give this. 1609 else: 1610 self.fail("TimeoutExpired not raised.") 1611 self.assertLess(after_secs - before_secs, 1.5, 1612 msg="TimeoutExpired was delayed! Bad traceback:\n```\n" 1613 f"{stacks}```") 1614 1615 1616@unittest.skipIf(mswindows, "POSIX specific tests") 1617class POSIXProcessTestCase(BaseTestCase): 1618 1619 def setUp(self): 1620 super().setUp() 1621 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1622 1623 def _get_chdir_exception(self): 1624 try: 1625 os.chdir(self._nonexistent_dir) 1626 except OSError as e: 1627 # This avoids hard coding the errno value or the OS perror() 1628 # string and instead capture the exception that we want to see 1629 # below for comparison. 1630 desired_exception = e 1631 else: 1632 self.fail("chdir to nonexistent directory %s succeeded." % 1633 self._nonexistent_dir) 1634 return desired_exception 1635 1636 def test_exception_cwd(self): 1637 """Test error in the child raised in the parent for a bad cwd.""" 1638 desired_exception = self._get_chdir_exception() 1639 try: 1640 p = subprocess.Popen([sys.executable, "-c", ""], 1641 cwd=self._nonexistent_dir) 1642 except OSError as e: 1643 # Test that the child process chdir failure actually makes 1644 # it up to the parent process as the correct exception. 1645 self.assertEqual(desired_exception.errno, e.errno) 1646 self.assertEqual(desired_exception.strerror, e.strerror) 1647 self.assertEqual(desired_exception.filename, e.filename) 1648 else: 1649 self.fail("Expected OSError: %s" % desired_exception) 1650 1651 def test_exception_bad_executable(self): 1652 """Test error in the child raised in the parent for a bad executable.""" 1653 desired_exception = self._get_chdir_exception() 1654 try: 1655 p = subprocess.Popen([sys.executable, "-c", ""], 1656 executable=self._nonexistent_dir) 1657 except OSError as e: 1658 # Test that the child process exec failure actually makes 1659 # it up to the parent process as the correct exception. 1660 self.assertEqual(desired_exception.errno, e.errno) 1661 self.assertEqual(desired_exception.strerror, e.strerror) 1662 self.assertEqual(desired_exception.filename, e.filename) 1663 else: 1664 self.fail("Expected OSError: %s" % desired_exception) 1665 1666 def test_exception_bad_args_0(self): 1667 """Test error in the child raised in the parent for a bad args[0].""" 1668 desired_exception = self._get_chdir_exception() 1669 try: 1670 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1671 except OSError as e: 1672 # Test that the child process exec failure actually makes 1673 # it up to the parent process as the correct exception. 1674 self.assertEqual(desired_exception.errno, e.errno) 1675 self.assertEqual(desired_exception.strerror, e.strerror) 1676 self.assertEqual(desired_exception.filename, e.filename) 1677 else: 1678 self.fail("Expected OSError: %s" % desired_exception) 1679 1680 # We mock the __del__ method for Popen in the next two tests 1681 # because it does cleanup based on the pid returned by fork_exec 1682 # along with issuing a resource warning if it still exists. Since 1683 # we don't actually spawn a process in these tests we can forego 1684 # the destructor. An alternative would be to set _child_created to 1685 # False before the destructor is called but there is no easy way 1686 # to do that 1687 class PopenNoDestructor(subprocess.Popen): 1688 def __del__(self): 1689 pass 1690 1691 @mock.patch("subprocess._posixsubprocess.fork_exec") 1692 def test_exception_errpipe_normal(self, fork_exec): 1693 """Test error passing done through errpipe_write in the good case""" 1694 def proper_error(*args): 1695 errpipe_write = args[13] 1696 # Write the hex for the error code EISDIR: 'is a directory' 1697 err_code = '{:x}'.format(errno.EISDIR).encode() 1698 os.write(errpipe_write, b"OSError:" + err_code + b":") 1699 return 0 1700 1701 fork_exec.side_effect = proper_error 1702 1703 with mock.patch("subprocess.os.waitpid", 1704 side_effect=ChildProcessError): 1705 with self.assertRaises(IsADirectoryError): 1706 self.PopenNoDestructor(["non_existent_command"]) 1707 1708 @mock.patch("subprocess._posixsubprocess.fork_exec") 1709 def test_exception_errpipe_bad_data(self, fork_exec): 1710 """Test error passing done through errpipe_write where its not 1711 in the expected format""" 1712 error_data = b"\xFF\x00\xDE\xAD" 1713 def bad_error(*args): 1714 errpipe_write = args[13] 1715 # Anything can be in the pipe, no assumptions should 1716 # be made about its encoding, so we'll write some 1717 # arbitrary hex bytes to test it out 1718 os.write(errpipe_write, error_data) 1719 return 0 1720 1721 fork_exec.side_effect = bad_error 1722 1723 with mock.patch("subprocess.os.waitpid", 1724 side_effect=ChildProcessError): 1725 with self.assertRaises(subprocess.SubprocessError) as e: 1726 self.PopenNoDestructor(["non_existent_command"]) 1727 1728 self.assertIn(repr(error_data), str(e.exception)) 1729 1730 @unittest.skipIf(not os.path.exists('/proc/self/status'), 1731 "need /proc/self/status") 1732 def test_restore_signals(self): 1733 # Blindly assume that cat exists on systems with /proc/self/status... 1734 default_proc_status = subprocess.check_output( 1735 ['cat', '/proc/self/status'], 1736 restore_signals=False) 1737 for line in default_proc_status.splitlines(): 1738 if line.startswith(b'SigIgn'): 1739 default_sig_ign_mask = line 1740 break 1741 else: 1742 self.skipTest("SigIgn not found in /proc/self/status.") 1743 restored_proc_status = subprocess.check_output( 1744 ['cat', '/proc/self/status'], 1745 restore_signals=True) 1746 for line in restored_proc_status.splitlines(): 1747 if line.startswith(b'SigIgn'): 1748 restored_sig_ign_mask = line 1749 break 1750 self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask, 1751 msg="restore_signals=True should've unblocked " 1752 "SIGPIPE and friends.") 1753 1754 def test_start_new_session(self): 1755 # For code coverage of calling setsid(). We don't care if we get an 1756 # EPERM error from it depending on the test execution environment, that 1757 # still indicates that it was called. 1758 try: 1759 output = subprocess.check_output( 1760 [sys.executable, "-c", "import os; print(os.getsid(0))"], 1761 start_new_session=True) 1762 except OSError as e: 1763 if e.errno != errno.EPERM: 1764 raise 1765 else: 1766 parent_sid = os.getsid(0) 1767 child_sid = int(output) 1768 self.assertNotEqual(parent_sid, child_sid) 1769 1770 def test_run_abort(self): 1771 # returncode handles signal termination 1772 with support.SuppressCrashReport(): 1773 p = subprocess.Popen([sys.executable, "-c", 1774 'import os; os.abort()']) 1775 p.wait() 1776 self.assertEqual(-p.returncode, signal.SIGABRT) 1777 1778 def test_CalledProcessError_str_signal(self): 1779 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 1780 error_string = str(err) 1781 # We're relying on the repr() of the signal.Signals intenum to provide 1782 # the word signal, the signal name and the numeric value. 1783 self.assertIn("signal", error_string.lower()) 1784 # We're not being specific about the signal name as some signals have 1785 # multiple names and which name is revealed can vary. 1786 self.assertIn("SIG", error_string) 1787 self.assertIn(str(signal.SIGABRT), error_string) 1788 1789 def test_CalledProcessError_str_unknown_signal(self): 1790 err = subprocess.CalledProcessError(-9876543, "fake cmd") 1791 error_string = str(err) 1792 self.assertIn("unknown signal 9876543.", error_string) 1793 1794 def test_CalledProcessError_str_non_zero(self): 1795 err = subprocess.CalledProcessError(2, "fake cmd") 1796 error_string = str(err) 1797 self.assertIn("non-zero exit status 2.", error_string) 1798 1799 def test_preexec(self): 1800 # DISCLAIMER: Setting environment variables is *not* a good use 1801 # of a preexec_fn. This is merely a test. 1802 p = subprocess.Popen([sys.executable, "-c", 1803 'import sys,os;' 1804 'sys.stdout.write(os.getenv("FRUIT"))'], 1805 stdout=subprocess.PIPE, 1806 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 1807 with p: 1808 self.assertEqual(p.stdout.read(), b"apple") 1809 1810 def test_preexec_exception(self): 1811 def raise_it(): 1812 raise ValueError("What if two swallows carried a coconut?") 1813 try: 1814 p = subprocess.Popen([sys.executable, "-c", ""], 1815 preexec_fn=raise_it) 1816 except subprocess.SubprocessError as e: 1817 self.assertTrue( 1818 subprocess._posixsubprocess, 1819 "Expected a ValueError from the preexec_fn") 1820 except ValueError as e: 1821 self.assertIn("coconut", e.args[0]) 1822 else: 1823 self.fail("Exception raised by preexec_fn did not make it " 1824 "to the parent process.") 1825 1826 class _TestExecuteChildPopen(subprocess.Popen): 1827 """Used to test behavior at the end of _execute_child.""" 1828 def __init__(self, testcase, *args, **kwargs): 1829 self._testcase = testcase 1830 subprocess.Popen.__init__(self, *args, **kwargs) 1831 1832 def _execute_child(self, *args, **kwargs): 1833 try: 1834 subprocess.Popen._execute_child(self, *args, **kwargs) 1835 finally: 1836 # Open a bunch of file descriptors and verify that 1837 # none of them are the same as the ones the Popen 1838 # instance is using for stdin/stdout/stderr. 1839 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 1840 for _ in range(8)] 1841 try: 1842 for fd in devzero_fds: 1843 self._testcase.assertNotIn( 1844 fd, (self.stdin.fileno(), self.stdout.fileno(), 1845 self.stderr.fileno()), 1846 msg="At least one fd was closed early.") 1847 finally: 1848 for fd in devzero_fds: 1849 os.close(fd) 1850 1851 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 1852 def test_preexec_errpipe_does_not_double_close_pipes(self): 1853 """Issue16140: Don't double close pipes on preexec error.""" 1854 1855 def raise_it(): 1856 raise subprocess.SubprocessError( 1857 "force the _execute_child() errpipe_data path.") 1858 1859 with self.assertRaises(subprocess.SubprocessError): 1860 self._TestExecuteChildPopen( 1861 self, ZERO_RETURN_CMD, 1862 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1863 stderr=subprocess.PIPE, preexec_fn=raise_it) 1864 1865 def test_preexec_gc_module_failure(self): 1866 # This tests the code that disables garbage collection if the child 1867 # process will execute any Python. 1868 def raise_runtime_error(): 1869 raise RuntimeError("this shouldn't escape") 1870 enabled = gc.isenabled() 1871 orig_gc_disable = gc.disable 1872 orig_gc_isenabled = gc.isenabled 1873 try: 1874 gc.disable() 1875 self.assertFalse(gc.isenabled()) 1876 subprocess.call([sys.executable, '-c', ''], 1877 preexec_fn=lambda: None) 1878 self.assertFalse(gc.isenabled(), 1879 "Popen enabled gc when it shouldn't.") 1880 1881 gc.enable() 1882 self.assertTrue(gc.isenabled()) 1883 subprocess.call([sys.executable, '-c', ''], 1884 preexec_fn=lambda: None) 1885 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 1886 1887 gc.disable = raise_runtime_error 1888 self.assertRaises(RuntimeError, subprocess.Popen, 1889 [sys.executable, '-c', ''], 1890 preexec_fn=lambda: None) 1891 1892 del gc.isenabled # force an AttributeError 1893 self.assertRaises(AttributeError, subprocess.Popen, 1894 [sys.executable, '-c', ''], 1895 preexec_fn=lambda: None) 1896 finally: 1897 gc.disable = orig_gc_disable 1898 gc.isenabled = orig_gc_isenabled 1899 if not enabled: 1900 gc.disable() 1901 1902 @unittest.skipIf( 1903 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 1904 def test_preexec_fork_failure(self): 1905 # The internal code did not preserve the previous exception when 1906 # re-enabling garbage collection 1907 try: 1908 from resource import getrlimit, setrlimit, RLIMIT_NPROC 1909 except ImportError as err: 1910 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 1911 limits = getrlimit(RLIMIT_NPROC) 1912 [_, hard] = limits 1913 setrlimit(RLIMIT_NPROC, (0, hard)) 1914 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 1915 try: 1916 subprocess.call([sys.executable, '-c', ''], 1917 preexec_fn=lambda: None) 1918 except BlockingIOError: 1919 # Forking should raise EAGAIN, translated to BlockingIOError 1920 pass 1921 else: 1922 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 1923 1924 def test_args_string(self): 1925 # args is a string 1926 fd, fname = tempfile.mkstemp() 1927 # reopen in text mode 1928 with open(fd, "w", errors="surrogateescape") as fobj: 1929 fobj.write("#!%s\n" % support.unix_shell) 1930 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1931 sys.executable) 1932 os.chmod(fname, 0o700) 1933 p = subprocess.Popen(fname) 1934 p.wait() 1935 os.remove(fname) 1936 self.assertEqual(p.returncode, 47) 1937 1938 def test_invalid_args(self): 1939 # invalid arguments should raise ValueError 1940 self.assertRaises(ValueError, subprocess.call, 1941 [sys.executable, "-c", 1942 "import sys; sys.exit(47)"], 1943 startupinfo=47) 1944 self.assertRaises(ValueError, subprocess.call, 1945 [sys.executable, "-c", 1946 "import sys; sys.exit(47)"], 1947 creationflags=47) 1948 1949 def test_shell_sequence(self): 1950 # Run command through the shell (sequence) 1951 newenv = os.environ.copy() 1952 newenv["FRUIT"] = "apple" 1953 p = subprocess.Popen(["echo $FRUIT"], shell=1, 1954 stdout=subprocess.PIPE, 1955 env=newenv) 1956 with p: 1957 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1958 1959 def test_shell_string(self): 1960 # Run command through the shell (string) 1961 newenv = os.environ.copy() 1962 newenv["FRUIT"] = "apple" 1963 p = subprocess.Popen("echo $FRUIT", shell=1, 1964 stdout=subprocess.PIPE, 1965 env=newenv) 1966 with p: 1967 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1968 1969 def test_call_string(self): 1970 # call() function with string argument on UNIX 1971 fd, fname = tempfile.mkstemp() 1972 # reopen in text mode 1973 with open(fd, "w", errors="surrogateescape") as fobj: 1974 fobj.write("#!%s\n" % support.unix_shell) 1975 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1976 sys.executable) 1977 os.chmod(fname, 0o700) 1978 rc = subprocess.call(fname) 1979 os.remove(fname) 1980 self.assertEqual(rc, 47) 1981 1982 def test_specific_shell(self): 1983 # Issue #9265: Incorrect name passed as arg[0]. 1984 shells = [] 1985 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 1986 for name in ['bash', 'ksh']: 1987 sh = os.path.join(prefix, name) 1988 if os.path.isfile(sh): 1989 shells.append(sh) 1990 if not shells: # Will probably work for any shell but csh. 1991 self.skipTest("bash or ksh required for this test") 1992 sh = '/bin/sh' 1993 if os.path.isfile(sh) and not os.path.islink(sh): 1994 # Test will fail if /bin/sh is a symlink to csh. 1995 shells.append(sh) 1996 for sh in shells: 1997 p = subprocess.Popen("echo $0", executable=sh, shell=True, 1998 stdout=subprocess.PIPE) 1999 with p: 2000 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 2001 2002 def _kill_process(self, method, *args): 2003 # Do not inherit file handles from the parent. 2004 # It should fix failures on some platforms. 2005 # Also set the SIGINT handler to the default to make sure it's not 2006 # being ignored (some tests rely on that.) 2007 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 2008 try: 2009 p = subprocess.Popen([sys.executable, "-c", """if 1: 2010 import sys, time 2011 sys.stdout.write('x\\n') 2012 sys.stdout.flush() 2013 time.sleep(30) 2014 """], 2015 close_fds=True, 2016 stdin=subprocess.PIPE, 2017 stdout=subprocess.PIPE, 2018 stderr=subprocess.PIPE) 2019 finally: 2020 signal.signal(signal.SIGINT, old_handler) 2021 # Wait for the interpreter to be completely initialized before 2022 # sending any signal. 2023 p.stdout.read(1) 2024 getattr(p, method)(*args) 2025 return p 2026 2027 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 2028 "Due to known OS bug (issue #16762)") 2029 def _kill_dead_process(self, method, *args): 2030 # Do not inherit file handles from the parent. 2031 # It should fix failures on some platforms. 2032 p = subprocess.Popen([sys.executable, "-c", """if 1: 2033 import sys, time 2034 sys.stdout.write('x\\n') 2035 sys.stdout.flush() 2036 """], 2037 close_fds=True, 2038 stdin=subprocess.PIPE, 2039 stdout=subprocess.PIPE, 2040 stderr=subprocess.PIPE) 2041 # Wait for the interpreter to be completely initialized before 2042 # sending any signal. 2043 p.stdout.read(1) 2044 # The process should end after this 2045 time.sleep(1) 2046 # This shouldn't raise even though the child is now dead 2047 getattr(p, method)(*args) 2048 p.communicate() 2049 2050 def test_send_signal(self): 2051 p = self._kill_process('send_signal', signal.SIGINT) 2052 _, stderr = p.communicate() 2053 self.assertIn(b'KeyboardInterrupt', stderr) 2054 self.assertNotEqual(p.wait(), 0) 2055 2056 def test_kill(self): 2057 p = self._kill_process('kill') 2058 _, stderr = p.communicate() 2059 self.assertStderrEqual(stderr, b'') 2060 self.assertEqual(p.wait(), -signal.SIGKILL) 2061 2062 def test_terminate(self): 2063 p = self._kill_process('terminate') 2064 _, stderr = p.communicate() 2065 self.assertStderrEqual(stderr, b'') 2066 self.assertEqual(p.wait(), -signal.SIGTERM) 2067 2068 def test_send_signal_dead(self): 2069 # Sending a signal to a dead process 2070 self._kill_dead_process('send_signal', signal.SIGINT) 2071 2072 def test_kill_dead(self): 2073 # Killing a dead process 2074 self._kill_dead_process('kill') 2075 2076 def test_terminate_dead(self): 2077 # Terminating a dead process 2078 self._kill_dead_process('terminate') 2079 2080 def _save_fds(self, save_fds): 2081 fds = [] 2082 for fd in save_fds: 2083 inheritable = os.get_inheritable(fd) 2084 saved = os.dup(fd) 2085 fds.append((fd, saved, inheritable)) 2086 return fds 2087 2088 def _restore_fds(self, fds): 2089 for fd, saved, inheritable in fds: 2090 os.dup2(saved, fd, inheritable=inheritable) 2091 os.close(saved) 2092 2093 def check_close_std_fds(self, fds): 2094 # Issue #9905: test that subprocess pipes still work properly with 2095 # some standard fds closed 2096 stdin = 0 2097 saved_fds = self._save_fds(fds) 2098 for fd, saved, inheritable in saved_fds: 2099 if fd == 0: 2100 stdin = saved 2101 break 2102 try: 2103 for fd in fds: 2104 os.close(fd) 2105 out, err = subprocess.Popen([sys.executable, "-c", 2106 'import sys;' 2107 'sys.stdout.write("apple");' 2108 'sys.stdout.flush();' 2109 'sys.stderr.write("orange")'], 2110 stdin=stdin, 2111 stdout=subprocess.PIPE, 2112 stderr=subprocess.PIPE).communicate() 2113 err = support.strip_python_stderr(err) 2114 self.assertEqual((out, err), (b'apple', b'orange')) 2115 finally: 2116 self._restore_fds(saved_fds) 2117 2118 def test_close_fd_0(self): 2119 self.check_close_std_fds([0]) 2120 2121 def test_close_fd_1(self): 2122 self.check_close_std_fds([1]) 2123 2124 def test_close_fd_2(self): 2125 self.check_close_std_fds([2]) 2126 2127 def test_close_fds_0_1(self): 2128 self.check_close_std_fds([0, 1]) 2129 2130 def test_close_fds_0_2(self): 2131 self.check_close_std_fds([0, 2]) 2132 2133 def test_close_fds_1_2(self): 2134 self.check_close_std_fds([1, 2]) 2135 2136 def test_close_fds_0_1_2(self): 2137 # Issue #10806: test that subprocess pipes still work properly with 2138 # all standard fds closed. 2139 self.check_close_std_fds([0, 1, 2]) 2140 2141 def test_small_errpipe_write_fd(self): 2142 """Issue #15798: Popen should work when stdio fds are available.""" 2143 new_stdin = os.dup(0) 2144 new_stdout = os.dup(1) 2145 try: 2146 os.close(0) 2147 os.close(1) 2148 2149 # Side test: if errpipe_write fails to have its CLOEXEC 2150 # flag set this should cause the parent to think the exec 2151 # failed. Extremely unlikely: everyone supports CLOEXEC. 2152 subprocess.Popen([ 2153 sys.executable, "-c", 2154 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 2155 finally: 2156 # Restore original stdin and stdout 2157 os.dup2(new_stdin, 0) 2158 os.dup2(new_stdout, 1) 2159 os.close(new_stdin) 2160 os.close(new_stdout) 2161 2162 def test_remapping_std_fds(self): 2163 # open up some temporary files 2164 temps = [tempfile.mkstemp() for i in range(3)] 2165 try: 2166 temp_fds = [fd for fd, fname in temps] 2167 2168 # unlink the files -- we won't need to reopen them 2169 for fd, fname in temps: 2170 os.unlink(fname) 2171 2172 # write some data to what will become stdin, and rewind 2173 os.write(temp_fds[1], b"STDIN") 2174 os.lseek(temp_fds[1], 0, 0) 2175 2176 # move the standard file descriptors out of the way 2177 saved_fds = self._save_fds(range(3)) 2178 try: 2179 # duplicate the file objects over the standard fd's 2180 for fd, temp_fd in enumerate(temp_fds): 2181 os.dup2(temp_fd, fd) 2182 2183 # now use those files in the "wrong" order, so that subprocess 2184 # has to rearrange them in the child 2185 p = subprocess.Popen([sys.executable, "-c", 2186 'import sys; got = sys.stdin.read();' 2187 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2188 stdin=temp_fds[1], 2189 stdout=temp_fds[2], 2190 stderr=temp_fds[0]) 2191 p.wait() 2192 finally: 2193 self._restore_fds(saved_fds) 2194 2195 for fd in temp_fds: 2196 os.lseek(fd, 0, 0) 2197 2198 out = os.read(temp_fds[2], 1024) 2199 err = support.strip_python_stderr(os.read(temp_fds[0], 1024)) 2200 self.assertEqual(out, b"got STDIN") 2201 self.assertEqual(err, b"err") 2202 2203 finally: 2204 for fd in temp_fds: 2205 os.close(fd) 2206 2207 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 2208 # open up some temporary files 2209 temps = [tempfile.mkstemp() for i in range(3)] 2210 temp_fds = [fd for fd, fname in temps] 2211 try: 2212 # unlink the files -- we won't need to reopen them 2213 for fd, fname in temps: 2214 os.unlink(fname) 2215 2216 # save a copy of the standard file descriptors 2217 saved_fds = self._save_fds(range(3)) 2218 try: 2219 # duplicate the temp files over the standard fd's 0, 1, 2 2220 for fd, temp_fd in enumerate(temp_fds): 2221 os.dup2(temp_fd, fd) 2222 2223 # write some data to what will become stdin, and rewind 2224 os.write(stdin_no, b"STDIN") 2225 os.lseek(stdin_no, 0, 0) 2226 2227 # now use those files in the given order, so that subprocess 2228 # has to rearrange them in the child 2229 p = subprocess.Popen([sys.executable, "-c", 2230 'import sys; got = sys.stdin.read();' 2231 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2232 stdin=stdin_no, 2233 stdout=stdout_no, 2234 stderr=stderr_no) 2235 p.wait() 2236 2237 for fd in temp_fds: 2238 os.lseek(fd, 0, 0) 2239 2240 out = os.read(stdout_no, 1024) 2241 err = support.strip_python_stderr(os.read(stderr_no, 1024)) 2242 finally: 2243 self._restore_fds(saved_fds) 2244 2245 self.assertEqual(out, b"got STDIN") 2246 self.assertEqual(err, b"err") 2247 2248 finally: 2249 for fd in temp_fds: 2250 os.close(fd) 2251 2252 # When duping fds, if there arises a situation where one of the fds is 2253 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 2254 # This tests all combinations of this. 2255 def test_swap_fds(self): 2256 self.check_swap_fds(0, 1, 2) 2257 self.check_swap_fds(0, 2, 1) 2258 self.check_swap_fds(1, 0, 2) 2259 self.check_swap_fds(1, 2, 0) 2260 self.check_swap_fds(2, 0, 1) 2261 self.check_swap_fds(2, 1, 0) 2262 2263 def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds): 2264 saved_fds = self._save_fds(range(3)) 2265 try: 2266 for from_fd in from_fds: 2267 with tempfile.TemporaryFile() as f: 2268 os.dup2(f.fileno(), from_fd) 2269 2270 fd_to_close = (set(range(3)) - set(from_fds)).pop() 2271 os.close(fd_to_close) 2272 2273 arg_names = ['stdin', 'stdout', 'stderr'] 2274 kwargs = {} 2275 for from_fd, to_fd in zip(from_fds, to_fds): 2276 kwargs[arg_names[to_fd]] = from_fd 2277 2278 code = textwrap.dedent(r''' 2279 import os, sys 2280 skipped_fd = int(sys.argv[1]) 2281 for fd in range(3): 2282 if fd != skipped_fd: 2283 os.write(fd, str(fd).encode('ascii')) 2284 ''') 2285 2286 skipped_fd = (set(range(3)) - set(to_fds)).pop() 2287 2288 rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)], 2289 **kwargs) 2290 self.assertEqual(rc, 0) 2291 2292 for from_fd, to_fd in zip(from_fds, to_fds): 2293 os.lseek(from_fd, 0, os.SEEK_SET) 2294 read_bytes = os.read(from_fd, 1024) 2295 read_fds = list(map(int, read_bytes.decode('ascii'))) 2296 msg = textwrap.dedent(f""" 2297 When testing {from_fds} to {to_fds} redirection, 2298 parent descriptor {from_fd} got redirected 2299 to descriptor(s) {read_fds} instead of descriptor {to_fd}. 2300 """) 2301 self.assertEqual([to_fd], read_fds, msg) 2302 finally: 2303 self._restore_fds(saved_fds) 2304 2305 # Check that subprocess can remap std fds correctly even 2306 # if one of them is closed (#32844). 2307 def test_swap_std_fds_with_one_closed(self): 2308 for from_fds in itertools.combinations(range(3), 2): 2309 for to_fds in itertools.permutations(range(3), 2): 2310 self._check_swap_std_fds_with_one_closed(from_fds, to_fds) 2311 2312 def test_surrogates_error_message(self): 2313 def prepare(): 2314 raise ValueError("surrogate:\uDCff") 2315 2316 try: 2317 subprocess.call( 2318 ZERO_RETURN_CMD, 2319 preexec_fn=prepare) 2320 except ValueError as err: 2321 # Pure Python implementations keeps the message 2322 self.assertIsNone(subprocess._posixsubprocess) 2323 self.assertEqual(str(err), "surrogate:\uDCff") 2324 except subprocess.SubprocessError as err: 2325 # _posixsubprocess uses a default message 2326 self.assertIsNotNone(subprocess._posixsubprocess) 2327 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 2328 else: 2329 self.fail("Expected ValueError or subprocess.SubprocessError") 2330 2331 def test_undecodable_env(self): 2332 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 2333 encoded_value = value.encode("ascii", "surrogateescape") 2334 2335 # test str with surrogates 2336 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 2337 env = os.environ.copy() 2338 env[key] = value 2339 # Use C locale to get ASCII for the locale encoding to force 2340 # surrogate-escaping of \xFF in the child process 2341 env['LC_ALL'] = 'C' 2342 decoded_value = value 2343 stdout = subprocess.check_output( 2344 [sys.executable, "-c", script], 2345 env=env) 2346 stdout = stdout.rstrip(b'\n\r') 2347 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2348 2349 # test bytes 2350 key = key.encode("ascii", "surrogateescape") 2351 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2352 env = os.environ.copy() 2353 env[key] = encoded_value 2354 stdout = subprocess.check_output( 2355 [sys.executable, "-c", script], 2356 env=env) 2357 stdout = stdout.rstrip(b'\n\r') 2358 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2359 2360 def test_bytes_program(self): 2361 abs_program = os.fsencode(ZERO_RETURN_CMD[0]) 2362 args = list(ZERO_RETURN_CMD[1:]) 2363 path, program = os.path.split(ZERO_RETURN_CMD[0]) 2364 program = os.fsencode(program) 2365 2366 # absolute bytes path 2367 exitcode = subprocess.call([abs_program]+args) 2368 self.assertEqual(exitcode, 0) 2369 2370 # absolute bytes path as a string 2371 cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8")) 2372 exitcode = subprocess.call(cmd, shell=True) 2373 self.assertEqual(exitcode, 0) 2374 2375 # bytes program, unicode PATH 2376 env = os.environ.copy() 2377 env["PATH"] = path 2378 exitcode = subprocess.call([program]+args, env=env) 2379 self.assertEqual(exitcode, 0) 2380 2381 # bytes program, bytes PATH 2382 envb = os.environb.copy() 2383 envb[b"PATH"] = os.fsencode(path) 2384 exitcode = subprocess.call([program]+args, env=envb) 2385 self.assertEqual(exitcode, 0) 2386 2387 def test_pipe_cloexec(self): 2388 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2389 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2390 2391 p1 = subprocess.Popen([sys.executable, sleeper], 2392 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2393 stderr=subprocess.PIPE, close_fds=False) 2394 2395 self.addCleanup(p1.communicate, b'') 2396 2397 p2 = subprocess.Popen([sys.executable, fd_status], 2398 stdout=subprocess.PIPE, close_fds=False) 2399 2400 output, error = p2.communicate() 2401 result_fds = set(map(int, output.split(b','))) 2402 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2403 p1.stderr.fileno()]) 2404 2405 self.assertFalse(result_fds & unwanted_fds, 2406 "Expected no fds from %r to be open in child, " 2407 "found %r" % 2408 (unwanted_fds, result_fds & unwanted_fds)) 2409 2410 def test_pipe_cloexec_real_tools(self): 2411 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2412 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2413 2414 subdata = b'zxcvbn' 2415 data = subdata * 4 + b'\n' 2416 2417 p1 = subprocess.Popen([sys.executable, qcat], 2418 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2419 close_fds=False) 2420 2421 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2422 stdin=p1.stdout, stdout=subprocess.PIPE, 2423 close_fds=False) 2424 2425 self.addCleanup(p1.wait) 2426 self.addCleanup(p2.wait) 2427 def kill_p1(): 2428 try: 2429 p1.terminate() 2430 except ProcessLookupError: 2431 pass 2432 def kill_p2(): 2433 try: 2434 p2.terminate() 2435 except ProcessLookupError: 2436 pass 2437 self.addCleanup(kill_p1) 2438 self.addCleanup(kill_p2) 2439 2440 p1.stdin.write(data) 2441 p1.stdin.close() 2442 2443 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2444 2445 self.assertTrue(readfiles, "The child hung") 2446 self.assertEqual(p2.stdout.read(), data) 2447 2448 p1.stdout.close() 2449 p2.stdout.close() 2450 2451 def test_close_fds(self): 2452 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2453 2454 fds = os.pipe() 2455 self.addCleanup(os.close, fds[0]) 2456 self.addCleanup(os.close, fds[1]) 2457 2458 open_fds = set(fds) 2459 # add a bunch more fds 2460 for _ in range(9): 2461 fd = os.open(os.devnull, os.O_RDONLY) 2462 self.addCleanup(os.close, fd) 2463 open_fds.add(fd) 2464 2465 for fd in open_fds: 2466 os.set_inheritable(fd, True) 2467 2468 p = subprocess.Popen([sys.executable, fd_status], 2469 stdout=subprocess.PIPE, close_fds=False) 2470 output, ignored = p.communicate() 2471 remaining_fds = set(map(int, output.split(b','))) 2472 2473 self.assertEqual(remaining_fds & open_fds, open_fds, 2474 "Some fds were closed") 2475 2476 p = subprocess.Popen([sys.executable, fd_status], 2477 stdout=subprocess.PIPE, close_fds=True) 2478 output, ignored = p.communicate() 2479 remaining_fds = set(map(int, output.split(b','))) 2480 2481 self.assertFalse(remaining_fds & open_fds, 2482 "Some fds were left open") 2483 self.assertIn(1, remaining_fds, "Subprocess failed") 2484 2485 # Keep some of the fd's we opened open in the subprocess. 2486 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2487 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2488 p = subprocess.Popen([sys.executable, fd_status], 2489 stdout=subprocess.PIPE, close_fds=True, 2490 pass_fds=fds_to_keep) 2491 output, ignored = p.communicate() 2492 remaining_fds = set(map(int, output.split(b','))) 2493 2494 self.assertFalse((remaining_fds - fds_to_keep) & open_fds, 2495 "Some fds not in pass_fds were left open") 2496 self.assertIn(1, remaining_fds, "Subprocess failed") 2497 2498 2499 @unittest.skipIf(sys.platform.startswith("freebsd") and 2500 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2501 "Requires fdescfs mounted on /dev/fd on FreeBSD.") 2502 def test_close_fds_when_max_fd_is_lowered(self): 2503 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2504 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2505 2506 # This launches the meat of the test in a child process to 2507 # avoid messing with the larger unittest processes maximum 2508 # number of file descriptors. 2509 # This process launches: 2510 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2511 # a bunch of high open fds above the new lower rlimit. 2512 # Those are reported via stdout before launching a new 2513 # process with close_fds=False to run the actual test: 2514 # +--> The TEST: This one launches a fd_status.py 2515 # subprocess with close_fds=True so we can find out if 2516 # any of the fds above the lowered rlimit are still open. 2517 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2518 ''' 2519 import os, resource, subprocess, sys, textwrap 2520 open_fds = set() 2521 # Add a bunch more fds to pass down. 2522 for _ in range(40): 2523 fd = os.open(os.devnull, os.O_RDONLY) 2524 open_fds.add(fd) 2525 2526 # Leave a two pairs of low ones available for use by the 2527 # internal child error pipe and the stdout pipe. 2528 # We also leave 10 more open as some Python buildbots run into 2529 # "too many open files" errors during the test if we do not. 2530 for fd in sorted(open_fds)[:14]: 2531 os.close(fd) 2532 open_fds.remove(fd) 2533 2534 for fd in open_fds: 2535 #self.addCleanup(os.close, fd) 2536 os.set_inheritable(fd, True) 2537 2538 max_fd_open = max(open_fds) 2539 2540 # Communicate the open_fds to the parent unittest.TestCase process. 2541 print(','.join(map(str, sorted(open_fds)))) 2542 sys.stdout.flush() 2543 2544 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2545 try: 2546 # 29 is lower than the highest fds we are leaving open. 2547 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2548 # Launch a new Python interpreter with our low fd rlim_cur that 2549 # inherits open fds above that limit. It then uses subprocess 2550 # with close_fds=True to get a report of open fds in the child. 2551 # An explicit list of fds to check is passed to fd_status.py as 2552 # letting fd_status rely on its default logic would miss the 2553 # fds above rlim_cur as it normally only checks up to that limit. 2554 subprocess.Popen( 2555 [sys.executable, '-c', 2556 textwrap.dedent(""" 2557 import subprocess, sys 2558 subprocess.Popen([sys.executable, %r] + 2559 [str(x) for x in range({max_fd})], 2560 close_fds=True).wait() 2561 """.format(max_fd=max_fd_open+1))], 2562 close_fds=False).wait() 2563 finally: 2564 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2565 ''' % fd_status)], stdout=subprocess.PIPE) 2566 2567 output, unused_stderr = p.communicate() 2568 output_lines = output.splitlines() 2569 self.assertEqual(len(output_lines), 2, 2570 msg="expected exactly two lines of output:\n%r" % output) 2571 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2572 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2573 2574 self.assertFalse(remaining_fds & opened_fds, 2575 msg="Some fds were left open.") 2576 2577 2578 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2579 # descriptor of a pipe closed in the parent process is valid in the 2580 # child process according to fstat(), but the mode of the file 2581 # descriptor is invalid, and read or write raise an error. 2582 @support.requires_mac_ver(10, 5) 2583 def test_pass_fds(self): 2584 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2585 2586 open_fds = set() 2587 2588 for x in range(5): 2589 fds = os.pipe() 2590 self.addCleanup(os.close, fds[0]) 2591 self.addCleanup(os.close, fds[1]) 2592 os.set_inheritable(fds[0], True) 2593 os.set_inheritable(fds[1], True) 2594 open_fds.update(fds) 2595 2596 for fd in open_fds: 2597 p = subprocess.Popen([sys.executable, fd_status], 2598 stdout=subprocess.PIPE, close_fds=True, 2599 pass_fds=(fd, )) 2600 output, ignored = p.communicate() 2601 2602 remaining_fds = set(map(int, output.split(b','))) 2603 to_be_closed = open_fds - {fd} 2604 2605 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2606 self.assertFalse(remaining_fds & to_be_closed, 2607 "fd to be closed passed") 2608 2609 # pass_fds overrides close_fds with a warning. 2610 with self.assertWarns(RuntimeWarning) as context: 2611 self.assertFalse(subprocess.call( 2612 ZERO_RETURN_CMD, 2613 close_fds=False, pass_fds=(fd, ))) 2614 self.assertIn('overriding close_fds', str(context.warning)) 2615 2616 def test_pass_fds_inheritable(self): 2617 script = support.findfile("fd_status.py", subdir="subprocessdata") 2618 2619 inheritable, non_inheritable = os.pipe() 2620 self.addCleanup(os.close, inheritable) 2621 self.addCleanup(os.close, non_inheritable) 2622 os.set_inheritable(inheritable, True) 2623 os.set_inheritable(non_inheritable, False) 2624 pass_fds = (inheritable, non_inheritable) 2625 args = [sys.executable, script] 2626 args += list(map(str, pass_fds)) 2627 2628 p = subprocess.Popen(args, 2629 stdout=subprocess.PIPE, close_fds=True, 2630 pass_fds=pass_fds) 2631 output, ignored = p.communicate() 2632 fds = set(map(int, output.split(b','))) 2633 2634 # the inheritable file descriptor must be inherited, so its inheritable 2635 # flag must be set in the child process after fork() and before exec() 2636 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2637 2638 # inheritable flag must not be changed in the parent process 2639 self.assertEqual(os.get_inheritable(inheritable), True) 2640 self.assertEqual(os.get_inheritable(non_inheritable), False) 2641 2642 2643 # bpo-32270: Ensure that descriptors specified in pass_fds 2644 # are inherited even if they are used in redirections. 2645 # Contributed by @izbyshev. 2646 def test_pass_fds_redirected(self): 2647 """Regression test for https://bugs.python.org/issue32270.""" 2648 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2649 pass_fds = [] 2650 for _ in range(2): 2651 fd = os.open(os.devnull, os.O_RDWR) 2652 self.addCleanup(os.close, fd) 2653 pass_fds.append(fd) 2654 2655 stdout_r, stdout_w = os.pipe() 2656 self.addCleanup(os.close, stdout_r) 2657 self.addCleanup(os.close, stdout_w) 2658 pass_fds.insert(1, stdout_w) 2659 2660 with subprocess.Popen([sys.executable, fd_status], 2661 stdin=pass_fds[0], 2662 stdout=pass_fds[1], 2663 stderr=pass_fds[2], 2664 close_fds=True, 2665 pass_fds=pass_fds): 2666 output = os.read(stdout_r, 1024) 2667 fds = {int(num) for num in output.split(b',')} 2668 2669 self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}") 2670 2671 2672 def test_stdout_stdin_are_single_inout_fd(self): 2673 with io.open(os.devnull, "r+") as inout: 2674 p = subprocess.Popen(ZERO_RETURN_CMD, 2675 stdout=inout, stdin=inout) 2676 p.wait() 2677 2678 def test_stdout_stderr_are_single_inout_fd(self): 2679 with io.open(os.devnull, "r+") as inout: 2680 p = subprocess.Popen(ZERO_RETURN_CMD, 2681 stdout=inout, stderr=inout) 2682 p.wait() 2683 2684 def test_stderr_stdin_are_single_inout_fd(self): 2685 with io.open(os.devnull, "r+") as inout: 2686 p = subprocess.Popen(ZERO_RETURN_CMD, 2687 stderr=inout, stdin=inout) 2688 p.wait() 2689 2690 def test_wait_when_sigchild_ignored(self): 2691 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 2692 sigchild_ignore = support.findfile("sigchild_ignore.py", 2693 subdir="subprocessdata") 2694 p = subprocess.Popen([sys.executable, sigchild_ignore], 2695 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 2696 stdout, stderr = p.communicate() 2697 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 2698 " non-zero with this error:\n%s" % 2699 stderr.decode('utf-8')) 2700 2701 def test_select_unbuffered(self): 2702 # Issue #11459: bufsize=0 should really set the pipes as 2703 # unbuffered (and therefore let select() work properly). 2704 select = support.import_module("select") 2705 p = subprocess.Popen([sys.executable, "-c", 2706 'import sys;' 2707 'sys.stdout.write("apple")'], 2708 stdout=subprocess.PIPE, 2709 bufsize=0) 2710 f = p.stdout 2711 self.addCleanup(f.close) 2712 try: 2713 self.assertEqual(f.read(4), b"appl") 2714 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 2715 finally: 2716 p.wait() 2717 2718 def test_zombie_fast_process_del(self): 2719 # Issue #12650: on Unix, if Popen.__del__() was called before the 2720 # process exited, it wouldn't be added to subprocess._active, and would 2721 # remain a zombie. 2722 # spawn a Popen, and delete its reference before it exits 2723 p = subprocess.Popen([sys.executable, "-c", 2724 'import sys, time;' 2725 'time.sleep(0.2)'], 2726 stdout=subprocess.PIPE, 2727 stderr=subprocess.PIPE) 2728 self.addCleanup(p.stdout.close) 2729 self.addCleanup(p.stderr.close) 2730 ident = id(p) 2731 pid = p.pid 2732 with support.check_warnings(('', ResourceWarning)): 2733 p = None 2734 2735 if mswindows: 2736 # subprocess._active is not used on Windows and is set to None. 2737 self.assertIsNone(subprocess._active) 2738 else: 2739 # check that p is in the active processes list 2740 self.assertIn(ident, [id(o) for o in subprocess._active]) 2741 2742 def test_leak_fast_process_del_killed(self): 2743 # Issue #12650: on Unix, if Popen.__del__() was called before the 2744 # process exited, and the process got killed by a signal, it would never 2745 # be removed from subprocess._active, which triggered a FD and memory 2746 # leak. 2747 # spawn a Popen, delete its reference and kill it 2748 p = subprocess.Popen([sys.executable, "-c", 2749 'import time;' 2750 'time.sleep(3)'], 2751 stdout=subprocess.PIPE, 2752 stderr=subprocess.PIPE) 2753 self.addCleanup(p.stdout.close) 2754 self.addCleanup(p.stderr.close) 2755 ident = id(p) 2756 pid = p.pid 2757 with support.check_warnings(('', ResourceWarning)): 2758 p = None 2759 2760 os.kill(pid, signal.SIGKILL) 2761 if mswindows: 2762 # subprocess._active is not used on Windows and is set to None. 2763 self.assertIsNone(subprocess._active) 2764 else: 2765 # check that p is in the active processes list 2766 self.assertIn(ident, [id(o) for o in subprocess._active]) 2767 2768 # let some time for the process to exit, and create a new Popen: this 2769 # should trigger the wait() of p 2770 time.sleep(0.2) 2771 with self.assertRaises(OSError): 2772 with subprocess.Popen(NONEXISTING_CMD, 2773 stdout=subprocess.PIPE, 2774 stderr=subprocess.PIPE) as proc: 2775 pass 2776 # p should have been wait()ed on, and removed from the _active list 2777 self.assertRaises(OSError, os.waitpid, pid, 0) 2778 if mswindows: 2779 # subprocess._active is not used on Windows and is set to None. 2780 self.assertIsNone(subprocess._active) 2781 else: 2782 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 2783 2784 def test_close_fds_after_preexec(self): 2785 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2786 2787 # this FD is used as dup2() target by preexec_fn, and should be closed 2788 # in the child process 2789 fd = os.dup(1) 2790 self.addCleanup(os.close, fd) 2791 2792 p = subprocess.Popen([sys.executable, fd_status], 2793 stdout=subprocess.PIPE, close_fds=True, 2794 preexec_fn=lambda: os.dup2(1, fd)) 2795 output, ignored = p.communicate() 2796 2797 remaining_fds = set(map(int, output.split(b','))) 2798 2799 self.assertNotIn(fd, remaining_fds) 2800 2801 @support.cpython_only 2802 def test_fork_exec(self): 2803 # Issue #22290: fork_exec() must not crash on memory allocation failure 2804 # or other errors 2805 import _posixsubprocess 2806 gc_enabled = gc.isenabled() 2807 try: 2808 # Use a preexec function and enable the garbage collector 2809 # to force fork_exec() to re-enable the garbage collector 2810 # on error. 2811 func = lambda: None 2812 gc.enable() 2813 2814 for args, exe_list, cwd, env_list in ( 2815 (123, [b"exe"], None, [b"env"]), 2816 ([b"arg"], 123, None, [b"env"]), 2817 ([b"arg"], [b"exe"], 123, [b"env"]), 2818 ([b"arg"], [b"exe"], None, 123), 2819 ): 2820 with self.assertRaises(TypeError): 2821 _posixsubprocess.fork_exec( 2822 args, exe_list, 2823 True, (), cwd, env_list, 2824 -1, -1, -1, -1, 2825 1, 2, 3, 4, 2826 True, True, func) 2827 finally: 2828 if not gc_enabled: 2829 gc.disable() 2830 2831 @support.cpython_only 2832 def test_fork_exec_sorted_fd_sanity_check(self): 2833 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 2834 import _posixsubprocess 2835 class BadInt: 2836 first = True 2837 def __init__(self, value): 2838 self.value = value 2839 def __int__(self): 2840 if self.first: 2841 self.first = False 2842 return self.value 2843 raise ValueError 2844 2845 gc_enabled = gc.isenabled() 2846 try: 2847 gc.enable() 2848 2849 for fds_to_keep in ( 2850 (-1, 2, 3, 4, 5), # Negative number. 2851 ('str', 4), # Not an int. 2852 (18, 23, 42, 2**63), # Out of range. 2853 (5, 4), # Not sorted. 2854 (6, 7, 7, 8), # Duplicate. 2855 (BadInt(1), BadInt(2)), 2856 ): 2857 with self.assertRaises( 2858 ValueError, 2859 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 2860 _posixsubprocess.fork_exec( 2861 [b"false"], [b"false"], 2862 True, fds_to_keep, None, [b"env"], 2863 -1, -1, -1, -1, 2864 1, 2, 3, 4, 2865 True, True, None) 2866 self.assertIn('fds_to_keep', str(c.exception)) 2867 finally: 2868 if not gc_enabled: 2869 gc.disable() 2870 2871 def test_communicate_BrokenPipeError_stdin_close(self): 2872 # By not setting stdout or stderr or a timeout we force the fast path 2873 # that just calls _stdin_write() internally due to our mock. 2874 proc = subprocess.Popen(ZERO_RETURN_CMD) 2875 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2876 mock_proc_stdin.close.side_effect = BrokenPipeError 2877 proc.communicate() # Should swallow BrokenPipeError from close. 2878 mock_proc_stdin.close.assert_called_with() 2879 2880 def test_communicate_BrokenPipeError_stdin_write(self): 2881 # By not setting stdout or stderr or a timeout we force the fast path 2882 # that just calls _stdin_write() internally due to our mock. 2883 proc = subprocess.Popen(ZERO_RETURN_CMD) 2884 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2885 mock_proc_stdin.write.side_effect = BrokenPipeError 2886 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 2887 mock_proc_stdin.write.assert_called_once_with(b'stuff') 2888 mock_proc_stdin.close.assert_called_once_with() 2889 2890 def test_communicate_BrokenPipeError_stdin_flush(self): 2891 # Setting stdin and stdout forces the ._communicate() code path. 2892 # python -h exits faster than python -c pass (but spams stdout). 2893 proc = subprocess.Popen([sys.executable, '-h'], 2894 stdin=subprocess.PIPE, 2895 stdout=subprocess.PIPE) 2896 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 2897 open(os.devnull, 'wb') as dev_null: 2898 mock_proc_stdin.flush.side_effect = BrokenPipeError 2899 # because _communicate registers a selector using proc.stdin... 2900 mock_proc_stdin.fileno.return_value = dev_null.fileno() 2901 # _communicate() should swallow BrokenPipeError from flush. 2902 proc.communicate(b'stuff') 2903 mock_proc_stdin.flush.assert_called_once_with() 2904 2905 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 2906 # Setting stdin and stdout forces the ._communicate() code path. 2907 # python -h exits faster than python -c pass (but spams stdout). 2908 proc = subprocess.Popen([sys.executable, '-h'], 2909 stdin=subprocess.PIPE, 2910 stdout=subprocess.PIPE) 2911 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2912 mock_proc_stdin.close.side_effect = BrokenPipeError 2913 # _communicate() should swallow BrokenPipeError from close. 2914 proc.communicate(timeout=999) 2915 mock_proc_stdin.close.assert_called_once_with() 2916 2917 @unittest.skipUnless(_testcapi is not None 2918 and hasattr(_testcapi, 'W_STOPCODE'), 2919 'need _testcapi.W_STOPCODE') 2920 def test_stopped(self): 2921 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 2922 args = ZERO_RETURN_CMD 2923 proc = subprocess.Popen(args) 2924 2925 # Wait until the real process completes to avoid zombie process 2926 pid = proc.pid 2927 pid, status = os.waitpid(pid, 0) 2928 self.assertEqual(status, 0) 2929 2930 status = _testcapi.W_STOPCODE(3) 2931 with mock.patch('subprocess.os.waitpid', return_value=(pid, status)): 2932 returncode = proc.wait() 2933 2934 self.assertEqual(returncode, -3) 2935 2936 def test_communicate_repeated_call_after_stdout_close(self): 2937 proc = subprocess.Popen([sys.executable, '-c', 2938 'import os, time; os.close(1), time.sleep(2)'], 2939 stdout=subprocess.PIPE) 2940 while True: 2941 try: 2942 proc.communicate(timeout=0.1) 2943 return 2944 except subprocess.TimeoutExpired: 2945 pass 2946 2947 2948@unittest.skipUnless(mswindows, "Windows specific tests") 2949class Win32ProcessTestCase(BaseTestCase): 2950 2951 def test_startupinfo(self): 2952 # startupinfo argument 2953 # We uses hardcoded constants, because we do not want to 2954 # depend on win32all. 2955 STARTF_USESHOWWINDOW = 1 2956 SW_MAXIMIZE = 3 2957 startupinfo = subprocess.STARTUPINFO() 2958 startupinfo.dwFlags = STARTF_USESHOWWINDOW 2959 startupinfo.wShowWindow = SW_MAXIMIZE 2960 # Since Python is a console process, it won't be affected 2961 # by wShowWindow, but the argument should be silently 2962 # ignored 2963 subprocess.call(ZERO_RETURN_CMD, 2964 startupinfo=startupinfo) 2965 2966 def test_startupinfo_keywords(self): 2967 # startupinfo argument 2968 # We use hardcoded constants, because we do not want to 2969 # depend on win32all. 2970 STARTF_USERSHOWWINDOW = 1 2971 SW_MAXIMIZE = 3 2972 startupinfo = subprocess.STARTUPINFO( 2973 dwFlags=STARTF_USERSHOWWINDOW, 2974 wShowWindow=SW_MAXIMIZE 2975 ) 2976 # Since Python is a console process, it won't be affected 2977 # by wShowWindow, but the argument should be silently 2978 # ignored 2979 subprocess.call(ZERO_RETURN_CMD, 2980 startupinfo=startupinfo) 2981 2982 def test_startupinfo_copy(self): 2983 # bpo-34044: Popen must not modify input STARTUPINFO structure 2984 startupinfo = subprocess.STARTUPINFO() 2985 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW 2986 startupinfo.wShowWindow = subprocess.SW_HIDE 2987 2988 # Call Popen() twice with the same startupinfo object to make sure 2989 # that it's not modified 2990 for _ in range(2): 2991 cmd = ZERO_RETURN_CMD 2992 with open(os.devnull, 'w') as null: 2993 proc = subprocess.Popen(cmd, 2994 stdout=null, 2995 stderr=subprocess.STDOUT, 2996 startupinfo=startupinfo) 2997 with proc: 2998 proc.communicate() 2999 self.assertEqual(proc.returncode, 0) 3000 3001 self.assertEqual(startupinfo.dwFlags, 3002 subprocess.STARTF_USESHOWWINDOW) 3003 self.assertIsNone(startupinfo.hStdInput) 3004 self.assertIsNone(startupinfo.hStdOutput) 3005 self.assertIsNone(startupinfo.hStdError) 3006 self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE) 3007 self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []}) 3008 3009 def test_creationflags(self): 3010 # creationflags argument 3011 CREATE_NEW_CONSOLE = 16 3012 sys.stderr.write(" a DOS box should flash briefly ...\n") 3013 subprocess.call(sys.executable + 3014 ' -c "import time; time.sleep(0.25)"', 3015 creationflags=CREATE_NEW_CONSOLE) 3016 3017 def test_invalid_args(self): 3018 # invalid arguments should raise ValueError 3019 self.assertRaises(ValueError, subprocess.call, 3020 [sys.executable, "-c", 3021 "import sys; sys.exit(47)"], 3022 preexec_fn=lambda: 1) 3023 3024 @support.cpython_only 3025 def test_issue31471(self): 3026 # There shouldn't be an assertion failure in Popen() in case the env 3027 # argument has a bad keys() method. 3028 class BadEnv(dict): 3029 keys = None 3030 with self.assertRaises(TypeError): 3031 subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv()) 3032 3033 def test_close_fds(self): 3034 # close file descriptors 3035 rc = subprocess.call([sys.executable, "-c", 3036 "import sys; sys.exit(47)"], 3037 close_fds=True) 3038 self.assertEqual(rc, 47) 3039 3040 def test_close_fds_with_stdio(self): 3041 import msvcrt 3042 3043 fds = os.pipe() 3044 self.addCleanup(os.close, fds[0]) 3045 self.addCleanup(os.close, fds[1]) 3046 3047 handles = [] 3048 for fd in fds: 3049 os.set_inheritable(fd, True) 3050 handles.append(msvcrt.get_osfhandle(fd)) 3051 3052 p = subprocess.Popen([sys.executable, "-c", 3053 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3054 stdout=subprocess.PIPE, close_fds=False) 3055 stdout, stderr = p.communicate() 3056 self.assertEqual(p.returncode, 0) 3057 int(stdout.strip()) # Check that stdout is an integer 3058 3059 p = subprocess.Popen([sys.executable, "-c", 3060 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3061 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) 3062 stdout, stderr = p.communicate() 3063 self.assertEqual(p.returncode, 1) 3064 self.assertIn(b"OSError", stderr) 3065 3066 # The same as the previous call, but with an empty handle_list 3067 handle_list = [] 3068 startupinfo = subprocess.STARTUPINFO() 3069 startupinfo.lpAttributeList = {"handle_list": handle_list} 3070 p = subprocess.Popen([sys.executable, "-c", 3071 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3072 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3073 startupinfo=startupinfo, close_fds=True) 3074 stdout, stderr = p.communicate() 3075 self.assertEqual(p.returncode, 1) 3076 self.assertIn(b"OSError", stderr) 3077 3078 # Check for a warning due to using handle_list and close_fds=False 3079 with support.check_warnings((".*overriding close_fds", RuntimeWarning)): 3080 startupinfo = subprocess.STARTUPINFO() 3081 startupinfo.lpAttributeList = {"handle_list": handles[:]} 3082 p = subprocess.Popen([sys.executable, "-c", 3083 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3084 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3085 startupinfo=startupinfo, close_fds=False) 3086 stdout, stderr = p.communicate() 3087 self.assertEqual(p.returncode, 0) 3088 3089 def test_empty_attribute_list(self): 3090 startupinfo = subprocess.STARTUPINFO() 3091 startupinfo.lpAttributeList = {} 3092 subprocess.call(ZERO_RETURN_CMD, 3093 startupinfo=startupinfo) 3094 3095 def test_empty_handle_list(self): 3096 startupinfo = subprocess.STARTUPINFO() 3097 startupinfo.lpAttributeList = {"handle_list": []} 3098 subprocess.call(ZERO_RETURN_CMD, 3099 startupinfo=startupinfo) 3100 3101 def test_shell_sequence(self): 3102 # Run command through the shell (sequence) 3103 newenv = os.environ.copy() 3104 newenv["FRUIT"] = "physalis" 3105 p = subprocess.Popen(["set"], shell=1, 3106 stdout=subprocess.PIPE, 3107 env=newenv) 3108 with p: 3109 self.assertIn(b"physalis", p.stdout.read()) 3110 3111 def test_shell_string(self): 3112 # Run command through the shell (string) 3113 newenv = os.environ.copy() 3114 newenv["FRUIT"] = "physalis" 3115 p = subprocess.Popen("set", shell=1, 3116 stdout=subprocess.PIPE, 3117 env=newenv) 3118 with p: 3119 self.assertIn(b"physalis", p.stdout.read()) 3120 3121 def test_shell_encodings(self): 3122 # Run command through the shell (string) 3123 for enc in ['ansi', 'oem']: 3124 newenv = os.environ.copy() 3125 newenv["FRUIT"] = "physalis" 3126 p = subprocess.Popen("set", shell=1, 3127 stdout=subprocess.PIPE, 3128 env=newenv, 3129 encoding=enc) 3130 with p: 3131 self.assertIn("physalis", p.stdout.read(), enc) 3132 3133 def test_call_string(self): 3134 # call() function with string argument on Windows 3135 rc = subprocess.call(sys.executable + 3136 ' -c "import sys; sys.exit(47)"') 3137 self.assertEqual(rc, 47) 3138 3139 def _kill_process(self, method, *args): 3140 # Some win32 buildbot raises EOFError if stdin is inherited 3141 p = subprocess.Popen([sys.executable, "-c", """if 1: 3142 import sys, time 3143 sys.stdout.write('x\\n') 3144 sys.stdout.flush() 3145 time.sleep(30) 3146 """], 3147 stdin=subprocess.PIPE, 3148 stdout=subprocess.PIPE, 3149 stderr=subprocess.PIPE) 3150 with p: 3151 # Wait for the interpreter to be completely initialized before 3152 # sending any signal. 3153 p.stdout.read(1) 3154 getattr(p, method)(*args) 3155 _, stderr = p.communicate() 3156 self.assertStderrEqual(stderr, b'') 3157 returncode = p.wait() 3158 self.assertNotEqual(returncode, 0) 3159 3160 def _kill_dead_process(self, method, *args): 3161 p = subprocess.Popen([sys.executable, "-c", """if 1: 3162 import sys, time 3163 sys.stdout.write('x\\n') 3164 sys.stdout.flush() 3165 sys.exit(42) 3166 """], 3167 stdin=subprocess.PIPE, 3168 stdout=subprocess.PIPE, 3169 stderr=subprocess.PIPE) 3170 with p: 3171 # Wait for the interpreter to be completely initialized before 3172 # sending any signal. 3173 p.stdout.read(1) 3174 # The process should end after this 3175 time.sleep(1) 3176 # This shouldn't raise even though the child is now dead 3177 getattr(p, method)(*args) 3178 _, stderr = p.communicate() 3179 self.assertStderrEqual(stderr, b'') 3180 rc = p.wait() 3181 self.assertEqual(rc, 42) 3182 3183 def test_send_signal(self): 3184 self._kill_process('send_signal', signal.SIGTERM) 3185 3186 def test_kill(self): 3187 self._kill_process('kill') 3188 3189 def test_terminate(self): 3190 self._kill_process('terminate') 3191 3192 def test_send_signal_dead(self): 3193 self._kill_dead_process('send_signal', signal.SIGTERM) 3194 3195 def test_kill_dead(self): 3196 self._kill_dead_process('kill') 3197 3198 def test_terminate_dead(self): 3199 self._kill_dead_process('terminate') 3200 3201class MiscTests(unittest.TestCase): 3202 3203 class RecordingPopen(subprocess.Popen): 3204 """A Popen that saves a reference to each instance for testing.""" 3205 instances_created = [] 3206 3207 def __init__(self, *args, **kwargs): 3208 super().__init__(*args, **kwargs) 3209 self.instances_created.append(self) 3210 3211 @mock.patch.object(subprocess.Popen, "_communicate") 3212 def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate, 3213 **kwargs): 3214 """Fake a SIGINT happening during Popen._communicate() and ._wait(). 3215 3216 This avoids the need to actually try and get test environments to send 3217 and receive signals reliably across platforms. The net effect of a ^C 3218 happening during a blocking subprocess execution which we want to clean 3219 up from is a KeyboardInterrupt coming out of communicate() or wait(). 3220 """ 3221 3222 mock__communicate.side_effect = KeyboardInterrupt 3223 try: 3224 with mock.patch.object(subprocess.Popen, "_wait") as mock__wait: 3225 # We patch out _wait() as no signal was involved so the 3226 # child process isn't actually going to exit rapidly. 3227 mock__wait.side_effect = KeyboardInterrupt 3228 with mock.patch.object(subprocess, "Popen", 3229 self.RecordingPopen): 3230 with self.assertRaises(KeyboardInterrupt): 3231 popener([sys.executable, "-c", 3232 "import time\ntime.sleep(9)\nimport sys\n" 3233 "sys.stderr.write('\\n!runaway child!\\n')"], 3234 stdout=subprocess.DEVNULL, **kwargs) 3235 for call in mock__wait.call_args_list[1:]: 3236 self.assertNotEqual( 3237 call, mock.call(timeout=None), 3238 "no open-ended wait() after the first allowed: " 3239 f"{mock__wait.call_args_list}") 3240 sigint_calls = [] 3241 for call in mock__wait.call_args_list: 3242 if call == mock.call(timeout=0.25): # from Popen.__init__ 3243 sigint_calls.append(call) 3244 self.assertLessEqual(mock__wait.call_count, 2, 3245 msg=mock__wait.call_args_list) 3246 self.assertEqual(len(sigint_calls), 1, 3247 msg=mock__wait.call_args_list) 3248 finally: 3249 # cleanup the forgotten (due to our mocks) child process 3250 process = self.RecordingPopen.instances_created.pop() 3251 process.kill() 3252 process.wait() 3253 self.assertEqual([], self.RecordingPopen.instances_created) 3254 3255 def test_call_keyboardinterrupt_no_kill(self): 3256 self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282) 3257 3258 def test_run_keyboardinterrupt_no_kill(self): 3259 self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282) 3260 3261 def test_context_manager_keyboardinterrupt_no_kill(self): 3262 def popen_via_context_manager(*args, **kwargs): 3263 with subprocess.Popen(*args, **kwargs) as unused_process: 3264 raise KeyboardInterrupt # Test how __exit__ handles ^C. 3265 self._test_keyboardinterrupt_no_kill(popen_via_context_manager) 3266 3267 def test_getoutput(self): 3268 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 3269 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 3270 (0, 'xyzzy')) 3271 3272 # we use mkdtemp in the next line to create an empty directory 3273 # under our exclusive control; from that, we can invent a pathname 3274 # that we _know_ won't exist. This is guaranteed to fail. 3275 dir = None 3276 try: 3277 dir = tempfile.mkdtemp() 3278 name = os.path.join(dir, "foo") 3279 status, output = subprocess.getstatusoutput( 3280 ("type " if mswindows else "cat ") + name) 3281 self.assertNotEqual(status, 0) 3282 finally: 3283 if dir is not None: 3284 os.rmdir(dir) 3285 3286 def test__all__(self): 3287 """Ensure that __all__ is populated properly.""" 3288 intentionally_excluded = {"list2cmdline", "Handle"} 3289 exported = set(subprocess.__all__) 3290 possible_exports = set() 3291 import types 3292 for name, value in subprocess.__dict__.items(): 3293 if name.startswith('_'): 3294 continue 3295 if isinstance(value, (types.ModuleType,)): 3296 continue 3297 possible_exports.add(name) 3298 self.assertEqual(exported, possible_exports - intentionally_excluded) 3299 3300 3301@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 3302 "Test needs selectors.PollSelector") 3303class ProcessTestCaseNoPoll(ProcessTestCase): 3304 def setUp(self): 3305 self.orig_selector = subprocess._PopenSelector 3306 subprocess._PopenSelector = selectors.SelectSelector 3307 ProcessTestCase.setUp(self) 3308 3309 def tearDown(self): 3310 subprocess._PopenSelector = self.orig_selector 3311 ProcessTestCase.tearDown(self) 3312 3313 3314@unittest.skipUnless(mswindows, "Windows-specific tests") 3315class CommandsWithSpaces (BaseTestCase): 3316 3317 def setUp(self): 3318 super().setUp() 3319 f, fname = tempfile.mkstemp(".py", "te st") 3320 self.fname = fname.lower () 3321 os.write(f, b"import sys;" 3322 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 3323 ) 3324 os.close(f) 3325 3326 def tearDown(self): 3327 os.remove(self.fname) 3328 super().tearDown() 3329 3330 def with_spaces(self, *args, **kwargs): 3331 kwargs['stdout'] = subprocess.PIPE 3332 p = subprocess.Popen(*args, **kwargs) 3333 with p: 3334 self.assertEqual( 3335 p.stdout.read ().decode("mbcs"), 3336 "2 [%r, 'ab cd']" % self.fname 3337 ) 3338 3339 def test_shell_string_with_spaces(self): 3340 # call() function with string argument with spaces on Windows 3341 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3342 "ab cd"), shell=1) 3343 3344 def test_shell_sequence_with_spaces(self): 3345 # call() function with sequence argument with spaces on Windows 3346 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 3347 3348 def test_noshell_string_with_spaces(self): 3349 # call() function with string argument with spaces on Windows 3350 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3351 "ab cd")) 3352 3353 def test_noshell_sequence_with_spaces(self): 3354 # call() function with sequence argument with spaces on Windows 3355 self.with_spaces([sys.executable, self.fname, "ab cd"]) 3356 3357 3358class ContextManagerTests(BaseTestCase): 3359 3360 def test_pipe(self): 3361 with subprocess.Popen([sys.executable, "-c", 3362 "import sys;" 3363 "sys.stdout.write('stdout');" 3364 "sys.stderr.write('stderr');"], 3365 stdout=subprocess.PIPE, 3366 stderr=subprocess.PIPE) as proc: 3367 self.assertEqual(proc.stdout.read(), b"stdout") 3368 self.assertStderrEqual(proc.stderr.read(), b"stderr") 3369 3370 self.assertTrue(proc.stdout.closed) 3371 self.assertTrue(proc.stderr.closed) 3372 3373 def test_returncode(self): 3374 with subprocess.Popen([sys.executable, "-c", 3375 "import sys; sys.exit(100)"]) as proc: 3376 pass 3377 # __exit__ calls wait(), so the returncode should be set 3378 self.assertEqual(proc.returncode, 100) 3379 3380 def test_communicate_stdin(self): 3381 with subprocess.Popen([sys.executable, "-c", 3382 "import sys;" 3383 "sys.exit(sys.stdin.read() == 'context')"], 3384 stdin=subprocess.PIPE) as proc: 3385 proc.communicate(b"context") 3386 self.assertEqual(proc.returncode, 1) 3387 3388 def test_invalid_args(self): 3389 with self.assertRaises(NONEXISTING_ERRORS): 3390 with subprocess.Popen(NONEXISTING_CMD, 3391 stdout=subprocess.PIPE, 3392 stderr=subprocess.PIPE) as proc: 3393 pass 3394 3395 def test_broken_pipe_cleanup(self): 3396 """Broken pipe error should not prevent wait() (Issue 21619)""" 3397 proc = subprocess.Popen(ZERO_RETURN_CMD, 3398 stdin=subprocess.PIPE, 3399 bufsize=support.PIPE_MAX_SIZE*2) 3400 proc = proc.__enter__() 3401 # Prepare to send enough data to overflow any OS pipe buffering and 3402 # guarantee a broken pipe error. Data is held in BufferedWriter 3403 # buffer until closed. 3404 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 3405 self.assertIsNone(proc.returncode) 3406 # EPIPE expected under POSIX; EINVAL under Windows 3407 self.assertRaises(OSError, proc.__exit__, None, None, None) 3408 self.assertEqual(proc.returncode, 0) 3409 self.assertTrue(proc.stdin.closed) 3410 3411 3412if __name__ == "__main__": 3413 unittest.main() 3414